summaryrefslogtreecommitdiff
path: root/libpipe/pipe.c
diff options
context:
space:
mode:
authorSamuel Thibault <samuel.thibault@ens-lyon.org>2025-03-05 00:23:43 +0100
committerSamuel Thibault <samuel.thibault@ens-lyon.org>2025-03-05 00:37:08 +0100
commit9712534e7cb0e60e51448896aa89a01f680a5fac (patch)
tree51785f119b6292b59c36915cceca10cd41bc6d26 /libpipe/pipe.c
parentaad9ff97ff277fae87870faed8d0ec263079e040 (diff)
libpipe: Do not split writes on dgrem and seqpack pipes
They really should not be split for the application, and really should not be queued several times because otherwise we would record the source several times and get a reference miscount.
Diffstat (limited to 'libpipe/pipe.c')
-rw-r--r--libpipe/pipe.c51
1 files changed, 41 insertions, 10 deletions
diff --git a/libpipe/pipe.c b/libpipe/pipe.c
index 53b761d2..55ef773f 100644
--- a/libpipe/pipe.c
+++ b/libpipe/pipe.c
@@ -21,6 +21,7 @@
#include <string.h> /* For memset() */
#include <assert-backtrace.h>
#include <stdlib.h>
+#include <sys/socket.h>
#include <mach/time_value.h>
#include <mach/mach_host.h>
@@ -352,7 +353,8 @@ pipe_send (struct pipe *pipe, int noblock, void *source,
size_t left = pipe->write_limit - pipe_readable (pipe, 1);
if (left < data_len)
{
- if (data_len <= pipe->write_atomic)
+ if (pipe->class->sock_type != SOCK_STREAM
+ || data_len <= pipe->write_atomic)
return EWOULDBLOCK;
else
data_len = left;
@@ -389,19 +391,52 @@ pipe_send (struct pipe *pipe, int noblock, void *source,
done = 0;
do
{
- size_t todo = data_len - done;
- size_t left = pipe->write_limit - pipe_readable (pipe, 1);
+ size_t todo = data_len - done, piece;
size_t partial_amount;
- if (todo > left)
- todo = left;
+ while(1)
+ {
+ size_t left = pipe->write_limit - pipe_readable (pipe, 1);
+
+ piece = todo;
- err = (*pipe->class->write)(pipe->queue, source, data + done, todo,
+ if (piece > left)
+ {
+ if (pipe->class->sock_type == SOCK_STREAM)
+ /* Can't write it all, split. */
+ piece = left;
+ else
+ /* Can't write it in a single packet, wait. */
+ piece = 0;
+ }
+ if (piece)
+ /* Ok, can progress. */
+ break;
+
+ if (!noblock)
+ {
+ /* No room, wait for people to consume enough. */
+ size_t amount = pipe->class->sock_type == SOCK_STREAM ? 1 : todo;
+ err = pipe_wait_writable_amount (pipe, 0, amount);
+ if (err)
+ break;
+ }
+ }
+ if (err)
+ break;
+
+ err = (*pipe->class->write)(pipe->queue, source, data + done, piece,
&partial_amount);
if (!err)
{
done += partial_amount;
+ if (done < data_len)
+ /* We are not supposed to record source in a split case, otherwise
+ we would record it several times and get spurious reference
+ releases. */
+ assert_backtrace (!source);
+
timestamp (&pipe->write_time);
/* And wakeup anyone that might be interested in it. */
@@ -415,10 +450,6 @@ pipe_send (struct pipe *pipe, int noblock, void *source,
pthread_cond_broadcast (&pipe->pending_read_selects);
pipe_select_cond_broadcast (pipe);
}
-
- if (!noblock && done < data_len)
- /* And wait for them to consume. */
- err = pipe_wait_writable (pipe, 0);
}
}
while (!noblock && !err && done < data_len);