diff options
author | Richard Braun <rbraun@sceen.net> | 2017-08-27 16:40:12 +0200 |
---|---|---|
committer | Richard Braun <rbraun@sceen.net> | 2017-08-27 16:40:12 +0200 |
commit | d3d0b5245942055aa7478d2adb20f1359ef772f7 (patch) | |
tree | cbd8f35488f62e0942e21e3c74edb46a3954b5ad | |
parent | 000c3defddf008c495f089cb8194b99a10e599cd (diff) |
kern/sleepq: implement timed waits
This change introduces the sleepq_broadcast and sleepq_wakeup functions
because it's now impossible for a condition variable implementation to
implement broadcasting along with timed waits on top of the existing
interface.
-rw-r--r-- | kern/sleepq.c | 147 | ||||
-rw-r--r-- | kern/sleepq.h | 31 |
2 files changed, 160 insertions, 18 deletions
diff --git a/kern/sleepq.c b/kern/sleepq.c index 99282d97..170b1a9f 100644 --- a/kern/sleepq.c +++ b/kern/sleepq.c @@ -38,19 +38,28 @@ struct sleepq_bucket { struct list list; }; +struct sleepq_waiter { + struct list node; + struct thread *thread; + bool pending_wakeup; +}; + +/* + * Waiters are queued in FIFO order and inserted at the head of the + * list of waiters. The pointer to the "oldest" waiter is used as + * a marker between threads waiting for a signal/broadcast (from the + * beginning up to and including the oldest waiter) and threads pending + * for wake-up (all the following threads up to the end of the list). + */ struct sleepq { - struct sleepq_bucket *bucket; + alignas(CPU_L1_SIZE) struct sleepq_bucket *bucket; struct list node; const void *sync_obj; struct list waiters; + struct sleepq_waiter *oldest_waiter; struct sleepq *next_free; }; -struct sleepq_waiter { - struct list node; - struct thread *thread; -}; - #define SLEEPQ_HTABLE_SIZE 128 #define SLEEPQ_COND_HTABLE_SIZE 64 @@ -76,11 +85,28 @@ static void sleepq_waiter_init(struct sleepq_waiter *waiter, struct thread *thread) { waiter->thread = thread; + waiter->pending_wakeup = false; +} + +static bool +sleepq_waiter_pending_wakeup(const struct sleepq_waiter *waiter) +{ + return waiter->pending_wakeup; +} + +static void +sleepq_waiter_set_pending_wakeup(struct sleepq_waiter *waiter) +{ + waiter->pending_wakeup = true; } static void sleepq_waiter_wakeup(struct sleepq_waiter *waiter) { + if (!sleepq_waiter_pending_wakeup(waiter)) { + return; + } + thread_wakeup(waiter->thread); } @@ -90,6 +116,7 @@ sleepq_assert_init_state(const struct sleepq *sleepq) assert(sleepq->bucket == NULL); assert(sleepq->sync_obj == NULL); assert(list_empty(&sleepq->waiters)); + assert(sleepq->oldest_waiter == NULL); assert(sleepq->next_free == NULL); } @@ -190,6 +217,7 @@ sleepq_ctor(void *ptr) sleepq->bucket = NULL; sleepq->sync_obj = NULL; list_init(&sleepq->waiters); + sleepq->oldest_waiter = NULL; sleepq->next_free = NULL; } @@ -367,15 +395,38 @@ sleepq_return(struct sleepq *sleepq, unsigned long flags) } static void +sleepq_shift_oldest_waiter(struct sleepq *sleepq) +{ + struct list *node; + + assert(sleepq->oldest_waiter != NULL); + + node = list_prev(&sleepq->oldest_waiter->node); + + if (list_end(&sleepq->waiters, node)) { + sleepq->oldest_waiter = NULL; + } else { + sleepq->oldest_waiter = list_entry(node, struct sleepq_waiter, node); + } +} + +static void sleepq_add_waiter(struct sleepq *sleepq, struct sleepq_waiter *waiter) { list_insert_head(&sleepq->waiters, &waiter->node); + + if (sleepq->oldest_waiter == NULL) { + sleepq->oldest_waiter = waiter; + } } static void sleepq_remove_waiter(struct sleepq *sleepq, struct sleepq_waiter *waiter) { - (void)sleepq; + if (sleepq->oldest_waiter == waiter) { + sleepq_shift_oldest_waiter(sleepq); + } + list_remove(&waiter->node); } @@ -385,19 +436,48 @@ sleepq_empty(const struct sleepq *sleepq) return list_empty(&sleepq->waiters); } -void -sleepq_wait(struct sleepq *sleepq, const char *wchan) +static int +sleepq_wait_common(struct sleepq *sleepq, const char *wchan, + bool timed, uint64_t ticks) { struct sleepq_waiter waiter; struct thread *thread; + int error; thread = thread_self(); sleepq_waiter_init(&waiter, thread); sleepq_add_waiter(sleepq, &waiter); - thread_sleep(&sleepq->bucket->lock, sleepq->sync_obj, wchan); + if (!timed) { + thread_sleep(&sleepq->bucket->lock, sleepq->sync_obj, wchan); + error = 0; + } else { + error = thread_timedsleep(&sleepq->bucket->lock, sleepq->sync_obj, + wchan, ticks); + + if (error && sleepq_waiter_pending_wakeup(&waiter)) { + error = 0; + } + } sleepq_remove_waiter(sleepq, &waiter); + + return error; +} + +void +sleepq_wait(struct sleepq *sleepq, const char *wchan) +{ + int error; + + error = sleepq_wait_common(sleepq, wchan, false, 0); + assert(!error); +} + +int +sleepq_timedwait(struct sleepq *sleepq, const char *wchan, uint64_t ticks) +{ + return sleepq_wait_common(sleepq, wchan, true, ticks); } void @@ -405,10 +485,55 @@ sleepq_signal(struct sleepq *sleepq) { struct sleepq_waiter *waiter; - if (sleepq_empty(sleepq)) { + if (list_empty(&sleepq->waiters)) { return; } waiter = list_last_entry(&sleepq->waiters, struct sleepq_waiter, node); + sleepq_waiter_set_pending_wakeup(waiter); + sleepq_waiter_wakeup(waiter); +} + +static void +sleepq_wakeup_common(struct sleepq *sleepq) +{ + struct sleepq_waiter *waiter; + + assert(!list_empty(&sleepq->waiters)); + + waiter = list_last_entry(&sleepq->waiters, struct sleepq_waiter, node); sleepq_waiter_wakeup(waiter); } + +void +sleepq_broadcast(struct sleepq *sleepq) +{ + struct sleepq_waiter *waiter; + + if (sleepq->oldest_waiter == NULL) { + goto out; + } + + list_for_each_entry(&sleepq->waiters, waiter, node) { + sleepq_waiter_set_pending_wakeup(waiter); + + if (waiter == sleepq->oldest_waiter) { + break; + } + } + + sleepq->oldest_waiter = NULL; + +out: + sleepq_wakeup_common(sleepq); +} + +void +sleepq_wakeup(struct sleepq *sleepq) +{ + if (list_empty(&sleepq->waiters)) { + return; + } + + sleepq_wakeup_common(sleepq); +} diff --git a/kern/sleepq.h b/kern/sleepq.h index 3de765a3..827d0436 100644 --- a/kern/sleepq.h +++ b/kern/sleepq.h @@ -26,17 +26,13 @@ * the associated mutex, at which point two sleep queues are locked. * Handling condition variable sleep queues slightly differently * allows preventing deadlocks while keeping overall complexity low. - * - * In addition, despite being used to implement condition variables, - * this implementation doesn't provide a broadcast call. The rationale - * is to force users to implement "chained waking" in order to avoid - * the thundering herd effect. */ #ifndef _KERN_SLEEPQ_H #define _KERN_SLEEPQ_H #include <stdbool.h> +#include <stdint.h> #include <kern/init.h> @@ -98,7 +94,7 @@ void sleepq_return(struct sleepq *sleepq, unsigned long flags); bool sleepq_empty(const struct sleepq *sleepq); /* - * Wait for a wake up on the given sleep queue. + * Wait for a wake-up on the given sleep queue. * * The sleep queue must be lent when calling this function. It is * released and later reacquired before returning from this function. @@ -110,8 +106,13 @@ bool sleepq_empty(const struct sleepq *sleepq); * the queue, the queue is not immediately considered empty. * * Threads are queued in FIFO order. + * + * When bounding the duration of the wait, the caller must pass an absolute + * time in ticks, and ERROR_TIMEDOUT is returned if that time is reached + * before the sleep queue is signalled. */ void sleepq_wait(struct sleepq *sleepq, const char *wchan); +int sleepq_timedwait(struct sleepq *sleepq, const char *wchan, uint64_t ticks); /* * Wake up a thread waiting on the given sleep queue, if any. @@ -125,10 +126,26 @@ void sleepq_wait(struct sleepq *sleepq, const char *wchan); * * Threads are queued in FIFO order, which means signalling a sleep * queue multiple times always awakens the same thread, regardless - * of new waiters, as long as that first thread didn't reacquire the + * of new waiters, as long as that thread didn't reacquire the * sleep queue. + * + * A broadcast differs only by also making all currently waiting threads + * pending for wake-up. As with sleepq_signal, a single thread may be + * awaken. The rationale is to force users to implement "chained waking" + * in order to avoid the thundering herd effect. */ void sleepq_signal(struct sleepq *sleepq); +void sleepq_broadcast(struct sleepq *sleepq); + +/* + * Wake up a pending thread. + * + * This function may only wake up a thread pending for wake-up after a + * broadcast. It is used to chain wake-ups to avoid the thundering herd + * effect. If there are no threads pending for wake-up, this function + * does nothing. + */ +void sleepq_wakeup(struct sleepq *sleepq); /* * This init operation provides : |