Added HCL_SERVER_WORKER_MAX_COUNT

changed the main server loop to purge dead clients as soon as the multiplex is finished
This commit is contained in:
2018-03-16 14:57:34 +00:00
parent 70ef4b6299
commit 0b7acc1fd8
5 changed files with 81 additions and 37 deletions

View File

@ -32,6 +32,9 @@
#include <string.h>
#include <errno.h>
#define TOKEN_NAME_ALIGN 64
#define WID_MAP_ALIGN 512
#if defined(_WIN32)
# include <windows.h>
# include <tchar.h>
@ -266,6 +269,7 @@ struct hcl_server_t
unsigned int trait;
unsigned int logmask;
hcl_oow_t worker_stack_size;
hcl_oow_t worker_max_count;
hcl_ntime_t worker_idle_timeout;
hcl_oow_t actor_heap_size;
hcl_ntime_t actor_max_runtime;
@ -275,6 +279,7 @@ struct hcl_server_t
{
hcl_server_worker_t* head;
hcl_server_worker_t* tail;
hcl_oow_t count;
} worker_list[2];
struct
@ -283,7 +288,7 @@ struct hcl_server_t
hcl_oow_t capa;
hcl_oow_t free_first;
hcl_oow_t free_last;
} wid_map;
} wid_map; /* worker's id map */
int mux_pipe[2]; /* pipe to break the blocking multiplexer in the main server loop */
@ -1273,7 +1278,7 @@ static HCL_INLINE int add_token_char (hcl_server_proto_t* proto, hcl_ooch_t c)
hcl_ooch_t* tmp;
hcl_oow_t capa;
capa = HCL_ALIGN_POW2(proto->tok.len + 1, 64);
capa = HCL_ALIGN_POW2(proto->tok.len + 1, TOKEN_NAME_ALIGN);
tmp = (hcl_ooch_t*)HCL_MMGR_REALLOC(proto->worker->server->mmgr, proto->tok.ptr, capa * HCL_SIZEOF(*tmp));
if (!tmp)
{
@ -1385,7 +1390,7 @@ static void exec_runtime_handler (hcl_tmr_t* tmr, const hcl_ntime_t* now, hcl_tm
hcl_server_proto_t* proto;
proto = (hcl_server_proto_t*)evt->ctx;
HCL_LOG0 (proto->worker->server->dummy_hcl, SERVER_LOGMASK_INFO, "Aborting script execution for max_actor_runtime exceeded\n"); /* TODO: include worker id into the message.. */
HCL_LOG1 (proto->worker->server->dummy_hcl, SERVER_LOGMASK_INFO, "Aborting script execution for max_actor_runtime exceeded [%zu]\n", proto->worker->wid);
hcl_abort (proto->hcl);
}
@ -1780,15 +1785,12 @@ static HCL_INLINE int prepare_to_acquire_wid (hcl_server_t* server)
HCL_ASSERT (server->dummy_hcl, server->wid_map.free_first == HCL_SERVER_WID_INVALID);
HCL_ASSERT (server->dummy_hcl, server->wid_map.free_last == HCL_SERVER_WID_INVALID);
new_capa = server->wid_map.capa + 128; /* TODO: adjust this incremental size ? */
new_capa = HCL_ALIGN_POW2(server->wid_map.capa + 1, WID_MAP_ALIGN);
if (new_capa > HCL_SERVER_WID_MAX)
{
if (server->wid_map.capa >= HCL_SERVER_WID_MAX)
{
/* too many workers??? */
/* TODO: error handling */
//hcl_seterrbfmt (hcl, HCL_EPFULL, "maximum number(%zd) of processes reached", HCL_SMOOI_MAX);
printf ("too many workers??? \n");
hcl_server_seterrnum (server, HCL_EFLOOD);
return -1;
}
@ -1798,8 +1800,7 @@ printf ("too many workers??? \n");
tmp = (hcl_server_wid_map_data_t*)HCL_MMGR_REALLOC(server->mmgr, server->wid_map.ptr, HCL_SIZEOF(*tmp) * new_capa);
if (!tmp)
{
// TODO: error handling ....
printf ("cannot reallocate wid map...\n");
hcl_server_seterrnum (server, HCL_ESYSERR);
return -1;
}
@ -1879,11 +1880,11 @@ static void close_worker_socket (hcl_server_worker_t* worker)
{
if (worker->proto)
{
HCL_LOG1 (worker->proto->hcl, SERVER_LOGMASK_INFO, "Closing worker socket %d\n", worker->sck);
HCL_LOG2 (worker->proto->hcl, SERVER_LOGMASK_INFO, "Closing worker socket %d [%zu]\n", worker->sck, worker->wid);
}
else
{
HCL_LOG1 (worker->server->dummy_hcl, SERVER_LOGMASK_INFO, "Closing worker socket %d\n", worker->sck);
HCL_LOG2 (worker->server->dummy_hcl, SERVER_LOGMASK_INFO, "Closing worker socket %d [%zu]\n", worker->sck, worker->wid);
}
close (worker->sck);
worker->sck = -1;
@ -1893,11 +1894,21 @@ static void close_worker_socket (hcl_server_worker_t* worker)
static void free_worker (hcl_server_worker_t* worker)
{
close_worker_socket (worker);
if (worker->proto)
{
HCL_LOG1 (worker->proto->hcl, SERVER_LOGMASK_INFO, "Killing worker [%zu]\n", worker->wid);
}
else
{
HCL_LOG1 (worker->server->dummy_hcl, SERVER_LOGMASK_INFO, "Killing worker [%zu]\n", worker->wid);
}
release_wid (worker->server, worker);
HCL_MMGR_FREE (worker->server->mmgr, worker);
}
static void add_hcl_server_worker_to_server (hcl_server_t* server, hcl_server_worker_state_t wstate, hcl_server_worker_t* worker)
static void add_worker_to_server (hcl_server_t* server, hcl_server_worker_state_t wstate, hcl_server_worker_t* worker)
{
HCL_ASSERT (server->dummy_hcl, worker->server == server);
@ -1916,6 +1927,7 @@ static void add_hcl_server_worker_to_server (hcl_server_t* server, hcl_server_wo
worker->next_worker = HCL_NULL;
}
server->worker_list[wstate].count++;
worker->state = wstate;
}
@ -1931,6 +1943,8 @@ static void zap_worker_in_server (hcl_server_t* server, hcl_server_worker_t* wor
if (worker->next_worker) worker->next_worker->prev_worker = worker->prev_worker;
else server->worker_list[wstate].tail = worker->prev_worker;
HCL_ASSERT (server->dummy_hcl, server->worker_list[wstate].count > 0);
server->worker_list[wstate].count--;
worker->prev_worker = HCL_NULL;
worker->next_worker = HCL_NULL;
}
@ -1953,7 +1967,7 @@ static void* worker_main (void* ctx)
}
pthread_mutex_lock (&server->worker_mutex);
add_hcl_server_worker_to_server (server, HCL_SERVER_WORKER_STATE_ALIVE, worker);
add_worker_to_server (server, HCL_SERVER_WORKER_STATE_ALIVE, worker);
pthread_mutex_unlock (&server->worker_mutex);
while (!server->stopreq)
@ -1969,7 +1983,7 @@ static void* worker_main (void* ctx)
if (!worker->claimed)
{
zap_worker_in_server (server, worker);
add_hcl_server_worker_to_server (server, HCL_SERVER_WORKER_STATE_DEAD, worker);
add_worker_to_server (server, HCL_SERVER_WORKER_STATE_DEAD, worker);
}
pthread_mutex_unlock (&server->worker_mutex);
@ -2129,8 +2143,6 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
struct pollfd pfd[2];
int n, pc;
purge_all_workers (server, HCL_SERVER_WORKER_STATE_DEAD);
pthread_mutex_lock (&server->tmr_mutex);
n = hcl_tmr_gettmout(server->tmr, HCL_NULL, &tmout);
pthread_mutex_unlock (&server->tmr_mutex);
@ -2144,6 +2156,7 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
pfd[pc++].events = POLLIN | POLLERR;
n = poll(pfd, pc, HCL_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec));
purge_all_workers (server, HCL_SERVER_WORKER_STATE_DEAD);
if (n <= -1)
{
if (server->stopreq) break; /* normal termination requested */
@ -2176,22 +2189,30 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
if (server->stopreq) break; /* normal termination requested */
if (errno == EINTR) continue; /* interrupted but not termination requested */
set_err_with_syserr (server, errno, "unable to accept worker on socket %d", pfd[n]);
set_err_with_syserr (server, errno, "unable to accept worker on server socket %d", pfd[n]);
xret = -1;
break;
}
HCL_LOG1 (server->dummy_hcl, SERVER_LOGMASK_INFO, "Accepted worker - socket %d\n", cli_fd);
if (server->cfg.worker_max_count > 0)
{
int flood;
pthread_mutex_lock (&server->worker_mutex);
flood = (server->worker_list[HCL_SERVER_WORKER_STATE_ALIVE].count >= server->cfg.worker_max_count);
pthread_mutex_unlock (&server->worker_mutex);
if (flood) goto unable_to_accomodate;
}
worker = alloc_worker(server, cli_fd);
if (!worker)
{
HCL_LOG1 (server->dummy_hcl, SERVER_LOGMASK_ERROR, "Unable to accomodate accepted worker - socket %d\n", cli_fd);
unable_to_accomodate:
HCL_LOG1 (server->dummy_hcl, SERVER_LOGMASK_ERROR, "Unable to accomodate worker - socket %d\n", cli_fd);
close (cli_fd);
}
else
{
HCL_LOG2 (server->dummy_hcl, SERVER_LOGMASK_INFO, "Assigned WID(%zd) to the accepted server socket %d\n", worker->wid, cli_fd);
HCL_LOG2 (server->dummy_hcl, SERVER_LOGMASK_INFO, "Accomodated worker [%zu] - socket %d\n", worker->wid, cli_fd);
if (pthread_create(&thr, &thr_attr, worker_main, worker) != 0)
{
free_worker (worker);
@ -2251,6 +2272,10 @@ int hcl_server_setoption (hcl_server_t* server, hcl_server_option_t id, const vo
}
return 0;
case HCL_SERVER_WORKER_MAX_COUNT:
server->cfg.worker_max_count = *(hcl_oow_t*)value;
return 0;
case HCL_SERVER_WORKER_STACK_SIZE:
server->cfg.worker_stack_size = *(hcl_oow_t*)value;
return 0;
@ -2284,6 +2309,10 @@ int hcl_server_getoption (hcl_server_t* server, hcl_server_option_t id, void* va
*(unsigned int*)value = server->cfg.logmask;
return 0;
case HCL_SERVER_WORKER_MAX_COUNT:
*(hcl_oow_t*)value = server->cfg.worker_max_count;
return 0;
case HCL_SERVER_WORKER_STACK_SIZE:
*(hcl_oow_t*)value = server->cfg.worker_stack_size;
return 0;