enhanced the hcl server implementation further while fixing a couple bugs too

This commit is contained in:
hyung-hwan 2018-03-15 15:23:51 +00:00
parent 3228386c2c
commit 5dcb376907
2 changed files with 188 additions and 108 deletions

View File

@ -268,6 +268,8 @@ struct hcl_server_t
hcl_server_worker_t* tail; hcl_server_worker_t* tail;
} worker_list[2]; } worker_list[2];
int mux_pipe[2]; /* pipe to break the blocking multiplexer in the main server loop */
pthread_mutex_t worker_mutex; pthread_mutex_t worker_mutex;
pthread_mutex_t tmr_mutex; pthread_mutex_t tmr_mutex;
pthread_mutex_t log_mutex; pthread_mutex_t log_mutex;
@ -893,11 +895,7 @@ static void vm_cleanup (hcl_t* hcl)
static void vm_checkbc (hcl_t* hcl, hcl_oob_t bcode) static void vm_checkbc (hcl_t* hcl, hcl_oob_t bcode)
{ {
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl); worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
if (xtn->proto->worker->server->stopreq) hcl_abort(hcl); if (xtn->proto->worker->server->stopreq) hcl_abort(hcl);
/* TODO: check how to this vm has been running. too long? abort it */
/* check agains xtn->proto->worker->server->cfg.actor_max_runtime */
} }
/* /*
@ -906,7 +904,6 @@ static void gc_hcl (hcl_t* hcl)
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl); worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
} }
static void fini_hcl (hcl_t* hcl) static void fini_hcl (hcl_t* hcl)
{ {
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl); worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
@ -1064,7 +1061,7 @@ static int write_reply_chunk (hcl_server_proto_t* proto)
{ {
/* error occurred inside the worker thread shouldn't affect the error information /* error occurred inside the worker thread shouldn't affect the error information
* in the server object. so here, i just log a message */ * in the server object. so here, i just log a message */
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "sendmsg failure on %d - %hs\n", proto->worker->sck, strerror(errno)); HCL_LOG2 (proto->hcl, SERVER_LOGMASK_ERROR, "sendmsg failure on %d - %hs\n", proto->worker->sck, strerror(errno));
return -1; return -1;
} }
@ -1263,7 +1260,7 @@ static HCL_INLINE int add_token_char (hcl_server_proto_t* proto, hcl_ooch_t c)
tmp = (hcl_ooch_t*)HCL_MMGR_REALLOC(proto->worker->server->mmgr, proto->tok.ptr, capa * HCL_SIZEOF(*tmp)); tmp = (hcl_ooch_t*)HCL_MMGR_REALLOC(proto->worker->server->mmgr, proto->tok.ptr, capa * HCL_SIZEOF(*tmp));
if (!tmp) if (!tmp)
{ {
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "out of memory in allocating a token buffer\n"); HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, "out of memory in allocating a token buffer\n");
return -1; return -1;
} }
@ -1340,7 +1337,7 @@ static int get_token (hcl_server_proto_t* proto)
GET_CHAR_TO(proto, c); GET_CHAR_TO(proto, c);
if (!is_alphachar(c)) if (!is_alphachar(c))
{ {
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "alphabetic character expected after a period\n"); HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, "alphabetic character expected after a period\n");
return -1; return -1;
} }
@ -1357,7 +1354,7 @@ static int get_token (hcl_server_proto_t* proto)
break; break;
default: default:
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "unrecognized character - [%jc]\n", c); HCL_LOG1 (proto->hcl, SERVER_LOGMASK_ERROR, "unrecognized character - [%jc]\n", c);
return -1; return -1;
} }
return 0; return 0;
@ -1365,27 +1362,42 @@ static int get_token (hcl_server_proto_t* proto)
static void exec_runtime_handler (hcl_tmr_t* tmr, const hcl_ntime_t* now, hcl_tmr_event_t* evt) static void exec_runtime_handler (hcl_tmr_t* tmr, const hcl_ntime_t* now, hcl_tmr_event_t* evt)
{ {
/* [NOTE] this handler is executed in the main server thread
* when it calls hcl_tmr_fire() */
hcl_server_proto_t* proto; hcl_server_proto_t* proto;
proto = (hcl_server_proto_t*)evt->ctx; proto = (hcl_server_proto_t*)evt->ctx;
//printf ("aborting hcl for runtime handler timeout...\n");
HCL_LOG0 (proto->worker->server->dummy_hcl, SERVER_LOGMASK_INFO, "Aborting script execution for max_actor_runtime exceeded\n"); /* TODO: include worker id into the message.. */
hcl_abort (proto->hcl); hcl_abort (proto->hcl);
} }
static void exec_runtime_updater (hcl_tmr_t* tmr, hcl_tmr_index_t old_index, hcl_tmr_index_t new_index, hcl_tmr_event_t* evt) static void exec_runtime_updater (hcl_tmr_t* tmr, hcl_tmr_index_t old_index, hcl_tmr_index_t new_index, hcl_tmr_event_t* evt)
{ {
/* [NOTE] this handler is executed in the main server thread
* when it calls hcl_tmr_fire() */
hcl_server_proto_t* proto; hcl_server_proto_t* proto;
proto = (hcl_server_proto_t*)evt->ctx; proto = (hcl_server_proto_t*)evt->ctx;
HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == old_index); HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == old_index);
/* the event is being removed by hcl_tmr_fire() or by hcl_tmr_delete()
* if new_index is HCL_TMR_INVALID_INDEX. it's being updated if not. */
proto->exec_runtime_event_index = new_index; proto->exec_runtime_event_index = new_index;
} }
static int insert_exec_timer (hcl_server_proto_t* proto, const hcl_ntime_t* tmout) static int insert_exec_timer (hcl_server_proto_t* proto, const hcl_ntime_t* tmout)
{ {
/* [NOTE] this is executed in the worker thread */
hcl_tmr_event_t event; hcl_tmr_event_t event;
hcl_tmr_index_t index; hcl_tmr_index_t index;
hcl_server_t* server;
HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX); HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX);
server = proto->worker->server;
HCL_MEMSET (&event, 0, HCL_SIZEOF(event)); HCL_MEMSET (&event, 0, HCL_SIZEOF(event));
event.ctx = proto; event.ctx = proto;
proto->hcl->vmprim.vm_gettime (proto->hcl, &event.when); proto->hcl->vmprim.vm_gettime (proto->hcl, &event.when);
@ -1393,144 +1405,44 @@ static int insert_exec_timer (hcl_server_proto_t* proto, const hcl_ntime_t* tmou
event.handler = exec_runtime_handler; event.handler = exec_runtime_handler;
event.updater = exec_runtime_updater; event.updater = exec_runtime_updater;
pthread_mutex_lock (&proto->worker->server->tmr_mutex); pthread_mutex_lock (&server->tmr_mutex);
index = hcl_tmr_insert(proto->worker->server->tmr, &event); index = hcl_tmr_insert(server->tmr, &event);
pthread_mutex_unlock (&proto->worker->server->tmr_mutex);
if (index == HCL_TMR_INVALID_INDEX) return -1;
proto->exec_runtime_event_index = index; proto->exec_runtime_event_index = index;
return 0; if (index != HCL_TMR_INVALID_INDEX)
{
/* inform the server of timer event change */
write (server->mux_pipe[1], "X", 1); /* don't care even if it fails */
}
pthread_mutex_unlock (&server->tmr_mutex);
return (index == HCL_TMR_INVALID_INDEX)? -1: 0;
} }
static void delete_exec_timer (hcl_server_proto_t* proto) static void delete_exec_timer (hcl_server_proto_t* proto)
{ {
/* [NOTE] this is executed in the worker thread. if the event has been fired
* in the server thread, proto->exec_runtime_event_index should be
* HCL_TMR_INVALID_INDEX as set by exec_runtime_handler */
hcl_server_t* server;
server = proto->worker->server;
pthread_mutex_lock (&server->tmr_mutex);
if (proto->exec_runtime_event_index != HCL_TMR_INVALID_INDEX) if (proto->exec_runtime_event_index != HCL_TMR_INVALID_INDEX)
{ {
//printf ("deleted exec timer..........\n"); /* the event has not been fired yet. let's delete it
pthread_mutex_lock (&proto->worker->server->tmr_mutex); * if it has been fired, the index it shall be HCL_TMR_INVALID_INDEX already */
hcl_tmr_delete (proto->worker->server->tmr, proto->exec_runtime_event_index);
pthread_mutex_unlock (&proto->worker->server->tmr_mutex); hcl_tmr_delete (server->tmr, proto->exec_runtime_event_index);
proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX; HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX);
/*proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX; */
} }
pthread_mutex_unlock (&server->tmr_mutex);
} }
int hcl_server_proto_handle_request (hcl_server_proto_t* proto) static int execute_script (hcl_server_proto_t* proto, const hcl_bch_t* trigger)
{
if (get_token(proto) <= -1) return -1;
switch (proto->tok.type)
{
case HCL_SERVER_PROTO_TOKEN_NL:
/* ignore new lines */
break;
case HCL_SERVER_PROTO_TOKEN_EOF:
if (proto->req.state != HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL)
{
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "unexpected EOF without .END\n");
return -1;
}
/* drop connection silently */
return 0;
case HCL_SERVER_PROTO_TOKEN_EXIT:
if (proto->req.state != HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL)
{
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, ".EXIT allowed in the top level only\n");
return -1;
}
if (get_token(proto) <= -1) return -1;
if (proto->tok.type != HCL_SERVER_PROTO_TOKEN_NL)
{
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "no new line after .EXIT\n");
return -1;
}
hcl_server_proto_start_reply (proto);
if (hcl_server_proto_end_reply(proto, HCL_NULL) <= -1)
{
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "cannot finalize reply for .EXIT\n");
return -1;
}
return 0;
case HCL_SERVER_PROTO_TOKEN_BEGIN:
if (proto->req.state != HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL)
{
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, ".BEGIN not allowed to be nested\n");
return -1;
}
if (get_token(proto) <= -1) return -1;
if (proto->tok.type != HCL_SERVER_PROTO_TOKEN_NL)
{
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "no new line after .BEGIN\n");
return -1;
}
proto->req.state = HCL_SERVER_PROTO_REQ_IN_BLOCK_LEVEL;
hcl_reset (proto->hcl);
break;
case HCL_SERVER_PROTO_TOKEN_END:
{ {
hcl_oop_t obj; hcl_oop_t obj;
if (proto->req.state != HCL_SERVER_PROTO_REQ_IN_BLOCK_LEVEL)
{
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, ".END without opening .BEGIN\n");
return -1;
}
if (get_token(proto) <= -1) return -1;
if (proto->tok.type != HCL_SERVER_PROTO_TOKEN_NL)
{
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "no new line after .BEGIN\n");
return -1;
}
hcl_server_proto_start_reply (proto);
obj = (hcl_getbclen(proto->hcl) > 0)? hcl_execute(proto->hcl): proto->hcl->_nil;
if (hcl_server_proto_end_reply(proto, (obj? HCL_NULL: hcl_geterrmsg(proto->hcl))) <= -1)
{
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "cannot finalize reply for .END\n");
return -1;
}
proto->req.state = HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL;
break;
}
case HCL_SERVER_PROTO_TOKEN_SCRIPT:
{
hcl_oop_t obj;
if (proto->req.state == HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL) hcl_reset(proto->hcl);
obj = hcl_read(proto->hcl);
if (!obj)
{
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "cannot read .SCRIPT contents - %js\n", hcl_geterrmsg(proto->hcl));
return -1;
}
if (get_token(proto) <= -1) return -1;
if (proto->tok.type != HCL_SERVER_PROTO_TOKEN_NL)
{
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "no new line after .SCRIPT contest\n");
return -1;
}
if (hcl_compile(proto->hcl, obj) <= -1)
{
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "cannot compile .SCRIPT contents - %js\n", hcl_geterrmsg(proto->hcl));
return -1;
}
if (proto->req.state == HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL)
{
const hcl_ooch_t* failmsg = HCL_NULL; const hcl_ooch_t* failmsg = HCL_NULL;
hcl_server_t* server; hcl_server_t* server;
@ -1546,29 +1458,136 @@ int hcl_server_proto_handle_request (hcl_server_proto_t* proto)
{ {
if (insert_exec_timer(proto, &server->cfg.actor_max_runtime) <= -1) if (insert_exec_timer(proto, &server->cfg.actor_max_runtime) <= -1)
{ {
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "cannot start execution timer\n"); HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, "Cannot start execution timer\n");
/* use proto->hcl's error formatting instead of server's to avoid using mutex over the shared server->dummy_hcl */ hcl_seterrbfmt (proto->hcl, HCL_ESYSMEM, "cannot start execution timer"); /* i do this just to compose the error message */
hcl_seterrbfmt (proto->hcl, HCL_ESYSMEM, "cannot start execution timer");
failmsg = hcl_geterrmsg(proto->hcl); failmsg = hcl_geterrmsg(proto->hcl);
} }
else else
{ {
//printf ("inserted exec timer..........\n");
obj = hcl_execute(proto->hcl); obj = hcl_execute(proto->hcl);
if (!obj) failmsg = hcl_geterrmsg(proto->hcl); if (!obj) failmsg = hcl_geterrmsg(proto->hcl);
delete_exec_timer (proto); delete_exec_timer (proto);
} }
} }
if (hcl_server_proto_end_reply(proto, failmsg) <= -1) if (hcl_server_proto_end_reply(proto, failmsg) <= -1)
{ {
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "cannot finalize reply for .SCRIPT\n"); HCL_LOG1 (proto->hcl, SERVER_LOGMASK_ERROR, "Cannot finalize reply for %hs\n", trigger);
return -1; return -1;
} }
return 0;
} }
int hcl_server_proto_handle_request (hcl_server_proto_t* proto)
{
if (get_token(proto) <= -1) return -1;
switch (proto->tok.type)
{
case HCL_SERVER_PROTO_TOKEN_NL:
/* ignore new lines */
break; break;
case HCL_SERVER_PROTO_TOKEN_EOF:
if (proto->req.state != HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL)
{
HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, "Unexpected EOF without .END\n");
return -1;
}
/* drop connection silently */
return 0;
case HCL_SERVER_PROTO_TOKEN_EXIT:
if (proto->req.state != HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL)
{
HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, ".EXIT allowed in the top level only\n");
return -1;
}
if (get_token(proto) <= -1) return -1;
if (proto->tok.type != HCL_SERVER_PROTO_TOKEN_NL)
{
HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, "No new line after .EXIT\n");
return -1;
}
hcl_server_proto_start_reply (proto);
if (hcl_server_proto_end_reply(proto, HCL_NULL) <= -1)
{
HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, "Unable to finalize reply for .EXIT\n");
return -1;
}
return 0;
case HCL_SERVER_PROTO_TOKEN_BEGIN:
if (proto->req.state != HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL)
{
HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, ".BEGIN not allowed to be nested\n");
return -1;
}
if (get_token(proto) <= -1) return -1;
if (proto->tok.type != HCL_SERVER_PROTO_TOKEN_NL)
{
HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, "No new line after .BEGIN\n");
return -1;
}
proto->req.state = HCL_SERVER_PROTO_REQ_IN_BLOCK_LEVEL;
hcl_reset (proto->hcl);
break;
case HCL_SERVER_PROTO_TOKEN_END:
if (proto->req.state != HCL_SERVER_PROTO_REQ_IN_BLOCK_LEVEL)
{
HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, ".END without opening .BEGIN\n");
return -1;
}
if (get_token(proto) <= -1) return -1;
if (proto->tok.type != HCL_SERVER_PROTO_TOKEN_NL)
{
HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, "No new line after .BEGIN\n");
return -1;
}
if (execute_script(proto, ".END") <= -1) return -1;
proto->req.state = HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL;
break;
case HCL_SERVER_PROTO_TOKEN_SCRIPT:
{
hcl_oop_t obj;
if (proto->req.state == HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL) hcl_reset(proto->hcl);
obj = hcl_read(proto->hcl);
if (!obj)
{
HCL_LOG1 (proto->hcl, SERVER_LOGMASK_ERROR, "Unable to read .SCRIPT contents - %js\n", hcl_geterrmsg(proto->hcl));
return -1;
}
if (get_token(proto) <= -1) return -1;
if (proto->tok.type != HCL_SERVER_PROTO_TOKEN_NL)
{
HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, "No new line after .SCRIPT contest\n");
return -1;
}
if (hcl_compile(proto->hcl, obj) <= -1)
{
HCL_LOG1 (proto->hcl, SERVER_LOGMASK_ERROR, "Unable to compile .SCRIPT contents - %js\n", hcl_geterrmsg(proto->hcl));
return -1;
}
if (proto->req.state == HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL)
{
if (execute_script(proto, ".SCRIPT") <= -1) return -1;
}
break;
} }
case HCL_SERVER_PROTO_TOKEN_SHOW_WORKERS: case HCL_SERVER_PROTO_TOKEN_SHOW_WORKERS:
@ -1580,7 +1599,7 @@ int hcl_server_proto_handle_request (hcl_server_proto_t* proto)
if (hcl_server_proto_end_reply(proto, HCL_NULL) <= -1) if (hcl_server_proto_end_reply(proto, HCL_NULL) <= -1)
{ {
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "cannot finalize reply for .SHOW-WORKERS\n"); HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, "Unable to finalize reply for .SHOW-WORKERS\n");
return -1; return -1;
} }
@ -1595,13 +1614,13 @@ int hcl_server_proto_handle_request (hcl_server_proto_t* proto)
if (hcl_server_proto_end_reply(proto, HCL_NULL) <= -1) if (hcl_server_proto_end_reply(proto, HCL_NULL) <= -1)
{ {
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "cannot finalize reply for .KILL-WORKER\n"); HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, "Unable to finalize reply for .KILL-WORKER\n");
return -1; return -1;
} }
break; break;
default: default:
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "unknown token - %d - %.*js\n", (int)proto->tok.type, proto->tok.len, proto->tok.ptr); HCL_LOG3 (proto->hcl, SERVER_LOGMASK_ERROR, "Unknown token - %d - %.*js\n", (int)proto->tok.type, proto->tok.len, proto->tok.ptr);
return -1; return -1;
} }
@ -1617,6 +1636,7 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p
hcl_vmprim_t vmprim; hcl_vmprim_t vmprim;
hcl_tmr_t* tmr; hcl_tmr_t* tmr;
dummy_hcl_xtn_t* xtn; dummy_hcl_xtn_t* xtn;
int pfd[2], fcv;
server = (hcl_server_t*)HCL_MMGR_ALLOC(mmgr, HCL_SIZEOF(*server) + xtnsize); server = (hcl_server_t*)HCL_MMGR_ALLOC(mmgr, HCL_SIZEOF(*server) + xtnsize);
if (!server) if (!server)
@ -1646,8 +1666,30 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p
{ {
hcl_close (hcl); hcl_close (hcl);
HCL_MMGR_FREE (mmgr, server); HCL_MMGR_FREE (mmgr, server);
return HCL_NULL;
} }
if (pipe(pfd) <= -1)
{
hcl_tmr_close (tmr);
hcl_close (hcl);
HCL_MMGR_FREE (mmgr, server);
return HCL_NULL;
}
#if defined(O_CLOEXEC)
fcv = fcntl(pfd[0], F_GETFD, 0);
if (fcv >= 0) fcntl(pfd[0], F_SETFD, fcv | O_CLOEXEC);
fcv = fcntl(pfd[1], F_GETFD, 0);
if (fcv >= 0) fcntl(pfd[1], F_SETFD, fcv | O_CLOEXEC);
#endif
#if defined(O_NONBLOCK)
fcv = fcntl(pfd[0], F_GETFL, 0);
if (fcv >= 0) fcntl(pfd[0], F_SETFL, fcv | O_NONBLOCK);
fcv = fcntl(pfd[1], F_GETFL, 0);
if (fcv >= 0) fcntl(pfd[1], F_SETFL, fcv | O_NONBLOCK);
#endif
xtn = (dummy_hcl_xtn_t*)hcl_getxtn(hcl); xtn = (dummy_hcl_xtn_t*)hcl_getxtn(hcl);
xtn->server = server; xtn->server = server;
@ -1665,6 +1707,9 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p
HCL_INITNTIME (&server->cfg.worker_idle_timeout, 0, 0); HCL_INITNTIME (&server->cfg.worker_idle_timeout, 0, 0);
HCL_INITNTIME (&server->cfg.actor_max_runtime, 0, 0); HCL_INITNTIME (&server->cfg.actor_max_runtime, 0, 0);
server->mux_pipe[0] = pfd[0];
server->mux_pipe[1] = pfd[1];
pthread_mutex_init (&server->worker_mutex, HCL_NULL); pthread_mutex_init (&server->worker_mutex, HCL_NULL);
pthread_mutex_init (&server->tmr_mutex, HCL_NULL); pthread_mutex_init (&server->tmr_mutex, HCL_NULL);
pthread_mutex_init (&server->log_mutex, HCL_NULL); pthread_mutex_init (&server->log_mutex, HCL_NULL);
@ -1678,6 +1723,8 @@ void hcl_server_close (hcl_server_t* server)
pthread_mutex_destroy (&server->tmr_mutex); pthread_mutex_destroy (&server->tmr_mutex);
pthread_mutex_destroy (&server->worker_mutex); pthread_mutex_destroy (&server->worker_mutex);
close (server->mux_pipe[0]);
close (server->mux_pipe[1]);
hcl_tmr_close (server->tmr); hcl_tmr_close (server->tmr);
hcl_close (server->dummy_hcl); hcl_close (server->dummy_hcl);
HCL_MMGR_FREE (server->mmgr, server); HCL_MMGR_FREE (server->mmgr, server);
@ -1702,11 +1749,11 @@ static void close_worker_socket (hcl_server_worker_t* worker)
{ {
if (worker->proto) if (worker->proto)
{ {
hcl_logbfmt (worker->proto->hcl, SERVER_LOGMASK_INFO, "closing worker socket %d\n", worker->sck); HCL_LOG1 (worker->proto->hcl, SERVER_LOGMASK_INFO, "Closing worker socket %d\n", worker->sck);
} }
else else
{ {
hcl_server_logbfmt (worker->server, SERVER_LOGMASK_INFO, "closing worker socket %d\n", worker->sck); HCL_LOG1 (worker->server->dummy_hcl, SERVER_LOGMASK_INFO, "Closing worker socket %d\n", worker->sck);
} }
close (worker->sck); close (worker->sck);
worker->sck = -1; worker->sck = -1;
@ -1948,19 +1995,24 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
pthread_t thr; pthread_t thr;
hcl_ntime_t tmout; hcl_ntime_t tmout;
hcl_server_worker_t* worker; hcl_server_worker_t* worker;
struct pollfd pfd[1]; struct pollfd pfd[2];
int n; int n, pc;
purge_all_workers (server, HCL_SERVER_WORKER_STATE_DEAD); purge_all_workers (server, HCL_SERVER_WORKER_STATE_DEAD);
pthread_mutex_lock (&server->tmr_mutex); pthread_mutex_lock (&server->tmr_mutex);
n = hcl_tmr_gettmout(server->tmr, HCL_NULL, &tmout); n = hcl_tmr_gettmout(server->tmr, HCL_NULL, &tmout);
pthread_mutex_unlock (&server->tmr_mutex); pthread_mutex_unlock (&server->tmr_mutex);
if (n <= -1) HCL_INITNTIME (&tmout, 1, 0); if (n <= -1) HCL_INITNTIME (&tmout, 10, 0);
pfd[0].fd = srv_fd; /* TODO: swtich to faster multiplexer like epoll or kqueue */
pfd[0].events = POLLIN | POLLERR; pc = 0;
n = poll(pfd, 1, HCL_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec)); pfd[pc].fd = server->mux_pipe[0];
pfd[pc++].events = POLLIN | POLLERR;
pfd[pc].fd = srv_fd;
pfd[pc++].events = POLLIN | POLLERR;
n = poll(pfd, pc, HCL_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec));
if (n <= -1) if (n <= -1)
{ {
if (server->stopreq) break; /* normal termination requested */ if (server->stopreq) break; /* normal termination requested */
@ -1975,11 +2027,16 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
hcl_tmr_fire (server->tmr, HCL_NULL, HCL_NULL); hcl_tmr_fire (server->tmr, HCL_NULL, HCL_NULL);
pthread_mutex_unlock (&server->tmr_mutex); pthread_mutex_unlock (&server->tmr_mutex);
while (n > 0) for (n = 0; n < pc; n++)
{ {
--n; if (!pfd[n].revents /*& (POLLIN | POLLHUP | POLLERR) */) continue;
if (pfd[n].revents /*& (POLLIN | POLLHUP | POLLERR) */) if (pfd[n].fd == server->mux_pipe[0])
{
char tmp[128];
while (read(server->mux_pipe[0], tmp, HCL_SIZEOF(tmp)) > 0) /* nothing */;
}
else if (pfd[n].fd == srv_fd)
{ {
cli_len = HCL_SIZEOF(cli_addr); cli_len = HCL_SIZEOF(cli_addr);
cli_fd = accept(pfd[n].fd, (struct sockaddr*)&cli_addr, &cli_len); cli_fd = accept(pfd[n].fd, (struct sockaddr*)&cli_addr, &cli_len);
@ -1993,7 +2050,7 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
break; break;
} }
hcl_server_logbfmt (server, SERVER_LOGMASK_INFO, "accepted worker - socket %d\n", cli_fd); HCL_LOG1 (server->dummy_hcl, SERVER_LOGMASK_INFO, "Accepted worker - socket %d\n", cli_fd);
worker = alloc_worker(server, cli_fd); worker = alloc_worker(server, cli_fd);
if (pthread_create(&thr, &thr_attr, worker_main, worker) != 0) if (pthread_create(&thr, &thr_attr, worker_main, worker) != 0)
@ -2008,12 +2065,15 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
purge_all_workers (server, HCL_SERVER_WORKER_STATE_DEAD); purge_all_workers (server, HCL_SERVER_WORKER_STATE_DEAD);
pthread_attr_destroy (&thr_attr); pthread_attr_destroy (&thr_attr);
close (srv_fd);
return xret; return xret;
} }
void hcl_server_stop (hcl_server_t* server) void hcl_server_stop (hcl_server_t* server)
{ {
server->stopreq = 1; server->stopreq = 1;
write (server->mux_pipe[1], "Q", 1); /* don't care about failure */
} }
int hcl_server_setoption (hcl_server_t* server, hcl_server_option_t id, const void* value) int hcl_server_setoption (hcl_server_t* server, hcl_server_option_t id, const void* value)

View File

@ -402,7 +402,8 @@ static int handle_dbgopt (hcl_server_t* server, const char* str)
/* ========================================================================= */ /* ========================================================================= */
#define MIN_MEMSIZE 512000ul #define MIN_WORKER_STACK_SIZE 512000ul
#define MIN_ACTOR_HEAP_SIZE 512000ul
int main (int argc, char* argv[]) int main (int argc, char* argv[])
{ {
@ -410,8 +411,11 @@ int main (int argc, char* argv[])
static hcl_bopt_lng_t lopt[] = static hcl_bopt_lng_t lopt[] =
{ {
{ ":log", 'l' }, { ":log", 'l' },
{ ":memsize", 'm' },
{ "large-pages", '\0' }, { "large-pages", '\0' },
{ ":worker-stack-size", '\0' },
{ ":worker-idle-timeout",'\0' },
{ ":actor-heap-size", 'm' },
{ ":actor-max-runtime", '\0' },
#if defined(HCL_BUILD_DEBUG) #if defined(HCL_BUILD_DEBUG)
{ ":debug", '\0' }, /* NOTE: there is no short option for --debug */ { ":debug", '\0' }, /* NOTE: there is no short option for --debug */
#endif #endif
@ -430,8 +434,10 @@ int main (int argc, char* argv[])
const char* logopt = HCL_NULL; const char* logopt = HCL_NULL;
const char* dbgopt = HCL_NULL; const char* dbgopt = HCL_NULL;
hcl_oow_t memsize = MIN_MEMSIZE; hcl_oow_t worker_stack_size = MIN_ACTOR_HEAP_SIZE;
hcl_ntime_t tmout = { 0, 0 }; hcl_ntime_t worker_idle_timeout = { 0, 0 };
hcl_oow_t actor_heap_size = MIN_ACTOR_HEAP_SIZE;
hcl_ntime_t actor_max_runtime = { 0, 0 };
int large_pages = 0; int large_pages = 0;
unsigned int trait; unsigned int trait;
@ -453,8 +459,8 @@ int main (int argc, char* argv[])
break; break;
case 'm': case 'm':
memsize = strtoul(opt.arg, HCL_NULL, 0); actor_heap_size = strtoul(opt.arg, HCL_NULL, 0);
if (memsize <= MIN_MEMSIZE) memsize = MIN_MEMSIZE; if (actor_heap_size <= MIN_ACTOR_HEAP_SIZE) actor_heap_size = MIN_ACTOR_HEAP_SIZE;
break; break;
case '\0': case '\0':
@ -463,6 +469,21 @@ int main (int argc, char* argv[])
large_pages = 1; large_pages = 1;
break; break;
} }
else if (hcl_compbcstr(opt.lngopt, "worker-stack-size") == 0)
{
worker_stack_size = strtoul(opt.arg, HCL_NULL, 0);
if (worker_stack_size <= MIN_WORKER_STACK_SIZE) actor_heap_size = MIN_WORKER_STACK_SIZE;
}
else if (hcl_compbcstr(opt.lngopt, "worker-idle-timeout") == 0)
{
worker_idle_timeout.sec = strtoul(opt.arg, HCL_NULL, 0);
break;
}
else if (hcl_compbcstr(opt.lngopt, "actor-max-runtime") == 0)
{
actor_max_runtime.sec = strtoul(opt.arg, HCL_NULL, 0);
break;
}
#if defined(HCL_BUILD_DEBUG) #if defined(HCL_BUILD_DEBUG)
else if (hcl_compbcstr(opt.lngopt, "debug") == 0) else if (hcl_compbcstr(opt.lngopt, "debug") == 0)
{ {
@ -521,11 +542,10 @@ int main (int argc, char* argv[])
else trait &= ~HCL_SERVER_TRAIT_USE_LARGE_PAGES; else trait &= ~HCL_SERVER_TRAIT_USE_LARGE_PAGES;
hcl_server_setoption (server, HCL_SERVER_TRAIT, &trait); hcl_server_setoption (server, HCL_SERVER_TRAIT, &trait);
/*hcl_server_setoption (server, HCL_SERVER_WORKER_STACK_SIZE, ???);*/ hcl_server_setoption (server, HCL_SERVER_WORKER_STACK_SIZE, &worker_stack_size);
hcl_server_setoption (server, HCL_SERVER_ACTOR_HEAP_SIZE, &memsize); hcl_server_setoption (server, HCL_SERVER_WORKER_IDLE_TIMEOUT, &worker_idle_timeout);
hcl_server_setoption (server, HCL_SERVER_ACTOR_HEAP_SIZE, &actor_heap_size);
HCL_INITNTIME (&tmout, 5, 0); hcl_server_setoption (server, HCL_SERVER_ACTOR_MAX_RUNTIME, &actor_max_runtime);
hcl_server_setoption (server, HCL_SERVER_ACTOR_MAX_RUNTIME, &tmout);
g_server = server; g_server = server;
set_signal (SIGINT, handle_sigint); set_signal (SIGINT, handle_sigint);