diff --git a/lib/hcl-x.c b/lib/hcl-x.c index 0e14f28..72f5ea0 100644 --- a/lib/hcl-x.c +++ b/lib/hcl-x.c @@ -33,7 +33,7 @@ #define HCL_SERVER_TOKEN_NAME_ALIGN 64 #define HCL_SERVER_WID_MAP_ALIGN 512 -#define HCL_SERVER_PROTO_REPLY_BUF_SIZE 1300 +#define HCL_XPROTO_REPLY_BUF_SIZE 1300 #if defined(_WIN32) # include @@ -91,7 +91,7 @@ typedef struct bb_t bb_t; struct worker_hcl_xtn_t { - hcl_server_proto_t* proto; + hcl_server_worker_t* worker; int vm_running; }; typedef struct worker_hcl_xtn_t worker_hcl_xtn_t; @@ -104,23 +104,21 @@ typedef struct server_hcl_xtn_t server_hcl_xtn_t; /* ---------------------------------- */ -enum hcl_server_proto_rcv_state_t +enum hcl_xproto_rcv_state_t { - HCL_SERVER_PROTO_RCV_HEADER, - HCL_SERVER_PROTO_RCV_PAYLOAD + HCL_XPROTO_RCV_HEADER, + HCL_XPROTO_RCV_PAYLOAD }; -typedef enum hcl_server_proto_rcv_state_t hcl_server_proto_rcv_state_t; +typedef enum hcl_xproto_rcv_state_t hcl_xproto_rcv_state_t; -struct hcl_server_proto_t +struct hcl_xproto_t { hcl_server_worker_t* worker; - hcl_t* hcl; hcl_tmr_index_t exec_runtime_event_index; - struct { - hcl_server_proto_rcv_state_t state; + hcl_xproto_rcv_state_t state; hcl_oow_t len_needed; unsigned int eof: 1; @@ -136,6 +134,8 @@ struct hcl_server_proto_t } snd; }; +/* ---------------------------------- */ + enum hcl_server_worker_state_t { HCL_SERVER_WORKER_STATE_DEAD = 0, @@ -168,7 +168,8 @@ struct hcl_server_worker_t hcl_ntime_t alloc_time; hcl_server_worker_state_t state; hcl_server_worker_opstate_t opstate; - hcl_server_proto_t* proto; + hcl_xproto_t* proto; + hcl_t* hcl; hcl_server_t* server; hcl_server_worker_t* prev_worker; @@ -268,10 +269,10 @@ struct hcl_server_t }; /* ========================================================================= */ -static int send_stdout_bytes (hcl_server_proto_t* proto, const hcl_bch_t* data, hcl_oow_t len); +static int send_stdout_bytes (hcl_xproto_t* proto, const hcl_bch_t* data, hcl_oow_t len); #if defined(HCL_OOCH_IS_UCH) -static int send_stdout_chars (hcl_server_proto_t* proto, const hcl_ooch_t* data, hcl_oow_t len); +static int send_stdout_chars (hcl_xproto_t* proto, const hcl_ooch_t* data, hcl_oow_t len); #else #define send_stdout_chars(proto,data,len) send_stdout_bytes(proto,data,len) #endif @@ -297,7 +298,7 @@ static HCL_INLINE int open_read_stream (hcl_t* hcl, hcl_io_cciarg_t* arg) bb_t* bb = HCL_NULL; hcl_server_t* server; - server = xtn->proto->worker->server; + server = xtn->worker->server; if (arg->includer) { @@ -370,7 +371,7 @@ static HCL_INLINE int open_read_stream (hcl_t* hcl, hcl_io_cciarg_t* arg) bb->fn = (hcl_bch_t*)(bb + 1); hcl_copy_bcstr (bb->fn, pathlen + 1, ""); - bb->fd = xtn->proto->worker->sck; + bb->fd = xtn->worker->sck; } HCL_ASSERT (hcl, bb->fd >= 0); @@ -381,7 +382,7 @@ static HCL_INLINE int open_read_stream (hcl_t* hcl, hcl_io_cciarg_t* arg) oops: if (bb) { - if (bb->fd >= 0 && bb->fd != xtn->proto->worker->sck) close (bb->fd); + if (bb->fd >= 0 && bb->fd != xtn->worker->sck) close (bb->fd); hcl_freemem (hcl, bb); } return -1; @@ -395,7 +396,7 @@ static HCL_INLINE int close_read_stream (hcl_t* hcl, hcl_io_cciarg_t* arg) bb = (bb_t*)arg->handle; HCL_ASSERT (hcl, bb != HCL_NULL && bb->fd >= 0); - if (bb->fd != xtn->proto->worker->sck) close (bb->fd); + if (bb->fd != xtn->worker->sck) close (bb->fd); hcl_freemem (hcl, bb); arg->handle = HCL_NULL; @@ -414,7 +415,7 @@ static HCL_INLINE int read_read_stream (hcl_t* hcl, hcl_io_cciarg_t* arg) bb = (bb_t*)arg->handle; HCL_ASSERT (hcl, bb != HCL_NULL && bb->fd >= 0); - worker = xtn->proto->worker; + worker = xtn->worker; start_over: if (arg->includer) @@ -551,7 +552,7 @@ static int scan_handler (hcl_t* hcl, hcl_io_cmd_t cmd, void* arg) hcl_io_udiarg_t* inarg = (hcl_io_udiarg_t*)arg; // what if it writes a request to require more input?? - if (hcl_server_proto_handle_incoming(xtn->proto) <= -1) + if (hcl_xproto_handle_incoming(xtn->proto) <= -1) { } } @@ -583,7 +584,7 @@ printf ("IO CLOSE SOMETHING...........\n"); hcl_io_udoarg_t* outarg = (hcl_io_udoarg_t*)arg; printf ("IO WRITE SOMETHING...........\n"); - if (send_stdout_chars(xtn->proto, outarg->ptr, outarg->len) <= -1) + if (send_stdout_chars(xtn->worker->proto, outarg->ptr, outarg->len) <= -1) { /* TODO: change error code and message. propagage the errormessage from proto */ hcl_seterrbfmt (hcl, HCL_EIOERR, "failed to write message via proto"); @@ -603,7 +604,7 @@ printf ("IO WRITE SOMETHING...........\n"); hcl_io_udoarg_t* outarg = (hcl_io_udoarg_t*)arg; printf ("IO WRITE SOMETHING BYTES...........\n"); - if (send_stdout_bytes(xtn->proto, outarg->ptr, outarg->len) <= -1) + if (send_stdout_bytes(xtn->worker->proto, outarg->ptr, outarg->len) <= -1) { /* TODO: change error code and message. propagage the errormessage from proto */ hcl_seterrbfmt (hcl, HCL_EIOERR, "failed to write message via proto"); @@ -630,9 +631,9 @@ static void server_log_write (hcl_t* hcl, hcl_bitmask_t mask, const hcl_ooch_t* worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl); hcl_server_t* server; - server = xtn->proto->worker->server; + server = xtn->worker->server; pthread_mutex_lock (&server->log_mutex); - server->prim.log_write (server, xtn->proto->worker->wid, mask, msg, len); + server->prim.log_write (server, xtn->worker->wid, mask, msg, len); pthread_mutex_unlock (&server->log_mutex); } @@ -665,7 +666,7 @@ static void vm_cleanup (hcl_t* hcl) static void vm_checkbc (hcl_t* hcl, hcl_oob_t bcode) { worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl); - if (xtn->proto->worker->server->stopreq) hcl_abort(hcl); + if (xtn->worker->server->stopreq) hcl_abort(hcl); } /* @@ -687,7 +688,7 @@ static void fini_hcl (hcl_t* hcl) static int on_fed_cnode (hcl_t* hcl, hcl_cnode_t* obj) { /*worker_hcl_xtn_t* xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl);*/ - /*hcl_server_proto_t* proto = xtn->proto;*/ + /*hcl_xproto_t* proto = xtn->proto;*/ int flags = 0; printf ("on_fed_cnode......\n"); @@ -711,20 +712,21 @@ hcl_logbfmt (hcl, HCL_LOG_STDERR, "COMPILER ERROR - %js\n", hcl_geterrmsg(hcl)); return 0; } -hcl_server_proto_t* hcl_server_proto_open (hcl_oow_t xtnsize, hcl_server_worker_t* worker) +#if 0 +hcl_xproto_t* hcl_xproto_open (hcl_oow_t xtnsize, hcl_server_worker_t* worker) { - hcl_server_proto_t* proto; + hcl_xproto_t* proto; hcl_cb_t hclcb; worker_hcl_xtn_t* xtn; hcl_bitmask_t trait; - proto = (hcl_server_proto_t*)hcl_server_allocmem(worker->server, HCL_SIZEOF(*proto)); + proto = (hcl_xproto_t*)hcl_server_allocmem(worker->server, HCL_SIZEOF(*proto)); if (HCL_UNLIKELY(!proto)) return HCL_NULL; HCL_MEMSET (proto, 0, HCL_SIZEOF(*proto)); proto->worker = worker; proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX; - proto->rcv.state = HCL_SERVER_PROTO_RCV_HEADER; + proto->rcv.state = HCL_XPROTO_RCV_HEADER; proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr); proto->rcv.eof = 0; @@ -776,23 +778,28 @@ oops: return HCL_NULL; } -void hcl_server_proto_close (hcl_server_proto_t* proto) +void hcl_xproto_close (hcl_xproto_t* proto) { hcl_endfeed(proto->hcl); hcl_close (proto->hcl); hcl_server_freemem (proto->worker->server, proto); } +#endif static void exec_runtime_handler (hcl_tmr_t* tmr, const hcl_ntime_t* now, hcl_tmr_event_t* evt) { /* [NOTE] this handler is executed in the main server thread * when it calls hcl_tmr_fire(). */ - hcl_server_proto_t* proto; - proto = (hcl_server_proto_t*)evt->ctx; + hcl_xproto_t* proto; + hcl_server_worker_t* worker; - HCL_LOG1 (proto->worker->server->dummy_hcl, SERVER_LOGMASK_INFO, "Aborting script execution for max_actor_runtime exceeded [%zu]\n", proto->worker->wid); - hcl_abort (proto->hcl); + proto = (hcl_xproto_t*)evt->ctx; + worker = proto->worker; + +/* TODO: can we use worker->hcl for logging before abort?? */ + HCL_LOG1 (worker->server->dummy_hcl, SERVER_LOGMASK_INFO, "Aborting script execution for max_actor_runtime exceeded [%zu]\n", proto->worker->wid); + hcl_abort (worker->hcl); } static void exec_runtime_updater (hcl_tmr_t* tmr, hcl_tmr_index_t old_index, hcl_tmr_index_t new_index, hcl_tmr_event_t* evt) @@ -800,30 +807,35 @@ static void exec_runtime_updater (hcl_tmr_t* tmr, hcl_tmr_index_t old_index, hcl /* [NOTE] this handler is executed in the main server thread * when it calls hcl_tmr_fire() */ - hcl_server_proto_t* proto; - proto = (hcl_server_proto_t*)evt->ctx; - HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == old_index); + hcl_xproto_t* proto; + hcl_server_worker_t* worker; + + proto = (hcl_xproto_t*)evt->ctx; + worker = proto->worker; + HCL_ASSERT (worker->hcl, proto->exec_runtime_event_index == old_index); /* the event is being removed by hcl_tmr_fire() or by hcl_tmr_delete() * if new_index is HCL_TMR_INVALID_INDEX. it's being updated if not. */ proto->exec_runtime_event_index = new_index; } -static int insert_exec_timer (hcl_server_proto_t* proto, const hcl_ntime_t* tmout) +static int insert_exec_timer (hcl_xproto_t* proto, const hcl_ntime_t* tmout) { /* [NOTE] this is executed in the worker thread */ hcl_tmr_event_t event; hcl_tmr_index_t index; + hcl_server_worker_t* worker; hcl_server_t* server; - HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX); + worker = proto->worker; + server = worker->server; - server = proto->worker->server; + HCL_ASSERT (worker->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX); HCL_MEMSET (&event, 0, HCL_SIZEOF(event)); event.ctx = proto; - proto->hcl->vmprim.vm_gettime (proto->hcl, &event.when); + worker->hcl->vmprim.vm_gettime (worker->hcl, &event.when); HCL_ADD_NTIME (&event.when, &event.when, tmout); event.handler = exec_runtime_handler; event.updater = exec_runtime_updater; @@ -841,14 +853,16 @@ static int insert_exec_timer (hcl_server_proto_t* proto, const hcl_ntime_t* tmou return (index == HCL_TMR_INVALID_INDEX)? -1: 0; } -static void delete_exec_timer (hcl_server_proto_t* proto) +static void delete_exec_timer (hcl_xproto_t* proto) { /* [NOTE] this is executed in the worker thread. if the event has been fired * in the server thread, proto->exec_runtime_event_index should be * HCL_TMR_INVALID_INDEX as set by exec_runtime_handler */ + hcl_server_worker_t* worker; hcl_server_t* server; - server = proto->worker->server; + worker = proto->worker; + server = worker->server; pthread_mutex_lock (&server->tmr_mutex); if (proto->exec_runtime_event_index != HCL_TMR_INVALID_INDEX) @@ -857,48 +871,50 @@ static void delete_exec_timer (hcl_server_proto_t* proto) * if it has been fired, the index it shall be HCL_TMR_INVALID_INDEX already */ hcl_tmr_delete (server->tmr, proto->exec_runtime_event_index); - HCL_ASSERT (proto->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX); + HCL_ASSERT (worker->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX); /*proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX; */ } pthread_mutex_unlock (&server->tmr_mutex); } -static int execute_script (hcl_server_proto_t* proto, const hcl_bch_t* trigger) +static int execute_script (hcl_xproto_t* proto, const hcl_bch_t* trigger) { hcl_oop_t obj; const hcl_ooch_t* failmsg = HCL_NULL; + hcl_server_worker_t* worker; hcl_server_t* server; - server = proto->worker->server; + worker = proto->worker; + server = worker->server; #if 0 - hcl_server_proto_start_reply (proto); + hcl_xproto_start_reply (proto); #endif if (server->cfg.actor_max_runtime.sec <= 0 && server->cfg.actor_max_runtime.sec <= 0) { - obj = hcl_execute(proto->hcl); - if (!obj) failmsg = hcl_geterrmsg(proto->hcl); + obj = hcl_execute(worker->hcl); + if (!obj) failmsg = hcl_geterrmsg(worker->hcl); } else { if (insert_exec_timer(proto, &server->cfg.actor_max_runtime) <= -1) { - HCL_LOG0 (proto->hcl, SERVER_LOGMASK_ERROR, "Cannot start execution timer\n"); - hcl_seterrbfmt (proto->hcl, HCL_ESYSMEM, "cannot start execution timer"); /* i do this just to compose the error message */ - failmsg = hcl_geterrmsg(proto->hcl); + HCL_LOG0 (worker->hcl, SERVER_LOGMASK_ERROR, "Cannot start execution timer\n"); + hcl_seterrbfmt (worker->hcl, HCL_ESYSMEM, "cannot start execution timer"); /* i do this just to compose the error message */ + failmsg = hcl_geterrmsg(worker->hcl); } else { - obj = hcl_execute(proto->hcl); - if (!obj) failmsg = hcl_geterrmsg(proto->hcl); + obj = hcl_execute(worker->hcl); + if (!obj) failmsg = hcl_geterrmsg(worker->hcl); delete_exec_timer (proto); } } #if 0 - if (hcl_server_proto_end_reply(proto, failmsg) <= -1) + if (hcl_xproto_end_reply(proto, failmsg) <= -1) { - HCL_LOG1 (proto->hcl, SERVER_LOGMASK_ERROR, "Cannot finalize reply for %hs\n", trigger); + HCL_LOG1 (worker->hcl, SERVER_LOGMASK_ERROR, "Cannot finalize reply for %hs\n", trigger); return -1; } #endif @@ -907,11 +923,11 @@ static int execute_script (hcl_server_proto_t* proto, const hcl_bch_t* trigger) } -static void send_error_message (hcl_server_proto_t* proto, const hcl_ooch_t* errmsg) +static void send_error_message (hcl_xproto_t* proto, const hcl_ooch_t* errmsg) { #if 0 - hcl_server_proto_start_reply (proto); - if (hcl_server_proto_end_reply(proto, errmsg) <= -1) + hcl_xproto_start_reply (proto); + if (hcl_xproto_end_reply(proto, errmsg) <= -1) { HCL_LOG1 (proto->hcl, SERVER_LOGMASK_ERROR, "Unable to send error message - %s\n", errmsg); } @@ -939,13 +955,14 @@ static void reformat_synerr (hcl_t* hcl) ); } -static void send_proto_hcl_error (hcl_server_proto_t* proto) +static void send_proto_hcl_error (hcl_xproto_t* proto) { - if (HCL_ERRNUM(proto->hcl) == HCL_ESYNERR) reformat_synerr (proto->hcl); - send_error_message (proto, hcl_geterrmsg(proto->hcl)); + hcl_server_worker_t* worker = proto->worker; + if (HCL_ERRNUM(worker->hcl) == HCL_ESYNERR) reformat_synerr (worker->hcl); + send_error_message (proto, hcl_geterrmsg(worker->hcl)); } -static void show_server_workers (hcl_server_proto_t* proto) +static void show_server_workers (hcl_xproto_t* proto) { hcl_server_t* server; hcl_server_worker_t* w; @@ -955,12 +972,12 @@ static void show_server_workers (hcl_server_proto_t* proto) for (w = server->worker_list[HCL_SERVER_WORKER_STATE_ALIVE].head; w; w = w->next_worker) { /* TODO: implement this better... */ - hcl_prbfmt (proto->hcl, "%zu %d %d\n", w->wid, w->sck, 1000); + hcl_prbfmt (proto->worker->hcl, "%zu %d %d\n", w->wid, w->sck, 1000); } pthread_mutex_unlock (&server->worker_mutex); } -static int kill_server_worker (hcl_server_proto_t* proto, hcl_oow_t wid) +static int kill_server_worker (hcl_xproto_t* proto, hcl_oow_t wid) { hcl_server_t* server; int xret = 0; @@ -992,7 +1009,7 @@ static int kill_server_worker (hcl_server_proto_t* proto, hcl_oow_t wid) else { if (worker->sck) shutdown (worker->sck, SHUT_RDWR); - if (worker->proto) hcl_abort (worker->proto->hcl); + if (worker->hcl) hcl_abort (worker->hcl); } } } @@ -1000,9 +1017,9 @@ static int kill_server_worker (hcl_server_proto_t* proto, hcl_oow_t wid) return xret; } -static int handle_packet (hcl_server_proto_t* proto, hcl_xpkt_type_t type, void* data, hcl_oow_t len) +static int handle_packet (hcl_xproto_t* proto, hcl_xpkt_type_t type, void* data, hcl_oow_t len) { - hcl_t* hcl = proto->hcl; + hcl_t* hcl = proto->worker->hcl; switch (type) { @@ -1044,6 +1061,7 @@ printf ("EXECUTING hcl_executing......\n"); case HCL_XPKT_KILL_WORKER: break; + case HCL_XPKT_DISCONNECT: return 0; /* disconnect received */ @@ -1060,77 +1078,6 @@ oops: return -1; } - -static int handle_received_data (hcl_server_proto_t* proto) -{ - hcl_server_worker_t* worker = proto->worker; - hcl_t* hcl = proto->hcl; - int n; - - switch (proto->rcv.state) - { - case HCL_SERVER_PROTO_RCV_HEADER: - if (proto->rcv.len < HCL_SIZEOF(proto->rcv.hdr)) goto carry_on; /* need more data */ - - HCL_MEMCPY (&proto->rcv.hdr, proto->rcv.buf, HCL_SIZEOF(proto->rcv.hdr)); - /*proto->rcv.hdr.len = hcl_ntoh16(proto->rcv.hdr.len);*/ /* keep this in the host byte order */ - - /* consume the header */ - HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[HCL_SIZEOF(proto->rcv.hdr)], proto->rcv.len - HCL_SIZEOF(proto->rcv.hdr)); - proto->rcv.len -= HCL_SIZEOF(proto->rcv.hdr); - - /* switch to the payload mode */ - if (proto->rcv.hdr.len > 0) - { - proto->rcv.state = HCL_SERVER_PROTO_RCV_PAYLOAD; - proto->rcv.len_needed = proto->rcv.hdr.len; - } - else - { - /* take shortcut */ - n = handle_packet(proto, proto->rcv.hdr.type, proto->rcv.buf, proto->rcv.hdr.len); - if (n <= -1) goto fail_with_errmsg; - if (n == 0) return 0; - } - - break; - - case HCL_SERVER_PROTO_RCV_PAYLOAD: - if (proto->rcv.len < proto->rcv.hdr.len) goto carry_on; /* need more payload data */ - - n = handle_packet(proto, proto->rcv.hdr.type, proto->rcv.buf, proto->rcv.hdr.len); - -/* TODO: minimize the use of HCL_MEMOVE... use the buffer */ - /* switch to the header mode */ - if (proto->rcv.hdr.len > 0) - { - HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[proto->rcv.hdr.len], proto->rcv.len - proto->rcv.hdr.len); - proto->rcv.len -= proto->rcv.hdr.len; - } - proto->rcv.state = HCL_SERVER_PROTO_RCV_HEADER; - proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr); - - if (n <= -1) goto fail_with_errmsg; - if (n == 0) return 0; - - break; - - default: - hcl_seterrbfmt (hcl, HCL_EINTERN, "invalid request state %d", (int)proto->rcv.state); - goto fail_with_errmsg; - } - -carry_on: - return 1; - -fail_with_errmsg: -// TODO: proper error handling - //send_proto_hcl_error (proto); - //HCL_LOG1 (hcl, SERVER_LOGMASK_ERROR, "Unable to compile .SCRIPT contents - %js\n", hcl_geterrmsg(proto->hcl)); - return -1; - -} - static int send_iov (int sck, struct iovec* iov, int count) { int index = 0; @@ -1165,7 +1112,7 @@ static int send_iov (int sck, struct iovec* iov, int count) return 0; } -static int send_stdout_bytes (hcl_server_proto_t* proto, const hcl_bch_t* data, hcl_oow_t len) +static int send_stdout_bytes (hcl_xproto_t* proto, const hcl_bch_t* data, hcl_oow_t len) { hcl_xpkt_hdr_t hdr; struct iovec iov[2]; @@ -1201,8 +1148,9 @@ printf ("SENDING BYTES [%.*s]\n", (int)len, data); } #if defined(HCL_OOCH_IS_UCH) -static int send_stdout_chars (hcl_server_proto_t* proto, const hcl_ooch_t* data, hcl_oow_t len) +static int send_stdout_chars (hcl_xproto_t* proto, const hcl_ooch_t* data, hcl_oow_t len) { + hcl_server_worker_t* worker = proto->worker; const hcl_ooch_t* ptr, * end; hcl_bch_t tmp[256]; hcl_oow_t tln, pln; @@ -1215,7 +1163,7 @@ static int send_stdout_chars (hcl_server_proto_t* proto, const hcl_ooch_t* data, { pln = end - ptr; tln = HCL_COUNTOF(tmp); - n = hcl_convutobchars(proto->hcl, ptr, &pln, tmp, &tln); + n = hcl_convutobchars(worker->hcl, ptr, &pln, tmp, &tln); if (n <= -1 && n != -2) return -1; if (send_stdout_bytes(proto, tmp, tln) <= -1) return -1; @@ -1227,6 +1175,114 @@ static int send_stdout_chars (hcl_server_proto_t* proto, const hcl_ooch_t* data, } #endif +/* ========================================================================= */ + +hcl_xproto_t* hcl_xproto_open (hcl_oow_t xtnsize, hcl_server_worker_t* worker) +{ + hcl_xproto_t* proto; + + proto = (hcl_xproto_t*)hcl_server_allocmem(worker->server, HCL_SIZEOF(*proto)); + if (HCL_UNLIKELY(!proto)) return HCL_NULL; + + HCL_MEMSET (proto, 0, HCL_SIZEOF(*proto)); + proto->worker = worker; + proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX; + proto->rcv.state = HCL_XPROTO_RCV_HEADER; + proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr); + proto->rcv.eof = 0; + + return proto; +} + +void hcl_xproto_close (hcl_xproto_t* proto) +{ + hcl_server_freemem (proto->worker->server, proto); +} + +int hcl_xproto_ready (hcl_xproto_t* proto) +{ + /* has it received suffient data for processing? */ + return proto->rcv.len >= proto->rcv.len_needed; +} + +static int hcl_xproto_process (hcl_xproto_t* proto) +{ +/* + hcl_xproto_t* proto = worker->proto; + hcl_server_ + hcl_t* hcl = worker->hcl; +*/ + int n; + + switch (proto->rcv.state) + { + case HCL_XPROTO_RCV_HEADER: + if (proto->rcv.len < HCL_SIZEOF(proto->rcv.hdr)) goto carry_on; /* need more data */ + + HCL_MEMCPY (&proto->rcv.hdr, proto->rcv.buf, HCL_SIZEOF(proto->rcv.hdr)); + /*proto->rcv.hdr.len = hcl_ntoh16(proto->rcv.hdr.len);*/ /* keep this in the host byte order */ + + /* consume the header */ + HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[HCL_SIZEOF(proto->rcv.hdr)], proto->rcv.len - HCL_SIZEOF(proto->rcv.hdr)); + proto->rcv.len -= HCL_SIZEOF(proto->rcv.hdr); + + /* switch to the payload mode */ + if (proto->rcv.hdr.len > 0) + { + proto->rcv.state = HCL_XPROTO_RCV_PAYLOAD; + proto->rcv.len_needed = proto->rcv.hdr.len; + } + else + { + /* take shortcut */ +/* TODO: convert handle_packet as call back */ + n = handle_packet(proto, proto->rcv.hdr.type, proto->rcv.buf, proto->rcv.hdr.len); + if (n <= -1) goto fail_with_errmsg; + if (n == 0) return 0; + } + + break; + + case HCL_XPROTO_RCV_PAYLOAD: + if (proto->rcv.len < proto->rcv.hdr.len) goto carry_on; /* need more payload data */ + +/* TODO: convert handle_packet as call back */ + n = handle_packet(proto, proto->rcv.hdr.type, proto->rcv.buf, proto->rcv.hdr.len); + +/* TODO: minimize the use of HCL_MEMOVE... use the buffer */ + /* switch to the header mode */ + if (proto->rcv.hdr.len > 0) + { + HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[proto->rcv.hdr.len], proto->rcv.len - proto->rcv.hdr.len); + proto->rcv.len -= proto->rcv.hdr.len; + } + proto->rcv.state = HCL_XPROTO_RCV_HEADER; + proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr); + + if (n <= -1) goto fail_with_errmsg; + if (n == 0) return 0; + + break; + + default: +/* + hcl_seterrbfmt (hcl, HCL_EINTERN, "invalid request state %d", (int)proto->rcv.state); +*/ +/* TODO: call back */ + goto fail_with_errmsg; + } + +carry_on: + return 1; + +fail_with_errmsg: +// TODO: proper error handling + //send_proto_hcl_error (proto); + //HCL_LOG1 (hcl, SERVER_LOGMASK_ERROR, "Unable to compile .SCRIPT contents - %js\n", hcl_geterrmsg(worker->hcl)); + return -1; + +} + /* ========================================================================= */ @@ -1473,13 +1529,13 @@ static hcl_server_worker_t* alloc_worker (hcl_server_t* server, int cli_sck, con return worker; } -static void close_worker_socket (hcl_server_worker_t* worker) +static void fini_worker_socket (hcl_server_worker_t* worker) { if (worker->sck >= 0) { - if (worker->proto) + if (worker->hcl) { - HCL_LOG2 (worker->proto->hcl, SERVER_LOGMASK_INFO, "Closing worker socket %d [%zu]\n", worker->sck, worker->wid); + HCL_LOG2 (worker->hcl, SERVER_LOGMASK_INFO, "Closing worker socket %d [%zu]\n", worker->sck, worker->wid); } else { @@ -1493,11 +1549,11 @@ static void close_worker_socket (hcl_server_worker_t* worker) static void free_worker (hcl_server_worker_t* worker) { - close_worker_socket (worker); + fini_worker_socket (worker); - if (worker->proto) + if (worker->hcl) { - HCL_LOG1 (worker->proto->hcl, SERVER_LOGMASK_INFO, "Killing worker [%zu]\n", worker->wid); + HCL_LOG1 (worker->hcl, SERVER_LOGMASK_INFO, "Killing worker [%zu]\n", worker->wid); } else { @@ -1553,9 +1609,9 @@ static void zap_worker_in_server (hcl_server_t* server, hcl_server_worker_t* wor static int worker_step (hcl_server_worker_t* worker) { - hcl_server_proto_t* proto = worker->proto; + hcl_xproto_t* proto = worker->proto; hcl_server_t* server = worker->server; - hcl_t* hcl = proto->hcl; + hcl_t* hcl = worker->hcl; struct pollfd pfd; int tmout, actual_tmout; ssize_t x; @@ -1628,9 +1684,9 @@ static int worker_step (hcl_server_worker_t* worker) } /* the receiver buffer has enough data */ - while (/*proto->rcv.len > 0 && */proto->rcv.len >= proto->rcv.len_needed) + while (hcl_xproto_ready(worker->proto)) { - if ((n = handle_received_data(proto)) <= -1) + if ((n = hcl_xproto_process(worker->proto)) <= -1) { /* TODO: proper error message */ return -1; @@ -1646,20 +1702,115 @@ carry_on: return 1; /* carry on */ } +static int init_worker_hcl (hcl_server_worker_t* worker) +{ + hcl_server_t* server = worker->server; + hcl_t* hcl; + worker_hcl_xtn_t* xtn; + hcl_bitmask_t trait; + hcl_cb_t hclcb; + + hcl = hcl_openstdwithmmgr(hcl_server_getmmgr(server), HCL_SIZEOF(*xtn), HCL_NULL); + if (HCL_UNLIKELY(!hcl)) goto oops; + + /* replace the vmprim.log_write function */ + hcl->vmprim.log_write = server_log_write; + + xtn = (worker_hcl_xtn_t*)hcl_getxtn(hcl); + xtn->worker = worker; + + hcl_setoption (hcl, HCL_MOD_INCTX, &server->cfg.module_inctx); + hcl_setoption (hcl, HCL_LOG_MASK, &server->cfg.logmask); + hcl_setcmgr (hcl, hcl_server_getcmgr(server)); + + hcl_getoption (hcl, HCL_TRAIT, &trait); +#if defined(HCL_BUILD_DEBUG) + if (server->cfg.trait & HCL_SERVER_TRAIT_DEBUG_GC) trait |= HCL_TRAIT_DEBUG_GC; + if (server->cfg.trait & HCL_SERVER_TRAIT_DEBUG_BIGINT) trait |= HCL_TRAIT_DEBUG_BIGINT; +#endif + trait |= HCL_TRAIT_LANG_ENABLE_BLOCK; + trait |= HCL_TRAIT_LANG_ENABLE_EOL; + hcl_setoption (hcl, HCL_TRAIT, &trait); + + HCL_MEMSET (&hclcb, 0, HCL_SIZEOF(hclcb)); + /*hclcb.fini = fini_hcl; + hclcb.gc = gc_hcl;*/ + hclcb.vm_startup = vm_startup; + hclcb.vm_cleanup = vm_cleanup; + hclcb.vm_checkbc = vm_checkbc; + hcl_regcb (hcl, &hclcb); + + if (hcl_ignite(hcl, server->cfg.actor_heap_size) <= -1) goto oops; + if (hcl_addbuiltinprims(hcl) <= -1) goto oops; + + if (hcl_attachccio(hcl, read_handler) <= -1) goto oops; + if (hcl_attachudio(hcl, scan_handler, print_handler) <= -1) goto oops; + + if (hcl_beginfeed(hcl, on_fed_cnode) <= -1) goto oops; + + worker->hcl = hcl; + return 0; + +oops: + if (hcl) hcl_close (hcl); + return -1; +} + + +static void fini_worker_hcl (hcl_server_worker_t* worker) +{ + if (HCL_LIKELY(worker->hcl)) + { + hcl_endfeed (worker->hcl); + hcl_close (worker->hcl); + worker->hcl = HCL_NULL; + } +} + + +static int init_worker_proto (hcl_server_worker_t* worker) +{ + hcl_xproto_t* proto; + + proto = hcl_xproto_open(0, worker); + if (HCL_UNLIKELY(!proto)) return -1; + + worker->proto = proto; + return 0; +} + +static void fini_worker_proto (hcl_server_worker_t* worker) +{ + if (HCL_LIKELY(worker->proto)) + { + hcl_xproto_close (worker->proto); + worker->proto = HCL_NULL; + } +} + static void* worker_main (void* ctx) { hcl_server_worker_t* worker = (hcl_server_worker_t*)ctx; hcl_server_t* server = worker->server; sigset_t set; + int n; sigfillset (&set); pthread_sigmask (SIG_BLOCK, &set, HCL_NULL); worker->thr = pthread_self(); - worker->proto = hcl_server_proto_open(0, worker); - if (!worker->proto) + + n = init_worker_hcl(worker); + if (HCL_UNLIKELY(n <= -1)) { - free_worker (worker); + /* TODO: capture error ... */ + return HCL_NULL; + } + + n = init_worker_proto(worker); + if (HCL_UNLIKELY(n <= -1)) + { + fini_worker_hcl (worker); return HCL_NULL; } @@ -1680,11 +1831,13 @@ static void* worker_main (void* ctx) } } - hcl_server_proto_close (worker->proto); + hcl_xproto_close (worker->proto); worker->proto = HCL_NULL; + fini_worker_hcl (worker); + pthread_mutex_lock (&server->worker_mutex); - close_worker_socket (worker); + fini_worker_socket (worker); if (!worker->claimed) { zap_worker_in_server (server, worker); @@ -2590,3 +2743,6 @@ void hcl_client_freemem (hcl_client_t* client, void* ptr) { HCL_MMGR_FREE (client->_mmgr, ptr); } + + + diff --git a/lib/hcl-x.h b/lib/hcl-x.h index ff66840..f21f828 100644 --- a/lib/hcl-x.h +++ b/lib/hcl-x.h @@ -52,6 +52,9 @@ struct hcl_xpkt_hdr_t }; typedef struct hcl_xpkt_hdr_t hcl_xpkt_hdr_t; +/* ---------------------------------------------------------------------- */ + +typedef struct hcl_xproto_t hcl_xproto_t; /* ---------------------------------------------------------------------- */ @@ -380,6 +383,17 @@ HCL_EXPORT void hcl_client_freemem ( void* ptr ); +/* ---------------------------------------------------------------------- */ + +HCL_EXPORT hcl_xproto_t* hcl_xproto_open ( + hcl_oow_t xtnsize, + hcl_server_worker_t* worker +); + +HCL_EXPORT void hcl_xproto_close ( + hcl_xproto_t* proto +); + #if defined(__cplusplus) } #endif