summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Braun <rbraun@sceen.net>2017-08-27 16:40:12 +0200
committerRichard Braun <rbraun@sceen.net>2017-08-27 16:40:12 +0200
commitd3d0b5245942055aa7478d2adb20f1359ef772f7 (patch)
treecbd8f35488f62e0942e21e3c74edb46a3954b5ad
parent000c3defddf008c495f089cb8194b99a10e599cd (diff)
kern/sleepq: implement timed waits
This change introduces the sleepq_broadcast and sleepq_wakeup functions because it's now impossible for a condition variable implementation to implement broadcasting along with timed waits on top of the existing interface.
-rw-r--r--kern/sleepq.c147
-rw-r--r--kern/sleepq.h31
2 files changed, 160 insertions, 18 deletions
diff --git a/kern/sleepq.c b/kern/sleepq.c
index 99282d97..170b1a9f 100644
--- a/kern/sleepq.c
+++ b/kern/sleepq.c
@@ -38,19 +38,28 @@ struct sleepq_bucket {
struct list list;
};
+struct sleepq_waiter {
+ struct list node;
+ struct thread *thread;
+ bool pending_wakeup;
+};
+
+/*
+ * Waiters are queued in FIFO order and inserted at the head of the
+ * list of waiters. The pointer to the "oldest" waiter is used as
+ * a marker between threads waiting for a signal/broadcast (from the
+ * beginning up to and including the oldest waiter) and threads pending
+ * for wake-up (all the following threads up to the end of the list).
+ */
struct sleepq {
- struct sleepq_bucket *bucket;
+ alignas(CPU_L1_SIZE) struct sleepq_bucket *bucket;
struct list node;
const void *sync_obj;
struct list waiters;
+ struct sleepq_waiter *oldest_waiter;
struct sleepq *next_free;
};
-struct sleepq_waiter {
- struct list node;
- struct thread *thread;
-};
-
#define SLEEPQ_HTABLE_SIZE 128
#define SLEEPQ_COND_HTABLE_SIZE 64
@@ -76,11 +85,28 @@ static void
sleepq_waiter_init(struct sleepq_waiter *waiter, struct thread *thread)
{
waiter->thread = thread;
+ waiter->pending_wakeup = false;
+}
+
+static bool
+sleepq_waiter_pending_wakeup(const struct sleepq_waiter *waiter)
+{
+ return waiter->pending_wakeup;
+}
+
+static void
+sleepq_waiter_set_pending_wakeup(struct sleepq_waiter *waiter)
+{
+ waiter->pending_wakeup = true;
}
static void
sleepq_waiter_wakeup(struct sleepq_waiter *waiter)
{
+ if (!sleepq_waiter_pending_wakeup(waiter)) {
+ return;
+ }
+
thread_wakeup(waiter->thread);
}
@@ -90,6 +116,7 @@ sleepq_assert_init_state(const struct sleepq *sleepq)
assert(sleepq->bucket == NULL);
assert(sleepq->sync_obj == NULL);
assert(list_empty(&sleepq->waiters));
+ assert(sleepq->oldest_waiter == NULL);
assert(sleepq->next_free == NULL);
}
@@ -190,6 +217,7 @@ sleepq_ctor(void *ptr)
sleepq->bucket = NULL;
sleepq->sync_obj = NULL;
list_init(&sleepq->waiters);
+ sleepq->oldest_waiter = NULL;
sleepq->next_free = NULL;
}
@@ -367,15 +395,38 @@ sleepq_return(struct sleepq *sleepq, unsigned long flags)
}
static void
+sleepq_shift_oldest_waiter(struct sleepq *sleepq)
+{
+ struct list *node;
+
+ assert(sleepq->oldest_waiter != NULL);
+
+ node = list_prev(&sleepq->oldest_waiter->node);
+
+ if (list_end(&sleepq->waiters, node)) {
+ sleepq->oldest_waiter = NULL;
+ } else {
+ sleepq->oldest_waiter = list_entry(node, struct sleepq_waiter, node);
+ }
+}
+
+static void
sleepq_add_waiter(struct sleepq *sleepq, struct sleepq_waiter *waiter)
{
list_insert_head(&sleepq->waiters, &waiter->node);
+
+ if (sleepq->oldest_waiter == NULL) {
+ sleepq->oldest_waiter = waiter;
+ }
}
static void
sleepq_remove_waiter(struct sleepq *sleepq, struct sleepq_waiter *waiter)
{
- (void)sleepq;
+ if (sleepq->oldest_waiter == waiter) {
+ sleepq_shift_oldest_waiter(sleepq);
+ }
+
list_remove(&waiter->node);
}
@@ -385,19 +436,48 @@ sleepq_empty(const struct sleepq *sleepq)
return list_empty(&sleepq->waiters);
}
-void
-sleepq_wait(struct sleepq *sleepq, const char *wchan)
+static int
+sleepq_wait_common(struct sleepq *sleepq, const char *wchan,
+ bool timed, uint64_t ticks)
{
struct sleepq_waiter waiter;
struct thread *thread;
+ int error;
thread = thread_self();
sleepq_waiter_init(&waiter, thread);
sleepq_add_waiter(sleepq, &waiter);
- thread_sleep(&sleepq->bucket->lock, sleepq->sync_obj, wchan);
+ if (!timed) {
+ thread_sleep(&sleepq->bucket->lock, sleepq->sync_obj, wchan);
+ error = 0;
+ } else {
+ error = thread_timedsleep(&sleepq->bucket->lock, sleepq->sync_obj,
+ wchan, ticks);
+
+ if (error && sleepq_waiter_pending_wakeup(&waiter)) {
+ error = 0;
+ }
+ }
sleepq_remove_waiter(sleepq, &waiter);
+
+ return error;
+}
+
+void
+sleepq_wait(struct sleepq *sleepq, const char *wchan)
+{
+ int error;
+
+ error = sleepq_wait_common(sleepq, wchan, false, 0);
+ assert(!error);
+}
+
+int
+sleepq_timedwait(struct sleepq *sleepq, const char *wchan, uint64_t ticks)
+{
+ return sleepq_wait_common(sleepq, wchan, true, ticks);
}
void
@@ -405,10 +485,55 @@ sleepq_signal(struct sleepq *sleepq)
{
struct sleepq_waiter *waiter;
- if (sleepq_empty(sleepq)) {
+ if (list_empty(&sleepq->waiters)) {
return;
}
waiter = list_last_entry(&sleepq->waiters, struct sleepq_waiter, node);
+ sleepq_waiter_set_pending_wakeup(waiter);
+ sleepq_waiter_wakeup(waiter);
+}
+
+static void
+sleepq_wakeup_common(struct sleepq *sleepq)
+{
+ struct sleepq_waiter *waiter;
+
+ assert(!list_empty(&sleepq->waiters));
+
+ waiter = list_last_entry(&sleepq->waiters, struct sleepq_waiter, node);
sleepq_waiter_wakeup(waiter);
}
+
+void
+sleepq_broadcast(struct sleepq *sleepq)
+{
+ struct sleepq_waiter *waiter;
+
+ if (sleepq->oldest_waiter == NULL) {
+ goto out;
+ }
+
+ list_for_each_entry(&sleepq->waiters, waiter, node) {
+ sleepq_waiter_set_pending_wakeup(waiter);
+
+ if (waiter == sleepq->oldest_waiter) {
+ break;
+ }
+ }
+
+ sleepq->oldest_waiter = NULL;
+
+out:
+ sleepq_wakeup_common(sleepq);
+}
+
+void
+sleepq_wakeup(struct sleepq *sleepq)
+{
+ if (list_empty(&sleepq->waiters)) {
+ return;
+ }
+
+ sleepq_wakeup_common(sleepq);
+}
diff --git a/kern/sleepq.h b/kern/sleepq.h
index 3de765a3..827d0436 100644
--- a/kern/sleepq.h
+++ b/kern/sleepq.h
@@ -26,17 +26,13 @@
* the associated mutex, at which point two sleep queues are locked.
* Handling condition variable sleep queues slightly differently
* allows preventing deadlocks while keeping overall complexity low.
- *
- * In addition, despite being used to implement condition variables,
- * this implementation doesn't provide a broadcast call. The rationale
- * is to force users to implement "chained waking" in order to avoid
- * the thundering herd effect.
*/
#ifndef _KERN_SLEEPQ_H
#define _KERN_SLEEPQ_H
#include <stdbool.h>
+#include <stdint.h>
#include <kern/init.h>
@@ -98,7 +94,7 @@ void sleepq_return(struct sleepq *sleepq, unsigned long flags);
bool sleepq_empty(const struct sleepq *sleepq);
/*
- * Wait for a wake up on the given sleep queue.
+ * Wait for a wake-up on the given sleep queue.
*
* The sleep queue must be lent when calling this function. It is
* released and later reacquired before returning from this function.
@@ -110,8 +106,13 @@ bool sleepq_empty(const struct sleepq *sleepq);
* the queue, the queue is not immediately considered empty.
*
* Threads are queued in FIFO order.
+ *
+ * When bounding the duration of the wait, the caller must pass an absolute
+ * time in ticks, and ERROR_TIMEDOUT is returned if that time is reached
+ * before the sleep queue is signalled.
*/
void sleepq_wait(struct sleepq *sleepq, const char *wchan);
+int sleepq_timedwait(struct sleepq *sleepq, const char *wchan, uint64_t ticks);
/*
* Wake up a thread waiting on the given sleep queue, if any.
@@ -125,10 +126,26 @@ void sleepq_wait(struct sleepq *sleepq, const char *wchan);
*
* Threads are queued in FIFO order, which means signalling a sleep
* queue multiple times always awakens the same thread, regardless
- * of new waiters, as long as that first thread didn't reacquire the
+ * of new waiters, as long as that thread didn't reacquire the
* sleep queue.
+ *
+ * A broadcast differs only by also making all currently waiting threads
+ * pending for wake-up. As with sleepq_signal, a single thread may be
+ * awaken. The rationale is to force users to implement "chained waking"
+ * in order to avoid the thundering herd effect.
*/
void sleepq_signal(struct sleepq *sleepq);
+void sleepq_broadcast(struct sleepq *sleepq);
+
+/*
+ * Wake up a pending thread.
+ *
+ * This function may only wake up a thread pending for wake-up after a
+ * broadcast. It is used to chain wake-ups to avoid the thundering herd
+ * effect. If there are no threads pending for wake-up, this function
+ * does nothing.
+ */
+void sleepq_wakeup(struct sleepq *sleepq);
/*
* This init operation provides :