From 31cd79beb407ff435061e95bef4c80c622794a2b Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Mon, 29 Apr 2024 19:01:00 +0900 Subject: [PATCH] refactoring x-client.c - code broken as of this commit --- bin/hclx.c | 1 + lib/hcl-x.h | 8 + lib/x-client.c | 400 +++++++++++++++++++++++++++++++++++++------------ lib/x-server.c | 81 ++++++---- 4 files changed, 362 insertions(+), 128 deletions(-) diff --git a/bin/hclx.c b/bin/hclx.c index 3b569a0..b295989 100644 --- a/bin/hclx.c +++ b/bin/hclx.c @@ -896,6 +896,7 @@ static int json_inst_cb (hcl_json_t* json, hcl_json_inst_t it, const hcl_oocs_t* return 0; } + int json_main (const char* outer, int argc, char* argv[]) { hcl_json_t* json; diff --git a/lib/hcl-x.h b/lib/hcl-x.h index f550dff..d3c241e 100644 --- a/lib/hcl-x.h +++ b/lib/hcl-x.h @@ -486,6 +486,14 @@ HCL_EXPORT int hcl_sys_send_iov ( int count ); +HCL_EXPORT int hcl_sys_open_pipes ( + int pfd[2] +); + +HCL_EXPORT void hcl_sys_close_pipes ( + int pfd[2] +); + #if defined(__cplusplus) } #endif diff --git a/lib/x-client.c b/lib/x-client.c index 98f0ac0..25c9556 100644 --- a/lib/x-client.c +++ b/lib/x-client.c @@ -105,6 +105,7 @@ struct hcl_client_t hcl_ooch_t buf[HCL_ERRMSG_CAPA]; hcl_oow_t len; } errmsg; + int stopreq; struct { @@ -114,6 +115,15 @@ struct hcl_client_t int sck; hcl_xproto_t* proto; + + int mux_pipe[2]; /* pipe to break the blocking multiplexer in the main server loop */ + + struct + { + hcl_bch_t buf[4096]; + hcl_oow_t pos; + hcl_oow_t len; + } script; }; @@ -130,9 +140,10 @@ static void client_log_write_for_dummy (hcl_t* hcl, hcl_bitmask_t mask, const hc hcl_client_t* hcl_client_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_client_prim_t* prim, hcl_errnum_t* errnum) { - hcl_client_t* client; - hcl_t* hcl; + hcl_client_t* client = HCL_NULL; + hcl_t* hcl = HCL_NULL; client_hcl_xtn_t* xtn; + int pfd[2]; client = (hcl_client_t*)HCL_MMGR_ALLOC(mmgr, HCL_SIZEOF(*client) + xtnsize); if (HCL_UNLIKELY(!client)) @@ -148,6 +159,12 @@ hcl_client_t* hcl_client_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_client_p return HCL_NULL; } + if (hcl_sys_open_pipes(pfd) <= -1) + { + if (errnum) *errnum = hcl->vmprim.syserrstrb(hcl, 0, errno, HCL_NULL, 0); + goto oops; + } + /* replace the vmprim.log_write function */ hcl->vmprim.log_write = client_log_write_for_dummy; @@ -161,6 +178,8 @@ hcl_client_t* hcl_client_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_client_p client->prim = *prim; client->dummy_hcl = hcl; client->sck = -1; + client->mux_pipe[0] = pfd[0]; + client->mux_pipe[1] = pfd[1]; client->cfg.logmask = ~(hcl_bitmask_t)0; @@ -171,12 +190,20 @@ hcl_client_t* hcl_client_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_client_p hcl_setcmgr (client->dummy_hcl, client->_cmgr); return client; + +oops: + /* NOTE: pipe should be closed if jump to here is made after pipe() above */ + if (hcl) hcl_close (hcl); + if (client) HCL_MMGR_FREE (mmgr, client); + return HCL_NULL; } void hcl_client_close (hcl_client_t* client) { if (client->proto) hcl_xproto_close (client->proto); if (client->sck >= 0) close (client->sck); + + hcl_sys_close_pipes(client->mux_pipe); hcl_close (client->dummy_hcl); HCL_MMGR_FREE (client->_mmgr, client); } @@ -493,25 +520,274 @@ static void client_close (hcl_client_t* client) } } + +#if 0 +static int client_send_code_to_server (hcl_client_t* client, const hcl_bch_t* ptr, hcl_oow_t len) +{ + hcl_xpkt_hdr_t hdr; + struct iovec iov[2]; + hcl_uint16_t seglen; + + + while (*sccur != '\0' && sccur - scptr < HCL_XPKT_MAX_PLD_LEN) sccur++; + + seglen = sccur - scptr; + + hdr.id = 1; /* TODO: */ + hdr.type = HCL_XPKT_CODE | (((seglen >> 8) & 0x0F) << 4); + hdr.len = seglen & 0xFF; + + iov[0].iov_base = &hdr; + iov[0].iov_len = HCL_SIZEOF(hdr); + iov[1].iov_base = scptr; + iov[1].iov_len = seglen; + + hcl_sys_send_iov (client->sck, iov, 2); /* TODO: error check */ + + scptr = sccur; +} + + +static int client_send_code_to_server (hcl_client_t* client, hcl_iovec_t* iov, hcl_oow_t iovcnt) +{ + hcl_oow_t urem, len; + hcl_oow_t index = 0, i; + int backup_index = -1, dcnt; + hcl_iovec_t backup; + + + len = 0; + for (i = 0; i < iovcnt; i++) len += iov[i].iov_len; + urem = len; + + + if (client->cwq) + { + goto enqueue; + } + + do + { + dcnt = iovcnt - index; + + memset (&msg, 0, HCL_SIZEOF(msg)); + msg.msg_iov = (struct iovec*)&iov[index]; + msg.msg_iovlen = &dcnt; + + sendmsg(xxx, &msg, 0); + + if (x <= -1) return -1; + else if (x == 0) goto enqueue; + + urem -= dcnt; + while (index < iovcnt && (hio_oow_t)dcnt >= iov[index].iov_len) + dcnt -= iov[index++].iov_len; + + if (index == iovcnt) break; + + if (backup_index != index) + { + if (backup_index >= 0) iov[backup_index] = backup; + backup = iov[index]; + backup_index = index; + } + + iov[index].iov_ptr = (void*)((hio_uint8_t*)iov[index].iov_ptr + dcnt); + iov[index].iov_len -= dcnt; + } + while (1); + + if (backup_index >= 0) iov[backup_index] = backup; + if (iovcnt <= 0) + { + } + + + +/* + while (1) + { + ssize_t nwritten; + struct msghdr msg; + + memset (&msg, 0, HCL_SIZEOF(msg)); + msg.msg_iov = (struct iovec*)&iov[index]; + msg.msg_iovlen = count - index; + nwritten = sendmsg(sck, &msg, 0); + if (nwritten <= -1) return -1; + + while (index < count && (size_t)nwritten >= iov[index].iov_len) + nwritten -= iov[index++].iov_len; + + if (index == count) break; + + iov[index].iov_base = (void*)((hcl_uint8_t*)iov[index].iov_base + nwritten); + iov[index].iov_len -= nwritten; + } +*/ + + return 0; +} +#endif +/* ========================================================================= */ + +static int on_control_event (hcl_client_t* client, struct pollfd* pfd) +{ + char tmp[128]; + while (read(client->mux_pipe[0], tmp, HCL_SIZEOF(tmp)) > 0) /* nothing */; +/* TODO: handle different command? */ +} + + +static int on_server_event (hcl_client_t* client, struct pollfd* pfd, int shut_wr_after_req) +{ + + if (pfd->revents & POLLERR) + { + hcl_client_seterrbfmt (client, HCL_ESYSERR, "error condition detected on %d", client->sck); + goto oops; + } + + if (pfd->revents & POLLOUT) + { + hcl_xpkt_hdr_t hdr; + struct iovec iov[2]; + hcl_uint16_t seglen; + + while (*sccur != '\0' && sccur - scptr < HCL_XPKT_MAX_PLD_LEN) sccur++; + + seglen = sccur - scptr; + + hdr.id = 1; /* TODO: */ + hdr.type = HCL_XPKT_CODE | (((seglen >> 8) & 0x0F) << 4); + hdr.len = seglen & 0xFF; + + iov[0].iov_base = &hdr; + iov[0].iov_len = HCL_SIZEOF(hdr); + iov[1].iov_base = scptr; + iov[1].iov_len = seglen; + + hcl_sys_send_iov (client->sck, iov, 2); /* TODO: error check */ + + scptr = sccur; + + if (*sccur == '\0') + { + hdr.id = 1; /* TODO: */ + hdr.type = HCL_XPKT_EXECUTE; + hdr.len = 0; + + iov[0].iov_base = &hdr; + iov[0].iov_len = HCL_SIZEOF(hdr); + hcl_sys_send_iov (client->sck, iov, 1); + + if (shut_wr_after_req) + { + shutdown (client->sck, SHUT_WR); + } + else + { + hdr.type = HCL_XPKT_DISCONNECT; + hdr.id = 1; /* TODO: */ + hdr.len = 0; + + iov[0].iov_base = &hdr; + iov[0].iov_len = HCL_SIZEOF(hdr); + hcl_sys_send_iov (client->sck, iov, 1); + } + } + } + + if (pfd->revents & POLLIN) + { + hcl_oow_t bcap; + hcl_uint8_t* bptr; + ssize_t x; + + bptr = hcl_xproto_getbuf(client->proto, &bcap);; + x = recv(client->sck, bptr, bcap, 0); + if (x <= -1) + { + if (errno == EINTR) goto carry_on; /* didn't read read */ + /*hcl_seterrwithsyserr (hcl, 0, errno); */ + /* TODO: error info set... */ + return -1; + } + if (x == 0) hcl_xproto_seteof(client->proto, 1); + hcl_xproto_advbuf (client->proto, x); + } + + +carry_on: + while (hcl_xproto_ready(client->proto)) + { + int n; + + if ((n = hcl_xproto_process(client->proto)) <= -1) + { + /* TODO: proper error message */ + return -1; + } + if (n == 0) + { + /* TODO: chceck if there is remaining data in the buffer...?? */ + printf ("NO MORE DATA. EXITING...\n"); + goto done; + } + } + + if (hcl_xproto_geteof(client->proto)) break; +} + +static int on_script_event (hcl_client_t* client, struct pollfd* pfd) +{ + ssize_t n; + hcl_uint8_t buf[128]; + + n = read(pfd.fd, buf, HCL_SIZEOF(buf)); + if (n <= -1) + { + } + else if (n == 0) + { + } + else + { + } +} + int hcl_client_start (hcl_client_t* client, const char* ipaddr, const char* script, int reuse_addr, int shut_wr_after_req) { - int x; - ssize_t n; const char* scptr, * sccur; + int script_fd = STDIN_FILENO; if (client_connect(client, ipaddr, reuse_addr) <= -1) return -1; scptr = sccur = script; while (1) { - struct pollfd pfd; + ssize_t n, i; + struct pollfd pfd[3]; - pfd.fd = client->sck; - pfd.events = POLLIN; - if (*sccur != '\0') pfd.events |= POLLOUT; - pfd.revents = 0; + n = 0; - n = poll(&pfd, 1, 1000); + pfd[n].fd = client->mux_pipe[0]; + pfd[n].events = POLLIN; + pfd[n++].revents = 0; + + pfd[n].fd = client->sck; + pfd[n].events = POLLIN; + if (*sccur != '\0') pfd[n].events |= POLLOUT; + pfd[n++].revents = 0; + + if (script_fd >= 0) + { + pfd[n].fd = script_fd; + pfd[n].events = POLLIN; + pfd[n++].revents = 0; + } + + n = poll(pfd, n, 1000); if (n <= -1) { hcl_client_seterrbfmt (client, HCL_ESYSERR, "poll error on %d - %hs", client->sck, strerror(n)); @@ -524,101 +800,27 @@ int hcl_client_start (hcl_client_t* client, const char* ipaddr, const char* scri continue; } - if (pfd.revents & POLLERR) + + for (i = 0; i < n; i++) { - hcl_client_seterrbfmt (client, HCL_ESYSERR, "error condition detected on %d", client->sck); - goto oops; - } - - if (pfd.revents & POLLOUT) - { - hcl_xpkt_hdr_t hdr; - struct iovec iov[2]; - hcl_uint16_t seglen; - - while (*sccur != '\0' && sccur - scptr < HCL_XPKT_MAX_PLD_LEN) sccur++; - - seglen = sccur - scptr; - - hdr.id = 1; /* TODO: */ - hdr.type = HCL_XPKT_CODE | (((seglen >> 8) & 0x0F) << 4); - hdr.len = seglen & 0xFF; - - iov[0].iov_base = &hdr; - iov[0].iov_len = HCL_SIZEOF(hdr); - iov[1].iov_base = scptr; - iov[1].iov_len = seglen; - - hcl_sys_send_iov (client->sck, iov, 2); /* TODO: error check */ - - scptr = sccur; - - if (*sccur == '\0') + if (pfd[i].fd == client->mux_pipe[0]) { - hdr.id = 1; /* TODO: */ - hdr.type = HCL_XPKT_EXECUTE; - hdr.len = 0; - - iov[0].iov_base = &hdr; - iov[0].iov_len = HCL_SIZEOF(hdr); - hcl_sys_send_iov (client->sck, iov, 1); - - if (shut_wr_after_req) - { - shutdown (client->sck, SHUT_WR); - } - else - { - hdr.type = HCL_XPKT_DISCONNECT; - hdr.id = 1; /* TODO: */ - hdr.len = 0; - - iov[0].iov_base = &hdr; - iov[0].iov_len = HCL_SIZEOF(hdr); - hcl_sys_send_iov (client->sck, iov, 1); - } + on_control_event (client, &pfd[i]); + } + else if (pfd[i].fd == client->sck) + { + /* event from the server */ + on_server_event (client, &pfd[i], shut_wr_after_req); + } + else if (pfd[i].fd == script_fd) + { + on_script_event (client, &pfd[i]); } } - if (pfd.revents & POLLIN) - { - hcl_oow_t bcap; - hcl_uint8_t* bptr; - - bptr = hcl_xproto_getbuf(client->proto, &bcap);; - x = recv(client->sck, bptr, bcap, 0); - if (x <= -1) - { - if (errno == EINTR) goto carry_on; /* didn't read read */ - /*hcl_seterrwithsyserr (hcl, 0, errno); */ - /* TODO: error info set... */ - return -1; - } - if (x == 0) hcl_xproto_seteof(client->proto, 1); - hcl_xproto_advbuf (client->proto, x); - } - - - carry_on: - while (hcl_xproto_ready(client->proto)) - { - if ((n = hcl_xproto_process(client->proto)) <= -1) - { - /* TODO: proper error message */ - return -1; - } - if (n == 0) - { - /* TODO: chceck if there is remaining data in the buffer...?? */ - printf ("NO MORE DATA. EXITING...\n"); - goto done; - } - } - - if (hcl_xproto_geteof(client->proto)) break; } -done: +done: /* TODO: we can check if the buffer has all been consumed. if not, there is trailing garbage.. */ /*{ struct linger linger; @@ -640,4 +842,6 @@ void hcl_client_stop (hcl_client_t* client) { /* TODO: */ /* TODO: break the cleint loop */ + client->stopreq = 1; + write (client->mux_pipe[1], "Q", 1); /* don't care about failure */ } diff --git a/lib/x-server.c b/lib/x-server.c index 5be9404..0b522c4 100644 --- a/lib/x-server.c +++ b/lib/x-server.c @@ -1086,38 +1086,12 @@ hcl_server_t* hcl_server_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_server_p goto oops; } - if (pipe(pfd) <= -1) + if (hcl_sys_open_pipes(pfd) <= -1) { if (errnum) *errnum = hcl->vmprim.syserrstrb(hcl, 0, errno, HCL_NULL, 0); goto oops; } -#if defined(O_NONBLOCK) || defined(O_CLOEXEC) - fcv = fcntl(pfd[0], F_GETFD, 0); - if (fcv >= 0) - { - #if defined(O_NONBLOCK) - fcv |= O_NONBLOCK; - #endif - #if defined(O_CLOEXEC) - fcv |= O_CLOEXEC; - #endif - fcntl(pfd[0], F_SETFD, fcv); - } - - fcv = fcntl(pfd[1], F_GETFD, 0); - if (fcv >= 0) - { - #if defined(O_NONBLOCK) - fcv |= O_NONBLOCK; - #endif - #if defined(O_CLOEXEC) - fcv |= O_CLOEXEC; - #endif - fcntl(pfd[1], F_SETFD, fcv); - } -#endif - xtn = (server_hcl_xtn_t*)hcl_getxtn(hcl); xtn->server = server; @@ -1167,7 +1141,6 @@ oops: if (tmr) hcl_tmr_close (tmr); if (hcl) hcl_close (hcl); if (server) HCL_MMGR_FREE (mmgr, server); - return HCL_NULL; } @@ -1189,8 +1162,7 @@ void hcl_server_close (hcl_server_t* server) pthread_mutex_destroy (&server->tmr_mutex); pthread_mutex_destroy (&server->worker_mutex); - close (server->mux_pipe[0]); - close (server->mux_pipe[1]); + hcl_sys_close_pipes (server->mux_pipe); hcl_tmr_close (server->tmr); hcl_close (server->dummy_hcl); @@ -2269,3 +2241,52 @@ int hcl_sys_send_iov (int sck, hcl_iovec_t* iov, int count) return 0; } + +int hcl_sys_open_pipes (int pfd[2]) +{ + int fcv; + + if (pipe(pfd) <= -1) return -1; + +#if defined(O_NONBLOCK) || defined(O_CLOEXEC) + fcv = fcntl(pfd[0], F_GETFD, 0); + if (fcv >= 0) + { + #if defined(O_NONBLOCK) + fcv |= O_NONBLOCK; + #endif + #if defined(O_CLOEXEC) + fcv |= O_CLOEXEC; + #endif + fcntl(pfd[0], F_SETFD, fcv); + } + + fcv = fcntl(pfd[1], F_GETFD, 0); + if (fcv >= 0) + { + #if defined(O_NONBLOCK) + fcv |= O_NONBLOCK; + #endif + #if defined(O_CLOEXEC) + fcv |= O_CLOEXEC; + #endif + fcntl(pfd[1], F_SETFD, fcv); + } +#endif + + return 0; +} + +void hcl_sys_close_pipes (int pfd[2]) +{ + if (pfd[0] >= 0) + { + close (pfd[0]); + pfd[0] = -1; + } + if (pfd[1] >= 0) + { + close (pfd[1]); + pfd[1] = -1; + } +}