code update for hcl-x
All checks were successful
continuous-integration/drone/push Build is passing

This commit is contained in:
hyung-hwan 2024-04-18 13:11:44 +09:00
parent e0083b4453
commit 2dad89e2e9
2 changed files with 97 additions and 101 deletions

View File

@ -508,7 +508,6 @@ struct hcl_xproto_t
hcl_xproto_rcv_state_t state; hcl_xproto_rcv_state_t state;
hcl_oow_t len_needed; hcl_oow_t len_needed;
unsigned int eof: 1; unsigned int eof: 1;
unsigned int polled: 1;
hcl_oow_t len; hcl_oow_t len;
hcl_uint8_t buf[4096]; hcl_uint8_t buf[4096];
@ -532,42 +531,36 @@ static int receive_raw_bytes (hcl_xproto_t* proto, int sck, hcl_ntime_t* idle_tm
return -1; return -1;
} }
if (HCL_LIKELY(!proto->rcv.polled)) tmout = idle_tmout? HCL_SECNSEC_TO_MSEC(idle_tmout->sec, idle_tmout->nsec): -1;
actual_tmout = (tmout <= 0)? 10000: tmout;
pfd.fd = sck;
pfd.events = POLLIN | POLLERR;
pfd.revents = 0;
n = poll(&pfd, 1, actual_tmout);
if (n <= -1)
{ {
tmout = idle_tmout? HCL_SECNSEC_TO_MSEC(idle_tmout->sec, idle_tmout->nsec): -1; if (errno == EINTR) return 0;
actual_tmout = (tmout <= 0)? 10000: tmout; hcl_seterrwithsyserr (hcl, 0, errno);
return -1;
pfd.fd = sck; }
pfd.events = POLLIN | POLLERR; else if (n == 0)
pfd.revents = 0; {
n = poll(&pfd, 1, actual_tmout); /* timed out - no activity on the pfd */
if (n <= -1) if (tmout > 0)
{ {
if (errno == EINTR) return 0; /* timeout explicity set. no activity for that duration. considered idle */
hcl_seterrwithsyserr (hcl, 0, errno); hcl_seterrbfmt (hcl, HCL_EGENERIC, "no activity on the socket %d", sck);
return -1; return -1;
}
else if (n == 0)
{
/* timed out - no activity on the pfd */
if (tmout > 0)
{
/* timeout explicity set. no activity for that duration. considered idle */
hcl_seterrbfmt (hcl, HCL_EGENERIC, "no activity on the socket %d", sck);
return -1;
}
return 0; /* didn't read yet */
} }
if (pfd.revents & POLLERR) return 0; /* didn't read yet */
{ }
hcl_seterrbfmt (hcl, HCL_EGENERIC, "error condition detected on socket %d", sck);
return -1;
}
proto->rcv.polled = 1;
if (pfd.revents & POLLERR)
{
hcl_seterrbfmt (hcl, HCL_EGENERIC, "error condition detected on socket %d", sck);
return -1;
} }
x = recv(sck, &proto->rcv.buf[proto->rcv.len], HCL_COUNTOF(proto->rcv.buf) - proto->rcv.len, 0); x = recv(sck, &proto->rcv.buf[proto->rcv.len], HCL_COUNTOF(proto->rcv.buf) - proto->rcv.len, 0);
@ -575,18 +568,59 @@ static int receive_raw_bytes (hcl_xproto_t* proto, int sck, hcl_ntime_t* idle_tm
{ {
if (errno == EINTR) return 0; /* didn't read read */ if (errno == EINTR) return 0; /* didn't read read */
proto->rcv.polled = 0;
hcl_seterrwithsyserr (hcl, 0, errno); hcl_seterrwithsyserr (hcl, 0, errno);
return -1; return -1;
} }
proto->rcv.polled = 0;
if (x == 0) proto->rcv.eof = 1; if (x == 0) proto->rcv.eof = 1;
proto->rcv.len += x; proto->rcv.len += x;
//printf ("RECEIVED %d rcv.len %d rcv.len_needed %d [%.*s]\n", (int)x, (int)proto->rcv.len, (int)proto->rcv.len_needed, (int)proto->rcv.len, proto->rcv.buf);
return 1; /* read some data */ return 1; /* read some data */
} }
static int handle_received_data (hcl_xproto_t* proto)
{
//printf ("HANDLE RECIVED rcv.len %d rcv.len_needed %d [%.*s]\n", (int)proto->rcv.len, (int)proto->rcv.len_needed, (int)proto->rcv.len, proto->rcv.buf);
switch (proto->rcv.state)
{
case HCL_XPROTO_RCV_HEADER:
if (proto->rcv.len < HCL_SIZEOF(proto->rcv.hdr)) return 0; /* need more data */
memcpy (&proto->rcv.hdr, proto->rcv.buf, HCL_SIZEOF(proto->rcv.hdr));
//proto->rcv.hdr.len = hcl_ntoh16(proto->rcv.hdr.len); /* keep this in the host byte order */
/* consume the header */
memmove (proto->rcv.buf, &proto->rcv.buf[HCL_SIZEOF(proto->rcv.hdr)], proto->rcv.len - HCL_SIZEOF(proto->rcv.hdr));
proto->rcv.len -= HCL_SIZEOF(proto->rcv.hdr);
/* switch to the payload mode */
proto->rcv.state = HCL_XPROTO_RCV_PAYLOAD;
proto->rcv.len_needed = proto->rcv.hdr.len;
return 0;
case HCL_XPROTO_RCV_PAYLOAD:
if (proto->rcv.len < proto->rcv.hdr.len) return 0; /* need more payload data */
if (proto->rcv.hdr.type == HCL_XPKT_STDOUT)
{
if (proto->rcv.hdr.len > 0)
fprintf (stdout, "%.*s", (int)proto->rcv.hdr.len, proto->rcv.buf);
}
if (proto->rcv.hdr.len > 0)
{
memmove (proto->rcv.buf, &proto->rcv.buf[proto->rcv.hdr.len], proto->rcv.len - proto->rcv.hdr.len);
proto->rcv.len -= proto->rcv.hdr.len;
}
proto->rcv.state = HCL_XPROTO_RCV_HEADER;
proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr);
break;
}
return 1;
}
static int handle_request (hcl_client_t* client, const char* ipaddr, const char* script, int reuse_addr, int shut_wr_after_req) static int handle_request (hcl_client_t* client, const char* ipaddr, const char* script, int reuse_addr, int shut_wr_after_req)
{ {
hcl_sckaddr_t sckaddr; hcl_sckaddr_t sckaddr;
@ -601,7 +635,8 @@ static int handle_request (hcl_client_t* client, const char* ipaddr, const char*
const char* scptr; const char* scptr;
const char* sccur; const char* sccur;
hcl_xproto_t proto; hcl_xproto_t proto_buf;
hcl_xproto_t* proto = &proto_buf;
client_xtn_t* client_xtn; client_xtn_t* client_xtn;
@ -614,7 +649,7 @@ static int handle_request (hcl_client_t* client, const char* ipaddr, const char*
goto oops; goto oops;
} }
sck = socket (sckfam, SOCK_STREAM, 0); sck = socket(sckfam, SOCK_STREAM, 0);
if (sck <= -1) if (sck <= -1)
{ {
fprintf (stderr, "cannot create a socket for %s - %s\n", ipaddr, strerror(errno)); fprintf (stderr, "cannot create a socket for %s - %s\n", ipaddr, strerror(errno));
@ -652,12 +687,13 @@ static int handle_request (hcl_client_t* client, const char* ipaddr, const char*
/* TODO: create hcl_xproto_open... */ /* TODO: create hcl_xproto_open... */
memset (&proto, 0, HCL_SIZEOF(proto)); memset (proto, 0, HCL_SIZEOF(*proto));
proto.hcl = hcl_openstdwithmmgr(hcl_client_getmmgr(client), 0, HCL_NULL); // TODO: proto->hcl = hcl_openstdwithmmgr(hcl_client_getmmgr(client), 0, HCL_NULL); // TODO:
proto.rcv.state = HCL_XPROTO_RCV_HEADER; proto->rcv.state = HCL_XPROTO_RCV_HEADER;
proto.rcv.len_needed = HCL_SIZEOF(proto.rcv.hdr); proto->rcv.len_needed = HCL_SIZEOF(proto->rcv.hdr);
proto.rcv.eof = 0; proto->rcv.eof = 0;
proto.rcv.polled = 0; // TODO: destroy xproto and data upon termination.
scptr = sccur = script; scptr = sccur = script;
while (1) while (1)
@ -665,7 +701,7 @@ static int handle_request (hcl_client_t* client, const char* ipaddr, const char*
struct pollfd pfd; struct pollfd pfd;
pfd.fd = sck; pfd.fd = sck;
pfd.events = POLLIN | POLLERR; pfd.events = POLLIN;
if (*sccur != '\0') pfd.events |= POLLOUT; if (*sccur != '\0') pfd.events |= POLLOUT;
pfd.revents = 0; pfd.revents = 0;
@ -708,23 +744,32 @@ static int handle_request (hcl_client_t* client, const char* ipaddr, const char*
scptr = sccur; scptr = sccur;
if (!sccur == '\0') if (*sccur == '\0')
{ {
hdr.type = HCL_XPKT_EXECUTE; hdr.type = HCL_XPKT_EXECUTE;
hdr.id = 1; /* TODO: */ hdr.id = 1; /* TODO: */
hdr.len = sccur - scptr; hdr.len = 0;
iov[0].iov_base = &hdr; iov[0].iov_base = &hdr;
iov[0].iov_len = HCL_SIZEOF(hdr); iov[0].iov_len = HCL_SIZEOF(hdr);
send_iov (sck, iov, 1); send_iov (sck, iov, 1);
if (shut_wr_after_req) shutdown (sck, SHUT_WR);
} }
if (*sccur == '\0' && shut_wr_after_req) shutdown (sck, SHUT_WR);
} }
if (pfd.revents & POLLIN) if (pfd.revents & POLLIN)
{ {
printf ("receiving...\n"); //printf ("receiving...\n");
if (receive_raw_bytes(&proto, sck, HCL_NULL) <= -1) break; if (receive_raw_bytes(proto, sck, HCL_NULL) <= -1) break;
}
while (/*proto->rcv.len > 0 &&*/ proto->rcv.len >= proto->rcv.len_needed)
{
if (handle_received_data(proto) <= -1)
{
goto oops;
}
} }
} }
@ -851,7 +896,7 @@ int main (int argc, char* argv[])
set_signal (SIGINT, handle_sigint); set_signal (SIGINT, handle_sigint);
set_signal_to_ignore (SIGPIPE); set_signal_to_ignore (SIGPIPE);
n = handle_request (client, argv[opt.ind], argv[opt.ind + 1], reuse_addr, shut_wr_after_req); n = handle_request(client, argv[opt.ind], argv[opt.ind + 1], reuse_addr, shut_wr_after_req);
set_signal_to_default (SIGINT); set_signal_to_default (SIGINT);
set_signal_to_default (SIGPIPE); set_signal_to_default (SIGPIPE);

View File

@ -609,7 +609,6 @@ printf ("IO WRITE SOMETHING...........\n");
hcl_io_udoarg_t* outarg = (hcl_io_udoarg_t*)arg; hcl_io_udoarg_t* outarg = (hcl_io_udoarg_t*)arg;
printf ("IO WRITE SOMETHING BYTES...........\n"); printf ("IO WRITE SOMETHING BYTES...........\n");
#if 0
if (send_stdout_bytes(xtn->proto, outarg->ptr, outarg->len) <= -1) if (send_stdout_bytes(xtn->proto, outarg->ptr, outarg->len) <= -1)
{ {
/* TODO: change error code and message. propagage the errormessage from proto */ /* TODO: change error code and message. propagage the errormessage from proto */
@ -620,7 +619,6 @@ printf ("IO WRITE SOMETHING BYTES...........\n");
hcl_abort (hcl); hcl_abort (hcl);
return -1; return -1;
} }
#endif
outarg->xlen = outarg->len; outarg->xlen = outarg->len;
return 0; return 0;
} }
@ -801,53 +799,6 @@ void hcl_server_proto_close (hcl_server_proto_t* proto)
hcl_server_freemem (proto->worker->server, proto); hcl_server_freemem (proto->worker->server, proto);
} }
static int write_stdout (hcl_server_proto_t* proto, const hcl_bch_t* ptr, hcl_oow_t len)
{
struct msghdr msg;
struct iovec iov[3];
hcl_bch_t cl[16]; /* ensure that this is large enough for the chunk length string */
int index = 0, count = 0;
hcl_xpkt_hdr_t hdr;
hdr.type = HCL_XPKT_STDOUT;
hdr.id = 1; // TODO:
hdr.len = hcl_hton16(len);
iov[0].iov_base = &hdr;
iov[0].iov_len = HCL_SIZEOF(hdr);
iov[1].iov_base = (hcl_bch_t*)ptr;
iov[1].iov_len = len;
while (1)
{
ssize_t nwritten;
HCL_MEMSET (&msg, 0, HCL_SIZEOF(msg));
msg.msg_iov = (struct iovec*)&iov[index];
msg.msg_iovlen = count - index;
nwritten = sendmsg(proto->worker->sck, &msg, 0);
/*nwritten = writev(proto->worker->sck, (const struct iovec*)&iov[index], count - index);*/
if (nwritten <= -1)
{
/* error occurred inside the worker thread shouldn't affect the error information
* in the server object. so here, i just log a message */
HCL_LOG2 (proto->hcl, SERVER_LOGMASK_ERROR, "Unable to sendmsg on %d - %hs\n", proto->worker->sck, strerror(errno));
return -1;
}
while (index < count && (size_t)nwritten >= iov[index].iov_len)
nwritten -= iov[index++].iov_len;
if (index == count) break;
iov[index].iov_base = (void*)((hcl_uint8_t*)iov[index].iov_base + nwritten);
iov[index].iov_len -= nwritten;
}
return 0;
}
static void exec_runtime_handler (hcl_tmr_t* tmr, const hcl_ntime_t* now, hcl_tmr_event_t* evt) static void exec_runtime_handler (hcl_tmr_t* tmr, const hcl_ntime_t* now, hcl_tmr_event_t* evt)
{ {
/* [NOTE] this handler is executed in the main server thread /* [NOTE] this handler is executed in the main server thread
@ -1151,7 +1102,7 @@ static int handle_received_data (hcl_server_proto_t* proto)
if (proto->rcv.len < HCL_SIZEOF(proto->rcv.hdr)) return 0; /* need more data */ if (proto->rcv.len < HCL_SIZEOF(proto->rcv.hdr)) return 0; /* need more data */
HCL_MEMCPY (&proto->rcv.hdr, proto->rcv.buf, HCL_SIZEOF(proto->rcv.hdr)); HCL_MEMCPY (&proto->rcv.hdr, proto->rcv.buf, HCL_SIZEOF(proto->rcv.hdr));
//proto->rcv.hdr.len = hcl_ntoh16(proto->rcv.hdr.len); /* keep this in the host byte order */ /*proto->rcv.hdr.len = hcl_ntoh16(proto->rcv.hdr.len);*/ /* keep this in the host byte order */
/* consume the header */ /* consume the header */
HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[HCL_SIZEOF(proto->rcv.hdr)], proto->rcv.len - HCL_SIZEOF(proto->rcv.hdr)); HCL_MEMMOVE (proto->rcv.buf, &proto->rcv.buf[HCL_SIZEOF(proto->rcv.hdr)], proto->rcv.len - HCL_SIZEOF(proto->rcv.hdr));
@ -1276,6 +1227,7 @@ static int send_stdout_bytes (hcl_server_proto_t* proto, const hcl_bch_t* data,
ptr = cur = data; ptr = cur = data;
end = data + len; end = data + len;
printf ("SENDING BYTES [%.*s]\n", (int)len, data);
while (ptr < end) while (ptr < end)
{ {
while (cur != end && cur - ptr < 255) cur++; while (cur != end && cur - ptr < 255) cur++;
@ -1729,9 +1681,8 @@ static int worker_step (hcl_server_worker_t* worker)
} }
} }
/* the receiver buffer has enough data */ /* the receiver buffer has enough data */
while (proto->rcv.len >= proto->rcv.len_needed) while (/*proto->rcv.len > 0 && */proto->rcv.len >= proto->rcv.len_needed)
{ {
if (handle_received_data(proto) <= -1) if (handle_received_data(proto) <= -1)
{ {