diff options
author | Richard Braun <rbraun@sceen.net> | 2013-06-03 22:02:01 +0200 |
---|---|---|
committer | Richard Braun <rbraun@sceen.net> | 2013-06-03 22:02:01 +0200 |
commit | 195093aee7f240ac341b428fa758a598d10dbaab (patch) | |
tree | 4d1844e7c881ff1cd838651459ace16c62b0012a | |
parent | 7b712748e94fadbcb9131c5c1b19caa3d360db5c (diff) |
kern/work: new module
This module implements thread pools to concurrently process queues of
deferred works.
-rw-r--r-- | Makefrag.am | 4 | ||||
-rw-r--r-- | kern/kernel.c | 2 | ||||
-rw-r--r-- | kern/work.c | 297 | ||||
-rw-r--r-- | kern/work.h | 146 |
4 files changed, 448 insertions, 1 deletions
diff --git a/Makefrag.am b/Makefrag.am index e4a09e27..fdfbf6c5 100644 --- a/Makefrag.am +++ b/Makefrag.am @@ -46,7 +46,9 @@ x15_SOURCES += \ kern/task.h \ kern/thread.c \ kern/thread.h \ - kern/types.h + kern/types.h \ + kern/work.c \ + kern/work.h x15_SOURCES += \ vm/vm_inherit.h \ diff --git a/kern/kernel.c b/kern/kernel.c index afeb1719..5508d569 100644 --- a/kern/kernel.c +++ b/kern/kernel.c @@ -22,6 +22,7 @@ #include <kern/panic.h> #include <kern/task.h> #include <kern/thread.h> +#include <kern/work.h> #include <machine/cpu.h> void __init @@ -36,6 +37,7 @@ kernel_main(void) cpumap_setup(); task_setup(); thread_setup(); + work_setup(); llsync_setup(); /* Rendezvous with APs */ diff --git a/kern/work.c b/kern/work.c new file mode 100644 index 00000000..de80cd06 --- /dev/null +++ b/kern/work.c @@ -0,0 +1,297 @@ +/* + * Copyright (c) 2013 Richard Braun. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * + * TODO Per-processor pools. + */ + +#include <kern/error.h> +#include <kern/kmem.h> +#include <kern/list.h> +#include <kern/panic.h> +#include <kern/printk.h> +#include <kern/spinlock.h> +#include <kern/sprintf.h> +#include <kern/stddef.h> +#include <kern/thread.h> +#include <kern/work.h> +#include <machine/cpu.h> + +#define WORK_PRIO_NORMAL THREAD_SCHED_TS_PRIO_DEFAULT +#define WORK_PRIO_HIGH THREAD_SCHED_TS_PRIO_MAX + +/* + * Keep at least that many threads alive when a work pool is idle. + * + * TODO Use time instead of a raw value to keep threads available. + */ +#define WORK_THREADS_SPARE 4 + +/* + * When computing the maximum number of worker threads, start with multiplying + * the number of processors by the ratio below. If the result is greater than + * the threshold, retry by decreasing the ratio until either the result is + * less than the threshold or the ratio is 1. + */ +#define WORK_THREADS_RATIO 4 +#define WORK_THREADS_THRESHOLD 512 + +#define WORK_NAME_SIZE 16 + +/* + * Work pool flags. + */ +#define WORK_PF_HIGHPRIO 0x1 /* High priority worker threads */ + +struct work_thread { + struct list node; + struct work_pool *pool; + struct thread *thread; +}; + +/* + * Pool of threads and works. + * + * Interrupts must be disabled when acquiring the pool lock. + */ +struct work_pool { + struct spinlock lock; + int flags; + struct work_queue queue; + struct work_thread *manager; + unsigned int nr_threads; + unsigned int nr_available_threads; + struct list available_threads; + char name[WORK_NAME_SIZE]; +}; + +static int work_thread_create(struct work_pool *pool); +static void work_thread_destroy(struct work_thread *worker); + +static struct work_pool work_pool_main; +static struct work_pool work_pool_highprio; + +static struct kmem_cache work_thread_cache; + +static unsigned int work_max_threads; + +static void +work_pool_init(struct work_pool *pool, const char *name, int flags) +{ + int error; + + spinlock_init(&pool->lock); + pool->flags = flags; + work_queue_init(&pool->queue); + pool->manager = NULL; + pool->nr_threads = 1; + pool->nr_available_threads = 0; + list_init(&pool->available_threads); + strlcpy(pool->name, name, sizeof(pool->name)); + error = work_thread_create(pool); + + if (error) + panic("work: unable to create initial worker thread"); +} + +static void +work_pool_wakeup_manager(struct work_pool *pool) +{ + if (pool->queue.nr_works == 0) + return; + + if ((pool->manager != NULL) && (pool->manager->thread != thread_self())) + thread_wakeup(pool->manager->thread); +} + +static inline struct work_pool * +work_pool_select(int flags) +{ + return (flags & WORK_HIGHPRIO) ? &work_pool_highprio : &work_pool_main; +} + +static void +work_process(void *arg) +{ + struct work_thread *self, *worker; + struct work_pool *pool; + struct work *work; + unsigned long flags; + int error; + + self = arg; + pool = self->pool; + + for (;;) { + spinlock_lock_intr_save(&pool->lock, &flags); + + if (pool->manager != NULL) { + list_insert_tail(&pool->available_threads, &self->node); + pool->nr_available_threads++; + + do + thread_sleep(&pool->lock); + while (pool->manager != NULL); + + list_remove(&self->node); + pool->nr_available_threads--; + } + + if (pool->queue.nr_works == 0) { + if (pool->nr_threads > WORK_THREADS_SPARE) + break; + + pool->manager = self; + + do + thread_sleep(&pool->lock); + while (pool->queue.nr_works == 0); + + pool->manager = NULL; + } + + work = work_queue_pop(&pool->queue); + + if (pool->queue.nr_works != 0) { + if (pool->nr_available_threads != 0) { + worker = list_first_entry(&pool->available_threads, + struct work_thread, node); + thread_wakeup(worker->thread); + } else if (pool->nr_threads < work_max_threads) { + pool->nr_threads++; + spinlock_unlock_intr_restore(&pool->lock, flags); + + error = work_thread_create(pool); + + spinlock_lock_intr_save(&pool->lock, &flags); + + if (error) { + pool->nr_threads--; + printk("work: warning: unable to create worker thread\n"); + } + } + } + + spinlock_unlock_intr_restore(&pool->lock, flags); + + work->fn(work); + } + + pool->nr_threads--; + spinlock_unlock_intr_restore(&pool->lock, flags); + work_thread_destroy(self); +} + +static int +work_thread_create(struct work_pool *pool) +{ + char name[THREAD_NAME_SIZE]; + struct thread_attr attr; + struct work_thread *worker; + int error; + + worker = kmem_cache_alloc(&work_thread_cache); + + if (worker == NULL) + return ERROR_NOMEM; + + worker->pool = pool; + + /* TODO Allocate numeric IDs to better identify worker threads */ + snprintf(name, sizeof(name), "x15_work_process:%s", pool->name); + attr.name = name; + attr.cpumap = NULL; + attr.task = NULL; + attr.policy = THREAD_SCHED_POLICY_TS; + attr.priority = (pool->flags & WORK_PF_HIGHPRIO) + ? WORK_PRIO_HIGH + : WORK_PRIO_NORMAL; + error = thread_create(&worker->thread, &attr, work_process, worker); + + if (error) + goto error_thread; + + return 0; + +error_thread: + kmem_cache_free(&work_thread_cache, worker); + return error; +} + +static void +work_thread_destroy(struct work_thread *worker) +{ + kmem_cache_free(&work_thread_cache, worker); +} + +static void +work_compute_max_threads(void) +{ + unsigned int max_threads, nr_cpus, ratio; + + nr_cpus = cpu_count(); + ratio = WORK_THREADS_RATIO; + max_threads = nr_cpus * ratio; + + while ((ratio > 1) && (max_threads > WORK_THREADS_THRESHOLD)) { + ratio--; + max_threads = nr_cpus * ratio; + } + + work_max_threads = max_threads; + printk("work: threads per pool (spare/limit): %u/%u\n", + WORK_THREADS_SPARE, max_threads); +} + +void +work_setup(void) +{ + kmem_cache_init(&work_thread_cache, "work_thread", + sizeof(struct work_thread), 0, NULL, NULL, NULL, 0); + + work_compute_max_threads(); + + work_pool_init(&work_pool_main, "main", 0); + work_pool_init(&work_pool_highprio, "highprio", WORK_PF_HIGHPRIO); +} + +void +work_schedule(struct work *work, int flags) +{ + struct work_pool *pool; + unsigned long lock_flags; + + pool = work_pool_select(flags); + + spinlock_lock_intr_save(&pool->lock, &lock_flags); + work_queue_push(&pool->queue, work); + work_pool_wakeup_manager(pool); + spinlock_unlock_intr_restore(&pool->lock, lock_flags); +} + +void +work_queue_schedule(struct work_queue *queue, int flags) +{ + struct work_pool *pool; + unsigned long lock_flags; + + pool = work_pool_select(flags); + + spinlock_lock_intr_save(&pool->lock, &lock_flags); + work_queue_concat(&pool->queue, queue); + work_pool_wakeup_manager(pool); + spinlock_unlock_intr_restore(&pool->lock, lock_flags); +} diff --git a/kern/work.h b/kern/work.h new file mode 100644 index 00000000..b3668df9 --- /dev/null +++ b/kern/work.h @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2013 Richard Braun. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * + * Deferred work queues. + * + * Works, like threads, are scheduled activities, but they are much shorter + * and (usually) consume a lot less resources. They are allowed to block + * and must run in thread context. This module provides thread pools to + * concurrently handle queued works. + */ + +#ifndef _KERN_WORK_H +#define _KERN_WORK_H + +/* + * Work scheduling flags. + */ +#define WORK_HIGHPRIO 0x1 /* Use a high priority worker thread */ + +struct work; + +/* + * Type for work functions. + */ +typedef void (*work_fn_t)(struct work *); + +/* + * Deferred work. + * + * This structure should be embedded in objects related to the work. It + * stores the work function and is passed to it as its only parameter. + * The function can then find the containing object with the structof macro. + */ +struct work { + struct work *next; + work_fn_t fn; +}; + +/* + * Queue of deferred works for batch scheduling. + */ +struct work_queue { + struct work *first; + struct work *last; + unsigned int nr_works; +}; + +static inline void +work_queue_init(struct work_queue *queue) +{ + queue->first = NULL; + queue->last = NULL; + queue->nr_works = 0; +} + +static inline unsigned int +work_queue_nr_works(const struct work_queue *queue) +{ + return queue->nr_works; +} + +static inline void +work_queue_push(struct work_queue *queue, struct work *work) +{ + work->next = NULL; + + if (queue->last == NULL) + queue->first = work; + else + queue->last->next = work; + + queue->last = work; + queue->nr_works++; +} + +static inline struct work * +work_queue_pop(struct work_queue *queue) +{ + struct work *work; + + work = queue->first; + queue->first = work->next; + + if (queue->last == work) + queue->last = NULL; + + queue->nr_works--; + return work; +} + +static inline void +work_queue_transfer(struct work_queue *dest, struct work_queue *src) +{ + *dest = *src; +} + +static inline void +work_queue_concat(struct work_queue *queue1, struct work_queue *queue2) +{ + if (queue2->nr_works == 0) + return; + + if (queue1->nr_works == 0) { + *queue1 = *queue2; + return; + } + + queue1->last->next = queue2->first; + queue1->last = queue2->last; + queue1->nr_works += queue2->nr_works; +} + +static inline void +work_init(struct work *work, work_fn_t fn) +{ + work->fn = fn; +} + +/* + * Initialize the work module. + */ +void work_setup(void); + +/* + * Schedule work for deferred processing. + * + * This function may be called from interrupt context. + */ +void work_schedule(struct work *work, int flags); +void work_queue_schedule(struct work_queue *queue, int flags); + +#endif /* _KERN_WORK_H */ |