implemented actor_max_runtime primitively using timer events
This commit is contained in:
parent
6f69e018c4
commit
3228386c2c
146
lib/hcl-s.c
146
lib/hcl-s.c
@ -26,6 +26,7 @@
|
||||
|
||||
#include "hcl-s.h"
|
||||
#include "hcl-prv.h"
|
||||
#include "hcl-tmr.h"
|
||||
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
@ -195,6 +196,7 @@ struct hcl_server_proto_t
|
||||
hcl_t* hcl;
|
||||
hcl_iolxc_t* lxc;
|
||||
hcl_server_proto_token_t tok;
|
||||
hcl_tmr_index_t exec_runtime_event_index;
|
||||
|
||||
struct
|
||||
{
|
||||
@ -240,6 +242,7 @@ struct hcl_server_t
|
||||
hcl_cmgr_t* cmgr;
|
||||
hcl_server_prim_t prim;
|
||||
hcl_t* dummy_hcl;
|
||||
hcl_tmr_t* tmr;
|
||||
|
||||
hcl_errnum_t errnum;
|
||||
struct
|
||||
@ -266,6 +269,7 @@ struct hcl_server_t
|
||||
} worker_list[2];
|
||||
|
||||
pthread_mutex_t worker_mutex;
|
||||
pthread_mutex_t tmr_mutex;
|
||||
pthread_mutex_t log_mutex;
|
||||
};
|
||||
|
||||
@ -602,7 +606,7 @@ static void syserrstrb (hcl_t* hcl, int syserr, hcl_bch_t* buf, hcl_oow_t len)
|
||||
#if defined(HAVE_STRERROR_R)
|
||||
strerror_r (syserr, buf, len);
|
||||
#else
|
||||
/* this is not thread safe */
|
||||
/* this may not be thread safe */
|
||||
hcl_copybcstr (buf, len, strerror(syserr));
|
||||
#endif
|
||||
}
|
||||
@ -977,6 +981,7 @@ hcl_server_proto_t* hcl_server_proto_open (hcl_oow_t xtnsize, hcl_server_worker_
|
||||
|
||||
HCL_MEMSET (proto, 0, HCL_SIZEOF(*proto));
|
||||
proto->worker = worker;
|
||||
proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX;
|
||||
|
||||
proto->hcl = hcl_open(proto->worker->server->mmgr, HCL_SIZEOF(*xtn), worker->server->cfg.actor_heap_size, &vmprim, HCL_NULL);
|
||||
if (!proto->hcl) goto oops;
|
||||
@ -1358,6 +1363,57 @@ static int get_token (hcl_server_proto_t* proto)
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void exec_runtime_handler (hcl_tmr_t* tmr, const hcl_ntime_t* now, hcl_tmr_event_t* evt)
|
||||
{
|
||||
hcl_server_proto_t* proto;
|
||||
proto = (hcl_server_proto_t*)evt->ctx;
|
||||
//printf ("aborting hcl for runtime handler timeout...\n");
|
||||
hcl_abort (proto->hcl);
|
||||
}
|
||||
|
||||
static void exec_runtime_updater (hcl_tmr_t* tmr, hcl_tmr_index_t old_index, hcl_tmr_index_t new_index, hcl_tmr_event_t* evt)
|
||||
{
|
||||
hcl_server_proto_t* proto;
|
||||
proto = (hcl_server_proto_t*)evt->ctx;
|
||||
HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == old_index);
|
||||
proto->exec_runtime_event_index = new_index;
|
||||
}
|
||||
|
||||
static int insert_exec_timer (hcl_server_proto_t* proto, const hcl_ntime_t* tmout)
|
||||
{
|
||||
hcl_tmr_event_t event;
|
||||
hcl_tmr_index_t index;
|
||||
|
||||
HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX);
|
||||
|
||||
HCL_MEMSET (&event, 0, HCL_SIZEOF(event));
|
||||
event.ctx = proto;
|
||||
proto->hcl->vmprim.vm_gettime (proto->hcl, &event.when);
|
||||
HCL_ADDNTIME (&event.when, &event.when, tmout);
|
||||
event.handler = exec_runtime_handler;
|
||||
event.updater = exec_runtime_updater;
|
||||
|
||||
pthread_mutex_lock (&proto->worker->server->tmr_mutex);
|
||||
index = hcl_tmr_insert(proto->worker->server->tmr, &event);
|
||||
pthread_mutex_unlock (&proto->worker->server->tmr_mutex);
|
||||
if (index == HCL_TMR_INVALID_INDEX) return -1;
|
||||
|
||||
proto->exec_runtime_event_index = index;
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void delete_exec_timer (hcl_server_proto_t* proto)
|
||||
{
|
||||
if (proto->exec_runtime_event_index != HCL_TMR_INVALID_INDEX)
|
||||
{
|
||||
//printf ("deleted exec timer..........\n");
|
||||
pthread_mutex_lock (&proto->worker->server->tmr_mutex);
|
||||
hcl_tmr_delete (proto->worker->server->tmr, proto->exec_runtime_event_index);
|
||||
pthread_mutex_unlock (&proto->worker->server->tmr_mutex);
|
||||
proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX;
|
||||
}
|
||||
}
|
||||
|
||||
int hcl_server_proto_handle_request (hcl_server_proto_t* proto)
|
||||
{
|
||||
if (get_token(proto) <= -1) return -1;
|
||||
@ -1475,9 +1531,37 @@ int hcl_server_proto_handle_request (hcl_server_proto_t* proto)
|
||||
|
||||
if (proto->req.state == HCL_SERVER_PROTO_REQ_IN_TOP_LEVEL)
|
||||
{
|
||||
const hcl_ooch_t* failmsg = HCL_NULL;
|
||||
hcl_server_t* server;
|
||||
|
||||
server = proto->worker->server;
|
||||
|
||||
hcl_server_proto_start_reply (proto);
|
||||
if (server->cfg.actor_max_runtime.sec <= 0 && server->cfg.actor_max_runtime.sec <= 0)
|
||||
{
|
||||
obj = hcl_execute(proto->hcl);
|
||||
if (hcl_server_proto_end_reply(proto, (obj? HCL_NULL: hcl_geterrmsg(proto->hcl))) <= -1)
|
||||
if (!obj) failmsg = hcl_geterrmsg(proto->hcl);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (insert_exec_timer(proto, &server->cfg.actor_max_runtime) <= -1)
|
||||
{
|
||||
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "cannot start execution timer\n");
|
||||
/* use proto->hcl's error formatting instead of server's to avoid using mutex over the shared server->dummy_hcl */
|
||||
hcl_seterrbfmt (proto->hcl, HCL_ESYSMEM, "cannot start execution timer");
|
||||
failmsg = hcl_geterrmsg(proto->hcl);
|
||||
}
|
||||
else
|
||||
{
|
||||
//printf ("inserted exec timer..........\n");
|
||||
obj = hcl_execute(proto->hcl);
|
||||
if (!obj) failmsg = hcl_geterrmsg(proto->hcl);
|
||||
|
||||
delete_exec_timer (proto);
|
||||
}
|
||||
}
|
||||
|
||||
if (hcl_server_proto_end_reply(proto, failmsg) <= -1)
|
||||
{
|
||||
hcl_logbfmt (proto->hcl, SERVER_LOGMASK_ERROR, "cannot finalize reply for .SCRIPT\n");
|
||||
return -1;
|
||||
@ -1531,6 +1615,7 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p
|
||||
hcl_server_t* server;
|
||||
hcl_t* hcl;
|
||||
hcl_vmprim_t vmprim;
|
||||
hcl_tmr_t* tmr;
|
||||
dummy_hcl_xtn_t* xtn;
|
||||
|
||||
server = (hcl_server_t*)HCL_MMGR_ALLOC(mmgr, HCL_SIZEOF(*server) + xtnsize);
|
||||
@ -1549,13 +1634,20 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p
|
||||
vmprim.vm_gettime = vm_gettime;
|
||||
vmprim.vm_sleep = vm_sleep;
|
||||
|
||||
hcl = hcl_open(mmgr, HCL_SIZEOF(*xtn), 10240, &vmprim, errnum);
|
||||
hcl = hcl_open(mmgr, HCL_SIZEOF(*xtn), 2048, &vmprim, errnum);
|
||||
if (!hcl)
|
||||
{
|
||||
HCL_MMGR_FREE (mmgr, server);
|
||||
return HCL_NULL;
|
||||
}
|
||||
|
||||
tmr = hcl_tmr_open(hcl, 0, 1024); /* TOOD: make the timer's default size configurable */
|
||||
if (!tmr)
|
||||
{
|
||||
hcl_close (hcl);
|
||||
HCL_MMGR_FREE (mmgr, server);
|
||||
}
|
||||
|
||||
xtn = (dummy_hcl_xtn_t*)hcl_getxtn(hcl);
|
||||
xtn->server = server;
|
||||
|
||||
@ -1564,14 +1656,18 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p
|
||||
server->cmgr = hcl_get_utf8_cmgr();
|
||||
server->prim = *prim;
|
||||
server->dummy_hcl = hcl;
|
||||
server->tmr = tmr;
|
||||
|
||||
server->cfg.logmask = ~0u;
|
||||
server->cfg.worker_stack_size = 512000UL;
|
||||
server->cfg.actor_heap_size = 512000UL;
|
||||
|
||||
pthread_mutex_init (&server->worker_mutex, HCL_NULL);
|
||||
pthread_mutex_init (&server->log_mutex, HCL_NULL);
|
||||
HCL_INITNTIME (&server->cfg.worker_idle_timeout, 0, 0);
|
||||
HCL_INITNTIME (&server->cfg.actor_max_runtime, 0, 0);
|
||||
|
||||
pthread_mutex_init (&server->worker_mutex, HCL_NULL);
|
||||
pthread_mutex_init (&server->tmr_mutex, HCL_NULL);
|
||||
pthread_mutex_init (&server->log_mutex, HCL_NULL);
|
||||
|
||||
return server;
|
||||
}
|
||||
@ -1579,8 +1675,10 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p
|
||||
void hcl_server_close (hcl_server_t* server)
|
||||
{
|
||||
pthread_mutex_destroy (&server->log_mutex);
|
||||
pthread_mutex_destroy (&server->tmr_mutex);
|
||||
pthread_mutex_destroy (&server->worker_mutex);
|
||||
|
||||
hcl_tmr_close (server->tmr);
|
||||
hcl_close (server->dummy_hcl);
|
||||
HCL_MMGR_FREE (server->mmgr, server);
|
||||
}
|
||||
@ -1848,19 +1946,49 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
|
||||
int cli_fd;
|
||||
socklen_t cli_len;
|
||||
pthread_t thr;
|
||||
|
||||
hcl_ntime_t tmout;
|
||||
hcl_server_worker_t* worker;
|
||||
struct pollfd pfd[1];
|
||||
int n;
|
||||
|
||||
purge_all_workers (server, HCL_SERVER_WORKER_STATE_DEAD);
|
||||
|
||||
pthread_mutex_lock (&server->tmr_mutex);
|
||||
n = hcl_tmr_gettmout(server->tmr, HCL_NULL, &tmout);
|
||||
pthread_mutex_unlock (&server->tmr_mutex);
|
||||
if (n <= -1) HCL_INITNTIME (&tmout, 1, 0);
|
||||
|
||||
pfd[0].fd = srv_fd;
|
||||
pfd[0].events = POLLIN | POLLERR;
|
||||
n = poll(pfd, 1, HCL_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec));
|
||||
if (n <= -1)
|
||||
{
|
||||
if (server->stopreq) break; /* normal termination requested */
|
||||
if (errno == EINTR) continue; /* interrupted but not termination requested */
|
||||
|
||||
set_err_with_syserr (server, errno, "unable to poll for events in server");
|
||||
xret = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
pthread_mutex_lock (&server->tmr_mutex);
|
||||
hcl_tmr_fire (server->tmr, HCL_NULL, HCL_NULL);
|
||||
pthread_mutex_unlock (&server->tmr_mutex);
|
||||
|
||||
while (n > 0)
|
||||
{
|
||||
--n;
|
||||
|
||||
if (pfd[n].revents /*& (POLLIN | POLLHUP | POLLERR) */)
|
||||
{
|
||||
cli_len = HCL_SIZEOF(cli_addr);
|
||||
cli_fd = accept(srv_fd, (struct sockaddr*)&cli_addr, &cli_len);
|
||||
cli_fd = accept(pfd[n].fd, (struct sockaddr*)&cli_addr, &cli_len);
|
||||
if (cli_fd == -1)
|
||||
{
|
||||
if (server->stopreq) break; /* normal termination requested */
|
||||
if (errno == EINTR) continue; /* interrupted but not termination requested */
|
||||
|
||||
set_err_with_syserr (server, errno, "unable to accept worker on socket %d", srv_fd);
|
||||
set_err_with_syserr (server, errno, "unable to accept worker on socket %d", pfd[n]);
|
||||
xret = -1;
|
||||
break;
|
||||
}
|
||||
@ -1873,6 +2001,8 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs)
|
||||
free_worker (worker);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
purge_all_workers (server, HCL_SERVER_WORKER_STATE_ALIVE);
|
||||
purge_all_workers (server, HCL_SERVER_WORKER_STATE_DEAD);
|
||||
|
@ -111,8 +111,6 @@ int hcl_init (hcl_t* hcl, hcl_mmgr_t* mmgr, hcl_oow_t heapsz, const hcl_vmprim_t
|
||||
if (!hcl->vmprim.free_heap) hcl->vmprim.free_heap = free_heap;
|
||||
|
||||
hcl->option.log_mask = ~0u;
|
||||
hcl->option.log_mask &= ~HCL_LOG_PREFER_JSON;
|
||||
|
||||
hcl->option.log_maxcapa = HCL_DFL_LOG_MAXCAPA;
|
||||
hcl->option.dfl_symtab_size = HCL_DFL_SYMTAB_SIZE;
|
||||
hcl->option.dfl_sysdic_size = HCL_DFL_SYSDIC_SIZE;
|
||||
|
@ -664,7 +664,7 @@ static void syserrstrb (hcl_t* hcl, int syserr, hcl_bch_t* buf, hcl_oow_t len)
|
||||
#if defined(HAVE_STRERROR_R)
|
||||
strerror_r (syserr, buf, len);
|
||||
#else
|
||||
/* this is not thread safe */
|
||||
/* this may not be thread safe */
|
||||
hcl_copybcstr (buf, len, strerror(syserr));
|
||||
#endif
|
||||
}
|
||||
|
@ -431,6 +431,7 @@ int main (int argc, char* argv[])
|
||||
const char* logopt = HCL_NULL;
|
||||
const char* dbgopt = HCL_NULL;
|
||||
hcl_oow_t memsize = MIN_MEMSIZE;
|
||||
hcl_ntime_t tmout = { 0, 0 };
|
||||
int large_pages = 0;
|
||||
unsigned int trait;
|
||||
|
||||
@ -523,6 +524,9 @@ int main (int argc, char* argv[])
|
||||
/*hcl_server_setoption (server, HCL_SERVER_WORKER_STACK_SIZE, ???);*/
|
||||
hcl_server_setoption (server, HCL_SERVER_ACTOR_HEAP_SIZE, &memsize);
|
||||
|
||||
HCL_INITNTIME (&tmout, 5, 0);
|
||||
hcl_server_setoption (server, HCL_SERVER_ACTOR_MAX_RUNTIME, &tmout);
|
||||
|
||||
g_server = server;
|
||||
set_signal (SIGINT, handle_sigint);
|
||||
set_signal_to_ignore (SIGPIPE);
|
||||
|
16
lib/tmr.c
16
lib/tmr.c
@ -66,7 +66,7 @@ int hcl_tmr_init (hcl_tmr_t* tmr, hcl_t* hcl, hcl_oow_t capa)
|
||||
|
||||
if (capa <= 0) capa = 1;
|
||||
|
||||
tmp = HCL_MMGR_ALLOC (hcl->mmgr, capa * HCL_SIZEOF(*tmp));
|
||||
tmp = HCL_MMGR_ALLOC(hcl->mmgr, capa * HCL_SIZEOF(*tmp));
|
||||
if (tmp == HCL_NULL) return -1;
|
||||
|
||||
tmr->hcl = hcl;
|
||||
@ -208,25 +208,31 @@ hcl_tmr_index_t hcl_tmr_insert (hcl_tmr_t* tmr, const hcl_tmr_event_t* event)
|
||||
|
||||
HCL_ASSERT (tmr->hcl, tmr->capa >= 1);
|
||||
new_capa = tmr->capa * 2;
|
||||
tmp = HCL_MMGR_REALLOC (tmr->hcl->mmgr, tmr->event, new_capa * HCL_SIZEOF(*tmp));
|
||||
tmp = HCL_MMGR_REALLOC(tmr->hcl->mmgr, tmr->event, new_capa * HCL_SIZEOF(*tmp));
|
||||
if (tmp == HCL_NULL) return HCL_TMR_INVALID_INDEX;
|
||||
|
||||
tmr->event = tmp;
|
||||
tmr->capa = new_capa;
|
||||
}
|
||||
|
||||
HCL_ASSERT (tmr->hcl, event->handler != HCL_NULL);
|
||||
HCL_ASSERT (tmr->hcl, event->updater != HCL_NULL);
|
||||
|
||||
tmr->size = tmr->size + 1;
|
||||
tmr->event[index] = *event;
|
||||
return sift_up (tmr, index, 0);
|
||||
return sift_up(tmr, index, 0);
|
||||
}
|
||||
|
||||
hcl_tmr_index_t hcl_tmr_update (hcl_tmr_t* tmr, hcl_oow_t index, const hcl_tmr_event_t* event)
|
||||
{
|
||||
hcl_tmr_event_t item;
|
||||
|
||||
HCL_ASSERT (tmr->hcl, event->handler != HCL_NULL);
|
||||
HCL_ASSERT (tmr->hcl, event->updater != HCL_NULL);
|
||||
|
||||
item = tmr->event[index];
|
||||
tmr->event[index] = *event;
|
||||
return YOUNGER_THAN(event, &item)? sift_up (tmr, index, 0): sift_down (tmr, index, 0);
|
||||
return YOUNGER_THAN(event, &item)? sift_up(tmr, index, 0): sift_down(tmr, index, 0);
|
||||
}
|
||||
|
||||
int hcl_tmr_fire (hcl_tmr_t* tmr, const hcl_ntime_t* tm, hcl_oow_t* firecnt)
|
||||
@ -267,7 +273,7 @@ int hcl_tmr_gettmout (hcl_tmr_t* tmr, const hcl_ntime_t* tm, hcl_ntime_t* tmout)
|
||||
/*else if (hcl_gettime(&now) <= -1) return -1;*/
|
||||
tmr->hcl->vmprim.vm_gettime (tmr->hcl, &now);
|
||||
|
||||
HCL_SUBNTIME (&tmr->event[0].when, &now, tmout);
|
||||
HCL_SUBNTIME (tmout, &tmr->event[0].when, &now);
|
||||
if (tmout->sec < 0) HCL_CLEARNTIME (tmout);
|
||||
|
||||
return 0;
|
||||
|
Loading…
Reference in New Issue
Block a user