diff options
author | Richard Braun <syn@sceen.net> | 2007-07-09 19:46:27 +0000 |
---|---|---|
committer | Richard Braun <syn@sceen.net> | 2007-07-09 19:46:27 +0000 |
commit | b0c17c41688a85c776c701768cc319b404baec62 (patch) | |
tree | 69db488e37014cda7b49395a1ef80356b3d300c4 | |
parent | 3e367a209d32b3c7f545bd66b51bfb82d4ae0860 (diff) |
Implemented reaper thread.
-rw-r--r-- | tntd/rpc.c | 146 |
1 files changed, 126 insertions, 20 deletions
@@ -51,6 +51,7 @@ struct rpc_client { list_link_t link; pthread_mutex_t mutex; + int reap; int id; pthread_t tid; list_head_t rpc_sd_list; @@ -59,6 +60,16 @@ struct rpc_client typedef struct rpc_client * rpc_client_t; #define RPC_CLIENT_NULL ((rpc_client_t)0) +static pthread_t rpc_reaper_thread; + +static pthread_mutex_t rpc_reaper_mutex; + +static pthread_cond_t rpc_reaper_cond; + +static int rpc_can_reap; + +static int rpc_reaper_stop; + static pthread_t rpc_thread; static cache_t rpc_client_cache; @@ -66,6 +77,7 @@ static cache_t rpc_client_cache; static cache_t rpc_sd_cache; static list_head_t rpc_client_list; + static pthread_mutex_t rpc_client_mutex; static int @@ -595,6 +607,25 @@ rpc_process_close(etp_msg_t msg, rpc_client_t rpc_client) if (error == -1) saved_errno = errno; + else + { + rpc_sd_t rpc_sd; + + pthread_mutex_lock(&rpc_client->mutex); + + list_for_each(&rpc_client->rpc_sd_list, rpc_sd, link) + if (sd == rpc_sd->sd) + break; + + if (rpc_sd != RPC_SD_NULL) + { + list_remove(&rpc_client->rpc_sd_list, rpc_sd, link); + cache_free(rpc_sd_cache, rpc_sd); + } + + pthread_mutex_unlock(&rpc_client->mutex); + } + msg = etp_msg_create(TNT_CLOSE_CMD_REPLY); if (msg == ETP_MSG_NULL) @@ -771,9 +802,14 @@ rpc_handle_client(void *arg) while (1); pthread_mutex_lock(&rpc_client->mutex); - + rpc_client->reap = 1; pthread_mutex_unlock(&rpc_client->mutex); + pthread_mutex_lock(&rpc_reaper_mutex); + rpc_can_reap = 1; + pthread_cond_signal(&rpc_reaper_cond); + pthread_mutex_unlock(&rpc_reaper_mutex); + return NULL; } @@ -832,41 +868,99 @@ rpc_server(void *arg) return NULL; } -void -rpc_terminate(void) +static void * +rpc_reaper(void *arg) { - rpc_client_t rpc_client; + rpc_client_t rpc_client, next_rpc_client; rpc_sd_t rpc_sd; - pthread_mutex_lock(&rpc_client_mutex); + output("RPC reaper thread running..."); + + pthread_mutex_lock(&rpc_reaper_mutex); - while (!list_empty(&rpc_client_list)) + do { - rpc_client = list_first(&rpc_client_list, rpc_client_t); - list_remove(&rpc_client_list, rpc_client, link); + while (!rpc_can_reap) + pthread_cond_wait(&rpc_reaper_cond, &rpc_reaper_mutex); - pthread_mutex_lock(&rpc_client->mutex); + pthread_mutex_lock(&rpc_client_mutex); - list_for_each(&rpc_client->rpc_sd_list, rpc_sd, link) - tnt_close(rpc_sd->sd); + /* + * TODO: list_for_each() version able to handle modifications on the + * list. + */ + for (rpc_client = list_first(&rpc_client_list, rpc_client_t); + !list_end(rpc_client); + rpc_client = next_rpc_client) + { + next_rpc_client = list_next(rpc_client, link); - pthread_mutex_unlock(&rpc_client->mutex); + pthread_mutex_lock(&rpc_client->mutex); - pthread_join(rpc_client->tid, NULL); + if (!rpc_client->reap) + { + pthread_mutex_unlock(&rpc_client->mutex); + continue; + } - while (!list_empty(&rpc_client->rpc_sd_list)) - { - rpc_sd = list_first(&rpc_client->rpc_sd_list, rpc_sd_t); - list_remove(&rpc_client->rpc_sd_list, rpc_sd, link); - cache_free(rpc_sd_cache, rpc_sd); + list_remove(&rpc_client_list, rpc_client, link); + + pthread_mutex_unlock(&rpc_client->mutex); + + pthread_join(rpc_client->tid, NULL); + rpc_client->reap = 0; + + while (!list_empty(&rpc_client->rpc_sd_list)) + { + rpc_sd = list_first(&rpc_client->rpc_sd_list, rpc_sd_t); + list_remove(&rpc_client->rpc_sd_list, rpc_sd, link); + tnt_close(rpc_sd->sd); + cache_free(rpc_sd_cache, rpc_sd); + } + + etp_close(rpc_client->id); + cache_free(rpc_client_cache, rpc_client); } - etp_close(rpc_client->id); - cache_free(rpc_client_cache, rpc_client); + pthread_mutex_unlock(&rpc_client_mutex); + + if (rpc_reaper_stop) + break; + + rpc_can_reap = 0; + } + while (1); + + pthread_mutex_unlock(&rpc_reaper_mutex); + + output("RPC reaper thread stopping..."); + return NULL; +} + +void +rpc_terminate(void) +{ + rpc_client_t rpc_client; + + pthread_mutex_lock(&rpc_client_mutex); + + list_for_each(&rpc_client_list, rpc_client, link) + { + pthread_mutex_lock(&rpc_client->mutex); + rpc_client->reap = 1; + pthread_mutex_unlock(&rpc_client->mutex); } pthread_mutex_unlock(&rpc_client_mutex); + pthread_mutex_lock(&rpc_reaper_mutex); + rpc_can_reap = 1; + rpc_reaper_stop = 1; + pthread_cond_signal(&rpc_reaper_cond); + pthread_mutex_unlock(&rpc_reaper_mutex); + + pthread_join(rpc_reaper_thread, NULL); + cache_destroy(rpc_sd_cache); cache_destroy(rpc_client_cache); etp_terminate(); @@ -880,6 +974,7 @@ rpc_client_constructor(void *object) rpc_client = object; pthread_mutex_init(&rpc_client->mutex, NULL); + rpc_client->reap = 0; list_init(&rpc_client->rpc_sd_list); } @@ -890,6 +985,7 @@ rpc_client_destructor(void *object) rpc_client = object; pthread_mutex_destroy(&rpc_client->mutex); + assert(rpc_client->reap == 0); assert(list_empty(&rpc_client->rpc_sd_list)); } @@ -928,9 +1024,19 @@ rpc_init(void) exit(1); } + rpc_can_reap = 0; + rpc_reaper_stop = 0; list_init(&rpc_client_list); pthread_mutex_init(&rpc_client_mutex, NULL); + error = pthread_create(&rpc_reaper_thread, NULL, rpc_reaper, NULL); + + if (error) + { + output_error("unable to create RPC reaper thread, error = %d", error); + exit(1); + } + error = pthread_create(&rpc_thread, NULL, rpc_server, NULL); if (error) |