/*
* Copyright (c) 2013-2014 Richard Braun.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#define WORK_PRIO_NORMAL THREAD_SCHED_FS_PRIO_DEFAULT
#define WORK_PRIO_HIGH THREAD_SCHED_FS_PRIO_MAX
#define WORK_INVALID_CPU ((unsigned int)-1)
/*
* Keep at least that many threads alive when a work pool is idle.
*/
#define WORK_THREADS_SPARE 4
/*
* When computing the maximum number of worker threads, start with multiplying
* the number of processors by the ratio below. If the result is greater than
* the threshold, retry by decreasing the ratio until either the result is
* less than the threshold or the ratio is 1.
*/
#define WORK_THREADS_RATIO 4
#define WORK_THREADS_THRESHOLD 512
#define WORK_MAX_THREADS MAX(CONFIG_MAX_CPUS, WORK_THREADS_THRESHOLD)
/*
* Work pool flags.
*/
#define WORK_PF_GLOBAL 0x1 /* System-wide work queue */
#define WORK_PF_HIGHPRIO 0x2 /* High priority worker threads */
struct work_thread {
struct list node;
struct thread *thread;
struct work_pool *pool;
unsigned int id;
};
/*
* Pool of threads and works.
*
* Interrupts must be disabled when accessing a work pool. Holding the
* lock is required for global pools only, whereas exclusive access on
* per-processor pools is achieved by disabling preemption.
*
* There are two internal queues of pending works. When first scheduling
* a work, it is inserted into queue0. After a periodic event, works still
* present in queue0 are moved to queue1. If these works are still present
* in queue1 at the next periodic event, it means they couldn't be processed
* for a complete period between two periodic events, at which point it is
* assumed that processing works on the same processor they were queued on
* becomes less relevant. As a result, periodic events also trigger the
* transfer of works from queue1 to the matching global pool. Global pools
* only use one queue.
*/
struct work_pool {
alignas(CPU_L1_SIZE) struct spinlock lock;
int flags;
struct work_queue queue0;
struct work_queue queue1;
struct work_thread *manager;
struct syscnt sc_transfers;
unsigned int cpu;
unsigned int max_threads;
unsigned int nr_threads;
unsigned int nr_available_threads;
struct list available_threads;
struct list dead_threads;
BITMAP_DECLARE(bitmap, WORK_MAX_THREADS);
};
static int work_thread_create(struct work_pool *pool, unsigned int id);
static struct work_pool work_pool_cpu_main __percpu;
static struct work_pool work_pool_cpu_highprio __percpu;
static struct work_pool work_pool_main;
static struct work_pool work_pool_highprio;
static struct kmem_cache work_thread_cache;
static unsigned int
work_pool_alloc_id(struct work_pool *pool)
{
int bit;
assert(pool->nr_threads < pool->max_threads);
pool->nr_threads++;
bit = bitmap_find_first_zero(pool->bitmap, pool->max_threads);
assert(bit >= 0);
bitmap_set(pool->bitmap, bit);
return bit;
}
static void
work_pool_free_id(struct work_pool *pool, unsigned int id)
{
assert(pool->nr_threads != 0);
pool->nr_threads--;
bitmap_clear(pool->bitmap, id);
}
static unsigned int
work_pool_cpu_id(const struct work_pool *pool)
{
assert(!(pool->flags & WORK_PF_GLOBAL));
return pool->cpu;
}
static unsigned int
work_pool_compute_max_threads(unsigned int nr_cpus)
{
unsigned int max_threads, ratio;
ratio = WORK_THREADS_RATIO;
max_threads = nr_cpus * ratio;
while ((ratio > 1) && (max_threads > WORK_THREADS_THRESHOLD)) {
ratio--;
max_threads = nr_cpus * ratio;
}
assert(max_threads != 0);
assert(max_threads <= WORK_MAX_THREADS);
return max_threads;
}
static void __init
work_pool_init(struct work_pool *pool)
{
spinlock_init(&pool->lock);
work_queue_init(&pool->queue0);
work_queue_init(&pool->queue1);
pool->manager = NULL;
}
static void __init
work_pool_build(struct work_pool *pool, unsigned int cpu, int flags)
{
char name[SYSCNT_NAME_SIZE];
const char *suffix;
unsigned int id, nr_cpus, max_threads;
int error;
pool->flags = flags;
if (flags & WORK_PF_GLOBAL) {
nr_cpus = cpu_count();
pool->cpu = WORK_INVALID_CPU;
} else {
nr_cpus = 1;
suffix = (flags & WORK_PF_HIGHPRIO) ? "h" : "";
snprintf(name, sizeof(name), "work_transfers/%u%s", cpu, suffix);
syscnt_register(&pool->sc_transfers, name);
pool->cpu = cpu;
}
max_threads = work_pool_compute_max_threads(nr_cpus);
pool->max_threads = max_threads;
pool->nr_threads = 0;
pool->nr_available_threads = 0;
list_init(&pool->available_threads);
list_init(&pool->dead_threads);
bitmap_zero(pool->bitmap, WORK_MAX_THREADS);
id = work_pool_alloc_id(pool);
error = work_thread_create(pool, id);
if (error) {
goto error_thread;
}
return;
error_thread:
panic("work: unable to create initial worker thread");
}
static struct work_pool *
work_pool_cpu_select(int flags)
{
return (flags & WORK_HIGHPRIO)
? cpu_local_ptr(work_pool_cpu_highprio)
: cpu_local_ptr(work_pool_cpu_main);
}
static void
work_pool_acquire(struct work_pool *pool, unsigned long *flags)
{
if (pool->flags & WORK_PF_GLOBAL) {
spinlock_lock_intr_save(&pool->lock, flags);
} else {
thread_preempt_disable_intr_save(flags);
}
}
static void
work_pool_release(struct work_pool *pool, unsigned long flags)
{
if (pool->flags & WORK_PF_GLOBAL) {
spinlock_unlock_intr_restore(&pool->lock, flags);
} else {
thread_preempt_enable_intr_restore(flags);
}
}
static int
work_pool_nr_works(const struct work_pool *pool)
{
return (work_queue_nr_works(&pool->queue0)
+ work_queue_nr_works(&pool->queue1));
}
static struct work *
work_pool_pop_work(struct work_pool *pool)
{
if (!(pool->flags & WORK_PF_GLOBAL)) {
if (work_queue_nr_works(&pool->queue1) != 0) {
return work_queue_pop(&pool->queue1);
}
}
return work_queue_pop(&pool->queue0);
}
static void
work_pool_wakeup_manager(struct work_pool *pool)
{
if (work_pool_nr_works(pool) == 0) {
return;
}
if (pool->manager != NULL) {
thread_wakeup(pool->manager->thread);
}
}
static void
work_pool_shift_queues(struct work_pool *pool, struct work_queue *old_queue)
{
assert(!(pool->flags & WORK_PF_GLOBAL));
work_queue_transfer(old_queue, &pool->queue1);
work_queue_transfer(&pool->queue1, &pool->queue0);
work_queue_init(&pool->queue0);
if (work_queue_nr_works(old_queue) != 0) {
syscnt_inc(&pool->sc_transfers);
}
}
static void
work_pool_push_work(struct work_pool *pool, struct work *work)
{
work_queue_push(&pool->queue0, work);
work_pool_wakeup_manager(pool);
}
static void
work_pool_concat_queue(struct work_pool *pool, struct work_queue *queue)
{
work_queue_concat(&pool->queue0, queue);
work_pool_wakeup_manager(pool);
}
static void
work_thread_destroy(struct work_thread *worker)
{
thread_join(worker->thread);
kmem_cache_free(&work_thread_cache, worker);
}
static void
work_process(void *arg)
{
struct work_thread *self, *worker;
struct work_pool *pool;
struct work *work;
struct spinlock *lock;
unsigned long flags;
unsigned int id;
int error;
self = arg;
pool = self->pool;
lock = (pool->flags & WORK_PF_GLOBAL) ? &pool->lock : NULL;
work_pool_acquire(pool, &flags);
for (;;) {
if (pool->manager != NULL) {
list_insert_tail(&pool->available_threads, &self->node);
pool->nr_available_threads++;
do {
thread_sleep(lock, pool, "work_spr");
} while (pool->manager != NULL);
list_remove(&self->node);
pool->nr_available_threads--;
}
if (!list_empty(&pool->dead_threads)) {
worker = list_first_entry(&pool->dead_threads,
struct work_thread, node);
list_remove(&worker->node);
work_pool_release(pool, flags);
id = worker->id;
work_thread_destroy(worker);
/*
* Release worker ID last so that, if the pool is full, no new
* worker can be created unless all the resources of the worker
* being destroyed have been freed. This is important to enforce
* a strict boundary on the total amount of resources allocated
* for a pool at any time.
*/
work_pool_acquire(pool, &flags);
work_pool_free_id(pool, id);
continue;
}
if (work_pool_nr_works(pool) == 0) {
if (pool->nr_threads > WORK_THREADS_SPARE) {
break;
}
pool->manager = self;
do {
thread_sleep(lock, pool, "work_mgr");
} while (work_pool_nr_works(pool) == 0);
pool->manager = NULL;
}
work = work_pool_pop_work(pool);
if (work_pool_nr_works(pool) != 0) {
if (pool->nr_available_threads != 0) {
worker = list_first_entry(&pool->available_threads,
struct work_thread, node);
thread_wakeup(worker->thread);
} else if (pool->nr_threads < pool->max_threads) {
id = work_pool_alloc_id(pool);
work_pool_release(pool, flags);
error = work_thread_create(pool, id);
work_pool_acquire(pool, &flags);
if (error) {
work_pool_free_id(pool, id);
log_warning("work: unable to create worker thread");
}
}
}
work_pool_release(pool, flags);
work->fn(work);
work_pool_acquire(pool, &flags);
}
list_insert_tail(&pool->dead_threads, &self->node);
work_pool_release(pool, flags);
}
static int
work_thread_create(struct work_pool *pool, unsigned int id)
{
char name[THREAD_NAME_SIZE];
struct thread_attr attr;
struct cpumap *cpumap;
struct work_thread *worker;
const char *suffix;
unsigned short priority;
int error;
worker = kmem_cache_alloc(&work_thread_cache);
if (worker == NULL) {
return ENOMEM;
}
worker->pool = pool;
worker->id = id;
if (pool->flags & WORK_PF_HIGHPRIO) {
suffix = "h";
priority = WORK_PRIO_HIGH;
} else {
suffix = "";
priority = WORK_PRIO_NORMAL;
}
if (pool->flags & WORK_PF_GLOBAL) {
cpumap = NULL;
snprintf(name, sizeof(name),
THREAD_KERNEL_PREFIX "work_process/g:%u%s",
worker->id, suffix);
} else {
unsigned int pool_id;
error = cpumap_create(&cpumap);
if (error) {
goto error_cpumap;
}
pool_id = work_pool_cpu_id(pool);
cpumap_zero(cpumap);
cpumap_set(cpumap, pool_id);
snprintf(name, sizeof(name),
THREAD_KERNEL_PREFIX "work_process/%u:%u%s",
pool_id, worker->id, suffix);
}
thread_attr_init(&attr, name);
thread_attr_set_priority(&attr, priority);
if (cpumap != NULL) {
thread_attr_set_cpumap(&attr, cpumap);
}
error = thread_create(&worker->thread, &attr, work_process, worker);
if (cpumap != NULL) {
cpumap_destroy(cpumap);
}
if (error) {
goto error_thread;
}
return 0;
error_thread:
error_cpumap:
kmem_cache_free(&work_thread_cache, worker);
return error;
}
static int __init
work_bootstrap(void)
{
work_pool_init(cpu_local_ptr(work_pool_cpu_main));
work_pool_init(cpu_local_ptr(work_pool_cpu_highprio));
return 0;
}
INIT_OP_DEFINE(work_bootstrap,
INIT_OP_DEP(cpu_setup, true),
INIT_OP_DEP(spinlock_setup, true),
INIT_OP_DEP(thread_bootstrap, true));
static int __init
work_setup(void)
{
kmem_cache_init(&work_thread_cache, "work_thread",
sizeof(struct work_thread), 0, NULL, 0);
for (unsigned int i = 1; i < cpu_count(); i++) {
work_pool_init(percpu_ptr(work_pool_cpu_main, i));
work_pool_init(percpu_ptr(work_pool_cpu_highprio, i));
}
work_pool_init(&work_pool_main);
work_pool_init(&work_pool_highprio);
for (unsigned int i = 0; i < cpu_count(); i++) {
work_pool_build(percpu_ptr(work_pool_cpu_main, i), i, 0);
work_pool_build(percpu_ptr(work_pool_cpu_highprio, i), i,
WORK_PF_HIGHPRIO);
}
work_pool_build(&work_pool_main, WORK_INVALID_CPU, WORK_PF_GLOBAL);
work_pool_build(&work_pool_highprio, WORK_INVALID_CPU,
WORK_PF_GLOBAL | WORK_PF_HIGHPRIO);
log_info("work: threads per pool (per-cpu/global): %u/%u, spare: %u",
percpu_var(work_pool_cpu_main.max_threads, 0),
work_pool_main.max_threads, WORK_THREADS_SPARE);
return 0;
}
INIT_OP_DEFINE(work_setup,
INIT_OP_DEP(cpu_mp_probe, true),
INIT_OP_DEP(cpumap_setup, true),
INIT_OP_DEP(kmem_setup, true),
INIT_OP_DEP(log_setup, true),
INIT_OP_DEP(panic_setup, true),
INIT_OP_DEP(spinlock_setup, true),
INIT_OP_DEP(syscnt_setup, true),
INIT_OP_DEP(thread_setup, true),
INIT_OP_DEP(work_bootstrap, true));
void
work_schedule(struct work *work, int flags)
{
struct work_pool *pool;
unsigned long cpu_flags;
thread_pin();
pool = work_pool_cpu_select(flags);
work_pool_acquire(pool, &cpu_flags);
work_pool_push_work(pool, work);
work_pool_release(pool, cpu_flags);
thread_unpin();
}
void
work_queue_schedule(struct work_queue *queue, int flags)
{
struct work_pool *pool;
unsigned long cpu_flags;
thread_pin();
pool = work_pool_cpu_select(flags);
work_pool_acquire(pool, &cpu_flags);
work_pool_concat_queue(pool, queue);
work_pool_release(pool, cpu_flags);
thread_unpin();
}
void
work_report_periodic_event(void)
{
struct work_queue queue, highprio_queue;
assert(thread_check_intr_context());
work_pool_shift_queues(cpu_local_ptr(work_pool_cpu_main), &queue);
work_pool_shift_queues(cpu_local_ptr(work_pool_cpu_highprio),
&highprio_queue);
if (work_queue_nr_works(&queue) != 0) {
spinlock_lock(&work_pool_main.lock);
work_pool_concat_queue(&work_pool_main, &queue);
spinlock_unlock(&work_pool_main.lock);
}
if (work_queue_nr_works(&highprio_queue) != 0) {
spinlock_lock(&work_pool_highprio.lock);
work_pool_concat_queue(&work_pool_highprio, &highprio_queue);
spinlock_unlock(&work_pool_highprio.lock);
}
}