From 5ae3cb1eba29343d7bd89d592b1294771ea20d94 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Sat, 11 May 2024 14:16:22 +0900 Subject: [PATCH] wip - x-client --- bin/hclx.c | 11 +- lib/hcl-utl.h | 2 +- lib/hcl-x.h | 8 +- lib/sys.c | 59 ++++-- lib/x-client.c | 504 ++++++++++++++++++++++++------------------------- lib/x-proto.c | 3 +- lib/x-server.c | 8 +- 7 files changed, 299 insertions(+), 296 deletions(-) diff --git a/bin/hclx.c b/bin/hclx.c index b295989..ab2a355 100644 --- a/bin/hclx.c +++ b/bin/hclx.c @@ -730,7 +730,6 @@ static int client_main (const char* outer, int argc, char* argv[]) static hcl_bopt_lng_t lopt[] = { { ":log", 'l' }, - { "reuseaddr", '\0' }, { "shutwr", '\0' }, { HCL_NULL, '\0' } }; @@ -745,7 +744,6 @@ static int client_main (const char* outer, int argc, char* argv[]) hcl_client_prim_t client_prim; int n; const char* logopt = HCL_NULL; - int reuse_addr = 0; int shut_wr_after_req = 0; setlocale (LC_ALL, ""); @@ -756,7 +754,6 @@ static int client_main (const char* outer, int argc, char* argv[]) fprintf (stderr, "Usage: %s %s [options] bind-address:port script-to-run\n", outer, argv[0]); fprintf (stderr, "Options are:\n"); fprintf (stderr, " -l/--log log-options\n"); - fprintf (stderr, " --reuseaddr\n"); fprintf (stderr, " --shutwr\n"); return -1; } @@ -770,11 +767,7 @@ static int client_main (const char* outer, int argc, char* argv[]) break; case '\0': - if (hcl_comp_bcstr(opt.lngopt, "reuseaddr") == 0) - { - reuse_addr = 1; - } - else if (hcl_comp_bcstr(opt.lngopt, "shutwr") == 0) + if (hcl_comp_bcstr(opt.lngopt, "shutwr") == 0) { shut_wr_after_req = 1; } @@ -829,7 +822,7 @@ static int client_main (const char* outer, int argc, char* argv[]) set_signal (SIGINT, handle_sigint); set_signal_to_ignore (SIGPIPE); - n = hcl_client_start(client, argv[opt.ind], argv[opt.ind + 1], reuse_addr, shut_wr_after_req); + n = hcl_client_start(client, argv[opt.ind], /*argv[opt.ind + 1],*/ shut_wr_after_req); if (n <= -1) { fprintf (stderr, "ERROR: %s\n", hcl_client_geterrbmsg(client)); diff --git a/lib/hcl-utl.h b/lib/hcl-utl.h index 5eed634..b7047dd 100644 --- a/lib/hcl-utl.h +++ b/lib/hcl-utl.h @@ -944,7 +944,7 @@ HCL_EXPORT void hcl_sub_ntime ( ); /* ========================================================================= - * PATH NAME + * PATH NAME * ========================================================================= */ const hcl_bch_t* hcl_get_base_name_from_bcstr_path ( diff --git a/lib/hcl-x.h b/lib/hcl-x.h index ff7e7bf..bcec8d3 100644 --- a/lib/hcl-x.h +++ b/lib/hcl-x.h @@ -318,8 +318,6 @@ HCL_EXPORT void hcl_client_close ( HCL_EXPORT int hcl_client_start ( hcl_client_t* client, const char* ipaddr, - const char* script, - int reuse_addr, int shut_wr_after_req ); @@ -480,6 +478,12 @@ HCL_EXPORT int hcl_xproto_process ( /* ---------------------------------------------------------------------- */ +HCL_EXPORT int hcl_sys_send ( + int sck, + const void* data, + hcl_oow_t* size /* [IN] number of bytes to write, [OUT] number of bytes written */ +); + HCL_EXPORT int hcl_sys_send_iov ( int sck, hcl_iovec_t* iov, /* note this is not read-only and can change */ diff --git a/lib/sys.c b/lib/sys.c index 35f6a76..d8378b0 100644 --- a/lib/sys.c +++ b/lib/sys.c @@ -24,7 +24,8 @@ */ #include -# +#include "hcl-prv.h" + #include #include #include @@ -51,28 +52,41 @@ # if defined(HAVE_SYS_TIME_H) # include # endif -# if defined(HAVE_SIGNAL_H) -# include -# endif -# if defined(HAVE_SYS_MMAN_H) -# include -# endif # if defined(HAVE_SYS_UIO_H) # include # endif -# if defined(HAVE_SYS_EPOLL_H) -# include -# define USE_EPOLL -# endif # include # include # include # include -# include -# include #endif +int hcl_sys_send (int sck, const void* data, hcl_oow_t* size) +{ + ssize_t n, seglen; + hcl_oow_t rem; + + rem = *size; + while (rem > 0) + { + seglen = (rem > HCL_TYPE_MAX(ssize_t))? HCL_TYPE_MAX(ssize_t): rem; + n = send(sck, data, seglen, 0); + if (n <= -1) + { + if (hcl_sys_is_errno_wb(errno)) break; + *size -= rem; /* update the size to the bytes sent so far upon failure*/ + return -1; + } + + /* i'm always paranoid about 0 returned by send. is it ever possible? */ + rem -= n; + } + + *size -= rem; /* update the size to the bytes sent */ + return 0; +} + int hcl_sys_send_iov (int sck, hcl_iovec_t* iov, int count) { int index = 0; @@ -82,14 +96,26 @@ int hcl_sys_send_iov (int sck, hcl_iovec_t* iov, int count) ssize_t nwritten; struct msghdr msg; - memset (&msg, 0, HCL_SIZEOF(msg)); + HCL_MEMSET (&msg, 0, HCL_SIZEOF(msg)); msg.msg_iov = (struct iovec*)&iov[index]; msg.msg_iovlen = count - index; nwritten = sendmsg(sck, &msg, 0); - if (nwritten <= -1) return -1; + if (nwritten <= -1) + { + if (hcl_sys_is_errno_wb(errno)) + { + /* the incompelete write. the caller shall check the return code + * and iov_len at the last written iov slot. */ + break; + } + return -1; + } while (index < count && (size_t)nwritten >= iov[index].iov_len) + { + iov[index].iov_len = 0; /* this slot has been fully written */ nwritten -= iov[index++].iov_len; + } if (index == count) break; @@ -97,9 +123,10 @@ int hcl_sys_send_iov (int sck, hcl_iovec_t* iov, int count) iov[index].iov_len -= nwritten; } - return 0; + return index; } + int hcl_sys_open_pipes (int pfd[2], int nonblock) { /* TODO: mimic open_pipes() in std.c */ diff --git a/lib/x-client.c b/lib/x-client.c index 3c64534..f0b563a 100644 --- a/lib/x-client.c +++ b/lib/x-client.c @@ -1,4 +1,5 @@ /* + } Copyright (c) 2016-2018 Chung, Hyung-Hwan. All rights reserved. Redistribution and use in source and binary forms, with or without @@ -47,7 +48,6 @@ #elif defined(__DOS__) # include # include -# include #elif defined(macintosh) # include #else @@ -58,26 +58,15 @@ # if defined(HAVE_SYS_TIME_H) # include # endif -# if defined(HAVE_SIGNAL_H) -# include -# endif -# if defined(HAVE_SYS_MMAN_H) -# include -# endif # if defined(HAVE_SYS_UIO_H) # include # endif -# if defined(HAVE_SYS_EPOLL_H) -# include -# endif -# include -# include # include # include -# include # include # include +# include #endif struct client_hcl_xtn_t @@ -114,11 +103,29 @@ struct hcl_client_t hcl_bitmask_t logmask; } cfg; - int sck; - hcl_xproto_t* proto; - int mux_pipe[2]; /* pipe to break the blocking multiplexer in the main server loop */ + struct + { + int sck; + hcl_xproto_t* proto; + } remote; + + struct + { + int in; + int out; + int err; + + struct + { + hcl_uint8_t* ptr; + hcl_oow_t capa; + hcl_oow_t pos; + hcl_oow_t len; + } pw2r; /* pending write to the remote side */ + } local; + struct { hcl_bch_t buf[4096]; @@ -178,9 +185,12 @@ hcl_client_t* hcl_client_open (hcl_mmgr_t* mmgr, hcl_oow_t xtnsize, hcl_client_p client->_cmgr = hcl_get_utf8_cmgr(); client->prim = *prim; client->dummy_hcl = hcl; - client->sck = -1; client->mux_pipe[0] = pfd[0]; client->mux_pipe[1] = pfd[1]; + client->remote.sck = -1; + client->local.in = -1; + client->local.out = -1; + client->local.err = -1; client->cfg.logmask = ~(hcl_bitmask_t)0; @@ -199,10 +209,18 @@ oops: return HCL_NULL; } +static int is_stdio_fd (int fd) +{ + return fd == STDIN_FILENO || fd == STDOUT_FILENO || fd == STDERR_FILENO; +} + void hcl_client_close (hcl_client_t* client) { - if (client->proto) hcl_xproto_close (client->proto); - if (client->sck >= 0) close (client->sck); + if (client->remote.proto) hcl_xproto_close (client->remote.proto); + if (client->remote.sck >= 0) close (client->remote.sck); + if (client->local.in >= 0 && is_stdio_fd(client->local.in)) close (client->local.in); + if (client->local.out >= 0 && is_stdio_fd(client->local.out)) close (client->local.out); + if (client->local.err >= 0 && is_stdio_fd(client->local.err)) close (client->local.err); hcl_sys_close_pipes(client->mux_pipe); hcl_close (client->dummy_hcl); @@ -416,7 +434,7 @@ struct proto_xtn_t }; typedef struct proto_xtn_t proto_xtn_t; -static int client_connect (hcl_client_t* client, const char* ipaddr, int reuse_addr) +static int client_connect_to_server (hcl_client_t* client, const char* ipaddr) { hcl_sckaddr_t sckaddr; hcl_scklen_t scklen; @@ -443,37 +461,36 @@ static int client_connect (hcl_client_t* client, const char* ipaddr, int reuse_a hcl_sys_set_cloexec(sck, 1); - if (reuse_addr) +#if 0 + if (sckfam == AF_INET) { - if (sckfam == AF_INET) + struct sockaddr_in anyaddr; + int opt = 1; + setsockopt(sck, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)); + HCL_MEMSET (&anyaddr, 0, HCL_SIZEOF(anyaddr)); + anyaddr.sin_family = sckfam; + if (bind(sck, (struct sockaddr *)&anyaddr, scklen) <= -1) { - struct sockaddr_in anyaddr; - int opt = 1; - setsockopt(sck, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)); - memset (&anyaddr, 0, HCL_SIZEOF(anyaddr)); - anyaddr.sin_family = sckfam; - if (bind(sck, (struct sockaddr *)&anyaddr, scklen) <= -1) - { - hcl_client_seterrbfmt (client, HCL_ESYSERR, - "cannot bind socket %d - %hs", sck, strerror(errno)); - goto oops; - } - } - else if (sckfam == AF_INET6) - { - struct sockaddr_in6 anyaddr; - int opt = 1; - setsockopt(sck, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)); - memset (&anyaddr, 0, HCL_SIZEOF(anyaddr)); - anyaddr.sin6_family = sckfam; - if (bind(sck, (struct sockaddr *)&anyaddr, scklen) <= -1) - { - hcl_client_seterrbfmt (client, HCL_ESYSERR, - "cannot bind socket %d - %hs", sck, strerror(errno)); - goto oops; - } + hcl_client_seterrbfmt (client, HCL_ESYSERR, + "cannot bind socket %d - %hs", sck, strerror(errno)); + goto oops; } } + else if (sckfam == AF_INET6) + { + struct sockaddr_in6 anyaddr; + int opt = 1; + setsockopt(sck, SOL_SOCKET, SO_REUSEADDR, (char *)&opt, sizeof(opt)); + HCL_MEMSET (&anyaddr, 0, HCL_SIZEOF(anyaddr)); + anyaddr.sin6_family = sckfam; + if (bind(sck, (struct sockaddr *)&anyaddr, scklen) <= -1) + { + hcl_client_seterrbfmt (client, HCL_ESYSERR, + "cannot bind socket %d - %hs", sck, strerror(errno)); + goto oops; + } + } +#endif /* TODO: async connect? */ /* TODO: connect timeout */ @@ -486,7 +503,7 @@ static int client_connect (hcl_client_t* client, const char* ipaddr, int reuse_a hcl_sys_set_nonblock(sck, 1); /* make it nonblocking after connection has been established */ - memset (&proto, 0, HCL_SIZEOF(proto_cb)); + HCL_MEMSET (&proto, 0, HCL_SIZEOF(proto_cb)); proto_cb.on_packet = client->prim.on_packet; proto = hcl_xproto_open(hcl_client_getmmgr(client), &proto_cb, HCL_SIZEOF(*proto_xtn)); @@ -498,8 +515,8 @@ static int client_connect (hcl_client_t* client, const char* ipaddr, int reuse_a proto_xtn = hcl_xproto_getxtn(proto); proto_xtn->client = client; - client->sck = sck; - client->proto = proto; + client->remote.sck = sck; + client->remote.proto = proto; return 0; oops: @@ -511,158 +528,49 @@ oops: static void client_close (hcl_client_t* client) { - if (client->proto) + if (client->remote.proto) { - hcl_xproto_close (client->proto); - client->proto = HCL_NULL; + hcl_xproto_close (client->remote.proto); + client->remote.proto = HCL_NULL; } - if (client->sck >= 0) + if (client->remote.sck >= 0) { - close (client->sck); - client->sck = -1; + close (client->remote.sck); + client->remote.sck = -1; } } +static int client_add_to_local_pw2r (hcl_client_t* client, const hcl_uint8_t* ptr, hcl_oow_t len) +{ + if (client->local.pw2r.len >= client->local.pw2r.capa) + { + hcl_uint8_t* tmp; + hcl_oow_t newcapa; -#if 0 -static int client_send_code_to_server (hcl_client_t* client, const hcl_bch_t* ptr, hcl_oow_t len) + newcapa = HCL_ALIGN_POW2(client->local.pw2r.capa + len, 128); + tmp = hcl_client_reallocmem(client, client->local.pw2r.ptr, newcapa * HCL_SIZEOF(*client->local.pw2r.ptr)); + if (HCL_UNLIKELY(!tmp)) return -1; + + client->local.pw2r.capa = newcapa; + client->local.pw2r.ptr = tmp; + } + + HCL_MEMCPY (&client->local.pw2r.ptr[client->local.pw2r.len], ptr, len); + client->local.pw2r.len += len; + return 0; +} + +static int client_send_to_remote (hcl_client_t* client, int code, const hcl_uint8_t* ptr, hcl_oow_t len) { hcl_xpkt_hdr_t hdr; struct iovec iov[2]; hcl_uint16_t seglen; + int n, i; - - while (*sccur != '\0' && sccur - scptr < HCL_XPKT_MAX_PLD_LEN) sccur++; - - seglen = sccur - scptr; - - hdr.id = 1; /* TODO: */ - hdr.type = HCL_XPKT_CODE | (((seglen >> 8) & 0x0F) << 4); - hdr.len = seglen & 0xFF; - - iov[0].iov_base = &hdr; - iov[0].iov_len = HCL_SIZEOF(hdr); - iov[1].iov_base = scptr; - iov[1].iov_len = seglen; - - hcl_sys_send_iov (client->sck, iov, 2); /* TODO: error check */ - - scptr = sccur; -} - - -static int client_send_code_to_server (hcl_client_t* client, hcl_iovec_t* iov, hcl_oow_t iovcnt) -{ - hcl_oow_t urem, len; - hcl_oow_t index = 0, i; - int backup_index = -1, dcnt; - hcl_iovec_t backup; - - - len = 0; - for (i = 0; i < iovcnt; i++) len += iov[i].iov_len; - urem = len; - - - if (client->cwq) + while (len > 0) { - goto enqueue; - } - - do - { - dcnt = iovcnt - index; - - memset (&msg, 0, HCL_SIZEOF(msg)); - msg.msg_iov = (struct iovec*)&iov[index]; - msg.msg_iovlen = &dcnt; - - sendmsg(xxx, &msg, 0); - - if (x <= -1) return -1; - else if (x == 0) goto enqueue; - - urem -= dcnt; - while (index < iovcnt && (hio_oow_t)dcnt >= iov[index].iov_len) - dcnt -= iov[index++].iov_len; - - if (index == iovcnt) break; - - if (backup_index != index) - { - if (backup_index >= 0) iov[backup_index] = backup; - backup = iov[index]; - backup_index = index; - } - - iov[index].iov_ptr = (void*)((hio_uint8_t*)iov[index].iov_ptr + dcnt); - iov[index].iov_len -= dcnt; - } - while (1); - - if (backup_index >= 0) iov[backup_index] = backup; - if (iovcnt <= 0) - { - } - - - -/* - while (1) - { - ssize_t nwritten; - struct msghdr msg; - - memset (&msg, 0, HCL_SIZEOF(msg)); - msg.msg_iov = (struct iovec*)&iov[index]; - msg.msg_iovlen = count - index; - nwritten = sendmsg(sck, &msg, 0); - if (nwritten <= -1) return -1; - - while (index < count && (size_t)nwritten >= iov[index].iov_len) - nwritten -= iov[index++].iov_len; - - if (index == count) break; - - iov[index].iov_base = (void*)((hcl_uint8_t*)iov[index].iov_base + nwritten); - iov[index].iov_len -= nwritten; - } -*/ - - return 0; -} -#endif -/* ========================================================================= */ - -static int on_control_event (hcl_client_t* client, struct pollfd* pfd) -{ - char tmp[128]; - while (read(client->mux_pipe[0], tmp, HCL_SIZEOF(tmp)) > 0) /* nothing */; -/* TODO: handle different command? */ -} - - - -static int on_server_event (hcl_client_t* client, struct pollfd* pfd, int shut_wr_after_req) -{ - -#if 0 - if (pfd->revents & POLLERR) - { - hcl_client_seterrbfmt (client, HCL_ESYSERR, "error condition detected on %d", client->sck); - goto oops; - } - - if (pfd->revents & POLLOUT) - { - hcl_xpkt_hdr_t hdr; - struct iovec iov[2]; - hcl_uint16_t seglen; - - while (*sccur != '\0' && sccur - scptr < HCL_XPKT_MAX_PLD_LEN) sccur++; - - seglen = sccur - scptr; + seglen = (len > HCL_XPKT_MAX_PLD_LEN)? HCL_XPKT_MAX_PLD_LEN: len; hdr.id = 1; /* TODO: */ hdr.type = HCL_XPKT_CODE | (((seglen >> 8) & 0x0F) << 4); @@ -670,37 +578,68 @@ static int on_server_event (hcl_client_t* client, struct pollfd* pfd, int shut_w iov[0].iov_base = &hdr; iov[0].iov_len = HCL_SIZEOF(hdr); - iov[1].iov_base = scptr; + iov[1].iov_base = ptr; iov[1].iov_len = seglen; - hcl_sys_send_iov (client->sck, iov, 2); /* TODO: error check */ + n = hcl_sys_send_iov(client->remote.sck, iov, 2); + if (n <= -1) return -1; - scptr = sccur; - - if (*sccur == '\0') + if (n < 2 || iov[n - 1].iov_len > 0) { - hdr.id = 1; /* TODO: */ - hdr.type = HCL_XPKT_EXECUTE; - hdr.len = 0; - - iov[0].iov_base = &hdr; - iov[0].iov_len = HCL_SIZEOF(hdr); - hcl_sys_send_iov (client->sck, iov, 1); - - if (shut_wr_after_req) + /* the write isn't completed. */ + for (i = n; i < 2 ; i++) { - shutdown (client->sck, SHUT_WR); + if (iov[i].iov_len > 0) + { + /* pending write... */ + if (client_add_to_local_pw2r(client, iov[i].iov_base, iov[i].iov_len) <= -1) return -1; + } } - else - { - hdr.type = HCL_XPKT_DISCONNECT; - hdr.id = 1; /* TODO: */ - hdr.len = 0; - iov[0].iov_base = &hdr; - iov[0].iov_len = HCL_SIZEOF(hdr); - hcl_sys_send_iov (client->sck, iov, 1); - } + break; + } + + ptr += seglen; + len -= seglen; + } + + return 0; +} + +/* ========================================================================= */ + +static void on_control_event (hcl_client_t* client, struct pollfd* pfd) +{ + char tmp[128]; +hcl_client_logbfmt(client, HCL_LOG_STDERR, "ON CONTROL EVENT \n"); + while (read(client->mux_pipe[0], tmp, HCL_SIZEOF(tmp)) > 0) /* nothing */; +/* TODO: handle different command? */ +} + +static void on_remote_event (hcl_client_t* client, struct pollfd* pfd, int shut_wr_after_req) +{ +hcl_client_logbfmt(client, HCL_LOG_STDERR, "ON REMOTE EVENT \n"); + + if (pfd->revents & POLLOUT) + { + ssize_t n; + hcl_oow_t len; + + len = client->local.pw2r.len - client->local.pw2r.pos; + n = hcl_sys_send(client->remote.sck, &client->local.pw2r.ptr[client->local.pw2r.pos], &len); + client->local.pw2r.pos += len; + if (client->local.pw2r.pos >= client->local.pw2r.len) + { + /* empty the buffer */ + client->local.pw2r.pos = 0; + client->local.pw2r.len = 0; + } + + if (n <= -1) + { + /* TODO: logging */ +hcl_client_logbfmt(client, HCL_LOG_STDERR, "send error - %hs\n", strerror(errno)); + goto reqstop; } } @@ -710,126 +649,170 @@ static int on_server_event (hcl_client_t* client, struct pollfd* pfd, int shut_w hcl_uint8_t* bptr; ssize_t x; - bptr = hcl_xproto_getbuf(client->proto, &bcap);; - x = recv(client->sck, bptr, bcap, 0); + bptr = hcl_xproto_getbuf(client->remote.proto, &bcap);; + x = recv(client->remote.sck, bptr, bcap, 0); if (x <= -1) { if (errno == EINTR) goto carry_on; /* didn't read read */ +hcl_client_logbfmt(client, HCL_LOG_STDERR, "recv error from remote - %hs", strerror(errno)); /*hcl_seterrwithsyserr (hcl, 0, errno); */ /* TODO: error info set... */ - return -1; + goto reqstop; } - if (x == 0) hcl_xproto_seteof(client->proto, 1); - hcl_xproto_advbuf (client->proto, x); + if (x == 0) hcl_xproto_seteof(client->remote.proto, 1); + hcl_xproto_advbuf (client->remote.proto, x); } carry_on: - while (hcl_xproto_ready(client->proto)) + /* handle the data received from the remote side */ + while (hcl_xproto_ready(client->remote.proto)) { int n; - if ((n = hcl_xproto_process(client->proto)) <= -1) + if ((n = hcl_xproto_process(client->remote.proto)) <= -1) { /* TODO: proper error message */ - return -1; + printf ("PROTOCOL PROCESSING ERROR...\n"); + goto reqstop; } if (n == 0) { /* TODO: chceck if there is remaining data in the buffer...?? */ - printf ("NO MORE DATA. EXITING...\n"); - goto done; + printf ("CALLBACK REQUESTED TO EXIT...\n"); + goto reqstop; } } - if (hcl_xproto_geteof(client->proto)) break; -#endif + if (hcl_xproto_geteof(client->remote.proto)) goto reqstop; + return; + +reqstop: + client->stopreq = 1; + return; } -static int on_script_event (hcl_client_t* client, struct pollfd* pfd) +static void on_local_in_event (hcl_client_t* client, struct pollfd* pfd) { ssize_t n; hcl_uint8_t buf[128]; +hcl_client_logbfmt(client, HCL_LOG_STDERR, "local in on %d\n", pfd->fd); n = read(pfd->fd, buf, HCL_SIZEOF(buf)); if (n <= -1) { + //if (hcl_sys_is_errno_wb(errno)) ... +hcl_client_logbfmt(client, HCL_LOG_STDERR, "local in read error - %hs\n", strerror(errno)); + client->stopreq = 1; } else if (n == 0) { +hcl_client_logbfmt(client, HCL_LOG_STDERR, "local in eof\n"); +/* TODO ARRANGE TO FINISH.. AFTER EXUCTION OF REMAINING STUFF... */ + //client->stopreq = 1; + n = client_send_to_remote(client, HCL_XPKT_EXECUTE, HCL_NULL, 0); + if (n <= -1) + { +hcl_client_logbfmt(client, HCL_LOG_STDERR, "local to remote (execute)- %hs\n", strerror(errno)); + } } else { +hcl_client_logbfmt(client, HCL_LOG_STDERR, "local read - %ld\n", (long)n); + n = client_send_to_remote(client, HCL_XPKT_CODE, buf, n); + if (n <= -1) + { +hcl_client_logbfmt(client, HCL_LOG_STDERR, "local to remote (code)- %hs\n", strerror(errno)); + } } } -int hcl_client_start (hcl_client_t* client, const char* ipaddr, const char* script, int reuse_addr, int shut_wr_after_req) +static int client_setup_local(hcl_client_t* client) { - const char* scptr, * sccur; - int cin = STDIN_FILENO; - int cout = STDOUT_FILENO; - int cerr = STDERR_FILENO; + client->local.in = STDIN_FILENO; + client->local.out = STDOUT_FILENO; + client->local.err = STDERR_FILENO; + return 0; +} +int hcl_client_start (hcl_client_t* client, const char* ipaddr, int shut_wr_after_req) +{ /* TODO: cin, cout, cerr could be actual files or something other than the console. the actual loop won't begin until all these file descriptors are ready */ - if (client_connect(client, ipaddr, reuse_addr) <= -1) return -1; /* TODO: support time out or abort while connecting... */ + client->stopreq = 0; + if (client_setup_local(client) <= -1) return -1; +hcl_client_logbfmt(client, HCL_LOG_STDERR, "staritg XXXXXXXXXXX loop... ...\n"); + if (client_connect_to_server(client, ipaddr) <= -1) return -1; /* TODO: support time out or abort while connecting... */ - scptr = sccur = script; - while (1) +hcl_client_logbfmt(client, HCL_LOG_STDERR, "staritg client loop... ...\n"); + while (!client->stopreq) { - ssize_t n, i; - struct pollfd pfd[5]; + int nfds, i; + struct pollfd pfd[10]; - n = 0; + HCL_MEMSET (pfd, 0, HCL_SIZEOF(pfd)); + nfds = 0; - pfd[n].fd = client->mux_pipe[0]; - pfd[n].events = POLLIN; - pfd[n++].revents = 0; + /* always monitor the control channel */ + pfd[nfds].fd = client->mux_pipe[0]; + pfd[nfds].events = POLLIN; + pfd[nfds++].revents = 0; - pfd[n].fd = client->sck; - pfd[n].events = POLLIN; - if (*sccur != '\0') pfd[n].events |= POLLOUT; - pfd[n++].revents = 0; + pfd[nfds].fd = client->remote.sck; + /* TODO: if there is data received from the server, not flushed to the client side + * don't monitor input */ + pfd[nfds].events = POLLIN; + if (client->local.pw2r.len > client->local.pw2r.pos) pfd[nfds].events |= POLLOUT; + pfd[nfds++].revents = 0; - if (cin >= 0) + /* TODO: client->local.in and client->local.out can be equal. + * handle this? */ + if (client->local.in >= 0) { - pfd[n].fd = cin; - pfd[n].events = POLLIN; - pfd[n++].revents = 0; + if (client->local.pw2r.pos >= client->local.pw2r.len) + { +//hcl_client_logbfmt(client, HCL_LOG_STDERR, "ADDING LOCAL IN TO MULTIPLEX...\n"); + pfd[nfds].fd = client->local.in; + pfd[nfds].events = POLLIN; + pfd[nfds++].revents = 0; + } } - n = poll(pfd, n, 1000); - if (n <= -1) + i = poll(pfd, nfds, 1000); +//hcl_client_logbfmt(client, HCL_LOG_STDERR, "poll returned %d\n", i); + if (i <= -1) { - hcl_client_seterrbfmt (client, HCL_ESYSERR, "poll error on %d - %hs", client->sck, strerror(n)); + hcl_client_seterrbfmt (client, HCL_ESYSERR, "poll error - %hs", strerror(errno)); goto oops; } - if (n == 0) + if (i == 0) { /* TODO: proper timeout handling */ continue; } - - for (i = 0; i < n; i++) + for (i = 0; i < nfds; i++) { + if (!pfd[i].revents) continue; + +//hcl_client_logbfmt(client, HCL_LOG_STDERR, "EVENT ON %d mux[%d], remote[%d], local[%d]\n", pfd[i].fd, client->mux_pipe[0], client->remote.sck, client->local.in); if (pfd[i].fd == client->mux_pipe[0]) { on_control_event (client, &pfd[i]); } - else if (pfd[i].fd == client->sck) + else if (pfd[i].fd == client->remote.sck) { /* event from the server */ - on_server_event (client, &pfd[i], shut_wr_after_req); + on_remote_event (client, &pfd[i], shut_wr_after_req); } - else if (pfd[i].fd == cin) + else if (pfd[i].fd == client->local.in) { - on_script_event (client, &pfd[i]); + /*if (pfd[i].revents & POLLIN)*/ + on_local_in_event (client, &pfd[i]); } } - } done: @@ -838,10 +821,9 @@ done: struct linger linger; linger.l_onoff = 1; linger.l_linger = 0; - setsockopt (client->sck, SOL_SOCKET, SO_LINGER, (char *) &linger, sizeof(linger)); + setsockopt (client->remote.sck, SOL_SOCKET, SO_LINGER, (char *) &linger, sizeof(linger)); }*/ - client_close (client); return 0; @@ -852,8 +834,6 @@ oops: void hcl_client_stop (hcl_client_t* client) { - /* TODO: */ - /* TODO: break the cleint loop */ client->stopreq = 1; write (client->mux_pipe[1], "Q", 1); /* don't care about failure */ } diff --git a/lib/x-proto.c b/lib/x-proto.c index 04796cd..feb297d 100644 --- a/lib/x-proto.c +++ b/lib/x-proto.c @@ -156,13 +156,12 @@ int hcl_xproto_process (hcl_xproto_t* proto) case HCL_XPROTO_RCV_PLD: if (proto->rcv.len < proto->rcv.hdr.len) goto carry_on; /* need more payload data */ -/* TODO: convert handle_packet as call back */ n = proto->_cb.on_packet(proto, proto->rcv.hdr.type, proto->rcv.buf, proto->rcv.hdr.len); -/* TODO: minimize the use of HCL_MEMOVE... use the buffer */ /* switch to the header mode */ if (proto->rcv.hdr.len > 0) { +/* TODO: minimize the use of HCL_MEMOVE... use the buffer */ HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[proto->rcv.hdr.len], proto->rcv.len - proto->rcv.hdr.len); proto->rcv.len -= proto->rcv.hdr.len; } diff --git a/lib/x-server.c b/lib/x-server.c index 4a288b7..2c9ba63 100644 --- a/lib/x-server.c +++ b/lib/x-server.c @@ -296,7 +296,7 @@ static HCL_INLINE int open_read_stream (hcl_t* hcl, hcl_io_cciarg_t* arg) } bb = (bb_t*)hcl_callocmem(hcl, HCL_SIZEOF(*bb) + (HCL_SIZEOF(hcl_bch_t) * (parlen + bcslen + 2))); - if (!bb) goto oops; + if (HCL_UNLIKELY(!bb)) goto oops; bb->fn = (hcl_bch_t*)(bb + 1); if (fn[0] == '\0' && server->cfg.script_include_path[0] != '\0') @@ -331,7 +331,7 @@ static HCL_INLINE int open_read_stream (hcl_t* hcl, hcl_io_cciarg_t* arg) /* main stream */ hcl_oow_t pathlen = 0; bb = (bb_t*)hcl_callocmem(hcl, HCL_SIZEOF(*bb) + (HCL_SIZEOF(hcl_bch_t) * (pathlen + 1))); - if (!bb) goto oops; + if (HCL_UNLIKELY(!bb)) goto oops; /* copy ane empty string as a main stream's name */ bb->fn = (hcl_bch_t*)(bb + 1); @@ -1019,7 +1019,7 @@ printf ("SENDING BYTES [%.*s]\n", (int)len, data); if (hcl_sys_send_iov(worker->sck, iov, 2) <= -1) { /* TODO: error message */ - fprintf (stderr, "Unable to sendmsg on %d - %s\n", worker->sck, strerror(errno)); +fprintf (stderr, "Unable to sendmsg on %d - %s\n", worker->sck, strerror(errno)); return -1; } @@ -1933,7 +1933,7 @@ int hcl_server_start (hcl_server_t* server, const hcl_bch_t* addrs) { if (server->stopreq) break; /* normal termination requested */ if (errno == EINTR) continue; /* interrupted but no termination requested */ - if (hcl_sys_is_errnor_wb(errno)) continue; + if (hcl_sys_is_errno_wb(errno)) continue; set_err_with_syserr (server, 0, errno, "unable to accept worker on server socket %d", evp->data.fd); xret = -1; break;