summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Braun <rbraun@sceen.net>2014-06-17 23:30:24 +0200
committerRichard Braun <rbraun@sceen.net>2014-06-17 23:30:24 +0200
commitbbbc53a1f0893703d1a5cc0293f6bc5ae6181fea (patch)
tree0560467446ddd2718d62fffe3681f29e56f7c865
parent718be3a92dd91dacaf7f629561aac62eacb9744e (diff)
kern/work: per-processor work pools
-rw-r--r--kern/thread.c2
-rw-r--r--kern/work.c297
-rw-r--r--kern/work.h10
3 files changed, 248 insertions, 61 deletions
diff --git a/kern/thread.c b/kern/thread.c
index 345d01e6..b660a15d 100644
--- a/kern/thread.c
+++ b/kern/thread.c
@@ -100,6 +100,7 @@
#include <kern/string.h>
#include <kern/task.h>
#include <kern/thread.h>
+#include <kern/work.h>
#include <machine/atomic.h>
#include <machine/cpu.h>
#include <machine/mb.h>
@@ -2005,6 +2006,7 @@ thread_tick_intr(void)
runq = thread_runq_local();
evcnt_inc(&runq->ev_tick_intr);
llsync_report_periodic_event();
+ work_report_periodic_event();
thread = thread_self();
spinlock_lock(&runq->lock);
diff --git a/kern/work.c b/kern/work.c
index 1d41069e..d74fc8bd 100644
--- a/kern/work.c
+++ b/kern/work.c
@@ -13,14 +13,12 @@
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
- *
- *
- * TODO Per-processor pools.
*/
#include <kern/assert.h>
#include <kern/bitmap.h>
#include <kern/error.h>
+#include <kern/evcnt.h>
#include <kern/kmem.h>
#include <kern/list.h>
#include <kern/macros.h>
@@ -52,12 +50,13 @@
#define WORK_THREADS_THRESHOLD 512
#define WORK_MAX_THREADS MAX(MAX_CPUS, WORK_THREADS_THRESHOLD)
-#define WORK_NAME_SIZE 16
+#define WORK_POOL_NAME_SIZE 16
/*
* Work pool flags.
*/
-#define WORK_PF_HIGHPRIO 0x1 /* High priority worker threads */
+#define WORK_PF_GLOBAL 0x1 /* System-wide work queue */
+#define WORK_PF_HIGHPRIO 0x2 /* High priority worker threads */
struct work_thread {
struct list node;
@@ -70,37 +69,57 @@ struct work_thread {
* Pool of threads and works.
*
* Interrupts must be disabled when acquiring the pool lock.
+ *
+ * There are two internal queues of pending works. When first scheduling
+ * a work, it is inserted into queue0. After a periodic event, works still
+ * present in queue0 are moved to queue1. If these works are still present
+ * in queue1 at the next periodic event, it means they couldn't be processed
+ * for a complete period between two periodic events, at which point it is
+ * assumed that processing works on the same processor they were queued on
+ * becomes less relevant. As a result, periodic events also trigger the
+ * transfer of works from queue1 to the matching global pool. Global pools
+ * only use one queue.
+ *
+ * TODO While it's not strictly necessary to hold the lock when accessing a
+ * per-processor pool, since disabling interrupts and preemption could be
+ * used instead, it's currently enforced by the programming model of the
+ * thread module. The thread_sleep() function could be changed to accept a
+ * NULL interlock but this requires clearly defining constraints for safe
+ * usage.
*/
struct work_pool {
struct spinlock lock;
int flags;
- struct work_queue queue;
+ struct work_queue queue0;
+ struct work_queue queue1;
struct work_thread *manager;
+ struct evcnt ev_transfer;
+ unsigned int max_threads;
unsigned int nr_threads;
unsigned int nr_available_threads;
struct list available_threads;
BITMAP_DECLARE(bitmap, WORK_MAX_THREADS);
- char name[WORK_NAME_SIZE];
-};
+ char name[WORK_POOL_NAME_SIZE];
+} __aligned(CPU_L1_SIZE);
static int work_thread_create(struct work_pool *pool, unsigned int id);
static void work_thread_destroy(struct work_thread *worker);
+static struct work_pool work_pool_cpu_main[MAX_CPUS];
+static struct work_pool work_pool_cpu_highprio[MAX_CPUS];
static struct work_pool work_pool_main;
static struct work_pool work_pool_highprio;
static struct kmem_cache work_thread_cache;
-static unsigned int work_max_threads __read_mostly;
-
static unsigned int
work_pool_alloc_id(struct work_pool *pool)
{
int bit;
- assert(pool->nr_threads < work_max_threads);
+ assert(pool->nr_threads < pool->max_threads);
pool->nr_threads++;
- bit = bitmap_find_first_zero(pool->bitmap, work_max_threads);
+ bit = bitmap_find_first_zero(pool->bitmap, pool->max_threads);
assert(bit >= 0);
bitmap_set(pool->bitmap, bit);
return bit;
@@ -114,20 +133,69 @@ work_pool_free_id(struct work_pool *pool, unsigned int id)
bitmap_clear(pool->bitmap, id);
}
+static unsigned int
+work_pool_cpu_id(const struct work_pool *pool)
+{
+ const struct work_pool *array;
+
+ assert(!(pool->flags & WORK_PF_GLOBAL));
+
+ array = (pool->flags & WORK_PF_HIGHPRIO)
+ ? work_pool_cpu_highprio
+ : work_pool_cpu_main;
+ return pool - array;
+}
+
+static unsigned int
+work_pool_compute_max_threads(unsigned int nr_cpus)
+{
+ unsigned int max_threads, ratio;
+
+ ratio = WORK_THREADS_RATIO;
+ max_threads = nr_cpus * ratio;
+
+ while ((ratio > 1) && (max_threads > WORK_THREADS_THRESHOLD)) {
+ ratio--;
+ max_threads = nr_cpus * ratio;
+ }
+
+ assert(max_threads != 0);
+ assert(max_threads <= WORK_MAX_THREADS);
+ return max_threads;
+}
+
static void
work_pool_init(struct work_pool *pool, const char *name, int flags)
{
- unsigned int id;
+ char ev_name[EVCNT_NAME_SIZE];
+ const char *suffix;
+ unsigned int id, nr_cpus, pool_id, max_threads;
int error;
- spinlock_init(&pool->lock);
pool->flags = flags;
- work_queue_init(&pool->queue);
+
+ if (flags & WORK_PF_GLOBAL)
+ nr_cpus = cpu_count();
+ else {
+ nr_cpus = 1;
+ suffix = (flags & WORK_PF_HIGHPRIO) ? "h" : "";
+ pool_id = work_pool_cpu_id(pool);
+ snprintf(ev_name, sizeof(ev_name), "work_transfer/%u%s",
+ pool_id, suffix);
+ evcnt_register(&pool->ev_transfer, ev_name);
+ }
+
+ max_threads = work_pool_compute_max_threads(nr_cpus);
+
+ spinlock_init(&pool->lock);
+ work_queue_init(&pool->queue0);
+ work_queue_init(&pool->queue1);
pool->manager = NULL;
+ pool->max_threads = max_threads;
pool->nr_threads = 0;
pool->nr_available_threads = 0;
list_init(&pool->available_threads);
- bitmap_zero(pool->bitmap, work_max_threads);
+ bitmap_zero(pool->bitmap, WORK_MAX_THREADS);
strlcpy(pool->name, name, sizeof(pool->name));
id = work_pool_alloc_id(pool);
@@ -142,20 +210,70 @@ error_thread:
panic("work: unable to create initial worker thread");
}
+static struct work_pool *
+work_pool_cpu_select(int flags)
+{
+ unsigned int cpu;
+
+ cpu = cpu_id();
+ return (flags & WORK_HIGHPRIO)
+ ? &work_pool_cpu_highprio[cpu]
+ : &work_pool_cpu_main[cpu];
+}
+
+static int
+work_pool_nr_works(const struct work_pool *pool)
+{
+ return (work_queue_nr_works(&pool->queue0)
+ + work_queue_nr_works(&pool->queue1));
+}
+
+static struct work *
+work_pool_pop_work(struct work_pool *pool)
+{
+ if (!(pool->flags & WORK_PF_GLOBAL)) {
+ if (work_queue_nr_works(&pool->queue1) != 0)
+ return work_queue_pop(&pool->queue1);
+ }
+
+ return work_queue_pop(&pool->queue0);
+}
+
static void
work_pool_wakeup_manager(struct work_pool *pool)
{
- if (pool->queue.nr_works == 0)
+ if (work_pool_nr_works(pool) == 0)
return;
if ((pool->manager != NULL) && (pool->manager->thread != thread_self()))
thread_wakeup(pool->manager->thread);
}
-static inline struct work_pool *
-work_pool_select(int flags)
+static void
+work_pool_shift_queues(struct work_pool *pool, struct work_queue *old_queue)
+{
+ assert(!(pool->flags & WORK_PF_GLOBAL));
+
+ work_queue_transfer(old_queue, &pool->queue1);
+ work_queue_transfer(&pool->queue1, &pool->queue0);
+ work_queue_init(&pool->queue0);
+
+ if (work_queue_nr_works(old_queue) != 0)
+ evcnt_inc(&pool->ev_transfer);
+}
+
+static void
+work_pool_push_work(struct work_pool *pool, struct work *work)
{
- return (flags & WORK_HIGHPRIO) ? &work_pool_highprio : &work_pool_main;
+ work_queue_push(&pool->queue0, work);
+ work_pool_wakeup_manager(pool);
+}
+
+static void
+work_pool_concat_queue(struct work_pool *pool, struct work_queue *queue)
+{
+ work_queue_concat(&pool->queue0, queue);
+ work_pool_wakeup_manager(pool);
}
static void
@@ -186,7 +304,7 @@ work_process(void *arg)
pool->nr_available_threads--;
}
- if (pool->queue.nr_works == 0) {
+ if (work_pool_nr_works(pool) == 0) {
if (pool->nr_threads > WORK_THREADS_SPARE)
break;
@@ -194,19 +312,19 @@ work_process(void *arg)
do
thread_sleep(&pool->lock);
- while (pool->queue.nr_works == 0);
+ while (work_pool_nr_works(pool) == 0);
pool->manager = NULL;
}
- work = work_queue_pop(&pool->queue);
+ work = work_pool_pop_work(pool);
- if (pool->queue.nr_works != 0) {
+ if (work_pool_nr_works(pool) != 0) {
if (pool->nr_available_threads != 0) {
worker = list_first_entry(&pool->available_threads,
struct work_thread, node);
thread_wakeup(worker->thread);
- } else if (pool->nr_threads < work_max_threads) {
+ } else if (pool->nr_threads < pool->max_threads) {
id = work_pool_alloc_id(pool);
spinlock_unlock_intr_restore(&pool->lock, flags);
@@ -237,7 +355,9 @@ work_thread_create(struct work_pool *pool, unsigned int id)
{
char name[THREAD_NAME_SIZE];
struct thread_attr attr;
+ struct cpumap *cpumap;
struct work_thread *worker;
+ const char *suffix;
unsigned short priority;
int error;
@@ -249,21 +369,51 @@ work_thread_create(struct work_pool *pool, unsigned int id)
worker->pool = pool;
worker->id = id;
- snprintf(name, sizeof(name), "x15_work_process:%s:%u", pool->name,
- worker->id);
- priority = (pool->flags & WORK_PF_HIGHPRIO)
- ? WORK_PRIO_HIGH
- : WORK_PRIO_NORMAL;
+ if (pool->flags & WORK_PF_HIGHPRIO) {
+ suffix = "h";
+ priority = WORK_PRIO_HIGH;
+ } else {
+ suffix = "";
+ priority = WORK_PRIO_NORMAL;
+ }
+
+ if (pool->flags & WORK_PF_GLOBAL) {
+ cpumap = NULL;
+ snprintf(name, sizeof(name), "x15_work_process/g:%u%s",
+ worker->id, suffix);
+ } else {
+ unsigned int pool_id;
+
+ error = cpumap_create(&cpumap);
+
+ if (error)
+ goto error_cpumap;
+
+ pool_id = work_pool_cpu_id(pool);
+ cpumap_zero(cpumap);
+ cpumap_set(cpumap, pool_id);
+ snprintf(name, sizeof(name), "x15_work_process/%u:%u%s",
+ pool_id, worker->id, suffix);
+ }
+
thread_attr_init(&attr, name);
thread_attr_set_priority(&attr, priority);
+
+ if (cpumap != NULL)
+ thread_attr_set_cpumap(&attr, cpumap);
+
error = thread_create(&worker->thread, &attr, work_process, worker);
+ if (cpumap != NULL)
+ cpumap_destroy(cpumap);
+
if (error)
goto error_thread;
return 0;
error_thread:
+error_cpumap:
kmem_cache_free(&work_thread_cache, worker);
return error;
}
@@ -274,36 +424,29 @@ work_thread_destroy(struct work_thread *worker)
kmem_cache_free(&work_thread_cache, worker);
}
-static void
-work_compute_max_threads(void)
-{
- unsigned int max_threads, nr_cpus, ratio;
-
- nr_cpus = cpu_count();
- ratio = WORK_THREADS_RATIO;
- max_threads = nr_cpus * ratio;
-
- while ((ratio > 1) && (max_threads > WORK_THREADS_THRESHOLD)) {
- ratio--;
- max_threads = nr_cpus * ratio;
- }
-
- assert(max_threads <= WORK_MAX_THREADS);
- work_max_threads = max_threads;
- printk("work: threads per pool (spare/limit): %u/%u\n",
- WORK_THREADS_SPARE, max_threads);
-}
-
void
work_setup(void)
{
+ char name[WORK_POOL_NAME_SIZE];
+ unsigned int i;
+
kmem_cache_init(&work_thread_cache, "work_thread",
sizeof(struct work_thread), 0, NULL, NULL, NULL, 0);
- work_compute_max_threads();
+ for (i = 0; i < cpu_count(); i++) {
+ snprintf(name, sizeof(name), "main%u", i);
+ work_pool_init(&work_pool_cpu_main[i], name, 0);
+ snprintf(name, sizeof(name), "highprio%u", i);
+ work_pool_init(&work_pool_cpu_highprio[i], name, WORK_PF_HIGHPRIO);
+ }
+
+ work_pool_init(&work_pool_main, "main", WORK_PF_GLOBAL);
+ work_pool_init(&work_pool_highprio, "highprio",
+ WORK_PF_GLOBAL | WORK_PF_HIGHPRIO);
- work_pool_init(&work_pool_main, "main", 0);
- work_pool_init(&work_pool_highprio, "highprio", WORK_PF_HIGHPRIO);
+ printk("work: threads per pool (per-cpu/global): %u/%u, spare: %u\n",
+ work_pool_cpu_main[0].max_threads, work_pool_main.max_threads,
+ WORK_THREADS_SPARE);
}
void
@@ -312,12 +455,12 @@ work_schedule(struct work *work, int flags)
struct work_pool *pool;
unsigned long lock_flags;
- pool = work_pool_select(flags);
-
+ thread_pin();
+ pool = work_pool_cpu_select(flags);
spinlock_lock_intr_save(&pool->lock, &lock_flags);
- work_queue_push(&pool->queue, work);
- work_pool_wakeup_manager(pool);
+ work_pool_push_work(pool, work);
spinlock_unlock_intr_restore(&pool->lock, lock_flags);
+ thread_unpin();
}
void
@@ -326,10 +469,42 @@ work_queue_schedule(struct work_queue *queue, int flags)
struct work_pool *pool;
unsigned long lock_flags;
- pool = work_pool_select(flags);
-
+ thread_pin();
+ pool = work_pool_cpu_select(flags);
spinlock_lock_intr_save(&pool->lock, &lock_flags);
- work_queue_concat(&pool->queue, queue);
- work_pool_wakeup_manager(pool);
+ work_pool_concat_queue(pool, queue);
spinlock_unlock_intr_restore(&pool->lock, lock_flags);
+ thread_unpin();
+}
+
+void
+work_report_periodic_event(void)
+{
+ struct work_queue queue, highprio_queue;
+ unsigned int cpu;
+
+ assert(!cpu_intr_enabled());
+ assert(!thread_preempt_enabled());
+
+ cpu = cpu_id();
+
+ spinlock_lock(&work_pool_cpu_main[cpu].lock);
+ work_pool_shift_queues(&work_pool_cpu_main[cpu], &queue);
+ spinlock_unlock(&work_pool_cpu_main[cpu].lock);
+
+ spinlock_lock(&work_pool_cpu_highprio[cpu].lock);
+ work_pool_shift_queues(&work_pool_cpu_highprio[cpu], &highprio_queue);
+ spinlock_unlock(&work_pool_cpu_highprio[cpu].lock);
+
+ if (work_queue_nr_works(&queue) != 0) {
+ spinlock_lock(&work_pool_main.lock);
+ work_pool_concat_queue(&work_pool_main, &queue);
+ spinlock_unlock(&work_pool_main.lock);
+ }
+
+ if (work_queue_nr_works(&highprio_queue) != 0) {
+ spinlock_lock(&work_pool_highprio.lock);
+ work_pool_concat_queue(&work_pool_highprio, &highprio_queue);
+ spinlock_unlock(&work_pool_highprio.lock);
+ }
}
diff --git a/kern/work.h b/kern/work.h
index b3668df9..fb22db90 100644
--- a/kern/work.h
+++ b/kern/work.h
@@ -143,4 +143,14 @@ void work_setup(void);
void work_schedule(struct work *work, int flags);
void work_queue_schedule(struct work_queue *queue, int flags);
+/*
+ * Report a periodic event (normally the periodic timer interrupt) on the
+ * current processor.
+ *
+ * Periodic events are used internally for optimizations.
+ *
+ * Interrupts and preemption must be disabled when calling this function.
+ */
+void work_report_periodic_event(void);
+
#endif /* _KERN_WORK_H */