diff options
Diffstat (limited to 'libpipe/pipe.c')
-rw-r--r-- | libpipe/pipe.c | 51 |
1 files changed, 41 insertions, 10 deletions
diff --git a/libpipe/pipe.c b/libpipe/pipe.c index 53b761d2..55ef773f 100644 --- a/libpipe/pipe.c +++ b/libpipe/pipe.c @@ -21,6 +21,7 @@ #include <string.h> /* For memset() */ #include <assert-backtrace.h> #include <stdlib.h> +#include <sys/socket.h> #include <mach/time_value.h> #include <mach/mach_host.h> @@ -352,7 +353,8 @@ pipe_send (struct pipe *pipe, int noblock, void *source, size_t left = pipe->write_limit - pipe_readable (pipe, 1); if (left < data_len) { - if (data_len <= pipe->write_atomic) + if (pipe->class->sock_type != SOCK_STREAM + || data_len <= pipe->write_atomic) return EWOULDBLOCK; else data_len = left; @@ -389,19 +391,52 @@ pipe_send (struct pipe *pipe, int noblock, void *source, done = 0; do { - size_t todo = data_len - done; - size_t left = pipe->write_limit - pipe_readable (pipe, 1); + size_t todo = data_len - done, piece; size_t partial_amount; - if (todo > left) - todo = left; + while(1) + { + size_t left = pipe->write_limit - pipe_readable (pipe, 1); + + piece = todo; - err = (*pipe->class->write)(pipe->queue, source, data + done, todo, + if (piece > left) + { + if (pipe->class->sock_type == SOCK_STREAM) + /* Can't write it all, split. */ + piece = left; + else + /* Can't write it in a single packet, wait. */ + piece = 0; + } + if (piece) + /* Ok, can progress. */ + break; + + if (!noblock) + { + /* No room, wait for people to consume enough. */ + size_t amount = pipe->class->sock_type == SOCK_STREAM ? 1 : todo; + err = pipe_wait_writable_amount (pipe, 0, amount); + if (err) + break; + } + } + if (err) + break; + + err = (*pipe->class->write)(pipe->queue, source, data + done, piece, &partial_amount); if (!err) { done += partial_amount; + if (done < data_len) + /* We are not supposed to record source in a split case, otherwise + we would record it several times and get spurious reference + releases. */ + assert_backtrace (!source); + timestamp (&pipe->write_time); /* And wakeup anyone that might be interested in it. */ @@ -415,10 +450,6 @@ pipe_send (struct pipe *pipe, int noblock, void *source, pthread_cond_broadcast (&pipe->pending_read_selects); pipe_select_cond_broadcast (pipe); } - - if (!noblock && done < data_len) - /* And wait for them to consume. */ - err = pipe_wait_writable (pipe, 0); } } while (!noblock && !err && done < data_len); |