undergoing code refactoring of xproto, server, client code
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
2024-04-21 22:15:04 +09:00
parent 6eee6bc9eb
commit 7e782809f9
9 changed files with 2979 additions and 426 deletions

View File

@ -110,45 +110,6 @@ typedef struct server_hcl_xtn_t server_hcl_xtn_t;
/* ---------------------------------- */
enum hcl_xproto_rcv_state_t
{
HCL_XPROTO_RCV_HDR,
HCL_XPROTO_RCV_PLD
};
typedef enum hcl_xproto_rcv_state_t hcl_xproto_rcv_state_t;
struct hcl_xproto_t
{
hcl_oow_t _instsize;
hcl_mmgr_t* _mmgr;
hcl_tmr_index_t exec_runtime_event_index;
struct
{
hcl_xproto_rcv_state_t state;
hcl_oow_t len_needed;
unsigned int eof: 1;
hcl_oow_t len;
hcl_uint8_t buf[HCL_XPKT_MAX_PLD_LEN];
/* normalize header of hcl_xpkt_hdr_t with combined bits into separate placeholders */
struct
{
hcl_uint8_t id;
hcl_uint8_t type;
hcl_uint16_t len; /* this is wider than the len field of hcl_xpkt_hdr_t */
} hdr;
} rcv;
struct
{
} snd;
};
/* ---------------------------------- */
enum hcl_server_worker_state_t
{
HCL_SERVER_WORKER_STATE_DEAD = 0,
@ -181,6 +142,7 @@ struct hcl_server_worker_t
hcl_ntime_t alloc_time;
hcl_server_worker_state_t state;
hcl_server_worker_opstate_t opstate;
hcl_tmr_index_t exec_runtime_event_index;
hcl_xproto_t* proto;
hcl_t* hcl;
@ -694,6 +656,16 @@ static void fini_hcl (hcl_t* hcl)
*/
/* ========================================================================= */
static hcl_server_worker_t* proto_to_worker (hcl_xproto_t* proto)
{
proto_xtn_t* prtxtn;
prtxtn = (proto_xtn_t*)hcl_xproto_getxtn(proto);
return prtxtn->worker;
}
/* ========================================================================= */
#define SERVER_LOGMASK_INFO (HCL_LOG_INFO | HCL_LOG_APP)
#define SERVER_LOGMASK_ERROR (HCL_LOG_ERROR | HCL_LOG_APP)
@ -728,15 +700,9 @@ static void exec_runtime_handler (hcl_tmr_t* tmr, const hcl_ntime_t* now, hcl_tm
{
/* [NOTE] this handler is executed in the main server thread
* when it calls hcl_tmr_fire(). */
hcl_xproto_t* proto;
hcl_server_worker_t* worker;
proto_xtn_t* prtxtn;
proto = (hcl_xproto_t*)evt->ctx;
prtxtn = (proto_xtn_t*)hcl_xproto_getxtn(proto);
worker = prtxtn->worker;
worker = proto_to_worker((hcl_xproto_t*)evt->ctx);
/* TODO: can we use worker->hcl for logging before abort?? */
HCL_LOG1 (worker->server->dummy_hcl, SERVER_LOGMASK_INFO, "Aborting script execution for max_actor_runtime exceeded [%zu]\n", worker->wid);
hcl_abort (worker->hcl);
@ -746,19 +712,16 @@ static void exec_runtime_updater (hcl_tmr_t* tmr, hcl_tmr_index_t old_index, hcl
{
/* [NOTE] this handler is executed in the main server thread
* when it calls hcl_tmr_fire() */
hcl_xproto_t* proto;
hcl_server_worker_t* worker;
proto_xtn_t* prtxtn;
proto = (hcl_xproto_t*)evt->ctx;
prtxtn = (proto_xtn_t*)hcl_xproto_getxtn(proto);
worker = prtxtn->worker;
HCL_ASSERT (worker->hcl, proto->exec_runtime_event_index == old_index);
worker = proto_to_worker(proto);
HCL_ASSERT (worker->hcl, worker->exec_runtime_event_index == old_index);
/* the event is being removed by hcl_tmr_fire() or by hcl_tmr_delete()
* if new_index is HCL_TMR_INVALID_INDEX. it's being updated if not. */
proto->exec_runtime_event_index = new_index;
worker->exec_runtime_event_index = new_index;
}
static int insert_exec_timer (hcl_xproto_t* proto, const hcl_ntime_t* tmout)
@ -767,15 +730,13 @@ static int insert_exec_timer (hcl_xproto_t* proto, const hcl_ntime_t* tmout)
hcl_tmr_event_t event;
hcl_tmr_index_t index;
proto_xtn_t* prtxtn;
hcl_server_worker_t* worker;
hcl_server_t* server;
prtxtn = (proto_xtn_t*)hcl_xproto_getxtn(proto);
worker = prtxtn->worker;
worker = proto_to_worker(proto);
server = worker->server;
HCL_ASSERT (worker->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX);
HCL_ASSERT (worker->hcl, worker->exec_runtime_event_index == HCL_TMR_INVALID_INDEX);
HCL_MEMSET (&event, 0, HCL_SIZEOF(event));
event.ctx = proto;
@ -786,7 +747,7 @@ static int insert_exec_timer (hcl_xproto_t* proto, const hcl_ntime_t* tmout)
pthread_mutex_lock (&server->tmr_mutex);
index = hcl_tmr_insert(server->tmr, &event);
proto->exec_runtime_event_index = index;
worker->exec_runtime_event_index = index;
if (index != HCL_TMR_INVALID_INDEX)
{
/* inform the server of timer event change */
@ -800,25 +761,23 @@ static int insert_exec_timer (hcl_xproto_t* proto, const hcl_ntime_t* tmout)
static void delete_exec_timer (hcl_xproto_t* proto)
{
/* [NOTE] this is executed in the worker thread. if the event has been fired
* in the server thread, proto->exec_runtime_event_index should be
* in the server thread, worker->exec_runtime_event_index should be
* HCL_TMR_INVALID_INDEX as set by exec_runtime_handler */
proto_xtn_t* prtxtn;
hcl_server_worker_t* worker;
hcl_server_t* server;
prtxtn = (proto_xtn_t*)hcl_xproto_getxtn(proto);
worker = prtxtn->worker;
worker = proto_to_worker(proto);
server = worker->server;
pthread_mutex_lock (&server->tmr_mutex);
if (proto->exec_runtime_event_index != HCL_TMR_INVALID_INDEX)
if (worker->exec_runtime_event_index != HCL_TMR_INVALID_INDEX)
{
/* the event has not been fired yet. let's delete it
* if it has been fired, the index it shall be HCL_TMR_INVALID_INDEX already */
hcl_tmr_delete (server->tmr, proto->exec_runtime_event_index);
HCL_ASSERT (worker->hcl, proto->exec_runtime_event_index == HCL_TMR_INVALID_INDEX);
/*proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX; */
hcl_tmr_delete (server->tmr, worker->exec_runtime_event_index);
HCL_ASSERT (worker->hcl, worker->exec_runtime_event_index == HCL_TMR_INVALID_INDEX);
/*worker->exec_runtime_event_index = HCL_TMR_INVALID_INDEX; */
}
pthread_mutex_unlock (&server->tmr_mutex);
}
@ -827,12 +786,10 @@ static int execute_script (hcl_xproto_t* proto, const hcl_bch_t* trigger)
{
hcl_oop_t obj;
const hcl_ooch_t* failmsg = HCL_NULL;
proto_xtn_t* prtxtn;
hcl_server_worker_t* worker;
hcl_server_t* server;
prtxtn = (proto_xtn_t*)hcl_xproto_getxtn(proto);
worker = prtxtn->worker;
worker = proto_to_worker(proto);
server = worker->server;
#if 0
@ -905,24 +862,18 @@ static void reformat_synerr (hcl_t* hcl)
static void send_proto_hcl_error (hcl_xproto_t* proto)
{
proto_xtn_t* prtxtn;
hcl_server_worker_t* worker;
prtxtn = (proto_xtn_t*)hcl_xproto_getxtn(proto);
worker = prtxtn->worker;
worker = proto_to_worker(proto);
if (HCL_ERRNUM(worker->hcl) == HCL_ESYNERR) reformat_synerr (worker->hcl);
send_error_message (proto, hcl_geterrmsg(worker->hcl));
}
static void show_server_workers (hcl_xproto_t* proto)
{
proto_xtn_t* prtxtn;
hcl_server_worker_t* worker, * w;
hcl_server_t* server;
prtxtn = (proto_xtn_t*)hcl_xproto_getxtn(proto);
worker = prtxtn->worker;
worker = proto_to_worker(proto);
server = worker->server;
pthread_mutex_lock (&server->worker_mutex);
@ -936,13 +887,11 @@ static void show_server_workers (hcl_xproto_t* proto)
static int kill_server_worker (hcl_xproto_t* proto, hcl_oow_t wid)
{
proto_xtn_t* prtxtn;
hcl_server_worker_t* worker;
hcl_server_t* server;
int xret = 0;
prtxtn = (proto_xtn_t*)hcl_xproto_getxtn(proto);
worker = prtxtn->worker;
worker = proto_to_worker(proto);
server = worker->server;
pthread_mutex_lock (&server->worker_mutex);
@ -979,14 +928,12 @@ static int kill_server_worker (hcl_xproto_t* proto, hcl_oow_t wid)
return xret;
}
static int handle_packet (hcl_xproto_t* proto, hcl_xpkt_type_t type, void* data, hcl_oow_t len)
static int handle_packet (hcl_xproto_t* proto, hcl_xpkt_type_t type, const void* data, hcl_oow_t len)
{
proto_xtn_t* prtxtn;
hcl_server_worker_t* worker;
hcl_t* hcl;
prtxtn = (proto_xtn_t*)hcl_xproto_getxtn(proto);
worker = prtxtn->worker;
worker = proto_to_worker(proto);
hcl = worker->hcl;
printf ("HANDLE PACKET TYPE => %d\n", type);
@ -1082,15 +1029,13 @@ static int send_iov (int sck, struct iovec* iov, int count)
static int send_stdout_bytes (hcl_xproto_t* proto, const hcl_bch_t* data, hcl_oow_t len)
{
proto_xtn_t* prtxtn;
hcl_server_worker_t* worker;
hcl_xpkt_hdr_t hdr;
struct iovec iov[2];
const hcl_bch_t* ptr, * cur, * end;
hcl_uint16_t seglen;
prtxtn = (proto_xtn_t*)hcl_xproto_getxtn(proto);
worker = prtxtn->worker;
worker = proto_to_worker(proto);
ptr = cur = data;
end = data + len;
@ -1126,15 +1071,13 @@ printf ("SENDING BYTES [%.*s]\n", (int)len, data);
#if defined(HCL_OOCH_IS_UCH)
static int send_stdout_chars (hcl_xproto_t* proto, const hcl_ooch_t* data, hcl_oow_t len)
{
proto_xtn_t* prtxtn;
hcl_server_worker_t* worker;
const hcl_ooch_t* ptr, * end;
hcl_bch_t tmp[256];
hcl_oow_t tln, pln;
int n;
prtxtn = (proto_xtn_t*)hcl_xproto_getxtn(proto);
worker = prtxtn->worker;
worker = proto_to_worker(proto);
ptr = data;
end = data + len;
@ -1155,139 +1098,6 @@ static int send_stdout_chars (hcl_xproto_t* proto, const hcl_ooch_t* data, hcl_o
}
#endif
/* ========================================================================= */
hcl_xproto_t* hcl_xproto_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize)
{
hcl_xproto_t* proto;
proto = (hcl_xproto_t*)HCL_MMGR_ALLOC(mmgr, HCL_SIZEOF(*proto) + xtnsize);
if (HCL_UNLIKELY(!proto)) return HCL_NULL;
HCL_MEMSET (proto, 0, HCL_SIZEOF(*proto));
proto->_instsize = HCL_SIZEOF(*proto);
proto->_mmgr = mmgr;
proto->exec_runtime_event_index = HCL_TMR_INVALID_INDEX;
proto->rcv.state = HCL_XPROTO_RCV_HDR;
proto->rcv.len_needed = HCL_XPKT_HDR_LEN;
proto->rcv.eof = 0;
return proto;
}
void hcl_xproto_close (hcl_xproto_t* proto)
{
HCL_MMGR_FREE (proto->_mmgr, proto);
}
void* hcl_xproto_getxtn (hcl_xproto_t* proto)
{
return (proto + 1);
}
int hcl_xprpto_feed (hcl_xproto_t* proto, const void* data, hcl_oow_t len)
{
}
hcl_uint8_t* hcl_xproto_getbuf (hcl_xproto_t* proto, hcl_oow_t* capa)
{
*capa = HCL_COUNTOF(proto->rcv.buf) - proto->rcv.len;
return &proto->rcv.buf[proto->rcv.len];
}
void hcl_xproto_seteof (hcl_xproto_t* proto, int v)
{
proto->rcv.eof = v;
}
void hcl_xproto_advbuf (hcl_xproto_t* proto, hcl_oow_t inc)
{
proto->rcv.len += inc;
}
int hcl_xproto_ready (hcl_xproto_t* proto)
{
/* has it received suffient data for processing? */
return proto->rcv.len >= proto->rcv.len_needed;
}
int hcl_xproto_process (hcl_xproto_t* proto)
{
int n;
hcl_xpkt_hdr_t* hdr;
switch (proto->rcv.state)
{
case HCL_XPROTO_RCV_HDR:
if (proto->rcv.len < HCL_XPKT_HDR_LEN) goto carry_on; /* need more data */
hdr = (hcl_xpkt_hdr_t*)proto->rcv.buf;
proto->rcv.hdr.id = hdr->id;
proto->rcv.hdr.type = hdr->type & 0x0F;
proto->rcv.hdr.len = (hcl_uint16_t)hdr->len | ((hcl_uint16_t)(hdr->type >> 4) << 8);
/* consume the header */
HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[HCL_XPKT_HDR_LEN], proto->rcv.len - HCL_XPKT_HDR_LEN);
proto->rcv.len -= HCL_XPKT_HDR_LEN;
/* switch to the payload mode */
if (proto->rcv.hdr.len > 0)
{
proto->rcv.state = HCL_XPROTO_RCV_PLD;
proto->rcv.len_needed = proto->rcv.hdr.len;
}
else
{
/* take shortcut */
/* TODO: convert handle_packet as call back */
n = handle_packet(proto, proto->rcv.hdr.type, proto->rcv.buf, proto->rcv.hdr.len);
if (n <= -1) goto fail_with_errmsg;
if (n == 0) return 0;
}
break;
case HCL_XPROTO_RCV_PLD:
if (proto->rcv.len < proto->rcv.hdr.len) goto carry_on; /* need more payload data */
/* TODO: convert handle_packet as call back */
n = handle_packet(proto, proto->rcv.hdr.type, proto->rcv.buf, proto->rcv.hdr.len);
/* TODO: minimize the use of HCL_MEMOVE... use the buffer */
/* switch to the header mode */
if (proto->rcv.hdr.len > 0)
{
HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[proto->rcv.hdr.len], proto->rcv.len - proto->rcv.hdr.len);
proto->rcv.len -= proto->rcv.hdr.len;
}
proto->rcv.state = HCL_XPROTO_RCV_HDR;
proto->rcv.len_needed = HCL_XPKT_HDR_LEN;
if (n <= -1) goto fail_with_errmsg;
if (n == 0) return 0;
break;
default:
/*
hcl_seterrbfmt (hcl, HCL_EINTERN, "invalid request state %d", (int)proto->rcv.state);
*/
/* TODO: call back */
goto fail_with_errmsg;
}
carry_on:
return 1;
fail_with_errmsg:
// TODO: proper error handling
//send_proto_hcl_error (proto);
//HCL_LOG1 (hcl, SERVER_LOGMASK_ERROR, "Unable to compile .SCRIPT contents - %js\n", hcl_geterrmsg(worker->hcl));
return -1;
}
/* ========================================================================= */
hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_prim_t* prim, hcl_errnum_t* errnum)
@ -1520,6 +1330,7 @@ static hcl_server_worker_t* alloc_worker (hcl_server_t* server, int cli_sck, con
worker->sck = cli_sck;
worker->peeraddr = *peeraddr;
worker->server = server;
worker->exec_runtime_event_index = HCL_TMR_INVALID_INDEX;
server->dummy_hcl->vmprim.vm_gettime (server->dummy_hcl, &worker->alloc_time); /* TODO: the callback may return monotonic time. find a way to guarantee it is realtime??? */
@ -1680,7 +1491,6 @@ static int worker_step (hcl_server_worker_t* worker)
if (x <= -1)
{
if (errno == EINTR) goto carry_on; /* didn't read read */
hcl_seterrwithsyserr (hcl, 0, errno);
return -1;
}
@ -1779,8 +1589,12 @@ static int init_worker_proto (hcl_server_worker_t* worker)
{
hcl_xproto_t* proto;
proto_xtn_t* xtn;
hcl_xproto_cb_t cb;
proto = hcl_xproto_open(hcl_server_getmmgr(worker->server), HCL_SIZEOF(*xtn));
HCL_MEMSET (&cb, 0, HCL_SIZEOF(cb));
cb.on_packet = handle_packet;
proto = hcl_xproto_open(hcl_server_getmmgr(worker->server), &cb, HCL_SIZEOF(*xtn));
if (HCL_UNLIKELY(!proto)) return -1;
xtn = hcl_xproto_getxtn(proto);
@ -2754,6 +2568,3 @@ void hcl_client_freemem (hcl_client_t* client, void* ptr)
{
HCL_MMGR_FREE (client->_mmgr, ptr);
}