From 003ebb1b943747ecf97b45b5b0cf5c6b4ae5707b Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Mon, 15 Apr 2024 02:23:55 +0900 Subject: [PATCH] wip - more reworking on hcl server/client code --- bin/main-c.c | 13 ++- bin/main-s.c | 2 +- bin/main.c | 2 +- lib/hcl-x.c | 304 +++++++++++++++++++++++++++++++++++++----------- lib/hcl-x.h | 20 +--- lib/utl.c | 2 +- t/call-5001.err | 11 ++ 7 files changed, 263 insertions(+), 91 deletions(-) diff --git a/bin/main-c.c b/bin/main-c.c index 988de7f..5ff158c 100644 --- a/bin/main-c.c +++ b/bin/main-c.c @@ -24,7 +24,6 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#include "hcl-c.h" #include "hcl-x.h" #include "hcl-opt.h" #include "hcl-utl.h" @@ -527,8 +526,6 @@ static int receive_raw_bytes (hcl_xproto_t* proto, int sck, hcl_ntime_t* idle_tm ssize_t x; int n; - HCL_ASSERT (hcl, proto->rcv.len < proto->rcv.len_needed); - if (HCL_UNLIKELY(proto->rcv.eof)) { hcl_seterrbfmt (hcl, HCL_EGENERIC, "connection closed"); @@ -711,6 +708,16 @@ static int handle_request (hcl_client_t* client, const char* ipaddr, const char* scptr = sccur; + if (!sccur == '\0') + { + hdr.type = HCL_XPKT_EXECUTE; + hdr.id = 1; /* TODO: */ + hdr.len = sccur - scptr; + + iov[0].iov_base = &hdr; + iov[0].iov_len = HCL_SIZEOF(hdr); + send_iov (sck, iov, 1); + } if (*sccur == '\0' && shut_wr_after_req) shutdown (sck, SHUT_WR); } diff --git a/bin/main-s.c b/bin/main-s.c index e059153..85b57f5 100644 --- a/bin/main-s.c +++ b/bin/main-s.c @@ -544,7 +544,7 @@ int main (int argc, char* argv[]) return -1; } - while ((c = hcl_getbopt (argc, argv, &opt)) != HCL_BCI_EOF) + while ((c = hcl_getbopt(argc, argv, &opt)) != HCL_BCI_EOF) { switch (c) { diff --git a/bin/main.c b/bin/main.c index b612f95..be48b72 100644 --- a/bin/main.c +++ b/bin/main.c @@ -837,7 +837,7 @@ int main (int argc, char* argv[]) /*trait |= HCL_TRAIT_NOGC;*/ trait |= HCL_TRAIT_AWAIT_PROCS; if (enable_block) trait |= HCL_TRAIT_LANG_ENABLE_BLOCK; - if (nl_terminator) trait |= HCL_TRAIT_LANG_ENABLE_EOL;; + if (nl_terminator) trait |= HCL_TRAIT_LANG_ENABLE_EOL; hcl_setoption (hcl, HCL_TRAIT, &trait); } diff --git a/lib/hcl-x.c b/lib/hcl-x.c index c74658e..d8a75f8 100644 --- a/lib/hcl-x.c +++ b/lib/hcl-x.c @@ -131,14 +131,10 @@ struct hcl_server_proto_t hcl_xpkt_hdr_t hdr; } rcv; -/* struct { - hcl_oow_t nchunks; - hcl_bch_t buf[HCL_SERVER_PROTO_REPLY_BUF_SIZE]; - hcl_oow_t len; - } reply; -*/ + + } snd; struct { @@ -277,6 +273,15 @@ struct hcl_server_t pthread_mutex_t log_mutex; }; +/* ========================================================================= */ +static int send_stdout_bytes (hcl_server_proto_t* proto, const hcl_bch_t* data, hcl_oow_t len); + +#if defined(HCL_OOCH_IS_UCH) +static int send_stdout_chars (hcl_server_proto_t* proto, const hcl_ooch_t* data, hcl_oow_t len); +#else +#define send_stdout_chars(proto,data,len) send_stdout_bytes(proto,data,len) +#endif + /* ========================================================================= */ static const hcl_bch_t* get_base_name (const hcl_bch_t* path) @@ -571,9 +576,11 @@ static int print_handler (hcl_t* hcl, hcl_io_cmd_t cmd, void* arg) switch (cmd) { case HCL_IO_OPEN: +printf ("IO OPEN SOMETHING...........\n"); return 0; case HCL_IO_CLOSE: +printf ("IO CLOSE SOMETHING...........\n"); return 0; case HCL_IO_WRITE: @@ -582,8 +589,7 @@ static int print_handler (hcl_t* hcl, hcl_io_cmd_t cmd, void* arg) hcl_io_udoarg_t* outarg = (hcl_io_udoarg_t*)arg; printf ("IO WRITE SOMETHING...........\n"); -#if 0 - if (hcl_server_proto_feed_reply(xtn->proto, outarg->ptr, outarg->len, 0) <= -1) + if (send_stdout_chars(xtn->proto, outarg->ptr, outarg->len) <= -1) { /* TODO: change error code and message. propagage the errormessage from proto */ hcl_seterrbfmt (hcl, HCL_EIOERR, "failed to write message via proto"); @@ -593,7 +599,7 @@ printf ("IO WRITE SOMETHING...........\n"); hcl_abort (hcl); return -1; } -#endif + outarg->xlen = outarg->len; return 0; } @@ -604,7 +610,7 @@ printf ("IO WRITE SOMETHING...........\n"); printf ("IO WRITE SOMETHING BYTES...........\n"); #if 0 - if (hcl_server_proto_feed_reply_bytes(xtn->proto, outarg->ptr, outarg->len) <= -1) + if (send_stdout_bytes(xtn->proto, outarg->ptr, outarg->len) <= -1) { /* TODO: change error code and message. propagage the errormessage from proto */ hcl_seterrbfmt (hcl, HCL_EIOERR, "failed to write message via proto"); @@ -705,48 +711,19 @@ printf ("on_fed_cnode......\n"); * arrange to clear byte-codes before compiling the expression. */ flags = HCL_COMPILE_CLEAR_CODE | HCL_COMPILE_CLEAR_FNBLK; proto->feed.ongoing = 1; +printf ("CLEARING...\n"); } -printf ("hcl_copingll......\n"); +printf ("COMPILING hcl_copingll......\n"); if (hcl_compile(hcl, obj, flags) <= -1) { +hcl_logbfmt (hcl, HCL_LOG_STDERR, "COMPILER ERROR - %js\n", hcl_geterrmsg(hcl)); #if 0 print_error(hcl, "failed to compile"); xtn->feed.pos = xtn->feed.len; /* arrange to discard the rest of the line */ show_prompt (hcl, 0); #endif } - else - { -#if 0 - hcl_oow_t i; - - for (i = xtn->feed.pos; i < xtn->feed.len; i++) - { - /* this loop is kind of weird. it is to check the current feed buffer is left with - * spaces only and to execute the compiled bytecodes so far if the check is true. - * the check is performed because a single line of the user input can have multiple - * expressions joined with a semicolon or contains trailing spaces. */ - if (!hcl_is_bch_space(xtn->feed.buf[i])) break; - } - - if (i >= xtn->feed.len || xtn->feed.pos >= xtn->feed.len) - { -#endif - hcl_oop_t retv; - - /* nothing more to feed */ - -printf ("hcl_executing.....\n"); - retv = hcl_execute(hcl); - hcl_flushudio (hcl); - - proto->feed.ongoing = 0; - /*show_prompt (hcl, 0);*/ -#if 0 - } -#endif - } return 0; } @@ -787,6 +764,8 @@ hcl_server_proto_t* hcl_server_proto_open (hcl_oow_t xtnsize, hcl_server_worker_ if (proto->worker->server->cfg.trait & HCL_SERVER_TRAIT_DEBUG_GC) trait |= HCL_TRAIT_DEBUG_GC; if (proto->worker->server->cfg.trait & HCL_SERVER_TRAIT_DEBUG_BIGINT) trait |= HCL_TRAIT_DEBUG_BIGINT; #endif + trait |= HCL_TRAIT_LANG_ENABLE_BLOCK; + trait |= HCL_TRAIT_LANG_ENABLE_EOL; hcl_setoption (proto->hcl, HCL_TRAIT, &trait); HCL_MEMSET (&hclcb, 0, HCL_SIZEOF(hclcb)); @@ -837,7 +816,7 @@ static int write_stdout (hcl_server_proto_t* proto, const hcl_bch_t* ptr, hcl_oo iov[0].iov_base = &hdr; iov[0].iov_len = HCL_SIZEOF(hdr); - iov[1].iov_base = ptr; + iov[1].iov_base = (hcl_bch_t*)ptr; iov[1].iov_len = len; while (1) @@ -1160,20 +1139,12 @@ static int receive_raw_request (hcl_server_proto_t* proto) } -int hcl_server_proto_handle_incoming (hcl_server_proto_t* proto) +static int handle_received_data (hcl_server_proto_t* proto) { hcl_server_worker_t* worker = proto->worker; hcl_t* hcl = proto->hcl; hcl_xpkt_hdr_t* hdr; - if (proto->rcv.len < proto->rcv.len_needed) - { - int n; - n = receive_raw_request(proto); - if (n <= -1) goto fail_with_errmsg; // TODO: backup error message...and create a new message - else if (n == 0) return 0; /* not enough data has been read */ - } - switch (proto->rcv.state) { case HCL_SERVER_PROTO_RCV_HEADER: @@ -1182,22 +1153,14 @@ int hcl_server_proto_handle_incoming (hcl_server_proto_t* proto) HCL_MEMCPY (&proto->rcv.hdr, proto->rcv.buf, HCL_SIZEOF(proto->rcv.hdr)); //proto->rcv.hdr.len = hcl_ntoh16(proto->rcv.hdr.len); /* keep this in the host byte order */ -printf ("req.hdr = %d %d %d\n", proto->rcv.hdr.type, proto->rcv.hdr.id, proto->rcv.hdr.len); /* consume the header */ HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[HCL_SIZEOF(proto->rcv.hdr)], proto->rcv.len - HCL_SIZEOF(proto->rcv.hdr)); proto->rcv.len -= HCL_SIZEOF(proto->rcv.hdr); - if (proto->rcv.hdr.len > 0) - { - /* switch to the payload mode */ - proto->rcv.state = HCL_SERVER_PROTO_RCV_PAYLOAD; - proto->rcv.len_needed = proto->rcv.hdr.len; - return 0; /* need data for payload */ - } - - /* payload length is zero in the header. */ - HCL_ASSERT (hcl, proto->rcv.len_needed == HCL_SIZEOF(proto->rcv.hdr)); - break; + /* switch to the payload mode */ + proto->rcv.state = HCL_SERVER_PROTO_RCV_PAYLOAD; + proto->rcv.len_needed = proto->rcv.hdr.len; + return 0; case HCL_SERVER_PROTO_RCV_PAYLOAD: if (proto->rcv.len < proto->rcv.hdr.len) return 0; /* need more payload data */ @@ -1211,6 +1174,22 @@ printf ("FEEDING [%.*s]\n", proto->rcv.hdr.len, proto->rcv.buf); goto fail_with_errmsg; } } + else if (proto->rcv.hdr.type == HCL_XPKT_EXECUTE) + { + hcl_oop_t retv; +printf ("EXECUTING hcl_executing......\n"); + proto->feed.ongoing = 0; + + hcl_decode (hcl, hcl_getcode(hcl), 0, hcl_getbclen(hcl)); + + retv = hcl_execute(hcl); + hcl_flushudio (hcl); + if (!retv) + { + /* TODO: backup error message...and create a new message */ + goto fail_with_errmsg; + } + } else if (proto->rcv.hdr.type == HCL_XPKT_STDIN) { /* store ... push stdin pipe... */ @@ -1229,8 +1208,11 @@ printf ("FEEDING [%.*s]\n", proto->rcv.hdr.len, proto->rcv.buf); /* TODO: minimize the use of HCL_MEMOVE... use the buffer */ /* switch to the header mode */ - HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[proto->rcv.hdr.len], proto->rcv.len - proto->rcv.hdr.len); - proto->rcv.len -= proto->rcv.hdr.len; + if (proto->rcv.hdr.len > 0) + { + HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[proto->rcv.hdr.len], proto->rcv.len - proto->rcv.hdr.len); + proto->rcv.len -= proto->rcv.hdr.len; + } proto->rcv.state = HCL_SERVER_PROTO_RCV_HEADER; proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr); @@ -1251,6 +1233,102 @@ fail_with_errmsg: } +static int send_iov (int sck, struct iovec* iov, int count) +{ + int index = 0; + + while (1) + { + ssize_t nwritten; + struct msghdr msg; + + memset (&msg, 0, HCL_SIZEOF(msg)); + msg.msg_iov = (struct iovec*)&iov[index]; + msg.msg_iovlen = count - index; + nwritten = sendmsg(sck, &msg, 0); + /*nwritten = writev(proto->worker->sck, (const struct iovec*)&iov[index], count - index);*/ + if (nwritten <= -1) + { + /* error occurred inside the worker thread shouldn't affect the error information + * in the server object. so here, i just log a message */ + fprintf (stderr, "Unable to sendmsg on %d - %s\n", sck, strerror(errno)); + return -1; + } + + while (index < count && (size_t)nwritten >= iov[index].iov_len) + nwritten -= iov[index++].iov_len; + + if (index == count) break; + + iov[index].iov_base = (void*)((hcl_uint8_t*)iov[index].iov_base + nwritten); + iov[index].iov_len -= nwritten; + } + + return 0; +} + +static int send_stdout_bytes (hcl_server_proto_t* proto, const hcl_bch_t* data, hcl_oow_t len) +{ + hcl_xpkt_hdr_t hdr; + struct iovec iov[2]; + const hcl_bch_t* ptr, * cur, * end; + + ptr = cur = data; + end = data + len; + + while (ptr < end) + { + while (cur != end && cur - ptr < 255) cur++; + + hdr.type = HCL_XPKT_STDOUT; + hdr.id = 1; /* TODO: */ + hdr.len = cur - ptr; + + iov[0].iov_base = &hdr; + iov[0].iov_len = HCL_SIZEOF(hdr); + iov[1].iov_base = ptr; + iov[1].iov_len = cur - ptr; + + if (send_iov(proto->worker->sck, iov, 2) <= -1) + { + /* TODO: error message */ + return -1; + } + + ptr = cur; + } + + return 0; +} + +#if defined(HCL_OOCH_IS_UCH) +static int send_stdout_chars (hcl_server_proto_t* proto, const hcl_ooch_t* data, hcl_oow_t len) +{ + const hcl_ooch_t* ptr, * end; + hcl_bch_t tmp[256]; + hcl_oow_t tln, pln; + int n; + + ptr = data; + end = data + len; + + while (ptr < end) + { + pln = end - ptr; + tln = HCL_COUNTOF(tmp); + n = hcl_convutobchars(proto->hcl, ptr, &pln, tmp, &tln); + if (n <= -1 && n != -2) return -1; + + if (send_stdout_bytes(proto, tmp, tln) <= -1) return -1; + ptr += pln; + } + + return 0; + +} +#endif + + /* ========================================================================= */ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_prim_t* prim, hcl_errnum_t* errnum) @@ -1574,6 +1652,97 @@ static void zap_worker_in_server (hcl_server_t* server, hcl_server_worker_t* wor worker->next_worker = HCL_NULL; } +static int worker_step (hcl_server_worker_t* worker) +{ + hcl_server_proto_t* proto = worker->proto; + hcl_server_t* server = worker->server; + hcl_t* hcl = proto->hcl; + struct pollfd pfd; + int tmout, actual_tmout; + ssize_t x; + int n; + + //HCL_ASSERT (hcl, proto->rcv.len < proto->rcv.len_needed); + + if (HCL_UNLIKELY(proto->rcv.eof)) + { +// TODO: may not be an error if writable needs to be checked... + hcl_seterrbfmt (hcl, HCL_EGENERIC, "connection closed"); + return -1; + } + + tmout = HCL_SECNSEC_TO_MSEC(server->cfg.worker_idle_timeout.sec, server->cfg.worker_idle_timeout.nsec); + actual_tmout = (tmout <= 0)? 10000: tmout; + + pfd.fd = worker->sck; + pfd.events = 0; + if (proto->rcv.len < proto->rcv.len_needed) pfd.events |= POLLIN; + //if (proto->snd.len > 0) pfd.events |= POLLOUT; + + if (pfd.events) + { + n = poll(&pfd, 1, actual_tmout); + if (n <= -1) + { + if (errno == EINTR) return 0; + hcl_seterrwithsyserr (hcl, 0, errno); + return -1; + } + else if (n == 0) + { + /* timed out - no activity on the pfd */ + if (tmout > 0) + { + /* timeout explicity set. no activity for that duration. considered idle */ + hcl_seterrbfmt (hcl, HCL_EGENERIC, "no activity on the worker socket %d", worker->sck); + return -1; + } + + return 0; /* didn't read yet */ + } + + if (pfd.revents & POLLERR) + { + hcl_seterrbfmt (hcl, HCL_EGENERIC, "error condition detected on workder socket %d", worker->sck); + return -1; + } + + if (pfd.revents & POLLOUT) + { + } + + if (pfd.revents & POLLIN) + { + x = recv(worker->sck, &proto->rcv.buf[proto->rcv.len], HCL_COUNTOF(proto->rcv.buf) - proto->rcv.len, 0); + if (x <= -1) + { + if (errno == EINTR) return 0; /* didn't read read */ + + proto->rcv.polled = 0; + hcl_seterrwithsyserr (hcl, 0, errno); + return -1; + } + + if (x == 0) proto->rcv.eof = 1; + proto->rcv.len += x; + + } + } + + + /* the receiver buffer has enough data */ + while (proto->rcv.len >= proto->rcv.len_needed) + { + if (handle_received_data(proto) <= -1) + { + /* TODO: proper error message */ + return -1; + } + } + + return 0; +} + static void* worker_main (void* ctx) { hcl_server_worker_t* worker = (hcl_server_worker_t*)ctx; @@ -1595,11 +1764,12 @@ static void* worker_main (void* ctx) add_worker_to_server (server, HCL_SERVER_WORKER_STATE_ALIVE, worker); pthread_mutex_unlock (&server->worker_mutex); + /* the worker loop */ while (!server->stopreq) { worker->opstate = HCL_SERVER_WORKER_OPSTATE_WAIT; - if (hcl_server_proto_handle_incoming(worker->proto) <= -1) + if (worker_step(worker) <= -1) { worker->opstate = HCL_SERVER_WORKER_OPSTATE_ERROR; break; diff --git a/lib/hcl-x.h b/lib/hcl-x.h index d0eb213..6c8a353 100644 --- a/lib/hcl-x.h +++ b/lib/hcl-x.h @@ -31,6 +31,8 @@ enum hcl_xpkt_type_t { HCL_XPKT_CODEIN, + HCL_XPKT_EXECUTE, + HCL_XPKT_CODEOUT, /* return value is passed over this? */ HCL_XPKT_STDIN, HCL_XPKT_STDOUT, @@ -96,7 +98,6 @@ struct hcl_server_prim_t typedef struct hcl_server_prim_t hcl_server_prim_t; - /* ---------------------------------------------------------------------- */ @@ -255,23 +256,6 @@ HCL_EXPORT void hcl_server_freemem ( ); -HCL_EXPORT int hcl_server_proto_feed_reply ( - hcl_server_proto_t* proto, - const hcl_ooch_t* ptr, - hcl_oow_t len, - int escape -); - -HCL_EXPORT int hcl_server_proto_feed_reply_bytes ( - hcl_server_proto_t* proto, - const hcl_bch_t* ptr, - hcl_oow_t len -); - -HCL_EXPORT int hcl_server_proto_handle_incoming ( - hcl_server_proto_t* proto -); - /* ---------------------------------------------------------------------- */ HCL_EXPORT hcl_client_t* hcl_client_open ( diff --git a/lib/utl.c b/lib/utl.c index c007773..7756d6c 100644 --- a/lib/utl.c +++ b/lib/utl.c @@ -686,7 +686,7 @@ int hcl_conv_bcstr_to_ucstr_with_cmgr (const hcl_bch_t* bcs, hcl_oow_t* bcslen, for (bp = bcs; *bp != '\0'; bp++) /* nothing */ ; mlen = bp - bcs; wlen = *ucslen; - n = hcl_conv_bchars_to_uchars_with_cmgr (bcs, &mlen, ucs, &wlen, cmgr, all); + n = hcl_conv_bchars_to_uchars_with_cmgr(bcs, &mlen, ucs, &wlen, cmgr, all); if (ucs) { /* null-terminate the target buffer if it has room for it. */ diff --git a/t/call-5001.err b/t/call-5001.err index b345ee0..308d0a0 100644 --- a/t/call-5001.err +++ b/t/call-5001.err @@ -4,6 +4,17 @@ --- +## you must enclose the binary expression with parentheses +## i := (i + 20) + +i:=0; +while(i < 20) { + printf "hello world 안녕하신가\n" + i := i + 20 ##ERROR: syntax error - prohibited binary operator - + +} + +--- + if ##ERROR: syntax error - no conditional expression after if ---