diff options
author | David S. Miller <davem@davemloft.net> | 2023-01-07 23:10:33 +0000 |
---|---|---|
committer | David S. Miller <davem@davemloft.net> | 2023-01-07 23:10:33 +0000 |
commit | 571f3dd0d01b62ec63a4039320dbdbcd54ae8fb0 (patch) | |
tree | f5f0cbce0c74c7e91821d087b892bc2bbd4644ff /net/rxrpc/call_object.c | |
parent | b4e9b8763e417db31c7088103cc557d55cb7a8f5 (diff) | |
parent | 42f229c350f57a8e825f7591e17cbc5c87e50235 (diff) |
Merge tag 'rxrpc-fixes-20230107' of git://git.kernel.org/pub/scm/linux/kernel/git/dhowells/linux-fs
David Howells says:
====================
rxrpc: Fix race between call connection, data transmit and call disconnect
Here are patches to fix an oops[1] caused by a race between call
connection, initial packet transmission and call disconnection which
results in something like:
kernel BUG at net/rxrpc/peer_object.c:413!
when the syzbot test is run. The problem is that the connection procedure
is effectively split across two threads and can get expanded by taking an
interrupt, thereby adding the call to the peer error distribution list
*after* it has been disconnected (say by the rxrpc socket shutting down).
The easiest solution is to look at the fourth set of I/O thread
conversion/SACK table expansion patches that didn't get applied[2] and take
from it those patches that move call connection and disconnection into the
I/O thread. Moving these things into the I/O thread means that the
sequencing is managed by all being done in the same thread - and the race
can no longer happen.
This is preferable to introducing an extra lock as adding an extra lock
would make the I/O thread have to wait for the app thread in yet another
place.
The changes can be considered as a number of logical parts:
(1) Move all of the call state changes into the I/O thread.
(2) Make client connection ID space per-local endpoint so that the I/O
thread doesn't need locks to access it.
(3) Move actual abort generation into the I/O thread and clean it up. If
sendmsg or recvmsg want to cause an abort, they have to delegate it.
(4) Offload the setting up of the security context on a connection to the
thread of one of the apps that's starting a call. We don't want to be
doing any sort of crypto in the I/O thread.
(5) Connect calls (ie. assign them to channel slots on connections) in the
I/O thread. Calls are set up by sendmsg/kafs and passed to the I/O
thread to connect. Connections are allocated in the I/O thread after
this.
(6) Disconnect calls in the I/O thread.
I've also added a patch for an unrelated bug that cropped up during
testing, whereby a race can occur between an incoming call and socket
shutdown.
Note that whilst this fixes the original syzbot bug, another bug may get
triggered if this one is fixed:
INFO: rcu detected stall in corrupted
rcu: INFO: rcu_preempt detected expedited stalls on CPUs/tasks: { P5792 } 2657 jiffies s: 2825 root: 0x0/T
rcu: blocking rcu_node structures (internal RCU debug):
It doesn't look this should be anything to do with rxrpc, though, as I've
tested an additional patch[3] that removes practically all the RCU usage
from rxrpc and it still occurs. It seems likely that it is being caused by
something in the tunnelling setup that the syzbot test does, but there's
not enough info to go on. It also seems unlikely to be anything to do with
the afs driver as the test doesn't use that.
====================
Signed-off-by: David S. Miller <davem@davemloft.net>
Diffstat (limited to 'net/rxrpc/call_object.c')
-rw-r--r-- | net/rxrpc/call_object.c | 116 |
1 files changed, 70 insertions, 46 deletions
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c index 89dcf60b11587..3ded5a24627c5 100644 --- a/net/rxrpc/call_object.c +++ b/net/rxrpc/call_object.c @@ -50,7 +50,7 @@ void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what) struct rxrpc_local *local = call->local; bool busy; - if (call->state < RXRPC_CALL_COMPLETE) { + if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) { spin_lock_bh(&local->lock); busy = !list_empty(&call->attend_link); trace_rxrpc_poke_call(call, busy, what); @@ -69,7 +69,7 @@ static void rxrpc_call_timer_expired(struct timer_list *t) _enter("%d", call->debug_id); - if (call->state < RXRPC_CALL_COMPLETE) { + if (!__rxrpc_call_is_complete(call)) { trace_rxrpc_timer_expired(call, jiffies); rxrpc_poke_call(call, rxrpc_call_poke_timer); } @@ -150,7 +150,7 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp, timer_setup(&call->timer, rxrpc_call_timer_expired, 0); INIT_WORK(&call->destroyer, rxrpc_destroy_call); INIT_LIST_HEAD(&call->link); - INIT_LIST_HEAD(&call->chan_wait_link); + INIT_LIST_HEAD(&call->wait_link); INIT_LIST_HEAD(&call->accept_link); INIT_LIST_HEAD(&call->recvmsg_link); INIT_LIST_HEAD(&call->sock_link); @@ -162,7 +162,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp, init_waitqueue_head(&call->waitq); spin_lock_init(&call->notify_lock); spin_lock_init(&call->tx_lock); - rwlock_init(&call->state_lock); refcount_set(&call->ref, 1); call->debug_id = debug_id; call->tx_total_len = -1; @@ -211,7 +210,6 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx, now = ktime_get_real(); call->acks_latest_ts = now; call->cong_tstamp = now; - call->state = RXRPC_CALL_CLIENT_AWAIT_CONN; call->dest_srx = *srx; call->interruptibility = p->interruptibility; call->tx_total_len = p->tx_total_len; @@ -227,11 +225,13 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx, ret = rxrpc_init_client_call_security(call); if (ret < 0) { - __rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, 0, ret); + rxrpc_prefail_call(call, RXRPC_CALL_LOCAL_ERROR, ret); rxrpc_put_call(call, rxrpc_call_put_discard_error); return ERR_PTR(ret); } + rxrpc_set_call_state(call, RXRPC_CALL_CLIENT_AWAIT_CONN); + trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), p->user_call_ID, rxrpc_call_new_client); @@ -242,7 +242,7 @@ static struct rxrpc_call *rxrpc_alloc_client_call(struct rxrpc_sock *rx, /* * Initiate the call ack/resend/expiry timer. */ -static void rxrpc_start_call_timer(struct rxrpc_call *call) +void rxrpc_start_call_timer(struct rxrpc_call *call) { unsigned long now = jiffies; unsigned long j = now + MAX_JIFFY_OFFSET; @@ -287,6 +287,39 @@ static void rxrpc_put_call_slot(struct rxrpc_call *call) } /* + * Start the process of connecting a call. We obtain a peer and a connection + * bundle, but the actual association of a call with a connection is offloaded + * to the I/O thread to simplify locking. + */ +static int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp) +{ + struct rxrpc_local *local = call->local; + int ret = 0; + + _enter("{%d,%lx},", call->debug_id, call->user_call_ID); + + call->peer = rxrpc_lookup_peer(local, &call->dest_srx, gfp); + if (!call->peer) + goto error; + + ret = rxrpc_look_up_bundle(call, gfp); + if (ret < 0) + goto error; + + trace_rxrpc_client(NULL, -1, rxrpc_client_queue_new_call); + rxrpc_get_call(call, rxrpc_call_get_io_thread); + spin_lock(&local->client_call_lock); + list_add_tail(&call->wait_link, &local->new_client_calls); + spin_unlock(&local->client_call_lock); + rxrpc_wake_up_io_thread(local); + return 0; + +error: + __set_bit(RXRPC_CALL_DISCONNECTED, &call->flags); + return ret; +} + +/* * Set up a call for the given parameters. * - Called with the socket lock held, which it must release. * - If it returns a call, the call's lock will need releasing by the caller. @@ -365,14 +398,10 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx, /* Set up or get a connection record and set the protocol parameters, * including channel number and call ID. */ - ret = rxrpc_connect_call(rx, call, cp, srx, gfp); + ret = rxrpc_connect_call(call, gfp); if (ret < 0) goto error_attached_to_socket; - rxrpc_see_call(call, rxrpc_call_see_connected); - - rxrpc_start_call_timer(call); - _leave(" = %p [new]", call); return call; @@ -384,27 +413,23 @@ struct rxrpc_call *rxrpc_new_client_call(struct rxrpc_sock *rx, error_dup_user_ID: write_unlock(&rx->call_lock); release_sock(&rx->sk); - __rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, - RX_CALL_DEAD, -EEXIST); + rxrpc_prefail_call(call, RXRPC_CALL_LOCAL_ERROR, -EEXIST); trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), 0, rxrpc_call_see_userid_exists); - rxrpc_release_call(rx, call); mutex_unlock(&call->user_mutex); rxrpc_put_call(call, rxrpc_call_put_userid_exists); _leave(" = -EEXIST"); return ERR_PTR(-EEXIST); /* We got an error, but the call is attached to the socket and is in - * need of release. However, we might now race with recvmsg() when - * completing the call queues it. Return 0 from sys_sendmsg() and + * need of release. However, we might now race with recvmsg() when it + * completion notifies the socket. Return 0 from sys_sendmsg() and * leave the error to recvmsg() to deal with. */ error_attached_to_socket: trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), ret, rxrpc_call_see_connect_failed); - set_bit(RXRPC_CALL_DISCONNECTED, &call->flags); - __rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, - RX_CALL_DEAD, ret); + rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR, 0, ret); _leave(" = c=%08x [err]", call->debug_id); return call; } @@ -427,32 +452,32 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx, call->call_id = sp->hdr.callNumber; call->dest_srx.srx_service = sp->hdr.serviceId; call->cid = sp->hdr.cid; - call->state = RXRPC_CALL_SERVER_SECURING; call->cong_tstamp = skb->tstamp; + __set_bit(RXRPC_CALL_EXPOSED, &call->flags); + rxrpc_set_call_state(call, RXRPC_CALL_SERVER_SECURING); + spin_lock(&conn->state_lock); switch (conn->state) { case RXRPC_CONN_SERVICE_UNSECURED: case RXRPC_CONN_SERVICE_CHALLENGING: - call->state = RXRPC_CALL_SERVER_SECURING; + rxrpc_set_call_state(call, RXRPC_CALL_SERVER_SECURING); break; case RXRPC_CONN_SERVICE: - call->state = RXRPC_CALL_SERVER_RECV_REQUEST; + rxrpc_set_call_state(call, RXRPC_CALL_SERVER_RECV_REQUEST); break; - case RXRPC_CONN_REMOTELY_ABORTED: - __rxrpc_set_call_completion(call, RXRPC_CALL_REMOTELY_ABORTED, - conn->abort_code, conn->error); - break; - case RXRPC_CONN_LOCALLY_ABORTED: - __rxrpc_abort_call("CON", call, 1, - conn->abort_code, conn->error); + case RXRPC_CONN_ABORTED: + rxrpc_set_call_completion(call, conn->completion, + conn->abort_code, conn->error); break; default: BUG(); } + rxrpc_get_call(call, rxrpc_call_get_io_thread); + /* Set the channel for this call. We don't get channel_lock as we're * only defending against the data_ready handler (which we're called * from) and the RESPONSE packet parser (which is only really @@ -462,7 +487,7 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx, chan = sp->hdr.cid & RXRPC_CHANNELMASK; conn->channels[chan].call_counter = call->call_id; conn->channels[chan].call_id = call->call_id; - rcu_assign_pointer(conn->channels[chan].call, call); + conn->channels[chan].call = call; spin_unlock(&conn->state_lock); spin_lock(&conn->peer->lock); @@ -522,20 +547,17 @@ static void rxrpc_cleanup_ring(struct rxrpc_call *call) void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call) { struct rxrpc_connection *conn = call->conn; - bool put = false; + bool put = false, putu = false; _enter("{%d,%d}", call->debug_id, refcount_read(&call->ref)); trace_rxrpc_call(call->debug_id, refcount_read(&call->ref), call->flags, rxrpc_call_see_release); - ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE); - if (test_and_set_bit(RXRPC_CALL_RELEASED, &call->flags)) BUG(); rxrpc_put_call_slot(call); - del_timer_sync(&call->timer); /* Make sure we don't get any more notifications */ write_lock(&rx->recvmsg_lock); @@ -560,7 +582,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call) if (test_and_clear_bit(RXRPC_CALL_HAS_USERID, &call->flags)) { rb_erase(&call->sock_node, &rx->calls); memset(&call->sock_node, 0xdd, sizeof(call->sock_node)); - rxrpc_put_call(call, rxrpc_call_put_userid_exists); + putu = true; } list_del(&call->sock_link); @@ -568,10 +590,9 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call) _debug("RELEASE CALL %p (%d CONN %p)", call, call->debug_id, conn); - if (conn && !test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) - rxrpc_disconnect_call(call); - if (call->security) - call->security->free_call_crypto(call); + if (putu) + rxrpc_put_call(call, rxrpc_call_put_userid); + _leave(""); } @@ -588,7 +609,8 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx) call = list_entry(rx->to_be_accepted.next, struct rxrpc_call, accept_link); list_del(&call->accept_link); - rxrpc_abort_call("SKR", call, 0, RX_CALL_DEAD, -ECONNRESET); + rxrpc_propose_abort(call, RX_CALL_DEAD, -ECONNRESET, + rxrpc_abort_call_sock_release_tba); rxrpc_put_call(call, rxrpc_call_put_release_sock_tba); } @@ -596,8 +618,8 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx) call = list_entry(rx->sock_calls.next, struct rxrpc_call, sock_link); rxrpc_get_call(call, rxrpc_call_get_release_sock); - rxrpc_abort_call("SKT", call, 0, RX_CALL_DEAD, -ECONNRESET); - rxrpc_send_abort_packet(call); + rxrpc_propose_abort(call, RX_CALL_DEAD, -ECONNRESET, + rxrpc_abort_call_sock_release); rxrpc_release_call(rx, call); rxrpc_put_call(call, rxrpc_call_put_release_sock); } @@ -620,7 +642,7 @@ void rxrpc_put_call(struct rxrpc_call *call, enum rxrpc_call_trace why) dead = __refcount_dec_and_test(&call->ref, &r); trace_rxrpc_call(debug_id, r - 1, 0, why); if (dead) { - ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE); + ASSERTCMP(__rxrpc_call_state(call), ==, RXRPC_CALL_COMPLETE); if (!list_empty(&call->link)) { spin_lock(&rxnet->call_lock); @@ -669,6 +691,8 @@ static void rxrpc_destroy_call(struct work_struct *work) rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned); rxrpc_put_connection(call->conn, rxrpc_conn_put_call); + rxrpc_deactivate_bundle(call->bundle); + rxrpc_put_bundle(call->bundle, rxrpc_bundle_put_call); rxrpc_put_peer(call->peer, rxrpc_peer_put_call); rxrpc_put_local(call->local, rxrpc_local_put_call); call_rcu(&call->rcu, rxrpc_rcu_free_call); @@ -681,7 +705,7 @@ void rxrpc_cleanup_call(struct rxrpc_call *call) { memset(&call->sock_node, 0xcd, sizeof(call->sock_node)); - ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE); + ASSERTCMP(__rxrpc_call_state(call), ==, RXRPC_CALL_COMPLETE); ASSERT(test_bit(RXRPC_CALL_RELEASED, &call->flags)); del_timer(&call->timer); @@ -719,7 +743,7 @@ void rxrpc_destroy_all_calls(struct rxrpc_net *rxnet) pr_err("Call %p still in use (%d,%s,%lx,%lx)!\n", call, refcount_read(&call->ref), - rxrpc_call_states[call->state], + rxrpc_call_states[__rxrpc_call_state(call)], call->flags, call->events); spin_unlock(&rxnet->call_lock); |