summaryrefslogtreecommitdiff
path: root/kern/thread.c
diff options
context:
space:
mode:
Diffstat (limited to 'kern/thread.c')
-rw-r--r--kern/thread.c561
1 files changed, 503 insertions, 58 deletions
diff --git a/kern/thread.c b/kern/thread.c
index 7a9a55b3..7eb49097 100644
--- a/kern/thread.c
+++ b/kern/thread.c
@@ -19,6 +19,7 @@
*/
#include <kern/assert.h>
+#include <kern/bitmap.h>
#include <kern/error.h>
#include <kern/init.h>
#include <kern/kmem.h>
@@ -35,6 +36,7 @@
#include <kern/thread.h>
#include <machine/atomic.h>
#include <machine/cpu.h>
+#include <machine/mb.h>
#include <machine/pmap.h>
#include <machine/tcb.h>
#include <vm/vm_map.h>
@@ -45,6 +47,12 @@
#define THREAD_DEFAULT_RR_TIME_SLICE (HZ / 10)
/*
+ * Maximum number of threads which can be pulled from a remote run queue
+ * while interrupts are disabled.
+ */
+#define THREAD_MAX_MIGRATIONS 16
+
+/*
* Run queue properties for real-time threads.
*/
struct thread_rt_runq {
@@ -53,6 +61,13 @@ struct thread_rt_runq {
};
/*
+ * When pulling threads from a run queue, this value is used to determine
+ * the total number of threads to pull by dividing the number of eligible
+ * threads with it.
+ */
+#define THREAD_TS_MIGRATION_RATIO 2
+
+/*
* Group of threads sharing the same weight.
*/
struct thread_ts_group {
@@ -72,7 +87,9 @@ struct thread_ts_runq {
unsigned long round;
struct thread_ts_group group_array[THREAD_SCHED_TS_PRIO_MAX + 1];
struct list groups;
+ struct list threads;
struct thread_ts_group *current;
+ unsigned int nr_threads;
unsigned int weight;
unsigned int work;
};
@@ -83,11 +100,14 @@ struct thread_ts_runq {
struct thread_runq {
struct spinlock lock;
struct thread *current;
+ unsigned int nr_threads;
struct thread_rt_runq rt_runq;
+ unsigned int ts_weight;
struct thread_ts_runq ts_runqs[2];
struct thread_ts_runq *ts_runq_active;
struct thread_ts_runq *ts_runq_expired;
struct thread *idler;
+ struct thread *balancer;
} __aligned(CPU_L1_SIZE);
/*
@@ -95,6 +115,7 @@ struct thread_runq {
*/
struct thread_sched_ops {
void (*init_thread)(struct thread *thread, unsigned short priority);
+ struct thread_runq * (*select_runq)(void);
void (*add)(struct thread_runq *runq, struct thread *thread);
void (*remove)(struct thread_runq *runq, struct thread *thread);
void (*put_prev)(struct thread_runq *runq, struct thread *thread);
@@ -134,6 +155,19 @@ static struct thread_attr thread_default_attr = {
THREAD_SCHED_TS_PRIO_DEFAULT
};
+BITMAP_DECLARE(thread_active_runqs, MAX_CPUS);
+
+/*
+ * System-wide value of the current highest round.
+ *
+ * There can be moderate bouncing on this word so give it its own cache line.
+ */
+static struct {
+ volatile unsigned long value __aligned(CPU_L1_SIZE);
+} thread_ts_highest_round_struct;
+
+#define thread_ts_highest_round (thread_ts_highest_round_struct.value)
+
static void __init
thread_runq_init_rt(struct thread_runq *runq)
{
@@ -166,6 +200,8 @@ thread_ts_runq_init(struct thread_ts_runq *ts_runq, unsigned long round)
thread_ts_group_init(&ts_runq->group_array[i]);
list_init(&ts_runq->groups);
+ list_init(&ts_runq->threads);
+ ts_runq->nr_threads = 0;
ts_runq->weight = 0;
ts_runq->work = 0;
}
@@ -173,6 +209,7 @@ thread_ts_runq_init(struct thread_ts_runq *ts_runq, unsigned long round)
static void __init
thread_runq_init_ts(struct thread_runq *runq)
{
+ runq->ts_weight = 0;
runq->ts_runq_active = &runq->ts_runqs[0];
runq->ts_runq_expired = &runq->ts_runqs[1];
thread_ts_runq_init(runq->ts_runq_active, 0);
@@ -198,30 +235,72 @@ static void __init
thread_runq_init(struct thread_runq *runq)
{
spinlock_init(&runq->lock);
+ runq->nr_threads = 0;
thread_runq_init_rt(runq);
thread_runq_init_ts(runq);
thread_runq_init_idle(runq);
runq->current = runq->idler;
+ runq->balancer = NULL;
+}
+
+static inline int
+thread_runq_id(struct thread_runq *runq)
+{
+ return runq - thread_runqs;
+}
+
+static inline struct thread_runq *
+thread_runq_local(void)
+{
+ assert(!thread_preempt_enabled() || thread_pinned());
+ return &thread_runqs[cpu_id()];
+}
+
+static inline void
+thread_set_flag(struct thread *thread, unsigned long flag)
+{
+ atomic_or(&thread->flags, flag);
+}
+
+static inline void
+thread_clear_flag(struct thread *thread, unsigned long flag)
+{
+ atomic_and(&thread->flags, ~flag);
+}
+
+static inline int
+thread_test_flag(struct thread *thread, unsigned long flag)
+{
+ barrier();
+ return ((thread->flags & flag) != 0);
}
static void
thread_runq_add(struct thread_runq *runq, struct thread *thread)
{
- assert(!cpu_intr_enabled());
spinlock_assert_locked(&runq->lock);
thread_sched_ops[thread->sched_class].add(runq, thread);
+ if (runq->nr_threads == 0)
+ bitmap_set_atomic(thread_active_runqs, thread_runq_id(runq));
+
+ runq->nr_threads++;
+
if (thread->sched_class < runq->current->sched_class)
- runq->current->flags |= THREAD_RESCHEDULE;
+ thread_set_flag(runq->current, THREAD_RESCHEDULE);
}
static void
thread_runq_remove(struct thread_runq *runq, struct thread *thread)
{
- assert(!cpu_intr_enabled());
spinlock_assert_locked(&runq->lock);
+ runq->nr_threads--;
+
+ if (runq->nr_threads == 0)
+ bitmap_clear_atomic(thread_active_runqs, thread_runq_id(runq));
+
thread_sched_ops[thread->sched_class].remove(runq, thread);
}
@@ -256,11 +335,45 @@ thread_runq_get_next(struct thread_runq *runq)
panic("thread: unable to find next thread");
}
-static inline struct thread_runq *
-thread_runq_local(void)
+static void
+thread_runq_wakeup(struct thread_runq *runq, struct thread *thread)
{
- assert(!thread_preempt_enabled() || thread_pinned());
- return &thread_runqs[cpu_id()];
+ spinlock_assert_locked(&runq->lock);
+ assert(thread->on_rq);
+
+ thread->state = THREAD_RUNNING;
+ thread_runq_add(runq, thread);
+
+ if (runq != thread_runq_local()) {
+ /*
+ * Make the new state and flags globally visible so that a remote
+ * rescheduling operation sees the correct values.
+ *
+ * Although scheduling implies a load memory barrier before testing
+ * the state of a thread (because of the spin lock acquire semantics),
+ * this isn't the case with thread flags. They are set atomically,
+ * but not ordered. As a result, reenabling preemption may miss a
+ * rescheduling request. But interrupts imply full memory barriers
+ * so the request won't be missed when the rescheduling IPI is
+ * received by the remote processor.
+ */
+ mb_store();
+
+ if (thread_test_flag(runq->current, THREAD_RESCHEDULE))
+ tcb_send_reschedule(thread_runq_id(runq));
+ }
+}
+
+static void
+thread_runq_double_lock(struct thread_runq *a, struct thread_runq *b)
+{
+ if (a <= b) {
+ spinlock_lock(&a->lock);
+ spinlock_lock(&b->lock);
+ } else {
+ spinlock_lock(&b->lock);
+ spinlock_lock(&a->lock);
+ }
}
static void
@@ -271,6 +384,16 @@ thread_sched_rt_init_thread(struct thread *thread, unsigned short priority)
thread->rt_ctx.time_slice = THREAD_DEFAULT_RR_TIME_SLICE;
}
+static struct thread_runq *
+thread_sched_rt_select_runq(void)
+{
+ struct thread_runq *runq;
+
+ runq = thread_runq_local();
+ spinlock_lock(&runq->lock);
+ return runq;
+}
+
static void
thread_sched_rt_add(struct thread_runq *runq, struct thread *thread)
{
@@ -286,7 +409,7 @@ thread_sched_rt_add(struct thread_runq *runq, struct thread *thread)
if ((thread->sched_class == runq->current->sched_class)
&& (thread->rt_ctx.priority > runq->current->rt_ctx.priority))
- runq->current->flags |= THREAD_RESCHEDULE;
+ thread_set_flag(runq->current, THREAD_RESCHEDULE);
}
static void
@@ -344,7 +467,7 @@ thread_sched_rt_tick(struct thread_runq *runq, struct thread *thread)
return;
thread->rt_ctx.time_slice = THREAD_DEFAULT_RR_TIME_SLICE;
- thread->flags |= THREAD_RESCHEDULE;
+ thread_set_flag(thread, THREAD_RESCHEDULE);
}
static void
@@ -357,6 +480,51 @@ thread_sched_ts_init_thread(struct thread *thread, unsigned short priority)
thread->ts_ctx.work = 0;
}
+static struct thread_runq *
+thread_sched_ts_select_runq(void)
+{
+ struct thread_runq *runq;
+ unsigned long highest_round;
+ unsigned int lowest_weight;
+ int i, runq_id, nr_runqs;
+
+ nr_runqs = cpu_count();
+ i = bitmap_find_first_zero(thread_active_runqs, nr_runqs);
+
+ if (i != -1) {
+ runq_id = i;
+ goto out;
+ }
+
+ runq_id = 0;
+ highest_round = 0;
+ lowest_weight = (unsigned int)-1;
+
+ for (i = 0; i < nr_runqs; i++) {
+ runq = &thread_runqs[i];
+
+ spinlock_lock(&runq->lock);
+
+ /* The run queue may have become idle */
+ if (runq->current == runq->idler)
+ return runq;
+
+ if ((runq->ts_runq_active->round >= highest_round)
+ && (runq->ts_weight < lowest_weight)) {
+ highest_round = runq->ts_runq_active->round;
+ lowest_weight = runq->ts_weight;
+ runq_id = i;
+ }
+
+ spinlock_unlock(&runq->lock);
+ }
+
+out:
+ runq = &thread_runqs[runq_id];
+ spinlock_lock(&runq->lock);
+ return runq;
+}
+
static unsigned int
thread_sched_ts_enqueue_scale(unsigned int work, unsigned int old_weight,
unsigned int new_weight)
@@ -382,16 +550,7 @@ thread_sched_ts_enqueue(struct thread_ts_runq *ts_runq, struct thread *thread)
group = &ts_runq->group_array[thread->ts_ctx.weight - 1];
group_weight = group->weight + thread->ts_ctx.weight;
-
- /* TODO Limit the maximum number of threads to prevent this situation */
- if (group_weight < group->weight)
- panic("thread: weight overflow");
-
total_weight = ts_runq->weight + thread->ts_ctx.weight;
-
- if (total_weight < ts_runq->weight)
- panic("thread: weight overflow");
-
node = (group->weight == 0)
? list_last(&ts_runq->groups)
: list_prev(&group->node);
@@ -443,11 +602,13 @@ thread_sched_ts_enqueue(struct thread_ts_runq *ts_runq, struct thread *thread)
thread->ts_ctx.work = thread_work;
}
+ ts_runq->nr_threads++;
ts_runq->weight = total_weight;
group->weight = group_weight;
- /* Insert at the front to improve interactivity */
- list_insert(&group->threads, &thread->ts_ctx.node);
+ /* Insert at the front of the group to improve interactivity */
+ list_insert(&group->threads, &thread->ts_ctx.group_node);
+ list_insert_tail(&ts_runq->threads, &thread->ts_ctx.runq_node);
thread->ts_ctx.ts_runq = ts_runq;
}
@@ -463,16 +624,27 @@ thread_sched_ts_restart(struct thread_runq *runq)
ts_runq->current = list_entry(node, struct thread_ts_group, node);
if (runq->current->sched_class == THREAD_SCHED_CLASS_TS)
- runq->current->flags |= THREAD_RESCHEDULE;
+ thread_set_flag(runq->current, THREAD_RESCHEDULE);
}
static void
thread_sched_ts_add(struct thread_runq *runq, struct thread *thread)
{
- struct thread_ts_runq *ts_runq;
+ unsigned int total_weight;
- ts_runq = runq->ts_runq_active;
- thread_sched_ts_enqueue(ts_runq, thread);
+ if (runq->ts_weight == 0) {
+ runq->ts_runq_active->round = thread_ts_highest_round;
+ runq->ts_runq_expired->round = runq->ts_runq_active->round + 1;
+ }
+
+ total_weight = runq->ts_weight + thread->ts_ctx.weight;
+
+ /* TODO Limit the maximum number of threads to prevent this situation */
+ if (total_weight < runq->ts_weight)
+ panic("thread: weight overflow");
+
+ runq->ts_weight = total_weight;
+ thread_sched_ts_enqueue(runq->ts_runq_active, thread);
thread_sched_ts_restart(runq);
}
@@ -489,11 +661,13 @@ thread_sched_ts_dequeue(struct thread *thread)
group = &ts_runq->group_array[thread->ts_ctx.weight - 1];
thread->ts_ctx.ts_runq = NULL;
- list_remove(&thread->ts_ctx.node);
+ list_remove(&thread->ts_ctx.runq_node);
+ list_remove(&thread->ts_ctx.group_node);
ts_runq->work -= thread->ts_ctx.work;
group->work -= thread->ts_ctx.work;
ts_runq->weight -= thread->ts_ctx.weight;
group->weight -= thread->ts_ctx.weight;
+ ts_runq->nr_threads--;
if (group->weight == 0)
list_remove(&group->node);
@@ -518,17 +692,16 @@ thread_sched_ts_dequeue(struct thread *thread)
}
static void
-thread_sched_ts_start_next_round(struct thread_runq *runq)
+thread_sched_ts_wakeup_balancer(struct thread_runq *runq)
{
- struct thread_ts_runq *tmp;
+ unsigned long on_rq;
- tmp = runq->ts_runq_expired;
- runq->ts_runq_expired = runq->ts_runq_active;
- runq->ts_runq_active = tmp;
- runq->ts_runq_expired->round = runq->ts_runq_active->round + 1;
+ on_rq = atomic_cas(&runq->balancer->on_rq, 0, 1);
- if (runq->ts_runq_active->weight != 0)
- thread_sched_ts_restart(runq);
+ if (on_rq)
+ return;
+
+ thread_runq_wakeup(runq, runq->balancer);
}
static void
@@ -536,12 +709,13 @@ thread_sched_ts_remove(struct thread_runq *runq, struct thread *thread)
{
struct thread_ts_runq *ts_runq;
+ runq->ts_weight -= thread->ts_ctx.weight;
ts_runq = thread->ts_ctx.ts_runq;
thread_sched_ts_dequeue(thread);
if (ts_runq == runq->ts_runq_active) {
- if (ts_runq->weight == 0)
- thread_sched_ts_start_next_round(runq);
+ if (ts_runq->nr_threads == 0)
+ thread_sched_ts_wakeup_balancer(runq);
else
thread_sched_ts_restart(runq);
}
@@ -557,8 +731,8 @@ thread_sched_ts_deactivate(struct thread_runq *runq, struct thread *thread)
thread->ts_ctx.work -= thread->ts_ctx.weight;
thread_sched_ts_enqueue(runq->ts_runq_expired, thread);
- if (runq->ts_runq_active->weight == 0)
- thread_sched_ts_start_next_round(runq);
+ if (runq->ts_runq_active->nr_threads == 0)
+ thread_sched_ts_wakeup_balancer(runq);
}
static void
@@ -569,7 +743,7 @@ thread_sched_ts_put_prev(struct thread_runq *runq, struct thread *thread)
ts_runq = runq->ts_runq_active;
group = &ts_runq->group_array[thread->ts_ctx.weight - 1];
- list_insert_tail(&group->threads, &thread->ts_ctx.node);
+ list_insert_tail(&group->threads, &thread->ts_ctx.group_node);
if (thread->ts_ctx.work >= thread->ts_ctx.weight)
thread_sched_ts_deactivate(runq, thread);
@@ -606,7 +780,7 @@ thread_sched_ts_get_next(struct thread_runq *runq)
ts_runq = runq->ts_runq_active;
- if (ts_runq->weight == 0)
+ if (ts_runq->nr_threads == 0)
return NULL;
group = ts_runq->current;
@@ -627,8 +801,9 @@ thread_sched_ts_get_next(struct thread_runq *runq)
}
ts_runq->current = group;
- thread = list_first_entry(&group->threads, struct thread, ts_ctx.node);
- list_remove(&thread->ts_ctx.node);
+ node = list_first(&group->threads);
+ thread = list_entry(node, struct thread, ts_ctx.group_node);
+ list_remove(node);
return thread;
}
@@ -642,17 +817,240 @@ thread_sched_ts_tick(struct thread_runq *runq, struct thread *thread)
ts_runq->work++;
group = &ts_runq->group_array[thread->ts_ctx.weight - 1];
group->work++;
- thread->flags |= THREAD_RESCHEDULE;
+ thread_set_flag(thread, THREAD_RESCHEDULE);
thread->ts_ctx.work++;
}
static void
+thread_sched_ts_start_next_round(struct thread_runq *runq)
+{
+ struct thread_ts_runq *tmp;
+
+ tmp = runq->ts_runq_expired;
+ runq->ts_runq_expired = runq->ts_runq_active;
+ runq->ts_runq_active = tmp;
+ runq->ts_runq_expired->round = runq->ts_runq_active->round + 1;
+
+ if (runq->ts_runq_active->nr_threads != 0) {
+ /* TODO Handle round overflows */
+ if (runq->ts_runq_active->round > thread_ts_highest_round)
+ thread_ts_highest_round = runq->ts_runq_active->round;
+
+ thread_sched_ts_restart(runq);
+ }
+}
+
+static int
+thread_sched_ts_balance_eligible(struct thread_runq *runq)
+{
+ unsigned long round, highest;
+ unsigned int nr_threads;
+
+ if (runq->current == runq->idler)
+ return 0;
+
+ round = runq->ts_runq_active->round;
+ highest = thread_ts_highest_round;
+
+ if ((round != highest) && (round != (highest - 1)))
+ return 0;
+
+ nr_threads = runq->ts_runq_active->nr_threads;
+
+ if (round != highest)
+ nr_threads += runq->ts_runq_expired->nr_threads;
+
+ if ((nr_threads == 0)
+ || ((nr_threads == 1)
+ && (runq->current->sched_class == THREAD_SCHED_CLASS_TS)))
+ return 0;
+
+ return 1;
+}
+
+/*
+ * Try to find the most suitable run queue from which to pull threads.
+ */
+static struct thread_runq *
+thread_sched_ts_balance_scan(struct thread_runq *runq)
+{
+ struct thread_runq *remote_runq;
+ unsigned int highest_weight;
+ int i, runq_id, nr_runqs, eligible;
+
+ runq_id = -1;
+ nr_runqs = cpu_count();
+ highest_weight = 0;
+
+ bitmap_for_each(thread_active_runqs, nr_runqs, i) {
+ remote_runq = &thread_runqs[i];
+
+ if (remote_runq == runq)
+ continue;
+
+ spinlock_lock(&thread_runqs[i].lock);
+
+ eligible = thread_sched_ts_balance_eligible(&thread_runqs[i]);
+
+ if (!eligible) {
+ spinlock_unlock(&thread_runqs[i].lock);
+ continue;
+ }
+
+ if (remote_runq->ts_weight > highest_weight) {
+ highest_weight = remote_runq->ts_weight;
+ runq_id = i;
+ }
+
+ spinlock_unlock(&thread_runqs[i].lock);
+ }
+
+ if (runq_id == -1)
+ return NULL;
+
+ return &thread_runqs[runq_id];
+}
+
+static unsigned int
+thread_sched_ts_balance_migrate(struct thread_runq *runq,
+ struct thread_runq *remote_runq)
+{
+ struct thread *thread, *tmp;
+ unsigned long flags;
+ unsigned int i, nr_threads;
+ int not_highest;
+
+ thread_preempt_disable();
+ flags = cpu_intr_save();
+ thread_runq_double_lock(runq, remote_runq);
+
+ if (!thread_sched_ts_balance_eligible(remote_runq)) {
+ i = 0;
+ goto out;
+ }
+
+ nr_threads = remote_runq->ts_runq_active->nr_threads;
+
+ if (remote_runq->ts_runq_active->round == thread_ts_highest_round)
+ not_highest = 0;
+ else {
+ not_highest = 1;
+ nr_threads += remote_runq->ts_runq_expired->nr_threads;
+ }
+
+ i = 0;
+ nr_threads = nr_threads / THREAD_TS_MIGRATION_RATIO;
+
+ if (nr_threads == 0)
+ nr_threads = 1;
+ else if (nr_threads > THREAD_MAX_MIGRATIONS)
+ nr_threads = THREAD_MAX_MIGRATIONS;
+
+ list_for_each_entry_safe(&remote_runq->ts_runq_active->threads,
+ thread, tmp, ts_ctx.runq_node) {
+ if (thread == remote_runq->current)
+ continue;
+
+ thread_runq_remove(remote_runq, thread);
+ thread->ts_ctx.round = runq->ts_runq_active->round;
+ thread_runq_add(runq, thread);
+ i++;
+
+ if (i == nr_threads)
+ goto out;
+ }
+
+ if (not_highest)
+ list_for_each_entry_safe(&remote_runq->ts_runq_expired->threads,
+ thread, tmp, ts_ctx.runq_node) {
+ thread_runq_remove(remote_runq, thread);
+ thread->ts_ctx.round = runq->ts_runq_active->round;
+ thread_runq_add(runq, thread);
+ i++;
+
+ if (i == nr_threads)
+ goto out;
+ }
+
+out:
+ spinlock_unlock(&runq->lock);
+ spinlock_unlock(&remote_runq->lock);
+ cpu_intr_restore(flags);
+ thread_preempt_enable();
+ return i;
+}
+
+static void
+thread_sched_ts_balance(struct thread_runq *runq)
+{
+ struct thread_runq *remote_runq;
+ unsigned long flags;
+ unsigned int nr_migrations;
+ int i, nr_runqs;
+
+ /*
+ * These values can't change while the balancer thread is running, so
+ * don't bother locking.
+ */
+ if ((runq->ts_runq_active->round == thread_ts_highest_round)
+ || (runq->ts_runq_expired->nr_threads == 0))
+ remote_runq = thread_sched_ts_balance_scan(runq);
+ else
+ remote_runq = NULL;
+
+ if (remote_runq == NULL)
+ goto no_migration;
+
+ nr_migrations = thread_sched_ts_balance_migrate(runq, remote_runq);
+
+ if (nr_migrations != 0)
+ return;
+
+ /*
+ * If no thread could be pulled from the remote run queue, it means
+ * its state has changed since the scan, and the new state has made
+ * the run queue ineligible. Make another, simpler scan, and stop as
+ * soon as some threads could be migrated successfully.
+ */
+ for (i = 0, nr_runqs = cpu_count(); i < nr_runqs; i++) {
+ remote_runq = &thread_runqs[i];
+
+ if (remote_runq == runq)
+ continue;
+
+ nr_migrations = thread_sched_ts_balance_migrate(runq, remote_runq);
+
+ if (nr_migrations != 0)
+ return;
+ }
+
+no_migration:
+ spinlock_lock_intr_save(&runq->lock, &flags);
+
+ /*
+ * No thread could be migrated. Check the active run queue, as another
+ * processor might have added threads while the balancer was running.
+ * If the run queue is still empty, switch to the next round.
+ */
+ if (runq->ts_runq_active->nr_threads == 0)
+ thread_sched_ts_start_next_round(runq);
+
+ spinlock_unlock_intr_restore(&runq->lock, flags);
+}
+
+static void
thread_sched_idle_init_thread(struct thread *thread, unsigned short priority)
{
(void)thread;
(void)priority;
}
+static struct thread_runq *
+thread_sched_idle_select_runq(void)
+{
+ panic("thread: idler threads cannot be awaken");
+}
+
static void __noreturn
thread_sched_idle_panic(void)
{
@@ -726,6 +1124,7 @@ thread_setup(void)
ops = &thread_sched_ops[THREAD_SCHED_CLASS_RT];
ops->init_thread = thread_sched_rt_init_thread;
+ ops->select_runq = thread_sched_rt_select_runq;
ops->add = thread_sched_rt_add;
ops->remove = thread_sched_rt_remove;
ops->put_prev = thread_sched_rt_put_prev;
@@ -734,6 +1133,7 @@ thread_setup(void)
ops = &thread_sched_ops[THREAD_SCHED_CLASS_TS];
ops->init_thread = thread_sched_ts_init_thread;
+ ops->select_runq = thread_sched_ts_select_runq;
ops->add = thread_sched_ts_add;
ops->remove = thread_sched_ts_remove;
ops->put_prev = thread_sched_ts_put_prev;
@@ -742,12 +1142,15 @@ thread_setup(void)
ops = &thread_sched_ops[THREAD_SCHED_CLASS_IDLE];
ops->init_thread = thread_sched_idle_init_thread;
+ ops->select_runq = thread_sched_idle_select_runq;
ops->add = thread_sched_idle_add;
ops->remove = thread_sched_idle_remove;
ops->put_prev = thread_sched_idle_put_prev;
ops->get_next = thread_sched_idle_get_next;
ops->tick = thread_sched_idle_tick;
+ bitmap_zero(thread_active_runqs, MAX_CPUS);
+
kmem_cache_init(&thread_cache, "thread", sizeof(struct thread),
CPU_L1_SIZE, NULL, NULL, NULL, 0);
kmem_cache_init(&thread_stack_cache, "thread_stack", STACK_SIZE,
@@ -854,9 +1257,6 @@ thread_create(struct thread **threadp, const struct thread_attr *attr,
}
thread_init(thread, stack, attr, fn, arg);
-
- /* TODO Multiprocessor thread dispatching */
- thread->cpu = cpu_id();
thread_wakeup(thread);
*threadp = thread;
@@ -881,24 +1281,66 @@ thread_wakeup(struct thread *thread)
struct thread_runq *runq;
unsigned long on_rq, flags;
- /* TODO Multiprocessor thread dispatching */
- assert(thread->cpu == cpu_id());
-
on_rq = atomic_cas(&thread->on_rq, 0, 1);
if (on_rq)
return;
- thread->state = THREAD_RUNNING;
+ /*
+ * Disable preemption and interrupts to avoid a deadlock in case the local
+ * run queue is selected.
+ */
+ thread_preempt_disable();
+ flags = cpu_intr_save();
+
+ /* The returned run queue is locked */
+ runq = thread_sched_ops[thread->sched_class].select_runq();
+ thread_runq_wakeup(runq, thread);
+ spinlock_unlock(&runq->lock);
+ cpu_intr_restore(flags);
+ thread_preempt_enable();
+}
+
+static void
+thread_balancer(void *arg)
+{
+ struct thread_runq *runq;
+
+ runq = arg;
+
+ for (;;) {
+ thread_sleep();
+ thread_sched_ts_balance(runq);
+ }
+}
+
+static void __init
+thread_setup_balancer(void)
+{
+ char name[THREAD_NAME_SIZE];
+ struct thread_runq *runq;
+ struct thread_attr attr;
+ struct thread *balancer;
+ int error;
- thread_pin();
runq = thread_runq_local();
- spinlock_lock_intr_save(&runq->lock, &flags);
- thread_runq_add(runq, thread);
- spinlock_unlock_intr_restore(&runq->lock, flags);
- thread_unpin();
- thread_reschedule();
+ /*
+ * Real-time threads are currently dispatched on the caller's run queue.
+ *
+ * TODO CPU affinity
+ */
+ snprintf(name, sizeof(name), "thread_balancer/%u", cpu_id());
+ attr.task = kernel_task;
+ attr.name = name;
+ attr.sched_policy = THREAD_SCHED_CLASS_RT;
+ attr.priority = THREAD_SCHED_RT_PRIO_MIN;
+ error = thread_create(&balancer, &attr, thread_balancer, runq);
+
+ if (error)
+ panic("thread: unable to create balancer thread");
+
+ runq->balancer = balancer;
}
static void
@@ -950,6 +1392,8 @@ thread_run(void)
assert(cpu_intr_enabled());
+ thread_setup_balancer();
+
/* This call disables interrupts */
thread_setup_idler();
@@ -994,7 +1438,7 @@ thread_schedule(void)
runq = thread_runq_local();
spinlock_lock_intr_save(&runq->lock, &flags);
- prev->flags &= ~THREAD_RESCHEDULE;
+ thread_clear_flag(prev, THREAD_RESCHEDULE);
thread_runq_put_prev(runq, prev);
if (prev->state != THREAD_RUNNING) {
@@ -1020,13 +1464,14 @@ thread_schedule(void)
spinlock_unlock_intr_restore(&runq->lock, flags);
thread_preempt_enable_no_resched();
- } while (prev->flags & THREAD_RESCHEDULE);
+ } while (thread_test_flag(prev, THREAD_RESCHEDULE));
}
void
thread_reschedule(void)
{
- if ((thread_self()->flags & THREAD_RESCHEDULE) && thread_preempt_enabled())
+ if (thread_test_flag(thread_self(), THREAD_RESCHEDULE)
+ && thread_preempt_enabled())
thread_schedule();
}