/*
* Copyright (c) 2013-2014 Richard Braun.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*
*
* TODO Per-processor pools.
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define WORK_PRIO_NORMAL THREAD_SCHED_TS_PRIO_DEFAULT
#define WORK_PRIO_HIGH THREAD_SCHED_TS_PRIO_MAX
/*
* Keep at least that many threads alive when a work pool is idle.
*
* TODO Use time instead of a raw value to keep threads available.
*/
#define WORK_THREADS_SPARE 4
/*
* When computing the maximum number of worker threads, start with multiplying
* the number of processors by the ratio below. If the result is greater than
* the threshold, retry by decreasing the ratio until either the result is
* less than the threshold or the ratio is 1.
*/
#define WORK_THREADS_RATIO 4
#define WORK_THREADS_THRESHOLD 512
#define WORK_NAME_SIZE 16
/*
* Work pool flags.
*/
#define WORK_PF_HIGHPRIO 0x1 /* High priority worker threads */
struct work_thread {
struct list node;
unsigned long long id;
struct thread *thread;
struct work_pool *pool;
};
/*
* Pool of threads and works.
*
* Interrupts must be disabled when acquiring the pool lock.
*
* The radix tree is only used to allocate worker IDs. It doesn't store
* anything relevant. The limit placed on the number of worker threads per
* pool prevents the allocation of many nodes, which keeps memory waste low.
* TODO The tree implementation could be improved to use nodes of reduced
* size, storing only allocation bitmaps and not actual pointers.
*/
struct work_pool {
struct spinlock lock;
int flags;
struct work_queue queue;
struct work_thread *manager;
unsigned int nr_threads;
unsigned int nr_available_threads;
struct list available_threads;
struct mutex tree_lock;
struct rdxtree tree;
char name[WORK_NAME_SIZE];
};
static int work_thread_create(struct work_pool *pool);
static void work_thread_destroy(struct work_thread *worker);
static struct work_pool work_pool_main;
static struct work_pool work_pool_highprio;
static struct kmem_cache work_thread_cache;
static unsigned int work_max_threads __read_mostly;
static int
work_pool_alloc_id(struct work_pool *pool, struct work_thread *worker,
unsigned long long *idp)
{
int error;
mutex_lock(&pool->tree_lock);
error = rdxtree_insert_alloc(&pool->tree, worker, idp);
mutex_unlock(&pool->tree_lock);
return error;
}
static void
work_pool_free_id(struct work_pool *pool, unsigned long long id)
{
mutex_lock(&pool->tree_lock);
rdxtree_remove(&pool->tree, id);
mutex_unlock(&pool->tree_lock);
}
static void
work_pool_init(struct work_pool *pool, const char *name, int flags)
{
int error;
spinlock_init(&pool->lock);
pool->flags = flags;
work_queue_init(&pool->queue);
pool->manager = NULL;
pool->nr_threads = 1;
pool->nr_available_threads = 0;
list_init(&pool->available_threads);
mutex_init(&pool->tree_lock);
rdxtree_init(&pool->tree);
strlcpy(pool->name, name, sizeof(pool->name));
error = work_thread_create(pool);
if (error)
goto error_thread;
return;
error_thread:
panic("work: unable to create initial worker thread");
}
static void
work_pool_wakeup_manager(struct work_pool *pool)
{
if (pool->queue.nr_works == 0)
return;
if ((pool->manager != NULL) && (pool->manager->thread != thread_self()))
thread_wakeup(pool->manager->thread);
}
static inline struct work_pool *
work_pool_select(int flags)
{
return (flags & WORK_HIGHPRIO) ? &work_pool_highprio : &work_pool_main;
}
static void
work_process(void *arg)
{
struct work_thread *self, *worker;
struct work_pool *pool;
struct work *work;
unsigned long flags;
int error;
self = arg;
pool = self->pool;
for (;;) {
spinlock_lock_intr_save(&pool->lock, &flags);
if (pool->manager != NULL) {
list_insert_tail(&pool->available_threads, &self->node);
pool->nr_available_threads++;
do
thread_sleep(&pool->lock);
while (pool->manager != NULL);
list_remove(&self->node);
pool->nr_available_threads--;
}
if (pool->queue.nr_works == 0) {
if (pool->nr_threads > WORK_THREADS_SPARE)
break;
pool->manager = self;
do
thread_sleep(&pool->lock);
while (pool->queue.nr_works == 0);
pool->manager = NULL;
}
work = work_queue_pop(&pool->queue);
if (pool->queue.nr_works != 0) {
if (pool->nr_available_threads != 0) {
worker = list_first_entry(&pool->available_threads,
struct work_thread, node);
thread_wakeup(worker->thread);
} else if (pool->nr_threads < work_max_threads) {
pool->nr_threads++;
spinlock_unlock_intr_restore(&pool->lock, flags);
error = work_thread_create(pool);
spinlock_lock_intr_save(&pool->lock, &flags);
if (error) {
pool->nr_threads--;
printk("work: warning: unable to create worker thread\n");
}
}
}
spinlock_unlock_intr_restore(&pool->lock, flags);
work->fn(work);
}
pool->nr_threads--;
spinlock_unlock_intr_restore(&pool->lock, flags);
work_thread_destroy(self);
}
static int
work_thread_create(struct work_pool *pool)
{
char name[THREAD_NAME_SIZE];
struct thread_attr attr;
struct work_thread *worker;
unsigned short priority;
int error;
worker = kmem_cache_alloc(&work_thread_cache);
if (worker == NULL)
return ERROR_NOMEM;
error = work_pool_alloc_id(pool, worker, &worker->id);
if (error)
goto error_id;
worker->pool = pool;
snprintf(name, sizeof(name), "x15_work_process:%s:%llu", pool->name,
worker->id);
priority = (pool->flags & WORK_PF_HIGHPRIO)
? WORK_PRIO_HIGH
: WORK_PRIO_NORMAL;
thread_attr_init(&attr, name);
thread_attr_set_priority(&attr, priority);
error = thread_create(&worker->thread, &attr, work_process, worker);
if (error)
goto error_thread;
return 0;
error_thread:
work_pool_free_id(pool, worker->id);
error_id:
kmem_cache_free(&work_thread_cache, worker);
return error;
}
static void
work_thread_destroy(struct work_thread *worker)
{
work_pool_free_id(worker->pool, worker->id);
kmem_cache_free(&work_thread_cache, worker);
}
static void
work_compute_max_threads(void)
{
unsigned int max_threads, nr_cpus, ratio;
nr_cpus = cpu_count();
ratio = WORK_THREADS_RATIO;
max_threads = nr_cpus * ratio;
while ((ratio > 1) && (max_threads > WORK_THREADS_THRESHOLD)) {
ratio--;
max_threads = nr_cpus * ratio;
}
work_max_threads = max_threads;
printk("work: threads per pool (spare/limit): %u/%u\n",
WORK_THREADS_SPARE, max_threads);
}
void
work_setup(void)
{
kmem_cache_init(&work_thread_cache, "work_thread",
sizeof(struct work_thread), 0, NULL, NULL, NULL, 0);
work_compute_max_threads();
work_pool_init(&work_pool_main, "main", 0);
work_pool_init(&work_pool_highprio, "highprio", WORK_PF_HIGHPRIO);
}
void
work_schedule(struct work *work, int flags)
{
struct work_pool *pool;
unsigned long lock_flags;
pool = work_pool_select(flags);
spinlock_lock_intr_save(&pool->lock, &lock_flags);
work_queue_push(&pool->queue, work);
work_pool_wakeup_manager(pool);
spinlock_unlock_intr_restore(&pool->lock, lock_flags);
}
void
work_queue_schedule(struct work_queue *queue, int flags)
{
struct work_pool *pool;
unsigned long lock_flags;
pool = work_pool_select(flags);
spinlock_lock_intr_save(&pool->lock, &lock_flags);
work_queue_concat(&pool->queue, queue);
work_pool_wakeup_manager(pool);
spinlock_unlock_intr_restore(&pool->lock, lock_flags);
}