diff options
Diffstat (limited to 'libhurd-cap-server/bucket-manage-mt.c')
-rw-r--r-- | libhurd-cap-server/bucket-manage-mt.c | 881 |
1 files changed, 863 insertions, 18 deletions
diff --git a/libhurd-cap-server/bucket-manage-mt.c b/libhurd-cap-server/bucket-manage-mt.c index 6369fcb..790f4d8 100644 --- a/libhurd-cap-server/bucket-manage-mt.c +++ b/libhurd-cap-server/bucket-manage-mt.c @@ -26,34 +26,879 @@ #include <errno.h> #include <stdlib.h> #include <assert.h> +#include <stdint.h> #include <pthread.h> #include "cap-server-intern.h" -/* Start managing RPCs on the bucket BUCKET. The BOOTSTRAP capability +/* FIXME: Throughout this file, for debugging the behaviour could be + relaxed to return errors to callers which would otherwise be + ignored (due to malformed requests etc). */ + + +/* FIXME: This section contains a lot of random junk that maybe should + be somewhere else (or not). */ + +/* Cancel the pending RPC of the specified thread. */ +#define HURD_CAP_MSG_LABEL_CANCEL 0x100 +#define HURD_CAP_MSG_LABEL_GET_ROOT 0x101 + +/* Some error for this. */ +#define ECAP_NOREPLY 0x10001 +#define ECAP_DIED 0x10002 + + +/* The msg labels of the reply from the worker to the manager. */ +#define _HURD_CAP_MSG_WORKER_ACCEPTED 0 +#define _HURD_CAP_MSG_WORKER_REJECTED 1 + +struct worker_info +{ + /* The bucket. */ + hurd_cap_bucket_t bucket; + + /* The manager thread. */ + l4_thread_id_t manager_tid; + + /* The timeout for the worker thread as an L4 time period. */ + l4_time_t timeout; +}; + + +static void +__attribute__((always_inline)) +reply_err (l4_thread_id_t to, error_t err) +{ +#define HURD_L4_ERROR_LABEL ((uint16_t) INT16_MIN) +#define HURD_L4_ERROR_TAG ((HURD_L4_ERROR_LABEL << 16) & 1) + l4_set_msg_tag (HURD_L4_ERROR_TAG); + l4_load_mr (1, err); + l4_reply (to); +} + + +/* Lookup the client with client ID CLIENT_ID and return it in + *R_CLIENT with one reference for the entry in the bucket. It is + *verified that the client is in fact the one with the task ID + *TASK_ID. */ +static error_t +__attribute__((always_inline)) +lookup_client (hurd_cap_bucket_t bucket, hurd_cap_client_id_t client_id, + hurd_task_id_t task_id, _hurd_cap_client_t *r_client) +{ + error_t err = 0; + _hurd_cap_client_entry_t entry; + + pthread_mutex_lock (&bucket->lock); + /* Look up the client by its ID. */ + entry = hurd_table_lookup (&bucket->clients, client_id); + if (!entry || entry->dead || entry->client->task_id != task_id) + err = ECAP_NOREPLY; + else + { + entry->refs++; + *r_client = entry->client; + } + pthread_mutex_unlock (&bucket->lock); + + return err; +} + + +/* Process the message MSG from thread FROM in worker thread WORKER of + bucket BUCKET. Return ECAP_NOREPLY if no reply should be sent. Any + other error will be replied to the user. If 0 is returned, MSG + must contain the reply message. */ +static error_t +__attribute__((always_inline)) +manage_demuxer (hurd_cap_rpc_context_t ctx, _hurd_cap_list_item_t worker) +{ + error_t err = 0; + hurd_cap_bucket_t bucket = ctx->bucket; + l4_thread_id_t from = ctx->from; + _hurd_cap_client_t client; + hurd_cap_t cap; + hurd_cap_class_t cap_class; + hurd_cap_obj_t obj; + _hurd_cap_obj_entry_t obj_entry; + struct _hurd_cap_list_item worker_client; + struct _hurd_cap_list_item worker_class; + struct _hurd_cap_list_item worker_obj; + + worker_client.thread = worker->thread; + worker_client.tid = worker->tid; + worker_client.next = NULL; + worker_client.prevp = NULL; + + worker_class = worker_client; + worker_obj = worker_client; + + if (l4_msg_label (ctx->msg) == HURD_CAP_MSG_LABEL_GET_ROOT) + { + /* This is the "get the root capability" RPC. FIXME: Needs to + be implemented. */ + return ENOSYS; + } + + /* Every normal RPC must have at least one untyped word, which + contains the client and capability ID. Otherwise the message is + malformed, and thus ignored. */ + if (l4_untyped_words (l4_msg_msg_tag (ctx->msg)) < 1) + return ECAP_NOREPLY; + cap = l4_msg_word (ctx->msg, 0); + + err = lookup_client (bucket, hurd_cap_client_id (cap), ctx->sender, &client); + if (err) + return err; + + /* At this point, CLIENT_ID and CLIENT are valid, and we have one + reference for the client entry. */ + + pthread_mutex_lock (&client->lock); + while (!err && client->state != _HURD_CAP_STATE_GREEN) + { + if (client->state == _HURD_CAP_STATE_BLACK) + err = ECAP_NOREPLY; + else + err = hurd_cond_wait (&bucket->cond, &bucket->lock); + } + if (err) + { + pthread_mutex_unlock (&client->lock); + /* Either the client died, or we have been canceled. */ + _hurd_cap_client_release (bucket, client->id); + return err; + } + + { + _hurd_cap_obj_entry_t *entry; + + entry = (_hurd_cap_obj_entry_t *) hurd_table_lookup (&client->caps, + hurd_cap_id (cap)); + if (!entry) + err = ECAP_NOREPLY; + else + { + obj_entry = *entry; + + if (!obj_entry->external_refs) + err = ECAP_NOREPLY; + else if (obj_entry->dead) + err = ECAP_DIED; + else + { + obj_entry->internal_refs++; + obj = obj_entry->cap_obj; + } + } + } + if (err) + { + /* Either the capability ID is invalid, or it was revoked. */ + pthread_mutex_unlock (&client->lock); + _hurd_cap_client_release (bucket, client->id); + return err; + } + + /* At this point, CAP_ID, OBJ_ENTRY and OBJ are valid. We have one + internal reference for the capability entry. */ + + /* Add ourself to the pending_rpcs list of the client. */ + _hurd_cap_list_item_add (&client->pending_rpcs, &worker_client); + pthread_mutex_unlock (&client->lock); + + cap_class = obj->cap_class; + + pthread_mutex_lock (&cap_class->lock); + /* First, we have to check if the class is inhibited, and if it is, + we have to wait until it is uninhibited. */ + while (!err && cap_class->state != _HURD_CAP_STATE_GREEN) + err = hurd_cond_wait (&cap_class->cond, &cap_class->lock); + if (err) + { + /* Canceled. */ + pthread_mutex_unlock (&cap_class->lock); + goto client_cleanup; + } + + _hurd_cap_list_item_add (&cap_class->pending_rpcs, &worker_class); + pthread_mutex_unlock (&cap_class->lock); + + + pthread_mutex_lock (&obj->lock); + /* First, we have to check if the object is inhibited, and if it is, + we have to wait until it is uninhibited. */ + if (obj->state != _HURD_CAP_STATE_GREEN) + { + pthread_mutex_unlock (&obj->lock); + pthread_mutex_lock (&cap_class->obj_cond_lock); + pthread_mutex_lock (&obj->lock); + while (!err && obj->state != _HURD_CAP_STATE_GREEN) + { + pthread_mutex_unlock (&obj->lock); + err = hurd_cond_wait (&cap_class->obj_cond, + &cap_class->obj_cond_lock); + pthread_mutex_lock (&obj->lock); + } + pthread_mutex_unlock (&cap_class->obj_cond_lock); + } + if (err) + { + /* Canceled. */ + pthread_mutex_unlock (&obj->lock); + goto class_cleanup; + } + + /* Now check if the client still has the capability, or if it was + revoked. */ + pthread_mutex_lock (&client->lock); + if (obj_entry->dead) + err = ECAP_DIED; + pthread_mutex_unlock (&client->lock); + if (err) + { + /* The capability was revoked in the meantime. */ + pthread_mutex_unlock (&obj->lock); + goto class_cleanup; + } + + _hurd_cap_list_item_add (&obj->pending_rpcs, &worker_obj); + pthread_mutex_unlock (&obj->lock); + + + /* At this point, we have looked up the capability, acquired an + internal reference for its entry in the client table (which + implicitely keeps a reference acquired for the object itself), + acquired a reference for the capability client entry in the + bucket, and have added an item to the pending_rpcs lists in the + client, class and object. With all this, we can finally start to + process the message for real. */ + + /* FIXME: Call the internal demuxer here, for things like reference + counter modification, cap passing etc. */ + + /* Invoke the class-specific demuxer. */ + ctx->client = client; + ctx->obj = obj; + (*cap_class->demuxer) (ctx); + + /* Clean up. */ + pthread_mutex_lock (&obj->lock); + _hurd_cap_list_item_remove (&worker_obj); + _hurd_cap_obj_cond_check (obj); + pthread_mutex_unlock (&obj->lock); + + class_cleanup: + pthread_mutex_lock (&cap_class->lock); + _hurd_cap_list_item_remove (&worker_class); + _hurd_cap_class_cond_check (cap_class); + pthread_mutex_unlock (&cap_class->lock); + + client_cleanup: + pthread_mutex_lock (&client->lock); + _hurd_cap_list_item_remove (&worker_client); + _hurd_cap_client_cond_check (bucket, client); + + /* You are not allowed to revoke a capability while there are + pending RPCs on it. This is the reason why we know that there + must be at least one extra internal reference. FIXME: For + cleanliness, this could still call some inline function that does + the decrement. The assert can be a hint to the compiler to + optimize the inline function expansion anyway. */ + assert (!obj_entry->dead); + assert (obj_entry->internal_refs > 1); + obj_entry->internal_refs--; + pthread_mutex_unlock (&client->lock); + + _hurd_cap_client_release (bucket, client->id); + + return err; +} + + +/* A worker thread for RPC processing. The behaviour of this function + is tightly integrated with the behaviour of the manager thread. */ +static void * +manage_mt_worker (void *arg) +{ + struct worker_info *info = (struct worker_info *) arg; + hurd_cap_bucket_t bucket = info->bucket; + struct _hurd_cap_list_item worker_item; + _hurd_cap_list_item_t worker = &worker_item; + l4_thread_id_t manager = info->manager_tid; + l4_time_t timeout = info->timeout; + l4_thread_id_t from; + l4_msg_tag_t msg_tag; + + /* Prepare the worker queue item. As we are always the current + worker thread when we are started up, we do not add ourselves to + the free list. */ + worker->thread = pthread_self (); + worker->tid = l4_myself (); + worker->next = NULL; + worker->prevp = NULL; + + /* When we are started up, we are supposed to listen as soon as + possible to the next incoming message. The first time, we do + this without a timeout. */ + from = manager; + msg_tag = l4_wait (&from); + + while (1) + { + if (__builtin_expect (l4_ipc_failed (msg_tag), 0)) + { + /* Slow path. */ + + l4_word_t err_code = l4_error_code (); + l4_word_t ipc_err = (err_code >> 1) & 0x7; + unsigned int current_worker_is_us; + + if (ipc_err == L4_IPC_CANCELED || ipc_err == L4_IPC_ABORTED) + /* We have been canceled for shutdown. */ + break; + + /* The only other error that can happen is a timeout waiting + for the message. */ + assert (ipc_err == L4_IPC_TIMEOUT); + + pthread_mutex_lock (&bucket->lock); + /* If we are not on the free queue, we are the current worker. */ + if (worker->prevp == NULL) + current_worker_is_us = 1; + else + current_worker_is_us = 0; + pthread_mutex_unlock (&bucket->lock); + + /* If we are not the current worker, then we can just exit + now because of our timeout. */ + if (!current_worker_is_us) + break; + + /* If we are the current worker, we should wait here for the + next message without a timeout. */ + from = manager; + msg_tag = l4_wait (&from); + /* From here, we will loop all over to the beginning of the + while(1) block. */ + } + else + { + /* Fast path. Process the RPC. */ + error_t err = 0; + struct hurd_cap_rpc_context ctx; + bool inhibited = false; + + /* IMPORTANT NOTE: The manager thread is blocked until we + reply a message with a label MSG_ACCEPTED or + MSG_REJECTED. We are supposed to return such a message + as quickly as possible. In the accepted case, we should + then process the message, while in the rejected case we + should rapidly go into the next receive. */ + + /* Before we can work on the message, we need to copy it. + This is because the MRs holding the message might be + overridden by the pthread implementation or other + function calls we make. In particular, + pthread_mutex_lock is can mangle the message buffer. */ + l4_msg_store (msg_tag, ctx.msg); + + assert (l4_ipc_propagated ()); + assert (l4_thread_is_equal (l4_actual_sender (), manager)); + + pthread_mutex_lock (&bucket->lock); + /* We process cancellation messages irregardless of the + bucket state. */ + if (l4_msg_label (ctx.msg) == HURD_CAP_MSG_LABEL_CANCEL) + { + if (l4_untyped_words (l4_msg_msg_tag (ctx.msg)) == 1) + { + l4_thread_id_t tid = l4_msg_word (ctx.msg, 0); + + /* First verify access. Threads are only allowed to + cancel RPCs from other threads in the task. */ + if (hurd_task_id_from_thread_id (tid) + == hurd_task_id_from_thread_id (from)) + { + /* We allow cancel requests even if normal RPCs + are inhibited. */ + _hurd_cap_list_item_t pending_worker; + + pending_worker = hurd_ihash_find (&bucket->senders, + tid); + if (!pending_worker) + reply_err (from, ESRCH); + else + { + /* Found it. Cancel it. */ + pthread_cancel (pending_worker->thread); + /* Reply success. */ + reply_err (from, 0); + } + } + } + /* Set the error variable so that we return to the + manager immediately. */ + err = ECAP_NOREPLY; + } + else + { + /* Normal RPCs. */ + if (bucket->state == _HURD_CAP_STATE_BLACK) + { + /* The bucket operations have been ended, and the + manager has already been canceled. We know that + the BUCKET->senders hash is empty, so we can + quickly process the message. */ + + /* This is a normal RPC. We cancel it immediately. */ + reply_err (from, ECANCELED); + + /* Now set ERR to any error, so we return to the + manager. */ + err = ECAP_NOREPLY; /* Doesn't matter which error. */ + } + else + { + if (bucket->state != _HURD_CAP_STATE_GREEN) + { + /* If we are inhibited, we will have to wait + until we are uninhibited. */ + inhibited = true; + } + + /* FIXME: This is inefficient. ihash should support + an "add if not there" function. */ + if (hurd_ihash_find (&bucket->senders, from)) + err = EBUSY; + else + { + /* FIXME: We know intimately that pthread_self is not + _HURD_IHASH_EMPTY or _HURD_IHASH_DELETED. */ + err = hurd_ihash_add (&bucket->senders, from, worker); + } + } + } + + if (err) + { + pthread_mutex_unlock (&bucket->lock); + + /* Either we already processed the message above, or + this user thread is currently in an RPC. We don't + allow asynchronous operation for security reason + (preventing DoS attacks). Silently drop the + message. */ + msg_tag = l4_niltag; + l4_set_msg_label (&msg_tag, _HURD_CAP_MSG_WORKER_REJECTED); + l4_load_mr (0, msg_tag); + + /* Reply to the manager that we don't accept the message + and wait for the next message without a timeout + (because now we know we are the current worker). */ + from = manager; + msg_tag = l4_lreply_wait (manager, &from); + + /* From here, we will loop all over to the beginning of + the while(1) block. */ + } + else + { + _hurd_cap_list_item_add (inhibited ? &bucket->waiting_rpcs + : &bucket->pending_rpcs, worker); + pthread_mutex_unlock (&bucket->lock); + + msg_tag = l4_niltag; + l4_set_msg_label (&msg_tag, _HURD_CAP_MSG_WORKER_ACCEPTED); + l4_load_mr (0, msg_tag); + msg_tag = l4_reply (manager); + assert (l4_ipc_succeeded (msg_tag)); + + /* Now we are "detached" from the manager in the sense + that we are not the current worker thread + anymore. */ + + if (inhibited) + { + pthread_mutex_lock (&bucket->lock); + while (!err && bucket->state != _HURD_CAP_STATE_GREEN + && bucket->state != _HURD_CAP_STATE_BLACK) + err = hurd_cond_wait (&bucket->cond, &bucket->lock); + if (!err) + { + if (bucket->state == _HURD_CAP_STATE_BLACK) + err = ECANCELED; + else + { + /* State is _HURD_CAP_STATE_GREEN. Move + ourselves to the pending RPC list. */ + _hurd_cap_list_item_remove (worker); + _hurd_cap_list_item_add (&bucket->pending_rpcs, + worker); + } + } + pthread_mutex_unlock (&bucket->lock); + } + + if (!err) + { + /* Process the message. */ + ctx.sender = hurd_task_id_from_thread_id (from); + ctx.bucket = bucket; + ctx.from = from; + err = manage_demuxer (&ctx, worker); + } + + /* Post-processing. */ + + pthread_mutex_lock (&bucket->lock); + /* We have to add ourselves to the free list before (or + at the same time) as removing the client from the + pending hash, and before replying to the RPC (if we + reply in the worker thread at all). The former is + necessary to make sure that no new thread is created + in the race that would otherwise exist, namely after + replying and before adding ourself to the free list. + The latter is required because a client that + immediately follows up with a new message of course + can expect that to work properly. */ + + if (bucket->is_manager_waiting && !bucket->free_worker) + { + /* The manager is starving for worker threads. */ + pthread_cond_broadcast (&bucket->cond); + } + + /* Remove from pending_rpcs (or waiting_rpcs) list. */ + _hurd_cap_list_item_remove (worker); + /* The last waiting RPC may have to signal the manager. */ + if (inhibited && bucket->state == _HURD_CAP_STATE_BLACK + && !bucket->waiting_rpcs) + pthread_cond_broadcast (&bucket->cond); + _hurd_cap_list_item_add (&bucket->free_worker, worker); + + _hurd_cap_bucket_cond_check (bucket); + + /* Now that we are back on the free list it is safe to + let in the next RPC by this thread. */ + hurd_ihash_locp_remove (&bucket->senders, worker->locp); + + /* FIXME: Reap the cancellation flag here. If it was + set, we have been effectively unblocked now. From + now on, canceling us means something different than + cancelling a pending RPC (it means terminating the + worker thread). */ + + pthread_mutex_unlock (&bucket->lock); + + /* Finally, return the reply message, if appropriate. */ + if (__builtin_expect (err != ECAP_NOREPLY, 1)) + { + if (__builtin_expect (err, 0)) + reply_err (from, err); + else + { + l4_msg_load (ctx.msg); + l4_reply (from); + } + } + + /* Now listen for the next message, with a timeout. */ + from = manager; + msg_tag = l4_wait_timeout (timeout, &from); + + /* From here, we will loop all over to the beginning of + the while(1) block. */ + } + } + } + + /* At this point, we have been canceled while being on the free + list, so we should go away. */ + + pthread_mutex_lock (&bucket->lock); + if (_hurd_cap_list_item_dequeued (worker)) + { + /* We are the current worker thread. We are the last worker + thread the manager thread will cancel. */ + pthread_cond_broadcast (&bucket->cond); + } + else + { + _hurd_cap_list_item_remove (worker); + if (bucket->is_manager_waiting && !bucket->free_worker) + { + /* The manager is shutting down. We are the last free + worker (except for the current worker thread) to be + canceled. */ + pthread_cond_broadcast (&bucket->cond); + } + } + pthread_mutex_unlock (&bucket->lock); + + return NULL; +} + + +/* Return the next free worker thread. If no free worker thread is + available, create a new one. If that fails, block until one + becomes free. If we are interrupted while blocking, return + l4_nilthread. */ +static l4_thread_id_t +manage_mt_get_next_worker (struct worker_info *info, pthread_t *worker_thread) +{ + hurd_cap_bucket_t bucket = info->bucket; + l4_thread_id_t worker = l4_nilthread; + _hurd_cap_list_item_t worker_item; + + pthread_mutex_lock (&bucket->lock); + + if (__builtin_expect (bucket->free_worker == NULL, 0)) + { + /* Slow path. Create a new thread and use that. */ + error_t err; + + pthread_mutex_unlock (&bucket->lock); + worker = pthread_pool_get_np (); + if (worker == l4_nilthread) + err = EAGAIN; + else + err = pthread_create_from_l4_tid_np (worker_thread, NULL, + worker, manage_mt_worker, info); + + if (!err) + { + pthread_detach (*worker_thread); + return worker; + } + else + { + pthread_mutex_lock (&bucket->lock); + if (!bucket->free_worker) + { + /* Creating a new thread failed. As a last resort, put + ourself to sleep until we are woken up by the next + free worker. Hopefully not all workers are blocking + forever. */ + + /* FIXME: To fix the case where all workers are blocking + forever, cancel one (or more? all?) (random? oldest?) + worker threads. Usually, that user will restart, but + it will nevertheless allow us to make some (although + slow) process. */ + + /* The next worker thread that adds itself to the free + list will broadcast the condition. */ + bucket->is_manager_waiting = true; + do + err = hurd_cond_wait (&bucket->cond, &bucket->lock); + while (!err && !bucket->free_worker); + + if (err) + { + pthread_mutex_unlock (&bucket->lock); + return l4_nilthread; + } + } + } + } + + /* Fast path. A worker thread is available. Remove it from the + free list and use it. */ + worker_item = bucket->free_worker; + _hurd_cap_list_item_remove (worker_item); + pthread_mutex_unlock (&bucket->lock); + + *worker_thread = worker_item->thread; + return worker_item->tid; +} + + +/* Start managing RPCs on the bucket BUCKET. The ROOT capability object, which must be unlocked and have one reference throughout the whole time this function runs, is used for bootstrapping client - connections. The BOOTSTRAP capability object must be a capability - object in one of the classes that have been added to the bucket. - The GLOBAL_TIMEOUT parameter specifies the number of seconds until - the manager times out (if there are no active users of capability - objects in precious classes). The WORKER_TIMEOUT parameter - specifies the number of seconds until each worker thread times out - (if there are no RPCs processed by the worker thread). + connections. The GLOBAL_TIMEOUT parameter specifies the number of + seconds until the manager times out (if there are no active users). + The WORKER_TIMEOUT parameter specifies the number of seconds until + each worker thread times out (if there are no RPCs processed by the + worker thread). If this returns ECANCELED, then hurd_cap_bucket_end was called with - the force flag being true while there were still active users of - capability objects in precious classes. If this returns without - any error, then the timeout expired, or hurd_cap_bucket_end was - called without active users of capability objects in precious - classes. */ + the force flag being true while there were still active users. If + this returns without any error, then the timeout expired, or + hurd_cap_bucket_end was called without active users. */ error_t hurd_cap_bucket_manage_mt (hurd_cap_bucket_t bucket, - hurd_cap_obj_t bootstrap, - unsigned int global_timeout, - unsigned int worker_timeout) + hurd_cap_obj_t root, + unsigned int global_timeout_sec, + unsigned int worker_timeout_sec) { - /* FIXME: Implement me, of course. */ - return ENOSYS; + error_t err; + l4_time_t global_timeout; + pthread_t worker_thread; + l4_thread_id_t worker; + struct worker_info info; + _hurd_cap_list_item_t item; + + global_timeout = (global_timeout_sec == 0) ? L4_NEVER + : l4_time_period (UINT64_C (1000000) * global_timeout_sec); + + info.bucket = bucket; + info.manager_tid = l4_myself (); + info.timeout = (worker_timeout_sec == 0) ? L4_NEVER + : l4_time_period (UINT64_C (1000000) * worker_timeout_sec); + + /* We never accept any map or grant items. FIXME: For now, we also + do not accept any string buffer items. */ + l4_accept (L4_UNTYPED_WORDS_ACCEPTOR); + + /* Because we do not accept any string items, we do not actually + need to set the Xfer timeouts. But this is what we want to set + them to when we eventually do support string items. */ + l4_set_xfer_timeouts (l4_timeouts (L4_ZERO_TIME, L4_NEVER)); + + /* We create the first worker thread ourselves, to catch any + possible error at this stage and bail out properly if needed. */ + worker = pthread_pool_get_np (); + if (worker == l4_nilthread) + return EAGAIN; + err = pthread_create_from_l4_tid_np (&worker_thread, NULL, + worker, manage_mt_worker, &info); + if (err) + return err; + pthread_detach (worker_thread); + + pthread_mutex_lock (&bucket->lock); + bucket->manager = pthread_self (); + bucket->is_managed = true; + bucket->is_manager_waiting = false; + pthread_mutex_unlock (&bucket->lock); + + while (1) + { + l4_thread_id_t from = l4_anythread; + l4_msg_tag_t msg_tag; + + /* FIXME: Make sure we have enabled deferred cancellation, and + use an L4 ipc() stub that supports that. In fact, this must + be true for most of the IPC operations in this file. */ + msg_tag = l4_wait_timeout (global_timeout, &from); + + if (__builtin_expect (l4_ipc_failed (msg_tag), 0)) + { + l4_word_t err_code = l4_error_code (); + + /* FIXME: We need a macro or inline function for that. */ + l4_word_t ipc_err = (err_code >> 1) & 0x7; + + /* There are two possible errors, cancellation or timeout. + Any other error indicates a bug in the code. */ + if (ipc_err == L4_IPC_CANCELED || ipc_err == L4_IPC_ABORTED) + { + /* If we are canceled, then this means that our state is + now _HURD_CAP_STATE_BLACK and we should end managing + RPCs even if there are still active users. */ + + pthread_mutex_lock (&bucket->lock); + assert (bucket->state == _HURD_CAP_STATE_BLACK); + err = ECANCELED; + break; + } + else + { + assert (((err_code >> 1) & 0x7) == L4_IPC_TIMEOUT); + + pthread_mutex_lock (&bucket->lock); + /* Check if we can time out safely. */ + if (bucket->state == _HURD_CAP_STATE_GREEN + && !bucket->nr_caps && !bucket->pending_rpcs + && !bucket->waiting_rpcs) + { + err = 0; + break; + } + pthread_mutex_unlock (&bucket->lock); + } + } + else + { + /* Propagate the message to the worker thread. */ + l4_set_propagation (&msg_tag); + l4_set_virtual_sender (from); + l4_set_msg_tag (msg_tag); + + /* FIXME: Make sure to use a non-cancellable l4_lcall that + does preserve any pending cancellation flag for this + thread. Alternatively, we can handle cancellation here + (reply ECANCEL to user, and enter shutdown sequence. */ + msg_tag = l4_lcall (worker); + assert (l4_ipc_succeeded (msg_tag)); + + if (__builtin_expect + (l4_label (msg_tag) == _HURD_CAP_MSG_WORKER_ACCEPTED, 1)) + { + worker = manage_mt_get_next_worker (&info, &worker_thread); + if (worker == l4_nilthread) + { + /* The manage_mt_get_next_worker thread was + canceled. In this case we have to terminate + ourselves. */ + err = hurd_cap_bucket_inhibit (bucket); + assert (!err); + hurd_cap_bucket_end (bucket, true); + + pthread_mutex_lock (&bucket->lock); + err = ECANCELED; + break; + } + } + } + } + + /* At this point, bucket->lock is held. Start the shutdown + sequence. */ + assert (!bucket->pending_rpcs); + + /* First force all the waiting rpcs onto the free list. They will + have noticed the state change to _HURD_CAP_STATE_BLACK already, + we just have to block until the last one wakes us up. */ + while (bucket->waiting_rpcs) + hurd_cond_wait (&bucket->cond, &bucket->lock); + + /* Cancel the free workers. */ + item = bucket->free_worker; + while (item) + { + pthread_cancel (item->thread); + item = item->next; + } + + bucket->is_manager_waiting = true; + + while (bucket->free_worker) + { + /* We ignore cancellations at this point, because we are already + shutting down. */ + hurd_cond_wait (&bucket->cond, &bucket->lock); + } + + /* Now cancel the current worker, except if we were canceled while + trying to get a new one (in which case there is no current + worker). */ + if (worker != l4_nilthread) + { + pthread_cancel (worker_thread); + while (bucket->free_worker) + { + /* We ignore cancellations at this point, because we are already + shutting down. */ + hurd_cond_wait (&bucket->cond, &bucket->lock); + } + } + + bucket->is_managed = false; + pthread_mutex_unlock (&bucket->lock); + + return err; } |