diff options
author | Richard Braun <rbraun@sceen.net> | 2014-06-17 23:30:24 +0200 |
---|---|---|
committer | Richard Braun <rbraun@sceen.net> | 2014-06-17 23:30:24 +0200 |
commit | bbbc53a1f0893703d1a5cc0293f6bc5ae6181fea (patch) | |
tree | 0560467446ddd2718d62fffe3681f29e56f7c865 | |
parent | 718be3a92dd91dacaf7f629561aac62eacb9744e (diff) |
kern/work: per-processor work pools
-rw-r--r-- | kern/thread.c | 2 | ||||
-rw-r--r-- | kern/work.c | 297 | ||||
-rw-r--r-- | kern/work.h | 10 |
3 files changed, 248 insertions, 61 deletions
diff --git a/kern/thread.c b/kern/thread.c index 345d01e6..b660a15d 100644 --- a/kern/thread.c +++ b/kern/thread.c @@ -100,6 +100,7 @@ #include <kern/string.h> #include <kern/task.h> #include <kern/thread.h> +#include <kern/work.h> #include <machine/atomic.h> #include <machine/cpu.h> #include <machine/mb.h> @@ -2005,6 +2006,7 @@ thread_tick_intr(void) runq = thread_runq_local(); evcnt_inc(&runq->ev_tick_intr); llsync_report_periodic_event(); + work_report_periodic_event(); thread = thread_self(); spinlock_lock(&runq->lock); diff --git a/kern/work.c b/kern/work.c index 1d41069e..d74fc8bd 100644 --- a/kern/work.c +++ b/kern/work.c @@ -13,14 +13,12 @@ * * You should have received a copy of the GNU General Public License * along with this program. If not, see <http://www.gnu.org/licenses/>. - * - * - * TODO Per-processor pools. */ #include <kern/assert.h> #include <kern/bitmap.h> #include <kern/error.h> +#include <kern/evcnt.h> #include <kern/kmem.h> #include <kern/list.h> #include <kern/macros.h> @@ -52,12 +50,13 @@ #define WORK_THREADS_THRESHOLD 512 #define WORK_MAX_THREADS MAX(MAX_CPUS, WORK_THREADS_THRESHOLD) -#define WORK_NAME_SIZE 16 +#define WORK_POOL_NAME_SIZE 16 /* * Work pool flags. */ -#define WORK_PF_HIGHPRIO 0x1 /* High priority worker threads */ +#define WORK_PF_GLOBAL 0x1 /* System-wide work queue */ +#define WORK_PF_HIGHPRIO 0x2 /* High priority worker threads */ struct work_thread { struct list node; @@ -70,37 +69,57 @@ struct work_thread { * Pool of threads and works. * * Interrupts must be disabled when acquiring the pool lock. + * + * There are two internal queues of pending works. When first scheduling + * a work, it is inserted into queue0. After a periodic event, works still + * present in queue0 are moved to queue1. If these works are still present + * in queue1 at the next periodic event, it means they couldn't be processed + * for a complete period between two periodic events, at which point it is + * assumed that processing works on the same processor they were queued on + * becomes less relevant. As a result, periodic events also trigger the + * transfer of works from queue1 to the matching global pool. Global pools + * only use one queue. + * + * TODO While it's not strictly necessary to hold the lock when accessing a + * per-processor pool, since disabling interrupts and preemption could be + * used instead, it's currently enforced by the programming model of the + * thread module. The thread_sleep() function could be changed to accept a + * NULL interlock but this requires clearly defining constraints for safe + * usage. */ struct work_pool { struct spinlock lock; int flags; - struct work_queue queue; + struct work_queue queue0; + struct work_queue queue1; struct work_thread *manager; + struct evcnt ev_transfer; + unsigned int max_threads; unsigned int nr_threads; unsigned int nr_available_threads; struct list available_threads; BITMAP_DECLARE(bitmap, WORK_MAX_THREADS); - char name[WORK_NAME_SIZE]; -}; + char name[WORK_POOL_NAME_SIZE]; +} __aligned(CPU_L1_SIZE); static int work_thread_create(struct work_pool *pool, unsigned int id); static void work_thread_destroy(struct work_thread *worker); +static struct work_pool work_pool_cpu_main[MAX_CPUS]; +static struct work_pool work_pool_cpu_highprio[MAX_CPUS]; static struct work_pool work_pool_main; static struct work_pool work_pool_highprio; static struct kmem_cache work_thread_cache; -static unsigned int work_max_threads __read_mostly; - static unsigned int work_pool_alloc_id(struct work_pool *pool) { int bit; - assert(pool->nr_threads < work_max_threads); + assert(pool->nr_threads < pool->max_threads); pool->nr_threads++; - bit = bitmap_find_first_zero(pool->bitmap, work_max_threads); + bit = bitmap_find_first_zero(pool->bitmap, pool->max_threads); assert(bit >= 0); bitmap_set(pool->bitmap, bit); return bit; @@ -114,20 +133,69 @@ work_pool_free_id(struct work_pool *pool, unsigned int id) bitmap_clear(pool->bitmap, id); } +static unsigned int +work_pool_cpu_id(const struct work_pool *pool) +{ + const struct work_pool *array; + + assert(!(pool->flags & WORK_PF_GLOBAL)); + + array = (pool->flags & WORK_PF_HIGHPRIO) + ? work_pool_cpu_highprio + : work_pool_cpu_main; + return pool - array; +} + +static unsigned int +work_pool_compute_max_threads(unsigned int nr_cpus) +{ + unsigned int max_threads, ratio; + + ratio = WORK_THREADS_RATIO; + max_threads = nr_cpus * ratio; + + while ((ratio > 1) && (max_threads > WORK_THREADS_THRESHOLD)) { + ratio--; + max_threads = nr_cpus * ratio; + } + + assert(max_threads != 0); + assert(max_threads <= WORK_MAX_THREADS); + return max_threads; +} + static void work_pool_init(struct work_pool *pool, const char *name, int flags) { - unsigned int id; + char ev_name[EVCNT_NAME_SIZE]; + const char *suffix; + unsigned int id, nr_cpus, pool_id, max_threads; int error; - spinlock_init(&pool->lock); pool->flags = flags; - work_queue_init(&pool->queue); + + if (flags & WORK_PF_GLOBAL) + nr_cpus = cpu_count(); + else { + nr_cpus = 1; + suffix = (flags & WORK_PF_HIGHPRIO) ? "h" : ""; + pool_id = work_pool_cpu_id(pool); + snprintf(ev_name, sizeof(ev_name), "work_transfer/%u%s", + pool_id, suffix); + evcnt_register(&pool->ev_transfer, ev_name); + } + + max_threads = work_pool_compute_max_threads(nr_cpus); + + spinlock_init(&pool->lock); + work_queue_init(&pool->queue0); + work_queue_init(&pool->queue1); pool->manager = NULL; + pool->max_threads = max_threads; pool->nr_threads = 0; pool->nr_available_threads = 0; list_init(&pool->available_threads); - bitmap_zero(pool->bitmap, work_max_threads); + bitmap_zero(pool->bitmap, WORK_MAX_THREADS); strlcpy(pool->name, name, sizeof(pool->name)); id = work_pool_alloc_id(pool); @@ -142,20 +210,70 @@ error_thread: panic("work: unable to create initial worker thread"); } +static struct work_pool * +work_pool_cpu_select(int flags) +{ + unsigned int cpu; + + cpu = cpu_id(); + return (flags & WORK_HIGHPRIO) + ? &work_pool_cpu_highprio[cpu] + : &work_pool_cpu_main[cpu]; +} + +static int +work_pool_nr_works(const struct work_pool *pool) +{ + return (work_queue_nr_works(&pool->queue0) + + work_queue_nr_works(&pool->queue1)); +} + +static struct work * +work_pool_pop_work(struct work_pool *pool) +{ + if (!(pool->flags & WORK_PF_GLOBAL)) { + if (work_queue_nr_works(&pool->queue1) != 0) + return work_queue_pop(&pool->queue1); + } + + return work_queue_pop(&pool->queue0); +} + static void work_pool_wakeup_manager(struct work_pool *pool) { - if (pool->queue.nr_works == 0) + if (work_pool_nr_works(pool) == 0) return; if ((pool->manager != NULL) && (pool->manager->thread != thread_self())) thread_wakeup(pool->manager->thread); } -static inline struct work_pool * -work_pool_select(int flags) +static void +work_pool_shift_queues(struct work_pool *pool, struct work_queue *old_queue) +{ + assert(!(pool->flags & WORK_PF_GLOBAL)); + + work_queue_transfer(old_queue, &pool->queue1); + work_queue_transfer(&pool->queue1, &pool->queue0); + work_queue_init(&pool->queue0); + + if (work_queue_nr_works(old_queue) != 0) + evcnt_inc(&pool->ev_transfer); +} + +static void +work_pool_push_work(struct work_pool *pool, struct work *work) { - return (flags & WORK_HIGHPRIO) ? &work_pool_highprio : &work_pool_main; + work_queue_push(&pool->queue0, work); + work_pool_wakeup_manager(pool); +} + +static void +work_pool_concat_queue(struct work_pool *pool, struct work_queue *queue) +{ + work_queue_concat(&pool->queue0, queue); + work_pool_wakeup_manager(pool); } static void @@ -186,7 +304,7 @@ work_process(void *arg) pool->nr_available_threads--; } - if (pool->queue.nr_works == 0) { + if (work_pool_nr_works(pool) == 0) { if (pool->nr_threads > WORK_THREADS_SPARE) break; @@ -194,19 +312,19 @@ work_process(void *arg) do thread_sleep(&pool->lock); - while (pool->queue.nr_works == 0); + while (work_pool_nr_works(pool) == 0); pool->manager = NULL; } - work = work_queue_pop(&pool->queue); + work = work_pool_pop_work(pool); - if (pool->queue.nr_works != 0) { + if (work_pool_nr_works(pool) != 0) { if (pool->nr_available_threads != 0) { worker = list_first_entry(&pool->available_threads, struct work_thread, node); thread_wakeup(worker->thread); - } else if (pool->nr_threads < work_max_threads) { + } else if (pool->nr_threads < pool->max_threads) { id = work_pool_alloc_id(pool); spinlock_unlock_intr_restore(&pool->lock, flags); @@ -237,7 +355,9 @@ work_thread_create(struct work_pool *pool, unsigned int id) { char name[THREAD_NAME_SIZE]; struct thread_attr attr; + struct cpumap *cpumap; struct work_thread *worker; + const char *suffix; unsigned short priority; int error; @@ -249,21 +369,51 @@ work_thread_create(struct work_pool *pool, unsigned int id) worker->pool = pool; worker->id = id; - snprintf(name, sizeof(name), "x15_work_process:%s:%u", pool->name, - worker->id); - priority = (pool->flags & WORK_PF_HIGHPRIO) - ? WORK_PRIO_HIGH - : WORK_PRIO_NORMAL; + if (pool->flags & WORK_PF_HIGHPRIO) { + suffix = "h"; + priority = WORK_PRIO_HIGH; + } else { + suffix = ""; + priority = WORK_PRIO_NORMAL; + } + + if (pool->flags & WORK_PF_GLOBAL) { + cpumap = NULL; + snprintf(name, sizeof(name), "x15_work_process/g:%u%s", + worker->id, suffix); + } else { + unsigned int pool_id; + + error = cpumap_create(&cpumap); + + if (error) + goto error_cpumap; + + pool_id = work_pool_cpu_id(pool); + cpumap_zero(cpumap); + cpumap_set(cpumap, pool_id); + snprintf(name, sizeof(name), "x15_work_process/%u:%u%s", + pool_id, worker->id, suffix); + } + thread_attr_init(&attr, name); thread_attr_set_priority(&attr, priority); + + if (cpumap != NULL) + thread_attr_set_cpumap(&attr, cpumap); + error = thread_create(&worker->thread, &attr, work_process, worker); + if (cpumap != NULL) + cpumap_destroy(cpumap); + if (error) goto error_thread; return 0; error_thread: +error_cpumap: kmem_cache_free(&work_thread_cache, worker); return error; } @@ -274,36 +424,29 @@ work_thread_destroy(struct work_thread *worker) kmem_cache_free(&work_thread_cache, worker); } -static void -work_compute_max_threads(void) -{ - unsigned int max_threads, nr_cpus, ratio; - - nr_cpus = cpu_count(); - ratio = WORK_THREADS_RATIO; - max_threads = nr_cpus * ratio; - - while ((ratio > 1) && (max_threads > WORK_THREADS_THRESHOLD)) { - ratio--; - max_threads = nr_cpus * ratio; - } - - assert(max_threads <= WORK_MAX_THREADS); - work_max_threads = max_threads; - printk("work: threads per pool (spare/limit): %u/%u\n", - WORK_THREADS_SPARE, max_threads); -} - void work_setup(void) { + char name[WORK_POOL_NAME_SIZE]; + unsigned int i; + kmem_cache_init(&work_thread_cache, "work_thread", sizeof(struct work_thread), 0, NULL, NULL, NULL, 0); - work_compute_max_threads(); + for (i = 0; i < cpu_count(); i++) { + snprintf(name, sizeof(name), "main%u", i); + work_pool_init(&work_pool_cpu_main[i], name, 0); + snprintf(name, sizeof(name), "highprio%u", i); + work_pool_init(&work_pool_cpu_highprio[i], name, WORK_PF_HIGHPRIO); + } + + work_pool_init(&work_pool_main, "main", WORK_PF_GLOBAL); + work_pool_init(&work_pool_highprio, "highprio", + WORK_PF_GLOBAL | WORK_PF_HIGHPRIO); - work_pool_init(&work_pool_main, "main", 0); - work_pool_init(&work_pool_highprio, "highprio", WORK_PF_HIGHPRIO); + printk("work: threads per pool (per-cpu/global): %u/%u, spare: %u\n", + work_pool_cpu_main[0].max_threads, work_pool_main.max_threads, + WORK_THREADS_SPARE); } void @@ -312,12 +455,12 @@ work_schedule(struct work *work, int flags) struct work_pool *pool; unsigned long lock_flags; - pool = work_pool_select(flags); - + thread_pin(); + pool = work_pool_cpu_select(flags); spinlock_lock_intr_save(&pool->lock, &lock_flags); - work_queue_push(&pool->queue, work); - work_pool_wakeup_manager(pool); + work_pool_push_work(pool, work); spinlock_unlock_intr_restore(&pool->lock, lock_flags); + thread_unpin(); } void @@ -326,10 +469,42 @@ work_queue_schedule(struct work_queue *queue, int flags) struct work_pool *pool; unsigned long lock_flags; - pool = work_pool_select(flags); - + thread_pin(); + pool = work_pool_cpu_select(flags); spinlock_lock_intr_save(&pool->lock, &lock_flags); - work_queue_concat(&pool->queue, queue); - work_pool_wakeup_manager(pool); + work_pool_concat_queue(pool, queue); spinlock_unlock_intr_restore(&pool->lock, lock_flags); + thread_unpin(); +} + +void +work_report_periodic_event(void) +{ + struct work_queue queue, highprio_queue; + unsigned int cpu; + + assert(!cpu_intr_enabled()); + assert(!thread_preempt_enabled()); + + cpu = cpu_id(); + + spinlock_lock(&work_pool_cpu_main[cpu].lock); + work_pool_shift_queues(&work_pool_cpu_main[cpu], &queue); + spinlock_unlock(&work_pool_cpu_main[cpu].lock); + + spinlock_lock(&work_pool_cpu_highprio[cpu].lock); + work_pool_shift_queues(&work_pool_cpu_highprio[cpu], &highprio_queue); + spinlock_unlock(&work_pool_cpu_highprio[cpu].lock); + + if (work_queue_nr_works(&queue) != 0) { + spinlock_lock(&work_pool_main.lock); + work_pool_concat_queue(&work_pool_main, &queue); + spinlock_unlock(&work_pool_main.lock); + } + + if (work_queue_nr_works(&highprio_queue) != 0) { + spinlock_lock(&work_pool_highprio.lock); + work_pool_concat_queue(&work_pool_highprio, &highprio_queue); + spinlock_unlock(&work_pool_highprio.lock); + } } diff --git a/kern/work.h b/kern/work.h index b3668df9..fb22db90 100644 --- a/kern/work.h +++ b/kern/work.h @@ -143,4 +143,14 @@ void work_setup(void); void work_schedule(struct work *work, int flags); void work_queue_schedule(struct work_queue *queue, int flags); +/* + * Report a periodic event (normally the periodic timer interrupt) on the + * current processor. + * + * Periodic events are used internally for optimizations. + * + * Interrupts and preemption must be disabled when calling this function. + */ +void work_report_periodic_event(void); + #endif /* _KERN_WORK_H */ |