summaryrefslogtreecommitdiff
path: root/libpager/demuxer.c
diff options
context:
space:
mode:
Diffstat (limited to 'libpager/demuxer.c')
-rw-r--r--libpager/demuxer.c287
1 files changed, 276 insertions, 11 deletions
diff --git a/libpager/demuxer.c b/libpager/demuxer.c
index b4d40541..efdf2853 100644
--- a/libpager/demuxer.c
+++ b/libpager/demuxer.c
@@ -1,5 +1,5 @@
/* Demuxer for pager library
- Copyright (C) 1994, 1995, 2002, 2011 Free Software Foundation
+ Copyright (C) 1993-2014 Free Software Foundation
This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License as
@@ -15,26 +15,291 @@
along with this program; if not, write to the Free Software
Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. */
+#include <error.h>
+#include <mach/mig_errors.h>
+#include <pthread.h>
+#include <string.h>
+
#include "priv.h"
#include "memory_object_S.h"
#include "notify_S.h"
+#include "queue.h"
+
+/*
+ Worker pool for the server functions.
+
+ A single thread receives messages from the port bucket and puts them
+ into a queue. A fixed number of consumers actually execute the
+ server functions and send the reply.
+
+ The requests to an object O have to be processed in the order they
+ were received. To this end, each worker has a local queue and a
+ tag. If a thread processes a request to O, it sets its tag to a
+ unique identifier representing O. If another thread now dequeues a
+ second request to O, it enqueues it to the first workers queue.
+
+ At least one worker thread is necessary.
+*/
+#define WORKER_COUNT 1
+
+/* An request contains the message received from the port set. */
+struct request
+{
+ struct item item;
+ mig_routine_t routine;
+ mach_msg_header_t *inp;
+ mach_msg_header_t *outp;
+};
+
+/* A worker. */
+struct worker
+{
+ struct requests *requests; /* our pagers request queue */
+ struct queue queue; /* other workers may delegate requests to us */
+ unsigned long tag; /* tag of the object we are working on */
+};
+
+/* This is the queue for incoming requests. A single thread receives
+ messages from the port set, looks the service routine up, and
+ enqueues the request here. */
+struct requests
+{
+ struct port_bucket *bucket;
+ struct queue queue;
+ int asleep;
+ pthread_cond_t wakeup;
+ pthread_mutex_t lock;
+ struct worker workers[WORKER_COUNT];
+};
/* Demultiplex a single message directed at a pager port; INP is the
message received; fill OUTP with the reply. */
-int
-pager_demuxer (mach_msg_header_t *inp,
+static int
+pager_demuxer (struct requests *requests,
+ mach_msg_header_t *inp,
mach_msg_header_t *outp)
{
+ error_t err = MIG_NO_REPLY;
+
+ /* The maximum size of the reply is 2048 bytes. See the MIG source
+ for details. */
+ const mach_msg_size_t max_size = 2048;
+
mig_routine_t routine;
- if ((routine = _pager_seqnos_memory_object_server_routine (inp)) ||
- (routine = _pager_seqnos_notify_server_routine (inp)))
+ if (! ((routine = _pager_seqnos_memory_object_server_routine (inp)) ||
+ (routine = _pager_seqnos_notify_server_routine (inp))))
+ return FALSE;
+
+#define MASK (8u - 1u)
+ mach_msg_size_t padded_size = (inp->msgh_size + MASK) & ~MASK;
+#undef MASK
+
+ struct request *r = malloc (sizeof *r + padded_size + max_size);
+ if (r == NULL)
+ {
+ err = ENOMEM;
+ goto out;
+ }
+
+ r->routine = routine;
+ r->inp = (mach_msg_header_t *) ((char *) r + sizeof *r);
+ memcpy (r->inp, inp, inp->msgh_size);
+
+ r->outp = (mach_msg_header_t *) ((char *) r + sizeof *r + padded_size);
+ memcpy (r->outp, outp, sizeof *outp);
+
+ pthread_mutex_lock (&requests->lock);
+
+ queue_enqueue (&requests->queue, &r->item);
+
+ /* Awake worker. */
+ if (requests->asleep > 0)
+ pthread_cond_signal (&requests->wakeup);
+
+ pthread_mutex_unlock (&requests->lock);
+
+ /* A worker thread will reply. */
+ err = MIG_NO_REPLY;
+
+ out:
+ ((mig_reply_header_t *) outp)->RetCode = err;
+ return TRUE;
+}
+
+/* Consumes requests from the queue. */
+static void *
+worker_func (void *arg)
+{
+ struct worker *self = (struct worker *) arg;
+ struct requests *requests = self->requests;
+ struct request *r = NULL;
+
+ while (1)
+ {
+ int i;
+ mach_msg_return_t mr;
+
+ /* Free previous message. */
+ free (r);
+
+ pthread_mutex_lock (&requests->lock);
+
+ /* First, look in our queue for more requests to the object we
+ have been working on lately. Some other thread might have
+ delegated them to us. */
+ r = queue_dequeue (&self->queue);
+ if (r != NULL)
+ goto got_one;
+
+ /* Nope. Clear our tag and... */
+ self->tag = 0;
+
+ get_request_locked:
+ /* ... get a request from the global queue instead. */
+ while ((r = queue_dequeue (&requests->queue)) == NULL)
+ {
+ requests->asleep += 1;
+ pthread_cond_wait (&requests->wakeup, &requests->lock);
+ requests->asleep -= 1;
+ }
+
+ for (i = 0; i < WORKER_COUNT; i++)
+ if (requests->workers[i].tag
+ == (unsigned long) r->inp->msgh_local_port)
+ {
+ /* Some other thread is working on that object. Delegate
+ the request to that worker. */
+ queue_enqueue (&requests->workers[i].queue, &r->item);
+ goto get_request_locked;
+ }
+
+ /* Claim responsibility for this object by setting our tag. */
+ self->tag = (unsigned long) r->inp->msgh_local_port;
+
+ got_one:
+ pthread_mutex_unlock (&requests->lock);
+
+ /* Call the server routine. */
+ (*r->routine) (r->inp, r->outp);
+
+ /* What follows is basically the second part of
+ mach_msg_server_timeout. */
+ mig_reply_header_t *request = (mig_reply_header_t *) r->inp;
+ mig_reply_header_t *reply = (mig_reply_header_t *) r->outp;
+
+ switch (reply->RetCode)
+ {
+ case KERN_SUCCESS:
+ /* Hunky dory. */
+ break;
+
+ case MIG_NO_REPLY:
+ /* The server function wanted no reply sent.
+ Loop for another request. */
+ continue;
+
+ default:
+ /* Some error; destroy the request message to release any
+ port rights or VM it holds. Don't destroy the reply port
+ right, so we can send an error message. */
+ request->Head.msgh_remote_port = MACH_PORT_NULL;
+ mach_msg_destroy (&request->Head);
+ break;
+ }
+
+ if (reply->Head.msgh_remote_port == MACH_PORT_NULL)
+ {
+ /* No reply port, so destroy the reply. */
+ if (reply->Head.msgh_bits & MACH_MSGH_BITS_COMPLEX)
+ mach_msg_destroy (&reply->Head);
+ continue;
+ }
+
+ /* Send the reply. */
+ mr = mach_msg (&reply->Head,
+ MACH_SEND_MSG,
+ reply->Head.msgh_size,
+ 0,
+ MACH_PORT_NULL,
+ 0,
+ MACH_PORT_NULL);
+
+ switch (mr)
+ {
+ case MACH_SEND_INVALID_DEST:
+ /* The reply can't be delivered, so destroy it. This error
+ indicates only that the requester went away, so we
+ continue and get the next request. */
+ mach_msg_destroy (&reply->Head);
+ break;
+
+ default:
+ /* Some other form of lossage; there is not much we can
+ do here. */
+ error (0, mr, "mach_msg");
+ }
+ }
+
+ /* Not reached. */
+ return NULL;
+}
+
+/* A top-level function for the paging thread that just services paging
+ requests. */
+static void *
+service_paging_requests (void *arg)
+{
+ struct requests *requests = arg;
+
+ int demuxer (mach_msg_header_t *inp,
+ mach_msg_header_t *outp)
+ {
+ return pager_demuxer (requests, inp, outp);
+ }
+
+ ports_manage_port_operations_one_thread (requests->bucket,
+ demuxer,
+ 0);
+ /* Not reached. */
+ return NULL;
+}
+
+/* Start the worker threads libpager uses to service requests. */
+error_t
+pager_start_workers (struct port_bucket *pager_bucket)
+{
+ error_t err;
+ int i;
+ pthread_t t;
+ struct requests *requests;
+
+ requests = malloc (sizeof *requests);
+ if (requests == NULL)
+ return ENOMEM;
+
+ requests->bucket = pager_bucket;
+ requests->asleep = 0;
+ queue_init (&requests->queue);
+ pthread_cond_init (&requests->wakeup, NULL);
+ pthread_mutex_init (&requests->lock, NULL);
+
+ /* Make a thread to service paging requests. */
+ err = pthread_create (&t, NULL, service_paging_requests, requests);
+ if (err)
+ return err;
+ pthread_detach (t);
+
+ for (i = 0; i < WORKER_COUNT; i++)
{
- (*routine) (inp, outp);
- return TRUE;
+ requests->workers[i].requests = requests;
+ requests->workers[i].tag = 0;
+ queue_init (&requests->workers[i].queue);
+
+ err = pthread_create (&t, NULL, &worker_func, &requests->workers[i]);
+ if (err)
+ return err;
+ pthread_detach (t);
}
- /* Synchronize our bookkeeping of the port's seqno with the one
- consumed by this bogus message. */
- _pager_update_seqno (inp->msgh_local_port, inp->msgh_seqno);
- return FALSE;
+ return err;
}