summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRichard Braun <syn@sceen.net>2007-07-09 19:46:27 +0000
committerRichard Braun <syn@sceen.net>2007-07-09 19:46:27 +0000
commitb0c17c41688a85c776c701768cc319b404baec62 (patch)
tree69db488e37014cda7b49395a1ef80356b3d300c4
parent3e367a209d32b3c7f545bd66b51bfb82d4ae0860 (diff)
Implemented reaper thread.
-rw-r--r--tntd/rpc.c146
1 files changed, 126 insertions, 20 deletions
diff --git a/tntd/rpc.c b/tntd/rpc.c
index 755cea9..247d64e 100644
--- a/tntd/rpc.c
+++ b/tntd/rpc.c
@@ -51,6 +51,7 @@ struct rpc_client
{
list_link_t link;
pthread_mutex_t mutex;
+ int reap;
int id;
pthread_t tid;
list_head_t rpc_sd_list;
@@ -59,6 +60,16 @@ struct rpc_client
typedef struct rpc_client * rpc_client_t;
#define RPC_CLIENT_NULL ((rpc_client_t)0)
+static pthread_t rpc_reaper_thread;
+
+static pthread_mutex_t rpc_reaper_mutex;
+
+static pthread_cond_t rpc_reaper_cond;
+
+static int rpc_can_reap;
+
+static int rpc_reaper_stop;
+
static pthread_t rpc_thread;
static cache_t rpc_client_cache;
@@ -66,6 +77,7 @@ static cache_t rpc_client_cache;
static cache_t rpc_sd_cache;
static list_head_t rpc_client_list;
+
static pthread_mutex_t rpc_client_mutex;
static int
@@ -595,6 +607,25 @@ rpc_process_close(etp_msg_t msg, rpc_client_t rpc_client)
if (error == -1)
saved_errno = errno;
+ else
+ {
+ rpc_sd_t rpc_sd;
+
+ pthread_mutex_lock(&rpc_client->mutex);
+
+ list_for_each(&rpc_client->rpc_sd_list, rpc_sd, link)
+ if (sd == rpc_sd->sd)
+ break;
+
+ if (rpc_sd != RPC_SD_NULL)
+ {
+ list_remove(&rpc_client->rpc_sd_list, rpc_sd, link);
+ cache_free(rpc_sd_cache, rpc_sd);
+ }
+
+ pthread_mutex_unlock(&rpc_client->mutex);
+ }
+
msg = etp_msg_create(TNT_CLOSE_CMD_REPLY);
if (msg == ETP_MSG_NULL)
@@ -771,9 +802,14 @@ rpc_handle_client(void *arg)
while (1);
pthread_mutex_lock(&rpc_client->mutex);
-
+ rpc_client->reap = 1;
pthread_mutex_unlock(&rpc_client->mutex);
+ pthread_mutex_lock(&rpc_reaper_mutex);
+ rpc_can_reap = 1;
+ pthread_cond_signal(&rpc_reaper_cond);
+ pthread_mutex_unlock(&rpc_reaper_mutex);
+
return NULL;
}
@@ -832,41 +868,99 @@ rpc_server(void *arg)
return NULL;
}
-void
-rpc_terminate(void)
+static void *
+rpc_reaper(void *arg)
{
- rpc_client_t rpc_client;
+ rpc_client_t rpc_client, next_rpc_client;
rpc_sd_t rpc_sd;
- pthread_mutex_lock(&rpc_client_mutex);
+ output("RPC reaper thread running...");
+
+ pthread_mutex_lock(&rpc_reaper_mutex);
- while (!list_empty(&rpc_client_list))
+ do
{
- rpc_client = list_first(&rpc_client_list, rpc_client_t);
- list_remove(&rpc_client_list, rpc_client, link);
+ while (!rpc_can_reap)
+ pthread_cond_wait(&rpc_reaper_cond, &rpc_reaper_mutex);
- pthread_mutex_lock(&rpc_client->mutex);
+ pthread_mutex_lock(&rpc_client_mutex);
- list_for_each(&rpc_client->rpc_sd_list, rpc_sd, link)
- tnt_close(rpc_sd->sd);
+ /*
+ * TODO: list_for_each() version able to handle modifications on the
+ * list.
+ */
+ for (rpc_client = list_first(&rpc_client_list, rpc_client_t);
+ !list_end(rpc_client);
+ rpc_client = next_rpc_client)
+ {
+ next_rpc_client = list_next(rpc_client, link);
- pthread_mutex_unlock(&rpc_client->mutex);
+ pthread_mutex_lock(&rpc_client->mutex);
- pthread_join(rpc_client->tid, NULL);
+ if (!rpc_client->reap)
+ {
+ pthread_mutex_unlock(&rpc_client->mutex);
+ continue;
+ }
- while (!list_empty(&rpc_client->rpc_sd_list))
- {
- rpc_sd = list_first(&rpc_client->rpc_sd_list, rpc_sd_t);
- list_remove(&rpc_client->rpc_sd_list, rpc_sd, link);
- cache_free(rpc_sd_cache, rpc_sd);
+ list_remove(&rpc_client_list, rpc_client, link);
+
+ pthread_mutex_unlock(&rpc_client->mutex);
+
+ pthread_join(rpc_client->tid, NULL);
+ rpc_client->reap = 0;
+
+ while (!list_empty(&rpc_client->rpc_sd_list))
+ {
+ rpc_sd = list_first(&rpc_client->rpc_sd_list, rpc_sd_t);
+ list_remove(&rpc_client->rpc_sd_list, rpc_sd, link);
+ tnt_close(rpc_sd->sd);
+ cache_free(rpc_sd_cache, rpc_sd);
+ }
+
+ etp_close(rpc_client->id);
+ cache_free(rpc_client_cache, rpc_client);
}
- etp_close(rpc_client->id);
- cache_free(rpc_client_cache, rpc_client);
+ pthread_mutex_unlock(&rpc_client_mutex);
+
+ if (rpc_reaper_stop)
+ break;
+
+ rpc_can_reap = 0;
+ }
+ while (1);
+
+ pthread_mutex_unlock(&rpc_reaper_mutex);
+
+ output("RPC reaper thread stopping...");
+ return NULL;
+}
+
+void
+rpc_terminate(void)
+{
+ rpc_client_t rpc_client;
+
+ pthread_mutex_lock(&rpc_client_mutex);
+
+ list_for_each(&rpc_client_list, rpc_client, link)
+ {
+ pthread_mutex_lock(&rpc_client->mutex);
+ rpc_client->reap = 1;
+ pthread_mutex_unlock(&rpc_client->mutex);
}
pthread_mutex_unlock(&rpc_client_mutex);
+ pthread_mutex_lock(&rpc_reaper_mutex);
+ rpc_can_reap = 1;
+ rpc_reaper_stop = 1;
+ pthread_cond_signal(&rpc_reaper_cond);
+ pthread_mutex_unlock(&rpc_reaper_mutex);
+
+ pthread_join(rpc_reaper_thread, NULL);
+
cache_destroy(rpc_sd_cache);
cache_destroy(rpc_client_cache);
etp_terminate();
@@ -880,6 +974,7 @@ rpc_client_constructor(void *object)
rpc_client = object;
pthread_mutex_init(&rpc_client->mutex, NULL);
+ rpc_client->reap = 0;
list_init(&rpc_client->rpc_sd_list);
}
@@ -890,6 +985,7 @@ rpc_client_destructor(void *object)
rpc_client = object;
pthread_mutex_destroy(&rpc_client->mutex);
+ assert(rpc_client->reap == 0);
assert(list_empty(&rpc_client->rpc_sd_list));
}
@@ -928,9 +1024,19 @@ rpc_init(void)
exit(1);
}
+ rpc_can_reap = 0;
+ rpc_reaper_stop = 0;
list_init(&rpc_client_list);
pthread_mutex_init(&rpc_client_mutex, NULL);
+ error = pthread_create(&rpc_reaper_thread, NULL, rpc_reaper, NULL);
+
+ if (error)
+ {
+ output_error("unable to create RPC reaper thread, error = %d", error);
+ exit(1);
+ }
+
error = pthread_create(&rpc_thread, NULL, rpc_server, NULL);
if (error)