From 2dad89e2e968212683f3b22a0404832654c75c0c Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Thu, 18 Apr 2024 13:11:44 +0900 Subject: [PATCH] code update for hcl-x --- bin/main-c.c | 143 +++++++++++++++++++++++++++++++++------------------ lib/hcl-x.c | 55 ++------------------ 2 files changed, 97 insertions(+), 101 deletions(-) diff --git a/bin/main-c.c b/bin/main-c.c index 5ff158c..3d441f0 100644 --- a/bin/main-c.c +++ b/bin/main-c.c @@ -508,7 +508,6 @@ struct hcl_xproto_t hcl_xproto_rcv_state_t state; hcl_oow_t len_needed; unsigned int eof: 1; - unsigned int polled: 1; hcl_oow_t len; hcl_uint8_t buf[4096]; @@ -532,42 +531,36 @@ static int receive_raw_bytes (hcl_xproto_t* proto, int sck, hcl_ntime_t* idle_tm return -1; } - if (HCL_LIKELY(!proto->rcv.polled)) + tmout = idle_tmout? HCL_SECNSEC_TO_MSEC(idle_tmout->sec, idle_tmout->nsec): -1; + actual_tmout = (tmout <= 0)? 10000: tmout; + + pfd.fd = sck; + pfd.events = POLLIN | POLLERR; + pfd.revents = 0; + n = poll(&pfd, 1, actual_tmout); + if (n <= -1) { - tmout = idle_tmout? HCL_SECNSEC_TO_MSEC(idle_tmout->sec, idle_tmout->nsec): -1; - actual_tmout = (tmout <= 0)? 10000: tmout; - - pfd.fd = sck; - pfd.events = POLLIN | POLLERR; - pfd.revents = 0; - n = poll(&pfd, 1, actual_tmout); - if (n <= -1) + if (errno == EINTR) return 0; + hcl_seterrwithsyserr (hcl, 0, errno); + return -1; + } + else if (n == 0) + { + /* timed out - no activity on the pfd */ + if (tmout > 0) { - if (errno == EINTR) return 0; - hcl_seterrwithsyserr (hcl, 0, errno); - return -1; - } - else if (n == 0) - { - /* timed out - no activity on the pfd */ - if (tmout > 0) - { - /* timeout explicity set. no activity for that duration. considered idle */ - hcl_seterrbfmt (hcl, HCL_EGENERIC, "no activity on the socket %d", sck); - return -1; - } - - return 0; /* didn't read yet */ + /* timeout explicity set. no activity for that duration. considered idle */ + hcl_seterrbfmt (hcl, HCL_EGENERIC, "no activity on the socket %d", sck); + return -1; } - if (pfd.revents & POLLERR) - { - hcl_seterrbfmt (hcl, HCL_EGENERIC, "error condition detected on socket %d", sck); - return -1; - } - - proto->rcv.polled = 1; + return 0; /* didn't read yet */ + } + if (pfd.revents & POLLERR) + { + hcl_seterrbfmt (hcl, HCL_EGENERIC, "error condition detected on socket %d", sck); + return -1; } x = recv(sck, &proto->rcv.buf[proto->rcv.len], HCL_COUNTOF(proto->rcv.buf) - proto->rcv.len, 0); @@ -575,18 +568,59 @@ static int receive_raw_bytes (hcl_xproto_t* proto, int sck, hcl_ntime_t* idle_tm { if (errno == EINTR) return 0; /* didn't read read */ - proto->rcv.polled = 0; hcl_seterrwithsyserr (hcl, 0, errno); return -1; } - proto->rcv.polled = 0; if (x == 0) proto->rcv.eof = 1; proto->rcv.len += x; +//printf ("RECEIVED %d rcv.len %d rcv.len_needed %d [%.*s]\n", (int)x, (int)proto->rcv.len, (int)proto->rcv.len_needed, (int)proto->rcv.len, proto->rcv.buf); return 1; /* read some data */ } +static int handle_received_data (hcl_xproto_t* proto) +{ +//printf ("HANDLE RECIVED rcv.len %d rcv.len_needed %d [%.*s]\n", (int)proto->rcv.len, (int)proto->rcv.len_needed, (int)proto->rcv.len, proto->rcv.buf); + switch (proto->rcv.state) + { + case HCL_XPROTO_RCV_HEADER: + if (proto->rcv.len < HCL_SIZEOF(proto->rcv.hdr)) return 0; /* need more data */ + + memcpy (&proto->rcv.hdr, proto->rcv.buf, HCL_SIZEOF(proto->rcv.hdr)); + //proto->rcv.hdr.len = hcl_ntoh16(proto->rcv.hdr.len); /* keep this in the host byte order */ + + /* consume the header */ + memmove (proto->rcv.buf, &proto->rcv.buf[HCL_SIZEOF(proto->rcv.hdr)], proto->rcv.len - HCL_SIZEOF(proto->rcv.hdr)); + proto->rcv.len -= HCL_SIZEOF(proto->rcv.hdr); + + /* switch to the payload mode */ + proto->rcv.state = HCL_XPROTO_RCV_PAYLOAD; + proto->rcv.len_needed = proto->rcv.hdr.len; + return 0; + + case HCL_XPROTO_RCV_PAYLOAD: + if (proto->rcv.len < proto->rcv.hdr.len) return 0; /* need more payload data */ + + if (proto->rcv.hdr.type == HCL_XPKT_STDOUT) + { + if (proto->rcv.hdr.len > 0) + fprintf (stdout, "%.*s", (int)proto->rcv.hdr.len, proto->rcv.buf); + } + + if (proto->rcv.hdr.len > 0) + { + memmove (proto->rcv.buf, &proto->rcv.buf[proto->rcv.hdr.len], proto->rcv.len - proto->rcv.hdr.len); + proto->rcv.len -= proto->rcv.hdr.len; + } + proto->rcv.state = HCL_XPROTO_RCV_HEADER; + proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr); + break; + } + + return 1; +} + static int handle_request (hcl_client_t* client, const char* ipaddr, const char* script, int reuse_addr, int shut_wr_after_req) { hcl_sckaddr_t sckaddr; @@ -601,7 +635,8 @@ static int handle_request (hcl_client_t* client, const char* ipaddr, const char* const char* scptr; const char* sccur; - hcl_xproto_t proto; + hcl_xproto_t proto_buf; + hcl_xproto_t* proto = &proto_buf; client_xtn_t* client_xtn; @@ -614,7 +649,7 @@ static int handle_request (hcl_client_t* client, const char* ipaddr, const char* goto oops; } - sck = socket (sckfam, SOCK_STREAM, 0); + sck = socket(sckfam, SOCK_STREAM, 0); if (sck <= -1) { fprintf (stderr, "cannot create a socket for %s - %s\n", ipaddr, strerror(errno)); @@ -652,12 +687,13 @@ static int handle_request (hcl_client_t* client, const char* ipaddr, const char* /* TODO: create hcl_xproto_open... */ - memset (&proto, 0, HCL_SIZEOF(proto)); - proto.hcl = hcl_openstdwithmmgr(hcl_client_getmmgr(client), 0, HCL_NULL); // TODO: - proto.rcv.state = HCL_XPROTO_RCV_HEADER; - proto.rcv.len_needed = HCL_SIZEOF(proto.rcv.hdr); - proto.rcv.eof = 0; - proto.rcv.polled = 0; + memset (proto, 0, HCL_SIZEOF(*proto)); + proto->hcl = hcl_openstdwithmmgr(hcl_client_getmmgr(client), 0, HCL_NULL); // TODO: + proto->rcv.state = HCL_XPROTO_RCV_HEADER; + proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr); + proto->rcv.eof = 0; +// TODO: destroy xproto and data upon termination. + scptr = sccur = script; while (1) @@ -665,7 +701,7 @@ static int handle_request (hcl_client_t* client, const char* ipaddr, const char* struct pollfd pfd; pfd.fd = sck; - pfd.events = POLLIN | POLLERR; + pfd.events = POLLIN; if (*sccur != '\0') pfd.events |= POLLOUT; pfd.revents = 0; @@ -708,23 +744,32 @@ static int handle_request (hcl_client_t* client, const char* ipaddr, const char* scptr = sccur; - if (!sccur == '\0') + if (*sccur == '\0') { hdr.type = HCL_XPKT_EXECUTE; hdr.id = 1; /* TODO: */ - hdr.len = sccur - scptr; + hdr.len = 0; iov[0].iov_base = &hdr; iov[0].iov_len = HCL_SIZEOF(hdr); send_iov (sck, iov, 1); + + if (shut_wr_after_req) shutdown (sck, SHUT_WR); } - if (*sccur == '\0' && shut_wr_after_req) shutdown (sck, SHUT_WR); } if (pfd.revents & POLLIN) { -printf ("receiving...\n"); - if (receive_raw_bytes(&proto, sck, HCL_NULL) <= -1) break; +//printf ("receiving...\n"); + if (receive_raw_bytes(proto, sck, HCL_NULL) <= -1) break; + } + + while (/*proto->rcv.len > 0 &&*/ proto->rcv.len >= proto->rcv.len_needed) + { + if (handle_received_data(proto) <= -1) + { + goto oops; + } } } @@ -851,7 +896,7 @@ int main (int argc, char* argv[]) set_signal (SIGINT, handle_sigint); set_signal_to_ignore (SIGPIPE); - n = handle_request (client, argv[opt.ind], argv[opt.ind + 1], reuse_addr, shut_wr_after_req); + n = handle_request(client, argv[opt.ind], argv[opt.ind + 1], reuse_addr, shut_wr_after_req); set_signal_to_default (SIGINT); set_signal_to_default (SIGPIPE); diff --git a/lib/hcl-x.c b/lib/hcl-x.c index d8a75f8..f7b4e80 100644 --- a/lib/hcl-x.c +++ b/lib/hcl-x.c @@ -609,7 +609,6 @@ printf ("IO WRITE SOMETHING...........\n"); hcl_io_udoarg_t* outarg = (hcl_io_udoarg_t*)arg; printf ("IO WRITE SOMETHING BYTES...........\n"); -#if 0 if (send_stdout_bytes(xtn->proto, outarg->ptr, outarg->len) <= -1) { /* TODO: change error code and message. propagage the errormessage from proto */ @@ -620,7 +619,6 @@ printf ("IO WRITE SOMETHING BYTES...........\n"); hcl_abort (hcl); return -1; } -#endif outarg->xlen = outarg->len; return 0; } @@ -801,53 +799,6 @@ void hcl_server_proto_close (hcl_server_proto_t* proto) hcl_server_freemem (proto->worker->server, proto); } -static int write_stdout (hcl_server_proto_t* proto, const hcl_bch_t* ptr, hcl_oow_t len) -{ - struct msghdr msg; - struct iovec iov[3]; - hcl_bch_t cl[16]; /* ensure that this is large enough for the chunk length string */ - int index = 0, count = 0; - hcl_xpkt_hdr_t hdr; - - - hdr.type = HCL_XPKT_STDOUT; - hdr.id = 1; // TODO: - hdr.len = hcl_hton16(len); - - iov[0].iov_base = &hdr; - iov[0].iov_len = HCL_SIZEOF(hdr); - iov[1].iov_base = (hcl_bch_t*)ptr; - iov[1].iov_len = len; - - while (1) - { - ssize_t nwritten; - - HCL_MEMSET (&msg, 0, HCL_SIZEOF(msg)); - msg.msg_iov = (struct iovec*)&iov[index]; - msg.msg_iovlen = count - index; - nwritten = sendmsg(proto->worker->sck, &msg, 0); - /*nwritten = writev(proto->worker->sck, (const struct iovec*)&iov[index], count - index);*/ - if (nwritten <= -1) - { - /* error occurred inside the worker thread shouldn't affect the error information - * in the server object. so here, i just log a message */ - HCL_LOG2 (proto->hcl, SERVER_LOGMASK_ERROR, "Unable to sendmsg on %d - %hs\n", proto->worker->sck, strerror(errno)); - return -1; - } - - while (index < count && (size_t)nwritten >= iov[index].iov_len) - nwritten -= iov[index++].iov_len; - - if (index == count) break; - - iov[index].iov_base = (void*)((hcl_uint8_t*)iov[index].iov_base + nwritten); - iov[index].iov_len -= nwritten; - } - - return 0; -} - static void exec_runtime_handler (hcl_tmr_t* tmr, const hcl_ntime_t* now, hcl_tmr_event_t* evt) { /* [NOTE] this handler is executed in the main server thread @@ -1151,7 +1102,7 @@ static int handle_received_data (hcl_server_proto_t* proto) if (proto->rcv.len < HCL_SIZEOF(proto->rcv.hdr)) return 0; /* need more data */ HCL_MEMCPY (&proto->rcv.hdr, proto->rcv.buf, HCL_SIZEOF(proto->rcv.hdr)); - //proto->rcv.hdr.len = hcl_ntoh16(proto->rcv.hdr.len); /* keep this in the host byte order */ + /*proto->rcv.hdr.len = hcl_ntoh16(proto->rcv.hdr.len);*/ /* keep this in the host byte order */ /* consume the header */ HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[HCL_SIZEOF(proto->rcv.hdr)], proto->rcv.len - HCL_SIZEOF(proto->rcv.hdr)); @@ -1276,6 +1227,7 @@ static int send_stdout_bytes (hcl_server_proto_t* proto, const hcl_bch_t* data, ptr = cur = data; end = data + len; +printf ("SENDING BYTES [%.*s]\n", (int)len, data); while (ptr < end) { while (cur != end && cur - ptr < 255) cur++; @@ -1729,9 +1681,8 @@ static int worker_step (hcl_server_worker_t* worker) } } - /* the receiver buffer has enough data */ - while (proto->rcv.len >= proto->rcv.len_needed) + while (/*proto->rcv.len > 0 && */proto->rcv.len >= proto->rcv.len_needed) { if (handle_received_data(proto) <= -1) {