diff options
author | Richard Braun <rbraun@sceen.net> | 2013-03-11 00:33:55 +0100 |
---|---|---|
committer | Richard Braun <rbraun@sceen.net> | 2013-03-11 00:33:55 +0100 |
commit | 98371badcb26e68fde275fa250162692204d6694 (patch) | |
tree | 3c16af8c89b4a1353aa56ad8754502596493c896 /kern/thread.c | |
parent | 59550d5c2fa1e9f5dcec20a4ae444afe923ca1c1 (diff) |
kern/thread: inter-processor load balancing
This change introduces per run queue balancer threads which regularly scan
remote run queues to balance threads among processors, using a variant of
the distributed weighted round-robin (DWRR) algorithm that directly uses
GR3 instead of WRR as the local scheduling algorithm. In addition, the
thread_wakeup function has been extended to dispatch threads on remote
processors too.
Note that real-time threads are not concerned by this modification.
Diffstat (limited to 'kern/thread.c')
-rw-r--r-- | kern/thread.c | 561 |
1 files changed, 503 insertions, 58 deletions
diff --git a/kern/thread.c b/kern/thread.c index 7a9a55b3..7eb49097 100644 --- a/kern/thread.c +++ b/kern/thread.c @@ -19,6 +19,7 @@ */ #include <kern/assert.h> +#include <kern/bitmap.h> #include <kern/error.h> #include <kern/init.h> #include <kern/kmem.h> @@ -35,6 +36,7 @@ #include <kern/thread.h> #include <machine/atomic.h> #include <machine/cpu.h> +#include <machine/mb.h> #include <machine/pmap.h> #include <machine/tcb.h> #include <vm/vm_map.h> @@ -45,6 +47,12 @@ #define THREAD_DEFAULT_RR_TIME_SLICE (HZ / 10) /* + * Maximum number of threads which can be pulled from a remote run queue + * while interrupts are disabled. + */ +#define THREAD_MAX_MIGRATIONS 16 + +/* * Run queue properties for real-time threads. */ struct thread_rt_runq { @@ -53,6 +61,13 @@ struct thread_rt_runq { }; /* + * When pulling threads from a run queue, this value is used to determine + * the total number of threads to pull by dividing the number of eligible + * threads with it. + */ +#define THREAD_TS_MIGRATION_RATIO 2 + +/* * Group of threads sharing the same weight. */ struct thread_ts_group { @@ -72,7 +87,9 @@ struct thread_ts_runq { unsigned long round; struct thread_ts_group group_array[THREAD_SCHED_TS_PRIO_MAX + 1]; struct list groups; + struct list threads; struct thread_ts_group *current; + unsigned int nr_threads; unsigned int weight; unsigned int work; }; @@ -83,11 +100,14 @@ struct thread_ts_runq { struct thread_runq { struct spinlock lock; struct thread *current; + unsigned int nr_threads; struct thread_rt_runq rt_runq; + unsigned int ts_weight; struct thread_ts_runq ts_runqs[2]; struct thread_ts_runq *ts_runq_active; struct thread_ts_runq *ts_runq_expired; struct thread *idler; + struct thread *balancer; } __aligned(CPU_L1_SIZE); /* @@ -95,6 +115,7 @@ struct thread_runq { */ struct thread_sched_ops { void (*init_thread)(struct thread *thread, unsigned short priority); + struct thread_runq * (*select_runq)(void); void (*add)(struct thread_runq *runq, struct thread *thread); void (*remove)(struct thread_runq *runq, struct thread *thread); void (*put_prev)(struct thread_runq *runq, struct thread *thread); @@ -134,6 +155,19 @@ static struct thread_attr thread_default_attr = { THREAD_SCHED_TS_PRIO_DEFAULT }; +BITMAP_DECLARE(thread_active_runqs, MAX_CPUS); + +/* + * System-wide value of the current highest round. + * + * There can be moderate bouncing on this word so give it its own cache line. + */ +static struct { + volatile unsigned long value __aligned(CPU_L1_SIZE); +} thread_ts_highest_round_struct; + +#define thread_ts_highest_round (thread_ts_highest_round_struct.value) + static void __init thread_runq_init_rt(struct thread_runq *runq) { @@ -166,6 +200,8 @@ thread_ts_runq_init(struct thread_ts_runq *ts_runq, unsigned long round) thread_ts_group_init(&ts_runq->group_array[i]); list_init(&ts_runq->groups); + list_init(&ts_runq->threads); + ts_runq->nr_threads = 0; ts_runq->weight = 0; ts_runq->work = 0; } @@ -173,6 +209,7 @@ thread_ts_runq_init(struct thread_ts_runq *ts_runq, unsigned long round) static void __init thread_runq_init_ts(struct thread_runq *runq) { + runq->ts_weight = 0; runq->ts_runq_active = &runq->ts_runqs[0]; runq->ts_runq_expired = &runq->ts_runqs[1]; thread_ts_runq_init(runq->ts_runq_active, 0); @@ -198,30 +235,72 @@ static void __init thread_runq_init(struct thread_runq *runq) { spinlock_init(&runq->lock); + runq->nr_threads = 0; thread_runq_init_rt(runq); thread_runq_init_ts(runq); thread_runq_init_idle(runq); runq->current = runq->idler; + runq->balancer = NULL; +} + +static inline int +thread_runq_id(struct thread_runq *runq) +{ + return runq - thread_runqs; +} + +static inline struct thread_runq * +thread_runq_local(void) +{ + assert(!thread_preempt_enabled() || thread_pinned()); + return &thread_runqs[cpu_id()]; +} + +static inline void +thread_set_flag(struct thread *thread, unsigned long flag) +{ + atomic_or(&thread->flags, flag); +} + +static inline void +thread_clear_flag(struct thread *thread, unsigned long flag) +{ + atomic_and(&thread->flags, ~flag); +} + +static inline int +thread_test_flag(struct thread *thread, unsigned long flag) +{ + barrier(); + return ((thread->flags & flag) != 0); } static void thread_runq_add(struct thread_runq *runq, struct thread *thread) { - assert(!cpu_intr_enabled()); spinlock_assert_locked(&runq->lock); thread_sched_ops[thread->sched_class].add(runq, thread); + if (runq->nr_threads == 0) + bitmap_set_atomic(thread_active_runqs, thread_runq_id(runq)); + + runq->nr_threads++; + if (thread->sched_class < runq->current->sched_class) - runq->current->flags |= THREAD_RESCHEDULE; + thread_set_flag(runq->current, THREAD_RESCHEDULE); } static void thread_runq_remove(struct thread_runq *runq, struct thread *thread) { - assert(!cpu_intr_enabled()); spinlock_assert_locked(&runq->lock); + runq->nr_threads--; + + if (runq->nr_threads == 0) + bitmap_clear_atomic(thread_active_runqs, thread_runq_id(runq)); + thread_sched_ops[thread->sched_class].remove(runq, thread); } @@ -256,11 +335,45 @@ thread_runq_get_next(struct thread_runq *runq) panic("thread: unable to find next thread"); } -static inline struct thread_runq * -thread_runq_local(void) +static void +thread_runq_wakeup(struct thread_runq *runq, struct thread *thread) { - assert(!thread_preempt_enabled() || thread_pinned()); - return &thread_runqs[cpu_id()]; + spinlock_assert_locked(&runq->lock); + assert(thread->on_rq); + + thread->state = THREAD_RUNNING; + thread_runq_add(runq, thread); + + if (runq != thread_runq_local()) { + /* + * Make the new state and flags globally visible so that a remote + * rescheduling operation sees the correct values. + * + * Although scheduling implies a load memory barrier before testing + * the state of a thread (because of the spin lock acquire semantics), + * this isn't the case with thread flags. They are set atomically, + * but not ordered. As a result, reenabling preemption may miss a + * rescheduling request. But interrupts imply full memory barriers + * so the request won't be missed when the rescheduling IPI is + * received by the remote processor. + */ + mb_store(); + + if (thread_test_flag(runq->current, THREAD_RESCHEDULE)) + tcb_send_reschedule(thread_runq_id(runq)); + } +} + +static void +thread_runq_double_lock(struct thread_runq *a, struct thread_runq *b) +{ + if (a <= b) { + spinlock_lock(&a->lock); + spinlock_lock(&b->lock); + } else { + spinlock_lock(&b->lock); + spinlock_lock(&a->lock); + } } static void @@ -271,6 +384,16 @@ thread_sched_rt_init_thread(struct thread *thread, unsigned short priority) thread->rt_ctx.time_slice = THREAD_DEFAULT_RR_TIME_SLICE; } +static struct thread_runq * +thread_sched_rt_select_runq(void) +{ + struct thread_runq *runq; + + runq = thread_runq_local(); + spinlock_lock(&runq->lock); + return runq; +} + static void thread_sched_rt_add(struct thread_runq *runq, struct thread *thread) { @@ -286,7 +409,7 @@ thread_sched_rt_add(struct thread_runq *runq, struct thread *thread) if ((thread->sched_class == runq->current->sched_class) && (thread->rt_ctx.priority > runq->current->rt_ctx.priority)) - runq->current->flags |= THREAD_RESCHEDULE; + thread_set_flag(runq->current, THREAD_RESCHEDULE); } static void @@ -344,7 +467,7 @@ thread_sched_rt_tick(struct thread_runq *runq, struct thread *thread) return; thread->rt_ctx.time_slice = THREAD_DEFAULT_RR_TIME_SLICE; - thread->flags |= THREAD_RESCHEDULE; + thread_set_flag(thread, THREAD_RESCHEDULE); } static void @@ -357,6 +480,51 @@ thread_sched_ts_init_thread(struct thread *thread, unsigned short priority) thread->ts_ctx.work = 0; } +static struct thread_runq * +thread_sched_ts_select_runq(void) +{ + struct thread_runq *runq; + unsigned long highest_round; + unsigned int lowest_weight; + int i, runq_id, nr_runqs; + + nr_runqs = cpu_count(); + i = bitmap_find_first_zero(thread_active_runqs, nr_runqs); + + if (i != -1) { + runq_id = i; + goto out; + } + + runq_id = 0; + highest_round = 0; + lowest_weight = (unsigned int)-1; + + for (i = 0; i < nr_runqs; i++) { + runq = &thread_runqs[i]; + + spinlock_lock(&runq->lock); + + /* The run queue may have become idle */ + if (runq->current == runq->idler) + return runq; + + if ((runq->ts_runq_active->round >= highest_round) + && (runq->ts_weight < lowest_weight)) { + highest_round = runq->ts_runq_active->round; + lowest_weight = runq->ts_weight; + runq_id = i; + } + + spinlock_unlock(&runq->lock); + } + +out: + runq = &thread_runqs[runq_id]; + spinlock_lock(&runq->lock); + return runq; +} + static unsigned int thread_sched_ts_enqueue_scale(unsigned int work, unsigned int old_weight, unsigned int new_weight) @@ -382,16 +550,7 @@ thread_sched_ts_enqueue(struct thread_ts_runq *ts_runq, struct thread *thread) group = &ts_runq->group_array[thread->ts_ctx.weight - 1]; group_weight = group->weight + thread->ts_ctx.weight; - - /* TODO Limit the maximum number of threads to prevent this situation */ - if (group_weight < group->weight) - panic("thread: weight overflow"); - total_weight = ts_runq->weight + thread->ts_ctx.weight; - - if (total_weight < ts_runq->weight) - panic("thread: weight overflow"); - node = (group->weight == 0) ? list_last(&ts_runq->groups) : list_prev(&group->node); @@ -443,11 +602,13 @@ thread_sched_ts_enqueue(struct thread_ts_runq *ts_runq, struct thread *thread) thread->ts_ctx.work = thread_work; } + ts_runq->nr_threads++; ts_runq->weight = total_weight; group->weight = group_weight; - /* Insert at the front to improve interactivity */ - list_insert(&group->threads, &thread->ts_ctx.node); + /* Insert at the front of the group to improve interactivity */ + list_insert(&group->threads, &thread->ts_ctx.group_node); + list_insert_tail(&ts_runq->threads, &thread->ts_ctx.runq_node); thread->ts_ctx.ts_runq = ts_runq; } @@ -463,16 +624,27 @@ thread_sched_ts_restart(struct thread_runq *runq) ts_runq->current = list_entry(node, struct thread_ts_group, node); if (runq->current->sched_class == THREAD_SCHED_CLASS_TS) - runq->current->flags |= THREAD_RESCHEDULE; + thread_set_flag(runq->current, THREAD_RESCHEDULE); } static void thread_sched_ts_add(struct thread_runq *runq, struct thread *thread) { - struct thread_ts_runq *ts_runq; + unsigned int total_weight; - ts_runq = runq->ts_runq_active; - thread_sched_ts_enqueue(ts_runq, thread); + if (runq->ts_weight == 0) { + runq->ts_runq_active->round = thread_ts_highest_round; + runq->ts_runq_expired->round = runq->ts_runq_active->round + 1; + } + + total_weight = runq->ts_weight + thread->ts_ctx.weight; + + /* TODO Limit the maximum number of threads to prevent this situation */ + if (total_weight < runq->ts_weight) + panic("thread: weight overflow"); + + runq->ts_weight = total_weight; + thread_sched_ts_enqueue(runq->ts_runq_active, thread); thread_sched_ts_restart(runq); } @@ -489,11 +661,13 @@ thread_sched_ts_dequeue(struct thread *thread) group = &ts_runq->group_array[thread->ts_ctx.weight - 1]; thread->ts_ctx.ts_runq = NULL; - list_remove(&thread->ts_ctx.node); + list_remove(&thread->ts_ctx.runq_node); + list_remove(&thread->ts_ctx.group_node); ts_runq->work -= thread->ts_ctx.work; group->work -= thread->ts_ctx.work; ts_runq->weight -= thread->ts_ctx.weight; group->weight -= thread->ts_ctx.weight; + ts_runq->nr_threads--; if (group->weight == 0) list_remove(&group->node); @@ -518,17 +692,16 @@ thread_sched_ts_dequeue(struct thread *thread) } static void -thread_sched_ts_start_next_round(struct thread_runq *runq) +thread_sched_ts_wakeup_balancer(struct thread_runq *runq) { - struct thread_ts_runq *tmp; + unsigned long on_rq; - tmp = runq->ts_runq_expired; - runq->ts_runq_expired = runq->ts_runq_active; - runq->ts_runq_active = tmp; - runq->ts_runq_expired->round = runq->ts_runq_active->round + 1; + on_rq = atomic_cas(&runq->balancer->on_rq, 0, 1); - if (runq->ts_runq_active->weight != 0) - thread_sched_ts_restart(runq); + if (on_rq) + return; + + thread_runq_wakeup(runq, runq->balancer); } static void @@ -536,12 +709,13 @@ thread_sched_ts_remove(struct thread_runq *runq, struct thread *thread) { struct thread_ts_runq *ts_runq; + runq->ts_weight -= thread->ts_ctx.weight; ts_runq = thread->ts_ctx.ts_runq; thread_sched_ts_dequeue(thread); if (ts_runq == runq->ts_runq_active) { - if (ts_runq->weight == 0) - thread_sched_ts_start_next_round(runq); + if (ts_runq->nr_threads == 0) + thread_sched_ts_wakeup_balancer(runq); else thread_sched_ts_restart(runq); } @@ -557,8 +731,8 @@ thread_sched_ts_deactivate(struct thread_runq *runq, struct thread *thread) thread->ts_ctx.work -= thread->ts_ctx.weight; thread_sched_ts_enqueue(runq->ts_runq_expired, thread); - if (runq->ts_runq_active->weight == 0) - thread_sched_ts_start_next_round(runq); + if (runq->ts_runq_active->nr_threads == 0) + thread_sched_ts_wakeup_balancer(runq); } static void @@ -569,7 +743,7 @@ thread_sched_ts_put_prev(struct thread_runq *runq, struct thread *thread) ts_runq = runq->ts_runq_active; group = &ts_runq->group_array[thread->ts_ctx.weight - 1]; - list_insert_tail(&group->threads, &thread->ts_ctx.node); + list_insert_tail(&group->threads, &thread->ts_ctx.group_node); if (thread->ts_ctx.work >= thread->ts_ctx.weight) thread_sched_ts_deactivate(runq, thread); @@ -606,7 +780,7 @@ thread_sched_ts_get_next(struct thread_runq *runq) ts_runq = runq->ts_runq_active; - if (ts_runq->weight == 0) + if (ts_runq->nr_threads == 0) return NULL; group = ts_runq->current; @@ -627,8 +801,9 @@ thread_sched_ts_get_next(struct thread_runq *runq) } ts_runq->current = group; - thread = list_first_entry(&group->threads, struct thread, ts_ctx.node); - list_remove(&thread->ts_ctx.node); + node = list_first(&group->threads); + thread = list_entry(node, struct thread, ts_ctx.group_node); + list_remove(node); return thread; } @@ -642,17 +817,240 @@ thread_sched_ts_tick(struct thread_runq *runq, struct thread *thread) ts_runq->work++; group = &ts_runq->group_array[thread->ts_ctx.weight - 1]; group->work++; - thread->flags |= THREAD_RESCHEDULE; + thread_set_flag(thread, THREAD_RESCHEDULE); thread->ts_ctx.work++; } static void +thread_sched_ts_start_next_round(struct thread_runq *runq) +{ + struct thread_ts_runq *tmp; + + tmp = runq->ts_runq_expired; + runq->ts_runq_expired = runq->ts_runq_active; + runq->ts_runq_active = tmp; + runq->ts_runq_expired->round = runq->ts_runq_active->round + 1; + + if (runq->ts_runq_active->nr_threads != 0) { + /* TODO Handle round overflows */ + if (runq->ts_runq_active->round > thread_ts_highest_round) + thread_ts_highest_round = runq->ts_runq_active->round; + + thread_sched_ts_restart(runq); + } +} + +static int +thread_sched_ts_balance_eligible(struct thread_runq *runq) +{ + unsigned long round, highest; + unsigned int nr_threads; + + if (runq->current == runq->idler) + return 0; + + round = runq->ts_runq_active->round; + highest = thread_ts_highest_round; + + if ((round != highest) && (round != (highest - 1))) + return 0; + + nr_threads = runq->ts_runq_active->nr_threads; + + if (round != highest) + nr_threads += runq->ts_runq_expired->nr_threads; + + if ((nr_threads == 0) + || ((nr_threads == 1) + && (runq->current->sched_class == THREAD_SCHED_CLASS_TS))) + return 0; + + return 1; +} + +/* + * Try to find the most suitable run queue from which to pull threads. + */ +static struct thread_runq * +thread_sched_ts_balance_scan(struct thread_runq *runq) +{ + struct thread_runq *remote_runq; + unsigned int highest_weight; + int i, runq_id, nr_runqs, eligible; + + runq_id = -1; + nr_runqs = cpu_count(); + highest_weight = 0; + + bitmap_for_each(thread_active_runqs, nr_runqs, i) { + remote_runq = &thread_runqs[i]; + + if (remote_runq == runq) + continue; + + spinlock_lock(&thread_runqs[i].lock); + + eligible = thread_sched_ts_balance_eligible(&thread_runqs[i]); + + if (!eligible) { + spinlock_unlock(&thread_runqs[i].lock); + continue; + } + + if (remote_runq->ts_weight > highest_weight) { + highest_weight = remote_runq->ts_weight; + runq_id = i; + } + + spinlock_unlock(&thread_runqs[i].lock); + } + + if (runq_id == -1) + return NULL; + + return &thread_runqs[runq_id]; +} + +static unsigned int +thread_sched_ts_balance_migrate(struct thread_runq *runq, + struct thread_runq *remote_runq) +{ + struct thread *thread, *tmp; + unsigned long flags; + unsigned int i, nr_threads; + int not_highest; + + thread_preempt_disable(); + flags = cpu_intr_save(); + thread_runq_double_lock(runq, remote_runq); + + if (!thread_sched_ts_balance_eligible(remote_runq)) { + i = 0; + goto out; + } + + nr_threads = remote_runq->ts_runq_active->nr_threads; + + if (remote_runq->ts_runq_active->round == thread_ts_highest_round) + not_highest = 0; + else { + not_highest = 1; + nr_threads += remote_runq->ts_runq_expired->nr_threads; + } + + i = 0; + nr_threads = nr_threads / THREAD_TS_MIGRATION_RATIO; + + if (nr_threads == 0) + nr_threads = 1; + else if (nr_threads > THREAD_MAX_MIGRATIONS) + nr_threads = THREAD_MAX_MIGRATIONS; + + list_for_each_entry_safe(&remote_runq->ts_runq_active->threads, + thread, tmp, ts_ctx.runq_node) { + if (thread == remote_runq->current) + continue; + + thread_runq_remove(remote_runq, thread); + thread->ts_ctx.round = runq->ts_runq_active->round; + thread_runq_add(runq, thread); + i++; + + if (i == nr_threads) + goto out; + } + + if (not_highest) + list_for_each_entry_safe(&remote_runq->ts_runq_expired->threads, + thread, tmp, ts_ctx.runq_node) { + thread_runq_remove(remote_runq, thread); + thread->ts_ctx.round = runq->ts_runq_active->round; + thread_runq_add(runq, thread); + i++; + + if (i == nr_threads) + goto out; + } + +out: + spinlock_unlock(&runq->lock); + spinlock_unlock(&remote_runq->lock); + cpu_intr_restore(flags); + thread_preempt_enable(); + return i; +} + +static void +thread_sched_ts_balance(struct thread_runq *runq) +{ + struct thread_runq *remote_runq; + unsigned long flags; + unsigned int nr_migrations; + int i, nr_runqs; + + /* + * These values can't change while the balancer thread is running, so + * don't bother locking. + */ + if ((runq->ts_runq_active->round == thread_ts_highest_round) + || (runq->ts_runq_expired->nr_threads == 0)) + remote_runq = thread_sched_ts_balance_scan(runq); + else + remote_runq = NULL; + + if (remote_runq == NULL) + goto no_migration; + + nr_migrations = thread_sched_ts_balance_migrate(runq, remote_runq); + + if (nr_migrations != 0) + return; + + /* + * If no thread could be pulled from the remote run queue, it means + * its state has changed since the scan, and the new state has made + * the run queue ineligible. Make another, simpler scan, and stop as + * soon as some threads could be migrated successfully. + */ + for (i = 0, nr_runqs = cpu_count(); i < nr_runqs; i++) { + remote_runq = &thread_runqs[i]; + + if (remote_runq == runq) + continue; + + nr_migrations = thread_sched_ts_balance_migrate(runq, remote_runq); + + if (nr_migrations != 0) + return; + } + +no_migration: + spinlock_lock_intr_save(&runq->lock, &flags); + + /* + * No thread could be migrated. Check the active run queue, as another + * processor might have added threads while the balancer was running. + * If the run queue is still empty, switch to the next round. + */ + if (runq->ts_runq_active->nr_threads == 0) + thread_sched_ts_start_next_round(runq); + + spinlock_unlock_intr_restore(&runq->lock, flags); +} + +static void thread_sched_idle_init_thread(struct thread *thread, unsigned short priority) { (void)thread; (void)priority; } +static struct thread_runq * +thread_sched_idle_select_runq(void) +{ + panic("thread: idler threads cannot be awaken"); +} + static void __noreturn thread_sched_idle_panic(void) { @@ -726,6 +1124,7 @@ thread_setup(void) ops = &thread_sched_ops[THREAD_SCHED_CLASS_RT]; ops->init_thread = thread_sched_rt_init_thread; + ops->select_runq = thread_sched_rt_select_runq; ops->add = thread_sched_rt_add; ops->remove = thread_sched_rt_remove; ops->put_prev = thread_sched_rt_put_prev; @@ -734,6 +1133,7 @@ thread_setup(void) ops = &thread_sched_ops[THREAD_SCHED_CLASS_TS]; ops->init_thread = thread_sched_ts_init_thread; + ops->select_runq = thread_sched_ts_select_runq; ops->add = thread_sched_ts_add; ops->remove = thread_sched_ts_remove; ops->put_prev = thread_sched_ts_put_prev; @@ -742,12 +1142,15 @@ thread_setup(void) ops = &thread_sched_ops[THREAD_SCHED_CLASS_IDLE]; ops->init_thread = thread_sched_idle_init_thread; + ops->select_runq = thread_sched_idle_select_runq; ops->add = thread_sched_idle_add; ops->remove = thread_sched_idle_remove; ops->put_prev = thread_sched_idle_put_prev; ops->get_next = thread_sched_idle_get_next; ops->tick = thread_sched_idle_tick; + bitmap_zero(thread_active_runqs, MAX_CPUS); + kmem_cache_init(&thread_cache, "thread", sizeof(struct thread), CPU_L1_SIZE, NULL, NULL, NULL, 0); kmem_cache_init(&thread_stack_cache, "thread_stack", STACK_SIZE, @@ -854,9 +1257,6 @@ thread_create(struct thread **threadp, const struct thread_attr *attr, } thread_init(thread, stack, attr, fn, arg); - - /* TODO Multiprocessor thread dispatching */ - thread->cpu = cpu_id(); thread_wakeup(thread); *threadp = thread; @@ -881,24 +1281,66 @@ thread_wakeup(struct thread *thread) struct thread_runq *runq; unsigned long on_rq, flags; - /* TODO Multiprocessor thread dispatching */ - assert(thread->cpu == cpu_id()); - on_rq = atomic_cas(&thread->on_rq, 0, 1); if (on_rq) return; - thread->state = THREAD_RUNNING; + /* + * Disable preemption and interrupts to avoid a deadlock in case the local + * run queue is selected. + */ + thread_preempt_disable(); + flags = cpu_intr_save(); + + /* The returned run queue is locked */ + runq = thread_sched_ops[thread->sched_class].select_runq(); + thread_runq_wakeup(runq, thread); + spinlock_unlock(&runq->lock); + cpu_intr_restore(flags); + thread_preempt_enable(); +} + +static void +thread_balancer(void *arg) +{ + struct thread_runq *runq; + + runq = arg; + + for (;;) { + thread_sleep(); + thread_sched_ts_balance(runq); + } +} + +static void __init +thread_setup_balancer(void) +{ + char name[THREAD_NAME_SIZE]; + struct thread_runq *runq; + struct thread_attr attr; + struct thread *balancer; + int error; - thread_pin(); runq = thread_runq_local(); - spinlock_lock_intr_save(&runq->lock, &flags); - thread_runq_add(runq, thread); - spinlock_unlock_intr_restore(&runq->lock, flags); - thread_unpin(); - thread_reschedule(); + /* + * Real-time threads are currently dispatched on the caller's run queue. + * + * TODO CPU affinity + */ + snprintf(name, sizeof(name), "thread_balancer/%u", cpu_id()); + attr.task = kernel_task; + attr.name = name; + attr.sched_policy = THREAD_SCHED_CLASS_RT; + attr.priority = THREAD_SCHED_RT_PRIO_MIN; + error = thread_create(&balancer, &attr, thread_balancer, runq); + + if (error) + panic("thread: unable to create balancer thread"); + + runq->balancer = balancer; } static void @@ -950,6 +1392,8 @@ thread_run(void) assert(cpu_intr_enabled()); + thread_setup_balancer(); + /* This call disables interrupts */ thread_setup_idler(); @@ -994,7 +1438,7 @@ thread_schedule(void) runq = thread_runq_local(); spinlock_lock_intr_save(&runq->lock, &flags); - prev->flags &= ~THREAD_RESCHEDULE; + thread_clear_flag(prev, THREAD_RESCHEDULE); thread_runq_put_prev(runq, prev); if (prev->state != THREAD_RUNNING) { @@ -1020,13 +1464,14 @@ thread_schedule(void) spinlock_unlock_intr_restore(&runq->lock, flags); thread_preempt_enable_no_resched(); - } while (prev->flags & THREAD_RESCHEDULE); + } while (thread_test_flag(prev, THREAD_RESCHEDULE)); } void thread_reschedule(void) { - if ((thread_self()->flags & THREAD_RESCHEDULE) && thread_preempt_enabled()) + if (thread_test_flag(thread_self(), THREAD_RESCHEDULE) + && thread_preempt_enabled()) thread_schedule(); } |