summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--libpipe/pipe.c51
1 files changed, 41 insertions, 10 deletions
diff --git a/libpipe/pipe.c b/libpipe/pipe.c
index 53b761d2..55ef773f 100644
--- a/libpipe/pipe.c
+++ b/libpipe/pipe.c
@@ -21,6 +21,7 @@
#include <string.h> /* For memset() */
#include <assert-backtrace.h>
#include <stdlib.h>
+#include <sys/socket.h>
#include <mach/time_value.h>
#include <mach/mach_host.h>
@@ -352,7 +353,8 @@ pipe_send (struct pipe *pipe, int noblock, void *source,
size_t left = pipe->write_limit - pipe_readable (pipe, 1);
if (left < data_len)
{
- if (data_len <= pipe->write_atomic)
+ if (pipe->class->sock_type != SOCK_STREAM
+ || data_len <= pipe->write_atomic)
return EWOULDBLOCK;
else
data_len = left;
@@ -389,19 +391,52 @@ pipe_send (struct pipe *pipe, int noblock, void *source,
done = 0;
do
{
- size_t todo = data_len - done;
- size_t left = pipe->write_limit - pipe_readable (pipe, 1);
+ size_t todo = data_len - done, piece;
size_t partial_amount;
- if (todo > left)
- todo = left;
+ while(1)
+ {
+ size_t left = pipe->write_limit - pipe_readable (pipe, 1);
+
+ piece = todo;
- err = (*pipe->class->write)(pipe->queue, source, data + done, todo,
+ if (piece > left)
+ {
+ if (pipe->class->sock_type == SOCK_STREAM)
+ /* Can't write it all, split. */
+ piece = left;
+ else
+ /* Can't write it in a single packet, wait. */
+ piece = 0;
+ }
+ if (piece)
+ /* Ok, can progress. */
+ break;
+
+ if (!noblock)
+ {
+ /* No room, wait for people to consume enough. */
+ size_t amount = pipe->class->sock_type == SOCK_STREAM ? 1 : todo;
+ err = pipe_wait_writable_amount (pipe, 0, amount);
+ if (err)
+ break;
+ }
+ }
+ if (err)
+ break;
+
+ err = (*pipe->class->write)(pipe->queue, source, data + done, piece,
&partial_amount);
if (!err)
{
done += partial_amount;
+ if (done < data_len)
+ /* We are not supposed to record source in a split case, otherwise
+ we would record it several times and get spurious reference
+ releases. */
+ assert_backtrace (!source);
+
timestamp (&pipe->write_time);
/* And wakeup anyone that might be interested in it. */
@@ -415,10 +450,6 @@ pipe_send (struct pipe *pipe, int noblock, void *source,
pthread_cond_broadcast (&pipe->pending_read_selects);
pipe_select_cond_broadcast (pipe);
}
-
- if (!noblock && done < data_len)
- /* And wait for them to consume. */
- err = pipe_wait_writable (pipe, 0);
}
}
while (!noblock && !err && done < data_len);