summaryrefslogtreecommitdiff
path: root/libpipe
diff options
context:
space:
mode:
authorRichard Braun <rbraun@sceen.net>2012-09-03 15:47:25 +0200
committerSamuel Thibault <samuel.thibault@ens-lyon.org>2012-11-24 18:46:46 +0100
commit7648503cd894549874ff13f7ed94a4708cf50f0d (patch)
treed31187eabbacf81debf2bc4473b014c116ecbf34 /libpipe
parentb2d57eb33ddd8f24929f372bfb08bac3a29ac6ea (diff)
Remove condition implications
There is no equivalent for these functions in libpthread. Instead of adding them as non standard extensions, rework their use. * console-client/kbd-repeat.c (kbd_repeat_key): Wake threads waiting on select_alert. (kbd_setrepeater): Remove call to condition_implies. console-client/pc-mouse.c (repeat_event): Wake threads waiting on select_alert. (setrepeater): Remove call to condition_implies. * libpipe/pipe.c (pipe_create): Initialize the `pending_selects' member. (pipe_add_select_cond): New function. (pipe_remove_select_cond): Likewise. (pipe_select_cond_broadcast): Likewise. (_pipe_no_readers): Wake threads waiting on a pending select. (_pipe_no_writers): Likewise. (pipe_send): Likewise. (pipe_recv): Likewise. (pipe_pair_select): Replace condition implications by installing a pending select on the pair of pipes. * libpipe/pipe.h (struct pipe_select_cond): New type. (struct pipe): New member `pending_selects'. * pfinet/tunnel.c (tunnel_xmit): Wake threads waiting on tdev->select_alert. (setup_tunnel_device): Remove call to condition_implies. * term/devio.c (device_write_reply_inband): Wake threads waiting on select_alert. * term/hurdio.c (hurdio_writer_loop): Likewise. * term/main.c (main): Remove calls to condition_implies. * term/ptyio.c (ptyio_init): Remove calls to condition_implies, initialize pty_select_alert. (wake_reader): Wake threads waiting on pty_select_wakeup. * term/term.h (pty_select_alert): New variable. (clear_queue): Wake threads waiting on select_alert and, if acting on the input queue, pty_select_alert, unless it's NULL. (dequeue_quote): Likewise. (enqueue_internal): Likewise. (queue_erase): Likewise. * trans/streamio.c (clear_buffer): Wake threads waiting on select_alert. (buffer_read): Likewise. (buffer_write): Likewise. (device_read_reply_inband): Likewise. (device_write_reply_inband): Likewise. (main): Remove calls to condition_implies.
Diffstat (limited to 'libpipe')
-rw-r--r--libpipe/pipe.c89
-rw-r--r--libpipe/pipe.h9
2 files changed, 84 insertions, 14 deletions
diff --git a/libpipe/pipe.c b/libpipe/pipe.c
index 914816bc..85aac0e6 100644
--- a/libpipe/pipe.c
+++ b/libpipe/pipe.c
@@ -64,6 +64,7 @@ pipe_create (struct pipe_class *class, struct pipe **pipe)
condition_init (&new->pending_read_selects);
condition_init (&new->pending_writes);
condition_init (&new->pending_write_selects);
+ new->pending_selects = NULL;
mutex_init (&new->lock);
pq_create (&new->queue);
@@ -83,6 +84,63 @@ pipe_free (struct pipe *pipe)
free (pipe);
}
+static void
+pipe_add_select_cond (struct pipe *pipe, struct pipe_select_cond *cond)
+{
+ struct pipe_select_cond *first, *last;
+
+ first = pipe->pending_selects;
+
+ if (first == NULL)
+ {
+ cond->next = cond;
+ cond->prev = cond;
+ pipe->pending_selects = cond;
+ return;
+ }
+
+ last = first->prev;
+ cond->next = first;
+ cond->prev = last;
+ first->prev = cond;
+ last->next = cond;
+}
+
+static void
+pipe_remove_select_cond (struct pipe *pipe, struct pipe_select_cond *cond)
+{
+ cond->prev->next = cond->next;
+ cond->next->prev = cond->prev;
+
+ if (pipe->pending_selects == cond)
+ {
+ if (cond->next == cond)
+ pipe->pending_selects = NULL;
+ else
+ pipe->pending_selects = cond->next;
+ }
+}
+
+static void
+pipe_select_cond_broadcast (struct pipe *pipe)
+{
+ struct pipe_select_cond *cond, *last;
+
+ cond = pipe->pending_selects;
+
+ if (cond == NULL)
+ return;
+
+ last = cond->prev;
+
+ do
+ {
+ condition_broadcast (&cond->cond);
+ cond = cond->next;
+ }
+ while (cond != last);
+}
+
/* Take any actions necessary when PIPE acquires its first writer. */
void _pipe_first_writer (struct pipe *pipe)
{
@@ -113,6 +171,7 @@ void _pipe_no_readers (struct pipe *pipe)
{
condition_broadcast (&pipe->pending_writes);
condition_broadcast (&pipe->pending_write_selects);
+ pipe_select_cond_broadcast (pipe);
}
}
mutex_unlock (&pipe->lock);
@@ -135,6 +194,7 @@ void _pipe_no_writers (struct pipe *pipe)
{
condition_broadcast (&pipe->pending_reads);
condition_broadcast (&pipe->pending_read_selects);
+ pipe_select_cond_broadcast (pipe);
}
}
mutex_unlock (&pipe->lock);
@@ -171,22 +231,22 @@ pipe_pair_select (struct pipe *rpipe, struct pipe *wpipe,
/* ugh */
{
int rpipe_blocked, wpipe_blocked;
- struct condition pending_read_write_select;
+ struct pipe_select_cond pending_select;
size_t wlimit = wpipe->write_limit;
struct mutex *lock =
(wpipe == rpipe ? &rpipe->lock : &pipe_multiple_lock);
- condition_init (&pending_read_write_select);
- condition_implies (&rpipe->pending_read_selects,
- &pending_read_write_select);
- condition_implies (&wpipe->pending_write_selects,
- &pending_read_write_select);
+ condition_init (&pending_select.cond);
mutex_lock (lock);
- if (rpipe != wpipe)
+ if (rpipe == wpipe)
+ pipe_add_select_cond (rpipe, &pending_select);
+ else
{
mutex_lock (&rpipe->lock);
mutex_lock (&wpipe->lock);
+ pipe_add_select_cond (rpipe, &pending_select);
+ pipe_add_select_cond (wpipe, &pending_select);
}
rpipe_blocked =
@@ -200,7 +260,7 @@ pipe_pair_select (struct pipe *rpipe, struct pipe *wpipe,
mutex_unlock (&rpipe->lock);
mutex_unlock (&wpipe->lock);
}
- if (hurd_condition_wait (&pending_read_write_select, lock))
+ if (hurd_condition_wait (&pending_select.cond, lock))
err = EINTR;
if (rpipe != wpipe)
{
@@ -223,17 +283,16 @@ pipe_pair_select (struct pipe *rpipe, struct pipe *wpipe,
*select_type &= ~SELECT_WRITE;
}
- if (rpipe != wpipe)
+ if (rpipe == wpipe)
+ pipe_remove_select_cond (rpipe, &pending_select);
+ else
{
+ pipe_remove_select_cond (rpipe, &pending_select);
+ pipe_remove_select_cond (wpipe, &pending_select);
mutex_unlock (&rpipe->lock);
mutex_unlock (&wpipe->lock);
}
mutex_unlock (lock);
-
- condition_unimplies (&rpipe->pending_read_selects,
- &pending_read_write_select);
- condition_unimplies (&wpipe->pending_write_selects,
- &pending_read_write_select);
}
return err;
@@ -306,6 +365,7 @@ pipe_send (struct pipe *pipe, int noblock, void *source,
if (pipe_is_readable (pipe, 0))
{
condition_broadcast (&pipe->pending_read_selects);
+ pipe_select_cond_broadcast (pipe);
/* We leave PIPE locked here, assuming the caller will soon unlock
it and allow others access. */
}
@@ -407,6 +467,7 @@ pipe_recv (struct pipe *pipe, int noblock, unsigned *flags, void **source,
if (pipe_readable (pipe, 1) < pipe->write_limit)
{
condition_broadcast (&pipe->pending_write_selects);
+ pipe_select_cond_broadcast (pipe);
/* We leave PIPE locked here, assuming the caller will soon unlock
it and allow others access. */
}
diff --git a/libpipe/pipe.h b/libpipe/pipe.h
index 96432990..a3590fc4 100644
--- a/libpipe/pipe.h
+++ b/libpipe/pipe.h
@@ -62,6 +62,13 @@ extern struct pipe_class *stream_pipe_class;
extern struct pipe_class *dgram_pipe_class;
extern struct pipe_class *seqpack_pipe_class;
+struct pipe_select_cond
+{
+ struct pipe_select_cond *next;
+ struct pipe_select_cond *prev;
+ struct condition cond;
+};
+
/* A unidirectional data pipe; it transfers data from READER to WRITER. */
struct pipe
{
@@ -88,6 +95,8 @@ struct pipe
struct condition pending_writes;
struct condition pending_write_selects;
+ struct pipe_select_cond *pending_selects;
+
/* The maximum number of characters that this pipe will hold without
further writes blocking. */
size_t write_limit;