From 3228386c2c9738d81c6e3a0a288b1a17b3d183d1 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Thu, 15 Mar 2018 10:30:06 +0000 Subject: [PATCH] implemented actor_max_runtime primitively using timer events --- lib/hcl-s.c | 162 ++++++++++++++++++++++++++++++++++++++++++++++------ lib/hcl.c | 2 - lib/main.c | 2 +- lib/main2.c | 4 ++ lib/tmr.c | 16 ++++-- 5 files changed, 162 insertions(+), 24 deletions(-) diff --git a/lib/hcl-s.c b/lib/hcl-s.c index 1ed7b8e..0a12e8a 100644 --- a/lib/hcl-s.c +++ b/lib/hcl-s.c @@ -26,6 +26,7 @@ #include "hcl-s.h" #include "hcl-prv.h" +#include "hcl-tmr.h" #include #include @@ -195,6 +196,7 @@ struct hcl_server_proto_t hcl_t* hcl; hcl_iolxc_t* lxc; hcl_server_proto_token_t tok; + hcl_tmr_index_t exec_runtime_event_index; struct { @@ -240,6 +242,7 @@ struct hcl_server_t hcl_cmgr_t* cmgr; hcl_server_prim_t prim; hcl_t* dummy_hcl; + hcl_tmr_t* tmr; hcl_errnum_t errnum; struct @@ -266,6 +269,7 @@ struct hcl_server_t } worker_list[2]; pthread_mutex_t worker_mutex; + pthread_mutex_t tmr_mutex; pthread_mutex_t log_mutex; }; @@ -602,7 +606,7 @@ static void syserrstrb (hcl_t* hcl, int syserr, hcl_bch_t* buf, hcl_oow_t len) #if defined(HAVE_STRERROR_R) strerror_r (syserr, buf, len); #else - /* this is not thread safe */ + /* this may not be thread safe */ hcl_copybcstr (buf, len, strerror(syserr)); #endif } @@ -977,6 +981,7 @@ hcl_server_proto_t* hcl_server_proto_open (hcl_oow_t xtnsize, hcl_server_worker_ HCL_MEMSET (proto, 0, HCL_SIZEOF(*proto)); proto->worker = worker; + proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX; proto->hcl = hcl_open(proto->worker->server->mmgr, HCL_SIZEOF(*xtn), worker->server->cfg.actor_heap_size, &vmprim, HCL_NULL); if (!proto->hcl) goto oops; @@ -1358,6 +1363,57 @@ static int get_token (hcl_server_proto_t* proto) return 0; } +static void exec_runtime_handler (hcl_tmr_t* tmr, const hcl_ntime_t* now, hcl_tmr_event_t* evt) +{ + hcl_server_proto_t* proto; + proto = (hcl_server_proto_t*)evt->ctx; +//printf ("aborting hcl for runtime handler timeout...\n"); + hcl_abort (proto->hcl); +} + +static void exec_runtime_updater (hcl_tmr_t* tmr, hcl_tmr_index_t old_index, hcl_tmr_index_t new_index, hcl_tmr_event_t* evt) +{ + hcl_server_proto_t* proto; + proto = (hcl_server_proto_t*)evt->ctx; + HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == old_index); + proto->exec_runtime_event_index = new_index; +} + +static int insert_exec_timer (hcl_server_proto_t* proto, const hcl_ntime_t* tmout) +{ + hcl_tmr_event_t event; + hcl_tmr_index_t index; + + HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX); + + HCL_MEMSET (&event, 0, HCL_SIZEOF(event)); + event.ctx = proto; + proto->hcl->vmprim.vm_gettime (proto->hcl, &event.when); + HCL_ADDNTIME (&event.when, &event.when, tmout); + event.handler = exec_runtime_handler; + event.updater = exec_runtime_updater; + + pthread_mutex_lock (&proto->worker->server->tmr_mutex); + index = hcl_tmr_insert(proto->worker->server->tmr, &event); + pthread_mutex_unlock (&proto->worker->server->tmr_mutex); + if (index == HCL_TMR_INVALID_INDEX) return -1; + + proto->exec_runtime_event_index = index; + return 0; +} + +static void delete_exec_timer (hcl_server_proto_t* proto) +{ + if (proto->exec_runtime_event_index != HCL_TMR_INVALID_INDEX) + { +//printf ("deleted exec timer..........\n"); + pthread_mutex_lock (&proto->worker->server->tmr_mutex); + hcl_tmr_delete (proto->worker->server->tmr, proto->exec_runtime_event_index); + pthread_mutex_unlock (&proto->worker->server->tmr_mutex); + proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX; + } +} + int hcl_server_proto_handle_request (hcl_server_proto_t* proto) { if (get_token(proto) <= -1) return -1; @@ -1475,9 +1531,37 @@ int hcl_server_proto_handle_request (hcl_server_proto_t* proto) if (proto->req.state == HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL) { + const hcl_ooch_t* failmsg = HCL_NULL; + hcl_server_t* server; + + server = proto->worker->server; + hcl_server_proto_start_reply (proto); - obj = hcl_execute(proto->hcl); - if (hcl_server_proto_end_reply(proto, (obj? HCL_NULL: hcl_geterrmsg(proto->hcl))) <= -1) + if (server->cfg.actor_max_runtime.sec <= 0 && server->cfg.actor_max_runtime.sec <= 0) + { + obj = hcl_execute(proto->hcl); + if (!obj) failmsg = hcl_geterrmsg(proto->hcl); + } + else + { + if (insert_exec_timer(proto, &server->cfg.actor_max_runtime) <= -1) + { + hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "cannot start execution timer\n"); + /* use proto->hcl's error formatting instead of server's to avoid using mutex over the shared server->dummy_hcl */ + hcl_seterrbfmt (proto->hcl, HCL_ESYSMEM, "cannot start execution timer"); + failmsg = hcl_geterrmsg(proto->hcl); + } + else + { +//printf ("inserted exec timer..........\n"); + obj = hcl_execute(proto->hcl); + if (!obj) failmsg = hcl_geterrmsg(proto->hcl); + + delete_exec_timer (proto); + } + } + + if (hcl_server_proto_end_reply(proto, failmsg) <= -1) { hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "cannot finalize reply for .SCRIPT\n"); return -1; @@ -1531,6 +1615,7 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p hcl_server_t* server; hcl_t* hcl; hcl_vmprim_t vmprim; + hcl_tmr_t* tmr; dummy_hcl_xtn_t* xtn; server = (hcl_server_t*)HCL_MMGR_ALLOC(mmgr, HCL_SIZEOF(*server) + xtnsize); @@ -1549,13 +1634,20 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p vmprim.vm_gettime = vm_gettime; vmprim.vm_sleep = vm_sleep; - hcl = hcl_open(mmgr, HCL_SIZEOF(*xtn), 10240, &vmprim, errnum); + hcl = hcl_open(mmgr, HCL_SIZEOF(*xtn), 2048, &vmprim, errnum); if (!hcl) { HCL_MMGR_FREE (mmgr, server); return HCL_NULL; } + tmr = hcl_tmr_open(hcl, 0, 1024); /* TOOD: make the timer's default size configurable */ + if (!tmr) + { + hcl_close (hcl); + HCL_MMGR_FREE (mmgr, server); + } + xtn = (dummy_hcl_xtn_t*)hcl_getxtn(hcl); xtn->server = server; @@ -1564,14 +1656,18 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p server->cmgr = hcl_get_utf8_cmgr(); server->prim = *prim; server->dummy_hcl = hcl; + server->tmr = tmr; server->cfg.logmask = ~0u; server->cfg.worker_stack_size = 512000UL; server->cfg.actor_heap_size = 512000UL; - pthread_mutex_init (&server->worker_mutex, HCL_NULL); - pthread_mutex_init (&server->log_mutex, HCL_NULL); + HCL_INITNTIME (&server->cfg.worker_idle_timeout, 0, 0); + HCL_INITNTIME (&server->cfg.actor_max_runtime, 0, 0); + pthread_mutex_init (&server->worker_mutex, HCL_NULL); + pthread_mutex_init (&server->tmr_mutex, HCL_NULL); + pthread_mutex_init (&server->log_mutex, HCL_NULL); return server; } @@ -1579,8 +1675,10 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p void hcl_server_close (hcl_server_t* server) { pthread_mutex_destroy (&server->log_mutex); + pthread_mutex_destroy (&server->tmr_mutex); pthread_mutex_destroy (&server->worker_mutex); + hcl_tmr_close (server->tmr); hcl_close (server->dummy_hcl); HCL_MMGR_FREE (server->mmgr, server); } @@ -1588,7 +1686,7 @@ void hcl_server_close (hcl_server_t* server) static hcl_server_worker_t* alloc_worker (hcl_server_t* server, int cli_sck) { hcl_server_worker_t* worker; - + worker = (hcl_server_worker_t*)HCL_MMGR_ALLOC (server->mmgr, HCL_SIZEOF(*worker)); if (!worker) return HCL_NULL; @@ -1848,29 +1946,61 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs) int cli_fd; socklen_t cli_len; pthread_t thr; - + hcl_ntime_t tmout; hcl_server_worker_t* worker; + struct pollfd pfd[1]; + int n; purge_all_workers (server, HCL_SERVER_WORKER_STATE_DEAD); - cli_len = HCL_SIZEOF(cli_addr); - cli_fd = accept(srv_fd, (struct sockaddr*)&cli_addr, &cli_len); - if (cli_fd == -1) + pthread_mutex_lock (&server->tmr_mutex); + n = hcl_tmr_gettmout(server->tmr, HCL_NULL, &tmout); + pthread_mutex_unlock (&server->tmr_mutex); + if (n <= -1) HCL_INITNTIME (&tmout, 1, 0); + + pfd[0].fd = srv_fd; + pfd[0].events = POLLIN | POLLERR; + n = poll(pfd, 1, HCL_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec)); + if (n <= -1) { if (server->stopreq) break; /* normal termination requested */ if (errno == EINTR) continue; /* interrupted but not termination requested */ - set_err_with_syserr (server, errno, "unable to accept worker on socket %d", srv_fd); + set_err_with_syserr (server, errno, "unable to poll for events in server"); xret = -1; break; } - hcl_server_logbfmt (server, SERVER_LOGMASK_INFO, "accepted worker - socket %d\n", cli_fd); + pthread_mutex_lock (&server->tmr_mutex); + hcl_tmr_fire (server->tmr, HCL_NULL, HCL_NULL); + pthread_mutex_unlock (&server->tmr_mutex); - worker = alloc_worker(server, cli_fd); - if (pthread_create(&thr, &thr_attr, worker_main, worker) != 0) + while (n > 0) { - free_worker (worker); + --n; + + if (pfd[n].revents /*& (POLLIN | POLLHUP | POLLERR) */) + { + cli_len = HCL_SIZEOF(cli_addr); + cli_fd = accept(pfd[n].fd, (struct sockaddr*)&cli_addr, &cli_len); + if (cli_fd == -1) + { + if (server->stopreq) break; /* normal termination requested */ + if (errno == EINTR) continue; /* interrupted but not termination requested */ + + set_err_with_syserr (server, errno, "unable to accept worker on socket %d", pfd[n]); + xret = -1; + break; + } + + hcl_server_logbfmt (server, SERVER_LOGMASK_INFO, "accepted worker - socket %d\n", cli_fd); + + worker = alloc_worker(server, cli_fd); + if (pthread_create(&thr, &thr_attr, worker_main, worker) != 0) + { + free_worker (worker); + } + } } } diff --git a/lib/hcl.c b/lib/hcl.c index a1119ec..93bee83 100644 --- a/lib/hcl.c +++ b/lib/hcl.c @@ -111,8 +111,6 @@ int hcl_init (hcl_t* hcl, hcl_mmgr_t* mmgr, hcl_oow_t heapsz, const hcl_vmprim_t if (!hcl->vmprim.free_heap) hcl->vmprim.free_heap = free_heap; hcl->option.log_mask = ~0u; - hcl->option.log_mask &= ~HCL_LOG_PREFER_JSON; - hcl->option.log_maxcapa = HCL_DFL_LOG_MAXCAPA; hcl->option.dfl_symtab_size = HCL_DFL_SYMTAB_SIZE; hcl->option.dfl_sysdic_size = HCL_DFL_SYSDIC_SIZE; diff --git a/lib/main.c b/lib/main.c index 9e5c993..487691b 100644 --- a/lib/main.c +++ b/lib/main.c @@ -664,7 +664,7 @@ static void syserrstrb (hcl_t* hcl, int syserr, hcl_bch_t* buf, hcl_oow_t len) #if defined(HAVE_STRERROR_R) strerror_r (syserr, buf, len); #else - /* this is not thread safe */ + /* this may not be thread safe */ hcl_copybcstr (buf, len, strerror(syserr)); #endif } diff --git a/lib/main2.c b/lib/main2.c index a1ea020..03a7133 100644 --- a/lib/main2.c +++ b/lib/main2.c @@ -431,6 +431,7 @@ int main (int argc, char* argv[]) const char* logopt = HCL_NULL; const char* dbgopt = HCL_NULL; hcl_oow_t memsize = MIN_MEMSIZE; + hcl_ntime_t tmout = { 0, 0 }; int large_pages = 0; unsigned int trait; @@ -523,6 +524,9 @@ int main (int argc, char* argv[]) /*hcl_server_setoption (server, HCL_SERVER_WORKER_STACK_SIZE, ???);*/ hcl_server_setoption (server, HCL_SERVER_ACTOR_HEAP_SIZE, &memsize); + HCL_INITNTIME (&tmout, 5, 0); + hcl_server_setoption (server, HCL_SERVER_ACTOR_MAX_RUNTIME, &tmout); + g_server = server; set_signal (SIGINT, handle_sigint); set_signal_to_ignore (SIGPIPE); diff --git a/lib/tmr.c b/lib/tmr.c index 8717863..44b94a1 100644 --- a/lib/tmr.c +++ b/lib/tmr.c @@ -66,7 +66,7 @@ int hcl_tmr_init (hcl_tmr_t* tmr, hcl_t* hcl, hcl_oow_t capa) if (capa <= 0) capa = 1; - tmp = HCL_MMGR_ALLOC (hcl->mmgr, capa * HCL_SIZEOF(*tmp)); + tmp = HCL_MMGR_ALLOC(hcl->mmgr, capa * HCL_SIZEOF(*tmp)); if (tmp == HCL_NULL) return -1; tmr->hcl = hcl; @@ -208,25 +208,31 @@ hcl_tmr_index_t hcl_tmr_insert (hcl_tmr_t* tmr, const hcl_tmr_event_t* event) HCL_ASSERT (tmr->hcl, tmr->capa >= 1); new_capa = tmr->capa * 2; - tmp = HCL_MMGR_REALLOC (tmr->hcl->mmgr, tmr->event, new_capa * HCL_SIZEOF(*tmp)); + tmp = HCL_MMGR_REALLOC(tmr->hcl->mmgr, tmr->event, new_capa * HCL_SIZEOF(*tmp)); if (tmp == HCL_NULL) return HCL_TMR_INVALID_INDEX; tmr->event = tmp; tmr->capa = new_capa; } + HCL_ASSERT (tmr->hcl, event->handler != HCL_NULL); + HCL_ASSERT (tmr->hcl, event->updater != HCL_NULL); + tmr->size = tmr->size + 1; tmr->event[index] = *event; - return sift_up (tmr, index, 0); + return sift_up(tmr, index, 0); } hcl_tmr_index_t hcl_tmr_update (hcl_tmr_t* tmr, hcl_oow_t index, const hcl_tmr_event_t* event) { hcl_tmr_event_t item; + HCL_ASSERT (tmr->hcl, event->handler != HCL_NULL); + HCL_ASSERT (tmr->hcl, event->updater != HCL_NULL); + item = tmr->event[index]; tmr->event[index] = *event; - return YOUNGER_THAN(event, &item)? sift_up (tmr, index, 0): sift_down (tmr, index, 0); + return YOUNGER_THAN(event, &item)? sift_up(tmr, index, 0): sift_down(tmr, index, 0); } int hcl_tmr_fire (hcl_tmr_t* tmr, const hcl_ntime_t* tm, hcl_oow_t* firecnt) @@ -267,7 +273,7 @@ int hcl_tmr_gettmout (hcl_tmr_t* tmr, const hcl_ntime_t* tm, hcl_ntime_t* tmout) /*else if (hcl_gettime(&now) <= -1) return -1;*/ tmr->hcl->vmprim.vm_gettime (tmr->hcl, &now); - HCL_SUBNTIME (&tmr->event[0].when, &now, tmout); + HCL_SUBNTIME (tmout, &tmr->event[0].when, &now); if (tmout->sec < 0) HCL_CLEARNTIME (tmout); return 0;