wip - making hcl-x code more reusable
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
hyung-hwan 2024-04-20 12:02:22 +09:00
parent f9bf37f8bf
commit 340f1d8a44
2 changed files with 327 additions and 157 deletions

View File

@ -33,7 +33,7 @@
#define HCL_SERVER_TOKEN_NAME_ALIGN 64 #define HCL_SERVER_TOKEN_NAME_ALIGN 64
#define HCL_SERVER_WID_MAP_ALIGN 512 #define HCL_SERVER_WID_MAP_ALIGN 512
#define HCL_SERVER_PROTO_REPLY_BUF_SIZE 1300 #define HCL_XPROTO_REPLY_BUF_SIZE 1300
#if defined(_WIN32) #if defined(_WIN32)
# include <windows.h> # include <windows.h>
@ -91,7 +91,7 @@ typedef struct bb_t bb_t;
struct worker_hcl_xtn_t struct worker_hcl_xtn_t
{ {
hcl_server_proto_t* proto; hcl_server_worker_t* worker;
int vm_running; int vm_running;
}; };
typedef struct worker_hcl_xtn_t worker_hcl_xtn_t; typedef struct worker_hcl_xtn_t worker_hcl_xtn_t;
@ -104,23 +104,21 @@ typedef struct server_hcl_xtn_t server_hcl_xtn_t;
/* ---------------------------------- */ /* ---------------------------------- */
enum hcl_server_proto_rcv_state_t enum hcl_xproto_rcv_state_t
{ {
HCL_SERVER_PROTO_RCV_HEADER, HCL_XPROTO_RCV_HEADER,
HCL_SERVER_PROTO_RCV_PAYLOAD HCL_XPROTO_RCV_PAYLOAD
}; };
typedef enum hcl_server_proto_rcv_state_t hcl_server_proto_rcv_state_t; typedef enum hcl_xproto_rcv_state_t hcl_xproto_rcv_state_t;
struct hcl_server_proto_t struct hcl_xproto_t
{ {
hcl_server_worker_t* worker; hcl_server_worker_t* worker;
hcl_t* hcl;
hcl_tmr_index_t exec_runtime_event_index; hcl_tmr_index_t exec_runtime_event_index;
struct struct
{ {
hcl_server_proto_rcv_state_t state; hcl_xproto_rcv_state_t state;
hcl_oow_t len_needed; hcl_oow_t len_needed;
unsigned int eof: 1; unsigned int eof: 1;
@ -136,6 +134,8 @@ struct hcl_server_proto_t
} snd; } snd;
}; };
/* ---------------------------------- */
enum hcl_server_worker_state_t enum hcl_server_worker_state_t
{ {
HCL_SERVER_WORKER_STATE_DEAD = 0, HCL_SERVER_WORKER_STATE_DEAD = 0,
@ -168,7 +168,8 @@ struct hcl_server_worker_t
hcl_ntime_t alloc_time; hcl_ntime_t alloc_time;
hcl_server_worker_state_t state; hcl_server_worker_state_t state;
hcl_server_worker_opstate_t opstate; hcl_server_worker_opstate_t opstate;
hcl_server_proto_t* proto; hcl_xproto_t* proto;
hcl_t* hcl;
hcl_server_t* server; hcl_server_t* server;
hcl_server_worker_t* prev_worker; hcl_server_worker_t* prev_worker;
@ -268,10 +269,10 @@ struct hcl_server_t
}; };
/* ========================================================================= */ /* ========================================================================= */
static int send_stdout_bytes (hcl_server_proto_t* proto, const hcl_bch_t* data, hcl_oow_t len); static int send_stdout_bytes (hcl_xproto_t* proto, const hcl_bch_t* data, hcl_oow_t len);
#if defined(HCL_OOCH_IS_UCH) #if defined(HCL_OOCH_IS_UCH)
static int send_stdout_chars (hcl_server_proto_t* proto, const hcl_ooch_t* data, hcl_oow_t len); static int send_stdout_chars (hcl_xproto_t* proto, const hcl_ooch_t* data, hcl_oow_t len);
#else #else
#define send_stdout_chars(proto,data,len) send_stdout_bytes(proto,data,len) #define send_stdout_chars(proto,data,len) send_stdout_bytes(proto,data,len)
#endif #endif
@ -297,7 +298,7 @@ static HCL_INLINE int open_read_stream (hcl_t* hcl, hcl_io_cciarg_t* arg)
bb_t* bb = HCL_NULL; bb_t* bb = HCL_NULL;
hcl_server_t* server; hcl_server_t* server;
server = xtn->proto->worker->server; server = xtn->worker->server;
if (arg->includer) if (arg->includer)
{ {
@ -370,7 +371,7 @@ static HCL_INLINE int open_read_stream (hcl_t* hcl, hcl_io_cciarg_t* arg)
bb->fn = (hcl_bch_t*)(bb + 1); bb->fn = (hcl_bch_t*)(bb + 1);
hcl_copy_bcstr (bb->fn, pathlen + 1, ""); hcl_copy_bcstr (bb->fn, pathlen + 1, "");
bb->fd = xtn->proto->worker->sck; bb->fd = xtn->worker->sck;
} }
HCL_ASSERT (hcl, bb->fd >= 0); HCL_ASSERT (hcl, bb->fd >= 0);
@ -381,7 +382,7 @@ static HCL_INLINE int open_read_stream (hcl_t* hcl, hcl_io_cciarg_t* arg)
oops: oops:
if (bb) if (bb)
{ {
if (bb->fd >= 0 && bb->fd != xtn->proto->worker->sck) close (bb->fd); if (bb->fd >= 0 && bb->fd != xtn->worker->sck) close (bb->fd);
hcl_freemem (hcl, bb); hcl_freemem (hcl, bb);
} }
return -1; return -1;
@ -395,7 +396,7 @@ static HCL_INLINE int close_read_stream (hcl_t* hcl, hcl_io_cciarg_t* arg)
bb = (bb_t*)arg->handle; bb = (bb_t*)arg->handle;
HCL_ASSERT (hcl, bb != HCL_NULL && bb->fd >= 0); HCL_ASSERT (hcl, bb != HCL_NULL && bb->fd >= 0);
if (bb->fd != xtn->proto->worker->sck) close (bb->fd); if (bb->fd != xtn->worker->sck) close (bb->fd);
hcl_freemem (hcl, bb); hcl_freemem (hcl, bb);
arg->handle = HCL_NULL; arg->handle = HCL_NULL;
@ -414,7 +415,7 @@ static HCL_INLINE int read_read_stream (hcl_t* hcl, hcl_io_cciarg_t* arg)
bb = (bb_t*)arg->handle; bb = (bb_t*)arg->handle;
HCL_ASSERT (hcl, bb != HCL_NULL && bb->fd >= 0); HCL_ASSERT (hcl, bb != HCL_NULL && bb->fd >= 0);
worker = xtn->proto->worker; worker = xtn->worker;
start_over: start_over:
if (arg->includer) if (arg->includer)
@ -551,7 +552,7 @@ static int scan_handler (hcl_t* hcl, hcl_io_cmd_t cmd, void* arg)
hcl_io_udiarg_t* inarg = (hcl_io_udiarg_t*)arg; hcl_io_udiarg_t* inarg = (hcl_io_udiarg_t*)arg;
// what if it writes a request to require more input?? // what if it writes a request to require more input??
if (hcl_server_proto_handle_incoming(xtn->proto) <= -1) if (hcl_xproto_handle_incoming(xtn->proto) <= -1)
{ {
} }
} }
@ -583,7 +584,7 @@ printf ("IO CLOSE SOMETHING...........\n");
hcl_io_udoarg_t* outarg = (hcl_io_udoarg_t*)arg; hcl_io_udoarg_t* outarg = (hcl_io_udoarg_t*)arg;
printf ("IO WRITE SOMETHING...........\n"); printf ("IO WRITE SOMETHING...........\n");
if (send_stdout_chars(xtn->proto, outarg->ptr, outarg->len) <= -1) if (send_stdout_chars(xtn->worker->proto, outarg->ptr, outarg->len) <= -1)
{ {
/* TODO: change error code and message. propagage the errormessage from proto */ /* TODO: change error code and message. propagage the errormessage from proto */
hcl_seterrbfmt (hcl, HCL_EIOERR, "failed to write message via proto"); hcl_seterrbfmt (hcl, HCL_EIOERR, "failed to write message via proto");
@ -603,7 +604,7 @@ printf ("IO WRITE SOMETHING...........\n");
hcl_io_udoarg_t* outarg = (hcl_io_udoarg_t*)arg; hcl_io_udoarg_t* outarg = (hcl_io_udoarg_t*)arg;
printf ("IO WRITE SOMETHING BYTES...........\n"); printf ("IO WRITE SOMETHING BYTES...........\n");
if (send_stdout_bytes(xtn->proto, outarg->ptr, outarg->len) <= -1) if (send_stdout_bytes(xtn->worker->proto, outarg->ptr, outarg->len) <= -1)
{ {
/* TODO: change error code and message. propagage the errormessage from proto */ /* TODO: change error code and message. propagage the errormessage from proto */
hcl_seterrbfmt (hcl, HCL_EIOERR, "failed to write message via proto"); hcl_seterrbfmt (hcl, HCL_EIOERR, "failed to write message via proto");
@ -630,9 +631,9 @@ static void server_log_write (hcl_t* hcl, hcl_bitmask_t mask, const hcl_ooch_t*
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl); worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
hcl_server_t* server; hcl_server_t* server;
server = xtn->proto->worker->server; server = xtn->worker->server;
pthread_mutex_lock (&server->log_mutex); pthread_mutex_lock (&server->log_mutex);
server->prim.log_write (server, xtn->proto->worker->wid, mask, msg, len); server->prim.log_write (server, xtn->worker->wid, mask, msg, len);
pthread_mutex_unlock (&server->log_mutex); pthread_mutex_unlock (&server->log_mutex);
} }
@ -665,7 +666,7 @@ static void vm_cleanup (hcl_t* hcl)
static void vm_checkbc (hcl_t* hcl, hcl_oob_t bcode) static void vm_checkbc (hcl_t* hcl, hcl_oob_t bcode)
{ {
worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl); worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
if (xtn->proto->worker->server->stopreq) hcl_abort(hcl); if (xtn->worker->server->stopreq) hcl_abort(hcl);
} }
/* /*
@ -687,7 +688,7 @@ static void fini_hcl (hcl_t* hcl)
static int on_fed_cnode (hcl_t* hcl, hcl_cnode_t* obj) static int on_fed_cnode (hcl_t* hcl, hcl_cnode_t* obj)
{ {
/*worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);*/ /*worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);*/
/*hcl_server_proto_t* proto = xtn->proto;*/ /*hcl_xproto_t* proto = xtn->proto;*/
int flags = 0; int flags = 0;
printf ("on_fed_cnode......\n"); printf ("on_fed_cnode......\n");
@ -711,20 +712,21 @@ hcl_logbfmt (hcl, HCL_LOG_STDERR, "COMPILER ERROR - %js\n", hcl_geterrmsg(hcl));
return 0; return 0;
} }
hcl_server_proto_t* hcl_server_proto_open (hcl_oow_t xtnsize, hcl_server_worker_t* worker) #if 0
hcl_xproto_t* hcl_xproto_open (hcl_oow_t xtnsize, hcl_server_worker_t* worker)
{ {
hcl_server_proto_t* proto; hcl_xproto_t* proto;
hcl_cb_t hclcb; hcl_cb_t hclcb;
worker_hcl_xtn_t* xtn; worker_hcl_xtn_t* xtn;
hcl_bitmask_t trait; hcl_bitmask_t trait;
proto = (hcl_server_proto_t*)hcl_server_allocmem(worker->server, HCL_SIZEOF(*proto)); proto = (hcl_xproto_t*)hcl_server_allocmem(worker->server, HCL_SIZEOF(*proto));
if (HCL_UNLIKELY(!proto)) return HCL_NULL; if (HCL_UNLIKELY(!proto)) return HCL_NULL;
HCL_MEMSET (proto, 0, HCL_SIZEOF(*proto)); HCL_MEMSET (proto, 0, HCL_SIZEOF(*proto));
proto->worker = worker; proto->worker = worker;
proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX; proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX;
proto->rcv.state = HCL_SERVER_PROTO_RCV_HEADER; proto->rcv.state = HCL_XPROTO_RCV_HEADER;
proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr); proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr);
proto->rcv.eof = 0; proto->rcv.eof = 0;
@ -776,23 +778,28 @@ oops:
return HCL_NULL; return HCL_NULL;
} }
void hcl_server_proto_close (hcl_server_proto_t* proto) void hcl_xproto_close (hcl_xproto_t* proto)
{ {
hcl_endfeed(proto->hcl); hcl_endfeed(proto->hcl);
hcl_close (proto->hcl); hcl_close (proto->hcl);
hcl_server_freemem (proto->worker->server, proto); hcl_server_freemem (proto->worker->server, proto);
} }
#endif
static void exec_runtime_handler (hcl_tmr_t* tmr, const hcl_ntime_t* now, hcl_tmr_event_t* evt) static void exec_runtime_handler (hcl_tmr_t* tmr, const hcl_ntime_t* now, hcl_tmr_event_t* evt)
{ {
/* [NOTE] this handler is executed in the main server thread /* [NOTE] this handler is executed in the main server thread
* when it calls hcl_tmr_fire(). */ * when it calls hcl_tmr_fire(). */
hcl_server_proto_t* proto; hcl_xproto_t* proto;
proto = (hcl_server_proto_t*)evt->ctx; hcl_server_worker_t* worker;
HCL_LOG1 (proto->worker->server->dummy_hcl, SERVER_LOGMASK_INFO, "Aborting script execution for max_actor_runtime exceeded [%zu]\n", proto->worker->wid); proto = (hcl_xproto_t*)evt->ctx;
hcl_abort (proto->hcl); worker = proto->worker;
/* TODO: can we use worker->hcl for logging before abort?? */
HCL_LOG1 (worker->server->dummy_hcl, SERVER_LOGMASK_INFO, "Aborting script execution for max_actor_runtime exceeded [%zu]\n", proto->worker->wid);
hcl_abort (worker->hcl);
} }
static void exec_runtime_updater (hcl_tmr_t* tmr, hcl_tmr_index_t old_index, hcl_tmr_index_t new_index, hcl_tmr_event_t* evt) static void exec_runtime_updater (hcl_tmr_t* tmr, hcl_tmr_index_t old_index, hcl_tmr_index_t new_index, hcl_tmr_event_t* evt)
@ -800,30 +807,35 @@ static void exec_runtime_updater (hcl_tmr_t* tmr, hcl_tmr_index_t old_index, hcl
/* [NOTE] this handler is executed in the main server thread /* [NOTE] this handler is executed in the main server thread
* when it calls hcl_tmr_fire() */ * when it calls hcl_tmr_fire() */
hcl_server_proto_t* proto; hcl_xproto_t* proto;
proto = (hcl_server_proto_t*)evt->ctx; hcl_server_worker_t* worker;
HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == old_index);
proto = (hcl_xproto_t*)evt->ctx;
worker = proto->worker;
HCL_ASSERT (worker->hcl, proto->exec_runtime_event_index == old_index);
/* the event is being removed by hcl_tmr_fire() or by hcl_tmr_delete() /* the event is being removed by hcl_tmr_fire() or by hcl_tmr_delete()
* if new_index is HCL_TMR_INVALID_INDEX. it's being updated if not. */ * if new_index is HCL_TMR_INVALID_INDEX. it's being updated if not. */
proto->exec_runtime_event_index = new_index; proto->exec_runtime_event_index = new_index;
} }
static int insert_exec_timer (hcl_server_proto_t* proto, const hcl_ntime_t* tmout) static int insert_exec_timer (hcl_xproto_t* proto, const hcl_ntime_t* tmout)
{ {
/* [NOTE] this is executed in the worker thread */ /* [NOTE] this is executed in the worker thread */
hcl_tmr_event_t event; hcl_tmr_event_t event;
hcl_tmr_index_t index; hcl_tmr_index_t index;
hcl_server_worker_t* worker;
hcl_server_t* server; hcl_server_t* server;
HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX); worker = proto->worker;
server = worker->server;
server = proto->worker->server; HCL_ASSERT (worker->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX);
HCL_MEMSET (&event, 0, HCL_SIZEOF(event)); HCL_MEMSET (&event, 0, HCL_SIZEOF(event));
event.ctx = proto; event.ctx = proto;
proto->hcl->vmprim.vm_gettime (proto->hcl, &event.when); worker->hcl->vmprim.vm_gettime (worker->hcl, &event.when);
HCL_ADD_NTIME (&event.when, &event.when, tmout); HCL_ADD_NTIME (&event.when, &event.when, tmout);
event.handler = exec_runtime_handler; event.handler = exec_runtime_handler;
event.updater = exec_runtime_updater; event.updater = exec_runtime_updater;
@ -841,14 +853,16 @@ static int insert_exec_timer (hcl_server_proto_t* proto, const hcl_ntime_t* tmou
return (index == HCL_TMR_INVALID_INDEX)? -1: 0; return (index == HCL_TMR_INVALID_INDEX)? -1: 0;
} }
static void delete_exec_timer (hcl_server_proto_t* proto) static void delete_exec_timer (hcl_xproto_t* proto)
{ {
/* [NOTE] this is executed in the worker thread. if the event has been fired /* [NOTE] this is executed in the worker thread. if the event has been fired
* in the server thread, proto->exec_runtime_event_index should be * in the server thread, proto->exec_runtime_event_index should be
* HCL_TMR_INVALID_INDEX as set by exec_runtime_handler */ * HCL_TMR_INVALID_INDEX as set by exec_runtime_handler */
hcl_server_worker_t* worker;
hcl_server_t* server; hcl_server_t* server;
server = proto->worker->server; worker = proto->worker;
server = worker->server;
pthread_mutex_lock (&server->tmr_mutex); pthread_mutex_lock (&server->tmr_mutex);
if (proto->exec_runtime_event_index != HCL_TMR_INVALID_INDEX) if (proto->exec_runtime_event_index != HCL_TMR_INVALID_INDEX)
@ -857,48 +871,50 @@ static void delete_exec_timer (hcl_server_proto_t* proto)
* if it has been fired, the index it shall be HCL_TMR_INVALID_INDEX already */ * if it has been fired, the index it shall be HCL_TMR_INVALID_INDEX already */
hcl_tmr_delete (server->tmr, proto->exec_runtime_event_index); hcl_tmr_delete (server->tmr, proto->exec_runtime_event_index);
HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX); HCL_ASSERT (worker->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX);
/*proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX; */ /*proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX; */
} }
pthread_mutex_unlock (&server->tmr_mutex); pthread_mutex_unlock (&server->tmr_mutex);
} }
static int execute_script (hcl_server_proto_t* proto, const hcl_bch_t* trigger) static int execute_script (hcl_xproto_t* proto, const hcl_bch_t* trigger)
{ {
hcl_oop_t obj; hcl_oop_t obj;
const hcl_ooch_t* failmsg = HCL_NULL; const hcl_ooch_t* failmsg = HCL_NULL;
hcl_server_worker_t* worker;
hcl_server_t* server; hcl_server_t* server;
server = proto->worker->server; worker = proto->worker;
server = worker->server;
#if 0 #if 0
hcl_server_proto_start_reply (proto); hcl_xproto_start_reply (proto);
#endif #endif
if (server->cfg.actor_max_runtime.sec <= 0 && server->cfg.actor_max_runtime.sec <= 0) if (server->cfg.actor_max_runtime.sec <= 0 && server->cfg.actor_max_runtime.sec <= 0)
{ {
obj = hcl_execute(proto->hcl); obj = hcl_execute(worker->hcl);
if (!obj) failmsg = hcl_geterrmsg(proto->hcl); if (!obj) failmsg = hcl_geterrmsg(worker->hcl);
} }
else else
{ {
if (insert_exec_timer(proto, &server->cfg.actor_max_runtime) <= -1) if (insert_exec_timer(proto, &server->cfg.actor_max_runtime) <= -1)
{ {
HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, "Cannot start execution timer\n"); HCL_LOG0 (worker->hcl, SERVER_LOGMASK_ERROR, "Cannot start execution timer\n");
hcl_seterrbfmt (proto->hcl, HCL_ESYSMEM, "cannot start execution timer"); /* i do this just to compose the error message */ hcl_seterrbfmt (worker->hcl, HCL_ESYSMEM, "cannot start execution timer"); /* i do this just to compose the error message */
failmsg = hcl_geterrmsg(proto->hcl); failmsg = hcl_geterrmsg(worker->hcl);
} }
else else
{ {
obj = hcl_execute(proto->hcl); obj = hcl_execute(worker->hcl);
if (!obj) failmsg = hcl_geterrmsg(proto->hcl); if (!obj) failmsg = hcl_geterrmsg(worker->hcl);
delete_exec_timer (proto); delete_exec_timer (proto);
} }
} }
#if 0 #if 0
if (hcl_server_proto_end_reply(proto, failmsg) <= -1) if (hcl_xproto_end_reply(proto, failmsg) <= -1)
{ {
HCL_LOG1 (proto->hcl, SERVER_LOGMASK_ERROR, "Cannot finalize reply for %hs\n", trigger); HCL_LOG1 (worker->hcl, SERVER_LOGMASK_ERROR, "Cannot finalize reply for %hs\n", trigger);
return -1; return -1;
} }
#endif #endif
@ -907,11 +923,11 @@ static int execute_script (hcl_server_proto_t* proto, const hcl_bch_t* trigger)
} }
static void send_error_message (hcl_server_proto_t* proto, const hcl_ooch_t* errmsg) static void send_error_message (hcl_xproto_t* proto, const hcl_ooch_t* errmsg)
{ {
#if 0 #if 0
hcl_server_proto_start_reply (proto); hcl_xproto_start_reply (proto);
if (hcl_server_proto_end_reply(proto, errmsg) <= -1) if (hcl_xproto_end_reply(proto, errmsg) <= -1)
{ {
HCL_LOG1 (proto->hcl, SERVER_LOGMASK_ERROR, "Unable to send error message - %s\n", errmsg); HCL_LOG1 (proto->hcl, SERVER_LOGMASK_ERROR, "Unable to send error message - %s\n", errmsg);
} }
@ -939,13 +955,14 @@ static void reformat_synerr (hcl_t* hcl)
); );
} }
static void send_proto_hcl_error (hcl_server_proto_t* proto) static void send_proto_hcl_error (hcl_xproto_t* proto)
{ {
if (HCL_ERRNUM(proto->hcl) == HCL_ESYNERR) reformat_synerr (proto->hcl); hcl_server_worker_t* worker = proto->worker;
send_error_message (proto, hcl_geterrmsg(proto->hcl)); if (HCL_ERRNUM(worker->hcl) == HCL_ESYNERR) reformat_synerr (worker->hcl);
send_error_message (proto, hcl_geterrmsg(worker->hcl));
} }
static void show_server_workers (hcl_server_proto_t* proto) static void show_server_workers (hcl_xproto_t* proto)
{ {
hcl_server_t* server; hcl_server_t* server;
hcl_server_worker_t* w; hcl_server_worker_t* w;
@ -955,12 +972,12 @@ static void show_server_workers (hcl_server_proto_t* proto)
for (w = server->worker_list[HCL_SERVER_WORKER_STATE_ALIVE].head; w; w = w->next_worker) for (w = server->worker_list[HCL_SERVER_WORKER_STATE_ALIVE].head; w; w = w->next_worker)
{ {
/* TODO: implement this better... */ /* TODO: implement this better... */
hcl_prbfmt (proto->hcl, "%zu %d %d\n", w->wid, w->sck, 1000); hcl_prbfmt (proto->worker->hcl, "%zu %d %d\n", w->wid, w->sck, 1000);
} }
pthread_mutex_unlock (&server->worker_mutex); pthread_mutex_unlock (&server->worker_mutex);
} }
static int kill_server_worker (hcl_server_proto_t* proto, hcl_oow_t wid) static int kill_server_worker (hcl_xproto_t* proto, hcl_oow_t wid)
{ {
hcl_server_t* server; hcl_server_t* server;
int xret = 0; int xret = 0;
@ -992,7 +1009,7 @@ static int kill_server_worker (hcl_server_proto_t* proto, hcl_oow_t wid)
else else
{ {
if (worker->sck) shutdown (worker->sck, SHUT_RDWR); if (worker->sck) shutdown (worker->sck, SHUT_RDWR);
if (worker->proto) hcl_abort (worker->proto->hcl); if (worker->hcl) hcl_abort (worker->hcl);
} }
} }
} }
@ -1000,9 +1017,9 @@ static int kill_server_worker (hcl_server_proto_t* proto, hcl_oow_t wid)
return xret; return xret;
} }
static int handle_packet (hcl_server_proto_t* proto, hcl_xpkt_type_t type, void* data, hcl_oow_t len) static int handle_packet (hcl_xproto_t* proto, hcl_xpkt_type_t type, void* data, hcl_oow_t len)
{ {
hcl_t* hcl = proto->hcl; hcl_t* hcl = proto->worker->hcl;
switch (type) switch (type)
{ {
@ -1044,6 +1061,7 @@ printf ("EXECUTING hcl_executing......\n");
case HCL_XPKT_KILL_WORKER: case HCL_XPKT_KILL_WORKER:
break; break;
case HCL_XPKT_DISCONNECT: case HCL_XPKT_DISCONNECT:
return 0; /* disconnect received */ return 0; /* disconnect received */
@ -1060,77 +1078,6 @@ oops:
return -1; return -1;
} }
static int handle_received_data (hcl_server_proto_t* proto)
{
hcl_server_worker_t* worker = proto->worker;
hcl_t* hcl = proto->hcl;
int n;
switch (proto->rcv.state)
{
case HCL_SERVER_PROTO_RCV_HEADER:
if (proto->rcv.len < HCL_SIZEOF(proto->rcv.hdr)) goto carry_on; /* need more data */
HCL_MEMCPY (&proto->rcv.hdr, proto->rcv.buf, HCL_SIZEOF(proto->rcv.hdr));
/*proto->rcv.hdr.len = hcl_ntoh16(proto->rcv.hdr.len);*/ /* keep this in the host byte order */
/* consume the header */
HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[HCL_SIZEOF(proto->rcv.hdr)], proto->rcv.len - HCL_SIZEOF(proto->rcv.hdr));
proto->rcv.len -= HCL_SIZEOF(proto->rcv.hdr);
/* switch to the payload mode */
if (proto->rcv.hdr.len > 0)
{
proto->rcv.state = HCL_SERVER_PROTO_RCV_PAYLOAD;
proto->rcv.len_needed = proto->rcv.hdr.len;
}
else
{
/* take shortcut */
n = handle_packet(proto, proto->rcv.hdr.type, proto->rcv.buf, proto->rcv.hdr.len);
if (n <= -1) goto fail_with_errmsg;
if (n == 0) return 0;
}
break;
case HCL_SERVER_PROTO_RCV_PAYLOAD:
if (proto->rcv.len < proto->rcv.hdr.len) goto carry_on; /* need more payload data */
n = handle_packet(proto, proto->rcv.hdr.type, proto->rcv.buf, proto->rcv.hdr.len);
/* TODO: minimize the use of HCL_MEMOVE... use the buffer */
/* switch to the header mode */
if (proto->rcv.hdr.len > 0)
{
HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[proto->rcv.hdr.len], proto->rcv.len - proto->rcv.hdr.len);
proto->rcv.len -= proto->rcv.hdr.len;
}
proto->rcv.state = HCL_SERVER_PROTO_RCV_HEADER;
proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr);
if (n <= -1) goto fail_with_errmsg;
if (n == 0) return 0;
break;
default:
hcl_seterrbfmt (hcl, HCL_EINTERN, "invalid request state %d", (int)proto->rcv.state);
goto fail_with_errmsg;
}
carry_on:
return 1;
fail_with_errmsg:
// TODO: proper error handling
//send_proto_hcl_error (proto);
//HCL_LOG1 (hcl, SERVER_LOGMASK_ERROR, "Unable to compile .SCRIPT contents - %js\n", hcl_geterrmsg(proto->hcl));
return -1;
}
static int send_iov (int sck, struct iovec* iov, int count) static int send_iov (int sck, struct iovec* iov, int count)
{ {
int index = 0; int index = 0;
@ -1165,7 +1112,7 @@ static int send_iov (int sck, struct iovec* iov, int count)
return 0; return 0;
} }
static int send_stdout_bytes (hcl_server_proto_t* proto, const hcl_bch_t* data, hcl_oow_t len) static int send_stdout_bytes (hcl_xproto_t* proto, const hcl_bch_t* data, hcl_oow_t len)
{ {
hcl_xpkt_hdr_t hdr; hcl_xpkt_hdr_t hdr;
struct iovec iov[2]; struct iovec iov[2];
@ -1201,8 +1148,9 @@ printf ("SENDING BYTES [%.*s]\n", (int)len, data);
} }
#if defined(HCL_OOCH_IS_UCH) #if defined(HCL_OOCH_IS_UCH)
static int send_stdout_chars (hcl_server_proto_t* proto, const hcl_ooch_t* data, hcl_oow_t len) static int send_stdout_chars (hcl_xproto_t* proto, const hcl_ooch_t* data, hcl_oow_t len)
{ {
hcl_server_worker_t* worker = proto->worker;
const hcl_ooch_t* ptr, * end; const hcl_ooch_t* ptr, * end;
hcl_bch_t tmp[256]; hcl_bch_t tmp[256];
hcl_oow_t tln, pln; hcl_oow_t tln, pln;
@ -1215,7 +1163,7 @@ static int send_stdout_chars (hcl_server_proto_t* proto, const hcl_ooch_t* data,
{ {
pln = end - ptr; pln = end - ptr;
tln = HCL_COUNTOF(tmp); tln = HCL_COUNTOF(tmp);
n = hcl_convutobchars(proto->hcl, ptr, &pln, tmp, &tln); n = hcl_convutobchars(worker->hcl, ptr, &pln, tmp, &tln);
if (n <= -1 && n != -2) return -1; if (n <= -1 && n != -2) return -1;
if (send_stdout_bytes(proto, tmp, tln) <= -1) return -1; if (send_stdout_bytes(proto, tmp, tln) <= -1) return -1;
@ -1227,6 +1175,114 @@ static int send_stdout_chars (hcl_server_proto_t* proto, const hcl_ooch_t* data,
} }
#endif #endif
/* ========================================================================= */
hcl_xproto_t* hcl_xproto_open (hcl_oow_t xtnsize, hcl_server_worker_t* worker)
{
hcl_xproto_t* proto;
proto = (hcl_xproto_t*)hcl_server_allocmem(worker->server, HCL_SIZEOF(*proto));
if (HCL_UNLIKELY(!proto)) return HCL_NULL;
HCL_MEMSET (proto, 0, HCL_SIZEOF(*proto));
proto->worker = worker;
proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX;
proto->rcv.state = HCL_XPROTO_RCV_HEADER;
proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr);
proto->rcv.eof = 0;
return proto;
}
void hcl_xproto_close (hcl_xproto_t* proto)
{
hcl_server_freemem (proto->worker->server, proto);
}
int hcl_xproto_ready (hcl_xproto_t* proto)
{
/* has it received suffient data for processing? */
return proto->rcv.len >= proto->rcv.len_needed;
}
static int hcl_xproto_process (hcl_xproto_t* proto)
{
/*
hcl_xproto_t* proto = worker->proto;
hcl_server_
hcl_t* hcl = worker->hcl;
*/
int n;
switch (proto->rcv.state)
{
case HCL_XPROTO_RCV_HEADER:
if (proto->rcv.len < HCL_SIZEOF(proto->rcv.hdr)) goto carry_on; /* need more data */
HCL_MEMCPY (&proto->rcv.hdr, proto->rcv.buf, HCL_SIZEOF(proto->rcv.hdr));
/*proto->rcv.hdr.len = hcl_ntoh16(proto->rcv.hdr.len);*/ /* keep this in the host byte order */
/* consume the header */
HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[HCL_SIZEOF(proto->rcv.hdr)], proto->rcv.len - HCL_SIZEOF(proto->rcv.hdr));
proto->rcv.len -= HCL_SIZEOF(proto->rcv.hdr);
/* switch to the payload mode */
if (proto->rcv.hdr.len > 0)
{
proto->rcv.state = HCL_XPROTO_RCV_PAYLOAD;
proto->rcv.len_needed = proto->rcv.hdr.len;
}
else
{
/* take shortcut */
/* TODO: convert handle_packet as call back */
n = handle_packet(proto, proto->rcv.hdr.type, proto->rcv.buf, proto->rcv.hdr.len);
if (n <= -1) goto fail_with_errmsg;
if (n == 0) return 0;
}
break;
case HCL_XPROTO_RCV_PAYLOAD:
if (proto->rcv.len < proto->rcv.hdr.len) goto carry_on; /* need more payload data */
/* TODO: convert handle_packet as call back */
n = handle_packet(proto, proto->rcv.hdr.type, proto->rcv.buf, proto->rcv.hdr.len);
/* TODO: minimize the use of HCL_MEMOVE... use the buffer */
/* switch to the header mode */
if (proto->rcv.hdr.len > 0)
{
HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[proto->rcv.hdr.len], proto->rcv.len - proto->rcv.hdr.len);
proto->rcv.len -= proto->rcv.hdr.len;
}
proto->rcv.state = HCL_XPROTO_RCV_HEADER;
proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr);
if (n <= -1) goto fail_with_errmsg;
if (n == 0) return 0;
break;
default:
/*
hcl_seterrbfmt (hcl, HCL_EINTERN, "invalid request state %d", (int)proto->rcv.state);
*/
/* TODO: call back */
goto fail_with_errmsg;
}
carry_on:
return 1;
fail_with_errmsg:
// TODO: proper error handling
//send_proto_hcl_error (proto);
//HCL_LOG1 (hcl, SERVER_LOGMASK_ERROR, "Unable to compile .SCRIPT contents - %js\n", hcl_geterrmsg(worker->hcl));
return -1;
}
/* ========================================================================= */ /* ========================================================================= */
@ -1473,13 +1529,13 @@ static hcl_server_worker_t* alloc_worker (hcl_server_t* server, int cli_sck, con
return worker; return worker;
} }
static void close_worker_socket (hcl_server_worker_t* worker) static void fini_worker_socket (hcl_server_worker_t* worker)
{ {
if (worker->sck >= 0) if (worker->sck >= 0)
{ {
if (worker->proto) if (worker->hcl)
{ {
HCL_LOG2 (worker->proto->hcl, SERVER_LOGMASK_INFO, "Closing worker socket %d [%zu]\n", worker->sck, worker->wid); HCL_LOG2 (worker->hcl, SERVER_LOGMASK_INFO, "Closing worker socket %d [%zu]\n", worker->sck, worker->wid);
} }
else else
{ {
@ -1493,11 +1549,11 @@ static void close_worker_socket (hcl_server_worker_t* worker)
static void free_worker (hcl_server_worker_t* worker) static void free_worker (hcl_server_worker_t* worker)
{ {
close_worker_socket (worker); fini_worker_socket (worker);
if (worker->proto) if (worker->hcl)
{ {
HCL_LOG1 (worker->proto->hcl, SERVER_LOGMASK_INFO, "Killing worker [%zu]\n", worker->wid); HCL_LOG1 (worker->hcl, SERVER_LOGMASK_INFO, "Killing worker [%zu]\n", worker->wid);
} }
else else
{ {
@ -1553,9 +1609,9 @@ static void zap_worker_in_server (hcl_server_t* server, hcl_server_worker_t* wor
static int worker_step (hcl_server_worker_t* worker) static int worker_step (hcl_server_worker_t* worker)
{ {
hcl_server_proto_t* proto = worker->proto; hcl_xproto_t* proto = worker->proto;
hcl_server_t* server = worker->server; hcl_server_t* server = worker->server;
hcl_t* hcl = proto->hcl; hcl_t* hcl = worker->hcl;
struct pollfd pfd; struct pollfd pfd;
int tmout, actual_tmout; int tmout, actual_tmout;
ssize_t x; ssize_t x;
@ -1628,9 +1684,9 @@ static int worker_step (hcl_server_worker_t* worker)
} }
/* the receiver buffer has enough data */ /* the receiver buffer has enough data */
while (/*proto->rcv.len > 0 && */proto->rcv.len >= proto->rcv.len_needed) while (hcl_xproto_ready(worker->proto))
{ {
if ((n = handle_received_data(proto)) <= -1) if ((n = hcl_xproto_process(worker->proto)) <= -1)
{ {
/* TODO: proper error message */ /* TODO: proper error message */
return -1; return -1;
@ -1646,20 +1702,115 @@ carry_on:
return 1; /* carry on */ return 1; /* carry on */
} }
static int init_worker_hcl (hcl_server_worker_t* worker)
{
hcl_server_t* server = worker->server;
hcl_t* hcl;
worker_hcl_xtn_t* xtn;
hcl_bitmask_t trait;
hcl_cb_t hclcb;
hcl = hcl_openstdwithmmgr(hcl_server_getmmgr(server), HCL_SIZEOF(*xtn), HCL_NULL);
if (HCL_UNLIKELY(!hcl)) goto oops;
/* replace the vmprim.log_write function */
hcl->vmprim.log_write = server_log_write;
xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);
xtn->worker = worker;
hcl_setoption (hcl, HCL_MOD_INCTX, &server->cfg.module_inctx);
hcl_setoption (hcl, HCL_LOG_MASK, &server->cfg.logmask);
hcl_setcmgr (hcl, hcl_server_getcmgr(server));
hcl_getoption (hcl, HCL_TRAIT, &trait);
#if defined(HCL_BUILD_DEBUG)
if (server->cfg.trait & HCL_SERVER_TRAIT_DEBUG_GC) trait |= HCL_TRAIT_DEBUG_GC;
if (server->cfg.trait & HCL_SERVER_TRAIT_DEBUG_BIGINT) trait |= HCL_TRAIT_DEBUG_BIGINT;
#endif
trait |= HCL_TRAIT_LANG_ENABLE_BLOCK;
trait |= HCL_TRAIT_LANG_ENABLE_EOL;
hcl_setoption (hcl, HCL_TRAIT, &trait);
HCL_MEMSET (&hclcb, 0, HCL_SIZEOF(hclcb));
/*hclcb.fini = fini_hcl;
hclcb.gc = gc_hcl;*/
hclcb.vm_startup = vm_startup;
hclcb.vm_cleanup = vm_cleanup;
hclcb.vm_checkbc = vm_checkbc;
hcl_regcb (hcl, &hclcb);
if (hcl_ignite(hcl, server->cfg.actor_heap_size) <= -1) goto oops;
if (hcl_addbuiltinprims(hcl) <= -1) goto oops;
if (hcl_attachccio(hcl, read_handler) <= -1) goto oops;
if (hcl_attachudio(hcl, scan_handler, print_handler) <= -1) goto oops;
if (hcl_beginfeed(hcl, on_fed_cnode) <= -1) goto oops;
worker->hcl = hcl;
return 0;
oops:
if (hcl) hcl_close (hcl);
return -1;
}
static void fini_worker_hcl (hcl_server_worker_t* worker)
{
if (HCL_LIKELY(worker->hcl))
{
hcl_endfeed (worker->hcl);
hcl_close (worker->hcl);
worker->hcl = HCL_NULL;
}
}
static int init_worker_proto (hcl_server_worker_t* worker)
{
hcl_xproto_t* proto;
proto = hcl_xproto_open(0, worker);
if (HCL_UNLIKELY(!proto)) return -1;
worker->proto = proto;
return 0;
}
static void fini_worker_proto (hcl_server_worker_t* worker)
{
if (HCL_LIKELY(worker->proto))
{
hcl_xproto_close (worker->proto);
worker->proto = HCL_NULL;
}
}
static void* worker_main (void* ctx) static void* worker_main (void* ctx)
{ {
hcl_server_worker_t* worker = (hcl_server_worker_t*)ctx; hcl_server_worker_t* worker = (hcl_server_worker_t*)ctx;
hcl_server_t* server = worker->server; hcl_server_t* server = worker->server;
sigset_t set; sigset_t set;
int n;
sigfillset (&set); sigfillset (&set);
pthread_sigmask (SIG_BLOCK, &set, HCL_NULL); pthread_sigmask (SIG_BLOCK, &set, HCL_NULL);
worker->thr = pthread_self(); worker->thr = pthread_self();
worker->proto = hcl_server_proto_open(0, worker);
if (!worker->proto) n = init_worker_hcl(worker);
if (HCL_UNLIKELY(n <= -1))
{ {
free_worker (worker); /* TODO: capture error ... */
return HCL_NULL;
}
n = init_worker_proto(worker);
if (HCL_UNLIKELY(n <= -1))
{
fini_worker_hcl (worker);
return HCL_NULL; return HCL_NULL;
} }
@ -1680,11 +1831,13 @@ static void* worker_main (void* ctx)
} }
} }
hcl_server_proto_close (worker->proto); hcl_xproto_close (worker->proto);
worker->proto = HCL_NULL; worker->proto = HCL_NULL;
fini_worker_hcl (worker);
pthread_mutex_lock (&server->worker_mutex); pthread_mutex_lock (&server->worker_mutex);
close_worker_socket (worker); fini_worker_socket (worker);
if (!worker->claimed) if (!worker->claimed)
{ {
zap_worker_in_server (server, worker); zap_worker_in_server (server, worker);
@ -2590,3 +2743,6 @@ void hcl_client_freemem (hcl_client_t* client, void* ptr)
{ {
HCL_MMGR_FREE (client->_mmgr, ptr); HCL_MMGR_FREE (client->_mmgr, ptr);
} }

View File

@ -52,6 +52,9 @@ struct hcl_xpkt_hdr_t
}; };
typedef struct hcl_xpkt_hdr_t hcl_xpkt_hdr_t; typedef struct hcl_xpkt_hdr_t hcl_xpkt_hdr_t;
/* ---------------------------------------------------------------------- */
typedef struct hcl_xproto_t hcl_xproto_t;
/* ---------------------------------------------------------------------- */ /* ---------------------------------------------------------------------- */
@ -380,6 +383,17 @@ HCL_EXPORT void hcl_client_freemem (
void* ptr void* ptr
); );
/* ---------------------------------------------------------------------- */
HCL_EXPORT hcl_xproto_t* hcl_xproto_open (
hcl_oow_t xtnsize,
hcl_server_worker_t* worker
);
HCL_EXPORT void hcl_xproto_close (
hcl_xproto_t* proto
);
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif