diff options
author | Samuel Thibault <samuel.thibault@ens-lyon.org> | 2025-03-05 00:23:43 +0100 |
---|---|---|
committer | Samuel Thibault <samuel.thibault@ens-lyon.org> | 2025-03-05 00:37:08 +0100 |
commit | 9712534e7cb0e60e51448896aa89a01f680a5fac (patch) | |
tree | 51785f119b6292b59c36915cceca10cd41bc6d26 | |
parent | aad9ff97ff277fae87870faed8d0ec263079e040 (diff) |
libpipe: Do not split writes on dgrem and seqpack pipes
They really should not be split for the application, and really should
not be queued several times because otherwise we would record the source
several times and get a reference miscount.
-rw-r--r-- | libpipe/pipe.c | 51 |
1 files changed, 41 insertions, 10 deletions
diff --git a/libpipe/pipe.c b/libpipe/pipe.c index 53b761d2..55ef773f 100644 --- a/libpipe/pipe.c +++ b/libpipe/pipe.c @@ -21,6 +21,7 @@ #include <string.h> /* For memset() */ #include <assert-backtrace.h> #include <stdlib.h> +#include <sys/socket.h> #include <mach/time_value.h> #include <mach/mach_host.h> @@ -352,7 +353,8 @@ pipe_send (struct pipe *pipe, int noblock, void *source, size_t left = pipe->write_limit - pipe_readable (pipe, 1); if (left < data_len) { - if (data_len <= pipe->write_atomic) + if (pipe->class->sock_type != SOCK_STREAM + || data_len <= pipe->write_atomic) return EWOULDBLOCK; else data_len = left; @@ -389,19 +391,52 @@ pipe_send (struct pipe *pipe, int noblock, void *source, done = 0; do { - size_t todo = data_len - done; - size_t left = pipe->write_limit - pipe_readable (pipe, 1); + size_t todo = data_len - done, piece; size_t partial_amount; - if (todo > left) - todo = left; + while(1) + { + size_t left = pipe->write_limit - pipe_readable (pipe, 1); + + piece = todo; - err = (*pipe->class->write)(pipe->queue, source, data + done, todo, + if (piece > left) + { + if (pipe->class->sock_type == SOCK_STREAM) + /* Can't write it all, split. */ + piece = left; + else + /* Can't write it in a single packet, wait. */ + piece = 0; + } + if (piece) + /* Ok, can progress. */ + break; + + if (!noblock) + { + /* No room, wait for people to consume enough. */ + size_t amount = pipe->class->sock_type == SOCK_STREAM ? 1 : todo; + err = pipe_wait_writable_amount (pipe, 0, amount); + if (err) + break; + } + } + if (err) + break; + + err = (*pipe->class->write)(pipe->queue, source, data + done, piece, &partial_amount); if (!err) { done += partial_amount; + if (done < data_len) + /* We are not supposed to record source in a split case, otherwise + we would record it several times and get spurious reference + releases. */ + assert_backtrace (!source); + timestamp (&pipe->write_time); /* And wakeup anyone that might be interested in it. */ @@ -415,10 +450,6 @@ pipe_send (struct pipe *pipe, int noblock, void *source, pthread_cond_broadcast (&pipe->pending_read_selects); pipe_select_cond_broadcast (pipe); } - - if (!noblock && done < data_len) - /* And wait for them to consume. */ - err = pipe_wait_writable (pipe, 0); } } while (!noblock && !err && done < data_len); |