refactored code in http-thr.c

This commit is contained in:
hyung-hwan 2023-03-29 00:36:30 +09:00
parent bb99ebb83d
commit 27feacf4d4
2 changed files with 261 additions and 343 deletions

View File

@ -200,7 +200,7 @@ static void cgi_peer_on_close (hio_dev_pro_t* pro, hio_dev_pro_sid_t sid)
cgi_peer_xtn_t* peer_xtn = hio_dev_pro_getxtn(pro); cgi_peer_xtn_t* peer_xtn = hio_dev_pro_getxtn(pro);
cgi_t* cgi = peer_xtn->cgi; cgi_t* cgi = peer_xtn->cgi;
if (!cgi) return; /* cgi state already gone */ if (!cgi) return; /* cgi task already gone */
switch (sid) switch (sid)
{ {
@ -403,7 +403,7 @@ static int peer_htrd_push_content (hio_htrd_t* htrd, hio_htre_t* req, const hio_
n = hio_svc_htts_task_addresbody(cgi, data, dlen); n = hio_svc_htts_task_addresbody(cgi, data, dlen);
if (cgi->task_res_pending_writes > CGI_PENDING_IO_THRESHOLD) if (cgi->task_res_pending_writes > CGI_PENDING_IO_THRESHOLD)
{ {
if (hio_dev_pro_read(cgi->peer, HIO_DEV_PRO_OUT, 0) <= -1) return -1; if (hio_dev_pro_read(cgi->peer, HIO_DEV_PRO_OUT, 0) <= -1) n = -1;
} }
return n; return n;
@ -468,7 +468,7 @@ static void cgi_client_on_disconnect (hio_dev_sck_t* sck)
/* call the parent handler*/ /* call the parent handler*/
/*if (fcgi->client_org_on_disconnect) fcgi->client_org_on_disconnect (sck);*/ /*if (fcgi->client_org_on_disconnect) fcgi->client_org_on_disconnect (sck);*/
if (sck->on_disconnect) sck->on_disconnect (sck); /* restored to the orginal parent handelr in unbind_task_from_client() */ if (sck->on_disconnect) sck->on_disconnect (sck); /* restored to the orginal parent handler in unbind_task_from_client() */
HIO_SVC_HTTS_TASK_RCDOWN (cgi); HIO_SVC_HTTS_TASK_RCDOWN (cgi);
} }
@ -508,22 +508,6 @@ static int cgi_client_on_read (hio_dev_sck_t* sck, const void* buf, hio_iolen_t
if (x <= -1) goto oops; if (x <= -1) goto oops;
} }
} }
#if 0
else
{
hio_oow_t rem;
HIO_ASSERT (hio, !(cgi->over & CGI_OVER_READ_FROM_CLIENT));
if (hio_htrd_feed(cli->htrd, buf, len, &rem) <= -1) goto oops;
if (rem > 0)
{
/* TODO store this to client buffer. once the current resource is completed, arrange to call on_read() with it */
HIO_DEBUG3 (hio, "HTTS(%p) - excessive data after contents by cgi client %p(%d)\n", sck->hio, sck, (int)sck->hnd);
}
}
#endif
if (n <= -1) goto oops; if (n <= -1) goto oops;
return 0; return 0;

View File

@ -31,14 +31,6 @@
#define THR_ALLOW_UNLIMITED_REQ_CONTENT_LENGTH #define THR_ALLOW_UNLIMITED_REQ_CONTENT_LENGTH
enum thr_res_mode_t
{
THR_RES_MODE_CHUNKED,
THR_RES_MODE_CLOSE,
THR_RES_MODE_LENGTH
};
typedef enum thr_res_mode_t thr_res_mode_t;
#define THR_PENDING_IO_THRESHOLD 5 #define THR_PENDING_IO_THRESHOLD 5
#define THR_OVER_READ_FROM_CLIENT (1 << 0) #define THR_OVER_READ_FROM_CLIENT (1 << 0)
@ -64,17 +56,12 @@ struct thr_t
hio_svc_htts_task_on_kill_t on_kill; /* user-provided on_kill callback */ hio_svc_htts_task_on_kill_t on_kill; /* user-provided on_kill callback */
int options; int options;
hio_oow_t num_pending_writes_to_client;
hio_oow_t num_pending_writes_to_peer; hio_oow_t num_pending_writes_to_peer;
hio_dev_thr_t* peer; hio_dev_thr_t* peer;
hio_htrd_t* peer_htrd; hio_htrd_t* peer_htrd;
unsigned int over: 4; /* must be large enough to accomodate THR_OVER_ALL */ unsigned int over: 4; /* must be large enough to accomodate THR_OVER_ALL */
unsigned int ever_attempted_to_write_to_client: 1;
unsigned int client_eof_detected: 1;
unsigned int client_disconnected: 1;
unsigned int client_htrd_recbs_changed: 1; unsigned int client_htrd_recbs_changed: 1;
thr_res_mode_t res_mode_to_cli;
hio_dev_sck_on_read_t client_org_on_read; hio_dev_sck_on_read_t client_org_on_read;
hio_dev_sck_on_write_t client_org_on_write; hio_dev_sck_on_write_t client_org_on_write;
@ -90,6 +77,9 @@ struct thr_peer_xtn_t
}; };
typedef struct thr_peer_xtn_t thr_peer_xtn_t; typedef struct thr_peer_xtn_t thr_peer_xtn_t;
static void unbind_task_from_client (thr_t* thr, int rcdown);
static void unbind_task_from_peer (thr_t* thr, int rcdown);
static void thr_halt_participating_devices (thr_t* thr) static void thr_halt_participating_devices (thr_t* thr)
{ {
HIO_DEBUG4 (thr->htts->hio, "HTTS(%p) - Halting participating devices in thr task %p(csck=%p,peer=%p)\n", thr->htts, thr, thr->task_csck, thr->peer); HIO_DEBUG4 (thr->htts->hio, "HTTS(%p) - Halting participating devices in thr task %p(csck=%p,peer=%p)\n", thr->htts, thr, thr->task_csck, thr->peer);
@ -99,69 +89,6 @@ static void thr_halt_participating_devices (thr_t* thr)
if (thr->peer) hio_dev_thr_halt (thr->peer); if (thr->peer) hio_dev_thr_halt (thr->peer);
} }
static int thr_write_to_client (thr_t* thr, const void* data, hio_iolen_t dlen)
{
if (thr->task_csck)
{
thr->ever_attempted_to_write_to_client = 1;
thr->num_pending_writes_to_client++;
if (hio_dev_sck_write(thr->task_csck, data, dlen, HIO_NULL, HIO_NULL) <= -1)
{
thr->num_pending_writes_to_client--;
return -1;
}
if (thr->num_pending_writes_to_client > THR_PENDING_IO_THRESHOLD)
{
if (hio_dev_thr_read(thr->peer, 0) <= -1) return -1;
}
}
return 0;
}
static int thr_writev_to_client (thr_t* thr, hio_iovec_t* iov, hio_iolen_t iovcnt)
{
if (thr->task_csck)
{
thr->ever_attempted_to_write_to_client = 1;
thr->num_pending_writes_to_client++;
if (hio_dev_sck_writev(thr->task_csck, iov, iovcnt, HIO_NULL, HIO_NULL) <= -1)
{
thr->num_pending_writes_to_client--;
return -1;
}
if (thr->num_pending_writes_to_client > THR_PENDING_IO_THRESHOLD)
{
if (hio_dev_thr_read(thr->peer, 0) <= -1) return -1;
}
}
return 0;
}
static HIO_INLINE int thr_send_final_status_to_client (thr_t* thr, int status_code, int force_close)
{
return hio_svc_htts_task_sendfinalres(thr, status_code, HIO_NULL, HIO_NULL, force_close);
}
static int thr_write_last_chunk_to_client (thr_t* thr)
{
if (!thr->ever_attempted_to_write_to_client)
{
if (thr_send_final_status_to_client(thr, HIO_HTTP_STATUS_INTERNAL_SERVER_ERROR, 0) <= -1) return -1;
}
else
{
if (thr->res_mode_to_cli == THR_RES_MODE_CHUNKED &&
thr_write_to_client(thr, "0\r\n\r\n", 5) <= -1) return -1;
}
if (!thr->task_keep_client_alive && thr_write_to_client(thr, HIO_NULL, 0) <= -1) return -1;
return 0;
}
static int thr_write_to_peer (thr_t* thr, const void* data, hio_iolen_t dlen) static int thr_write_to_peer (thr_t* thr, const void* data, hio_iolen_t dlen)
{ {
if (thr->peer) if (thr->peer)
@ -224,11 +151,11 @@ static HIO_INLINE void thr_mark_over (thr_t* thr, int over_bits)
{ {
HIO_ASSERT (hio, thr->task_client != HIO_NULL); HIO_ASSERT (hio, thr->task_client != HIO_NULL);
if (thr->task_keep_client_alive && !thr->client_eof_detected) if (thr->task_keep_client_alive)
{ {
/* how to arrange to delete this thr object and put the socket back to the normal waiting state??? */ /* how to arrange to delete this thr object and put the socket back to the normal waiting state??? */
HIO_ASSERT (thr->htts->hio, thr->task_client->task == (hio_svc_htts_task_t*)thr); HIO_ASSERT (thr->htts->hio, thr->task_client->task == (hio_svc_htts_task_t*)thr);
HIO_SVC_HTTS_TASK_UNREF (thr->task_client->task); unbind_task_from_client (thr, 1);
/* IMPORTANT: thr must not be accessed from here down as it could have been destroyed */ /* IMPORTANT: thr must not be accessed from here down as it could have been destroyed */
} }
else else
@ -250,51 +177,20 @@ static void thr_on_kill (hio_svc_htts_task_t* task)
if (thr->on_kill) thr->on_kill (task); if (thr->on_kill) thr->on_kill (task);
if (thr->peer) /* [NOTE]
{ * 1. if hio_svc_htts_task_kill() is called, thr->peer, thr->peer_htrd, thr->task_csck,
thr_peer_xtn_t* peer_xtn = hio_dev_thr_getxtn(thr->peer); * thr->task_client may not not null.
if (peer_xtn->task) * 2. this callback function doesn't decrement the reference count on thr because
{ * it is the task destruction callback. (passing 0 to unbind_task_from_peer/client)
/* peer_xtn->task may not be NULL if the resource is killed regardless of the reference count. */
* anyway, don't use HIO_SVC_HTTS_TASK_UNREF (peer_xtn->task) because the resource itself
* is already being killed. */
peer_xtn->task = HIO_NULL;
}
hio_dev_thr_kill (thr->peer); unbind_task_from_peer (task, 0);
thr->peer = HIO_NULL;
}
if (thr->peer_htrd)
{
thr_peer_xtn_t* peer_xtn = hio_htrd_getxtn(thr->peer_htrd);
if (peer_xtn->task) peer_xtn->task = HIO_NULL; // no HIO_SVC_HTTS_TASK_UNREF() for the same reason above
hio_htrd_close (thr->peer_htrd);
thr->peer_htrd = HIO_NULL;
}
if (thr->task_csck) if (thr->task_csck)
{ {
HIO_ASSERT (hio, thr->task_client != HIO_NULL); HIO_ASSERT (hio, thr->task_client != HIO_NULL);
unbind_task_from_client (thr, 0);
/* restore callbacks */
if (thr->client_org_on_read) thr->task_csck->on_read = thr->client_org_on_read;
if (thr->client_org_on_write) thr->task_csck->on_write = thr->client_org_on_write;
if (thr->client_org_on_disconnect) thr->task_csck->on_disconnect = thr->client_org_on_disconnect;
if (thr->client_htrd_recbs_changed) hio_htrd_setrecbs (thr->task_client->htrd, &thr->client_htrd_org_recbs);
if (!thr->task_keep_client_alive || hio_dev_sck_read(thr->task_csck, 1) <= -1)
{
HIO_DEBUG5 (hio, "HTTS(%p) - thr(t=%p,c=%p[%d],p=%p) - halting client for failure to enable input watching\n", thr->htts, thr, thr->task_client, (thr->task_csck? thr->task_csck->hnd: -1), thr->peer);
hio_dev_sck_halt (thr->task_csck);
} }
}
thr->client_org_on_read = HIO_NULL;
thr->client_org_on_write = HIO_NULL;
thr->client_org_on_disconnect = HIO_NULL;
thr->client_htrd_recbs_changed = 0;
if (thr->task_next) HIO_SVC_HTTS_TASKL_UNLINK_TASK (thr); /* detach from the htts service only if it's attached */ if (thr->task_next) HIO_SVC_HTTS_TASKL_UNLINK_TASK (thr); /* detach from the htts service only if it's attached */
HIO_DEBUG5 (hio, "HTTS(%p) - thr(t=%p,c=%p[%d],p=%p) - killed the task\n", thr->htts, thr, thr->task_client, (thr->task_csck? thr->task_csck->hnd: -1), thr->peer); HIO_DEBUG5 (hio, "HTTS(%p) - thr(t=%p,c=%p[%d],p=%p) - killed the task\n", thr->htts, thr, thr->task_client, (thr->task_csck? thr->task_csck->hnd: -1), thr->peer);
@ -303,8 +199,8 @@ static void thr_on_kill (hio_svc_htts_task_t* task)
static void thr_peer_on_close (hio_dev_thr_t* peer, hio_dev_thr_sid_t sid) static void thr_peer_on_close (hio_dev_thr_t* peer, hio_dev_thr_sid_t sid)
{ {
hio_t* hio = peer->hio; hio_t* hio = peer->hio;
thr_peer_xtn_t* peer_xtn = (thr_peer_xtn_t*)hio_dev_thr_getxtn(peer); thr_peer_xtn_t* pxtn = (thr_peer_xtn_t*)hio_dev_thr_getxtn(peer);
thr_t* thr = peer_xtn->task; thr_t* thr = pxtn->task;
if (!thr) return; /* thr task already gone */ if (!thr) return; /* thr task already gone */
@ -312,20 +208,9 @@ static void thr_peer_on_close (hio_dev_thr_t* peer, hio_dev_thr_sid_t sid)
{ {
case HIO_DEV_THR_MASTER: case HIO_DEV_THR_MASTER:
HIO_DEBUG2 (hio, "HTTS(%p) - peer %p closing master\n", thr->htts, peer); HIO_DEBUG2 (hio, "HTTS(%p) - peer %p closing master\n", thr->htts, peer);
thr->peer = HIO_NULL; /* clear this peer from the state */ /* reset thr->peer before calling unbind_task_from_peer() because this is the peer close callback */
thr->peer = HIO_NULL;
HIO_ASSERT (hio, peer_xtn->task != HIO_NULL); unbind_task_from_peer (thr, 1);
HIO_SVC_HTTS_TASK_UNREF (peer_xtn->task);
if (thr->peer_htrd)
{
/* once this peer device is closed, peer's htrd is also never used.
* it's safe to detach the extra information attached on the htrd object. */
peer_xtn = hio_htrd_getxtn(thr->peer_htrd);
HIO_ASSERT (hio, peer_xtn->task != HIO_NULL);
HIO_SVC_HTTS_TASK_UNREF (peer_xtn->task);
}
break; break;
case HIO_DEV_THR_OUT: case HIO_DEV_THR_OUT:
@ -334,7 +219,7 @@ static void thr_peer_on_close (hio_dev_thr_t* peer, hio_dev_thr_sid_t sid)
if (!(thr->over & THR_OVER_READ_FROM_PEER)) if (!(thr->over & THR_OVER_READ_FROM_PEER))
{ {
if (thr_write_last_chunk_to_client(thr) <= -1) if (hio_svc_htts_task_endbody(thr) <= -1)
thr_halt_participating_devices (thr); thr_halt_participating_devices (thr);
else else
thr_mark_over (thr, THR_OVER_READ_FROM_PEER); thr_mark_over (thr, THR_OVER_READ_FROM_PEER);
@ -355,8 +240,8 @@ static void thr_peer_on_close (hio_dev_thr_t* peer, hio_dev_thr_sid_t sid)
static int thr_peer_on_read (hio_dev_thr_t* peer, const void* data, hio_iolen_t dlen) static int thr_peer_on_read (hio_dev_thr_t* peer, const void* data, hio_iolen_t dlen)
{ {
hio_t* hio = peer->hio; hio_t* hio = peer->hio;
thr_peer_xtn_t* peer_xtn = (thr_peer_xtn_t*)hio_dev_thr_getxtn(peer); thr_peer_xtn_t* pxtn = (thr_peer_xtn_t*)hio_dev_thr_getxtn(peer);
thr_t* thr = peer_xtn->task; thr_t* thr = pxtn->task;
HIO_ASSERT (hio, thr != HIO_NULL); HIO_ASSERT (hio, thr != HIO_NULL);
@ -376,7 +261,7 @@ static int thr_peer_on_read (hio_dev_thr_t* peer, const void* data, hio_iolen_t
/* the thr script could be misbehaviing. /* the thr script could be misbehaviing.
* it still has to read more but EOF is read. * it still has to read more but EOF is read.
* otherwise client_peer_htrd_poke() should have been called */ * otherwise client_peer_htrd_poke() should have been called */
n = thr_write_last_chunk_to_client(thr); n = hio_svc_htts_task_endbody(thr);
thr_mark_over (thr, THR_OVER_READ_FROM_PEER); thr_mark_over (thr, THR_OVER_READ_FROM_PEER);
if (n <= -1) goto oops; if (n <= -1) goto oops;
} }
@ -391,21 +276,20 @@ static int thr_peer_on_read (hio_dev_thr_t* peer, const void* data, hio_iolen_t
{ {
HIO_DEBUG2 (hio, "HTTPS(%p) - unable to feed peer htrd - peer %p\n", thr->htts, peer); HIO_DEBUG2 (hio, "HTTPS(%p) - unable to feed peer htrd - peer %p\n", thr->htts, peer);
if (!thr->ever_attempted_to_write_to_client && if (!thr->task_res_started && !(thr->over & THR_OVER_WRITE_TO_CLIENT))
!(thr->over & THR_OVER_WRITE_TO_CLIENT))
{ {
thr_send_final_status_to_client (thr, HIO_HTTP_STATUS_INTERNAL_SERVER_ERROR, 1); /* don't care about error because it jumps to oops below anyway */ hio_svc_htts_task_sendfinalres (thr, HIO_HTTP_STATUS_BAD_GATEWAY, HIO_NULL, HIO_NULL, 1); /* don't care about error because it jumps to oops below anyway */
} }
goto oops; goto oops;
} }
#if 0
if (rem > 0) if (rem > 0)
{ {
/* If the script specifies Content-Length and produces longer data, it will come here */ /* If the script specifies Content-Length and produces longer data, it will come here */
/*printf ("AAAAAAAAAAAAAAAAAa EEEEEXcessive DATA..................\n");*/
/* TODO: or drop this request?? */
} }
#endif
} }
return 0; return 0;
@ -425,18 +309,13 @@ static int thr_peer_htrd_peek (hio_htrd_t* htrd, hio_htre_t* req)
thr_peer_xtn_t* peer = hio_htrd_getxtn(htrd); thr_peer_xtn_t* peer = hio_htrd_getxtn(htrd);
thr_t* thr = peer->task; thr_t* thr = peer->task;
hio_svc_htts_cli_t* cli = thr->task_client; hio_svc_htts_cli_t* cli = thr->task_client;
if (HIO_LIKELY(cli))
{
int status_code = HIO_HTTP_STATUS_OK; int status_code = HIO_HTTP_STATUS_OK;
const hio_bch_t* status_desc = HIO_NULL; const hio_bch_t* status_desc = HIO_NULL;
int chunked; int chunked;
if (HIO_UNLIKELY(!cli))
{
/* client disconnected or not connectd */
return 0;
}
// TOOD: remove content_length if content_length is negative or not numeric.
if (req->attr.content_length) thr->res_mode_to_cli = THR_RES_MODE_LENGTH;
if (req->attr.status) hio_parse_http_status_header_value(req->attr.status, &status_code, &status_desc); if (req->attr.status) hio_parse_http_status_header_value(req->attr.status, &status_code, &status_desc);
chunked = thr->task_keep_client_alive && !req->attr.content_length; chunked = thr->task_keep_client_alive && !req->attr.content_length;
@ -444,6 +323,7 @@ static int thr_peer_htrd_peek (hio_htrd_t* htrd, hio_htre_t* req)
if (hio_svc_htts_task_startreshdr(thr, status_code, status_desc, chunked) <= -1 || if (hio_svc_htts_task_startreshdr(thr, status_code, status_desc, chunked) <= -1 ||
hio_htre_walkheaders(req, peer_capture_response_header, thr) <= -1 || hio_htre_walkheaders(req, peer_capture_response_header, thr) <= -1 ||
hio_svc_htts_task_endreshdr(thr) <= -1) return -1; hio_svc_htts_task_endreshdr(thr) <= -1) return -1;
}
return 0; return 0;
} }
@ -451,62 +331,30 @@ static int thr_peer_htrd_peek (hio_htrd_t* htrd, hio_htre_t* req)
static int thr_peer_htrd_poke (hio_htrd_t* htrd, hio_htre_t* req) static int thr_peer_htrd_poke (hio_htrd_t* htrd, hio_htre_t* req)
{ {
/* client request got completed */ /* client request got completed */
thr_peer_xtn_t* peer_xtn = hio_htrd_getxtn(htrd); thr_peer_xtn_t* pxtn = hio_htrd_getxtn(htrd);
thr_t* thr = peer_xtn->task; thr_t* thr = pxtn->task;
int n;
if (thr_write_last_chunk_to_client(thr) <= -1) return -1;
n = hio_svc_htts_task_endbody(thr);
thr_mark_over (thr, THR_OVER_READ_FROM_PEER); thr_mark_over (thr, THR_OVER_READ_FROM_PEER);
return 0; return n;
} }
static int thr_peer_htrd_push_content (hio_htrd_t* htrd, hio_htre_t* req, const hio_bch_t* data, hio_oow_t dlen) static int thr_peer_htrd_push_content (hio_htrd_t* htrd, hio_htre_t* req, const hio_bch_t* data, hio_oow_t dlen)
{ {
thr_peer_xtn_t* peer_xtn = hio_htrd_getxtn(htrd); thr_peer_xtn_t* pxtn = hio_htrd_getxtn(htrd);
thr_t* thr = peer_xtn->task; thr_t* thr = pxtn->task;
int n;
HIO_ASSERT (thr->htts->hio, htrd == thr->peer_htrd); HIO_ASSERT (thr->htts->hio, htrd == thr->peer_htrd);
switch (thr->res_mode_to_cli) n = hio_svc_htts_task_addresbody(thr, data, dlen);
if (thr->task_res_pending_writes > THR_PENDING_IO_THRESHOLD)
{ {
case THR_RES_MODE_CHUNKED: if (hio_dev_thr_read(thr->peer, 0) <= -1) n = -1;
{
hio_iovec_t iov[3];
hio_bch_t lbuf[16];
hio_oow_t llen;
/* hio_fmt_uintmax_to_bcstr() null-terminates the output. only HIO_COUNTOF(lbuf) - 1
* is enough to hold '\r' and '\n' at the back without '\0'. */
llen = hio_fmt_uintmax_to_bcstr(lbuf, HIO_COUNTOF(lbuf) - 1, dlen, 16 | HIO_FMT_UINTMAX_UPPERCASE, 0, '\0', HIO_NULL);
lbuf[llen++] = '\r';
lbuf[llen++] = '\n';
iov[0].iov_ptr = lbuf;
iov[0].iov_len = llen;
iov[1].iov_ptr = (void*)data;
iov[1].iov_len = dlen;
iov[2].iov_ptr = "\r\n";
iov[2].iov_len = 2;
if (thr_writev_to_client(thr, iov, HIO_COUNTOF(iov)) <= -1) goto oops;
break;
} }
case THR_RES_MODE_CLOSE: return n;
case THR_RES_MODE_LENGTH:
if (thr_write_to_client(thr, data, dlen) <= -1) goto oops;
break;
}
if (thr->num_pending_writes_to_client > THR_PENDING_IO_THRESHOLD)
{
if (hio_dev_thr_read(thr->peer, 0) <= -1) goto oops;
}
return 0;
oops:
return -1;
} }
static hio_htrd_recbs_t thr_peer_htrd_recbs = static hio_htrd_recbs_t thr_peer_htrd_recbs =
@ -552,8 +400,8 @@ static hio_htrd_recbs_t thr_client_htrd_recbs =
static int thr_peer_on_write (hio_dev_thr_t* peer, hio_iolen_t wrlen, void* wrctx) static int thr_peer_on_write (hio_dev_thr_t* peer, hio_iolen_t wrlen, void* wrctx)
{ {
hio_t* hio = peer->hio; hio_t* hio = peer->hio;
thr_peer_xtn_t* peer_xtn = (thr_peer_xtn_t*)hio_dev_thr_getxtn(peer); thr_peer_xtn_t* pxtn = (thr_peer_xtn_t*)hio_dev_thr_getxtn(peer);
thr_t* thr = peer_xtn->task; thr_t* thr = pxtn->task;
if (!thr) return 0; /* there is nothing i can do. the thr is being cleared or has been cleared already. */ if (!thr) return 0; /* there is nothing i can do. the thr is being cleared or has been cleared already. */
@ -610,14 +458,17 @@ static void thr_client_on_disconnect (hio_dev_sck_t* sck)
HIO_ASSERT (hio, sck = thr->task_csck); HIO_ASSERT (hio, sck = thr->task_csck);
HIO_DEBUG4 (hio, "HTTS(%p) - thr(t=%p,c=%p,csck=%p) - client socket disconnect notified\n", htts, thr, cli, sck); HIO_DEBUG4 (hio, "HTTS(%p) - thr(t=%p,c=%p,csck=%p) - client socket disconnect notified\n", htts, thr, cli, sck);
thr->client_disconnected = 1; if (thr)
thr->task_csck = HIO_NULL;
thr->task_client = HIO_NULL;
if (thr->client_org_on_disconnect)
{ {
thr->client_org_on_disconnect (sck); HIO_SVC_HTTS_TASK_RCUP (thr);
/* this original callback destroys the associated resource.
* thr must not be accessed from here down */ unbind_task_from_client (thr, 1);
/* call the parent handler*/
/*if (thr->client_org_on_disconnect) thr->client_org_on_disconnect (sck);*/
if (sck->on_disconnect) sck->on_disconnect (sck); /* restored to the orginal parent handler in unbind_task_from_client() */
HIO_SVC_HTTS_TASK_RCDOWN (thr);
} }
HIO_DEBUG4 (hio, "HTTS(%p) - thr(t=%p,c=%p,csck=%p) - client socket disconnect handled\n", htts, thr, cli, sck); HIO_DEBUG4 (hio, "HTTS(%p) - thr(t=%p,c=%p,csck=%p) - client socket disconnect handled\n", htts, thr, cli, sck);
@ -629,9 +480,12 @@ static int thr_client_on_read (hio_dev_sck_t* sck, const void* buf, hio_iolen_t
hio_t* hio = sck->hio; hio_t* hio = sck->hio;
hio_svc_htts_cli_t* cli = hio_dev_sck_getxtn(sck); hio_svc_htts_cli_t* cli = hio_dev_sck_getxtn(sck);
thr_t* thr = (thr_t*)cli->task; thr_t* thr = (thr_t*)cli->task;
int n;
HIO_ASSERT (hio, sck == cli->sck); HIO_ASSERT (hio, sck == cli->sck);
n = thr->client_org_on_read? thr->client_org_on_read(sck, buf, len, srcaddr): 0;
if (len <= -1) if (len <= -1)
{ {
/* read error */ /* read error */
@ -639,17 +493,10 @@ static int thr_client_on_read (hio_dev_sck_t* sck, const void* buf, hio_iolen_t
goto oops; goto oops;
} }
if (!thr->peer)
{
/* the peer is gone */
goto oops; /* do what? just return 0? */
}
if (len == 0) if (len == 0)
{ {
/* EOF on the client side. arrange to close */ /* EOF on the client side. arrange to close */
HIO_DEBUG3 (hio, "HTTPS(%p) - EOF from client %p(hnd=%d)\n", thr->htts, sck, (int)sck->hnd); HIO_DEBUG3 (hio, "HTTPS(%p) - EOF from client %p(hnd=%d)\n", thr->htts, sck, (int)sck->hnd);
thr->client_eof_detected = 1;
if (!(thr->over & THR_OVER_READ_FROM_CLIENT)) /* if this is true, EOF is received without thr_client_htrd_poke() */ if (!(thr->over & THR_OVER_READ_FROM_CLIENT)) /* if this is true, EOF is received without thr_client_htrd_poke() */
{ {
@ -659,21 +506,8 @@ static int thr_client_on_read (hio_dev_sck_t* sck, const void* buf, hio_iolen_t
if (n <= -1) goto oops; if (n <= -1) goto oops;
} }
} }
else
{
hio_oow_t rem;
HIO_ASSERT (hio, !(thr->over & THR_OVER_READ_FROM_CLIENT));
if (hio_htrd_feed(cli->htrd, buf, len, &rem) <= -1) goto oops;
if (rem > 0)
{
/* TODO store this to client buffer. once the current resource is completed, arrange to call on_read() with it */
HIO_DEBUG3 (hio, "HTTPS(%p) - excessive data after contents by thr client %p(%d)\n", sck->hio, sck, (int)sck->hnd);
}
}
if (n <= -1) goto oops;
return 0; return 0;
oops: oops:
@ -686,18 +520,12 @@ static int thr_client_on_write (hio_dev_sck_t* sck, hio_iolen_t wrlen, void* wrc
hio_t* hio = sck->hio; hio_t* hio = sck->hio;
hio_svc_htts_cli_t* cli = hio_dev_sck_getxtn(sck); hio_svc_htts_cli_t* cli = hio_dev_sck_getxtn(sck);
thr_t* thr = (thr_t*)cli->task; thr_t* thr = (thr_t*)cli->task;
int n;
if (wrlen <= -1) n = thr->client_org_on_write? thr->client_org_on_write(sck, wrlen, wrctx, dstaddr): 0;
{
HIO_DEBUG3 (hio, "HTTPS(%p) - unable to write to client %p(%d)\n", sck->hio, sck, (int)sck->hnd);
goto oops;
}
if (wrlen == 0) if (wrlen == 0)
{ {
/* if the connect is keep-alive, this part may not be called */
thr->num_pending_writes_to_client--;
HIO_ASSERT (hio, thr->num_pending_writes_to_client == 0);
HIO_DEBUG3 (hio, "HTTS(%p) - indicated EOF to client %p(%d)\n", thr->htts, sck, (int)sck->hnd); HIO_DEBUG3 (hio, "HTTS(%p) - indicated EOF to client %p(%d)\n", thr->htts, sck, (int)sck->hnd);
/* since EOF has been indicated to the client, it must not write to the client any further. /* since EOF has been indicated to the client, it must not write to the client any further.
* this also means that i don't need any data from the peer side either. * this also means that i don't need any data from the peer side either.
@ -706,25 +534,20 @@ static int thr_client_on_write (hio_dev_sck_t* sck, hio_iolen_t wrlen, void* wrc
} }
else else
{ {
HIO_ASSERT (hio, thr->num_pending_writes_to_client > 0); if (thr->peer && thr->task_res_pending_writes == THR_PENDING_IO_THRESHOLD)
thr->num_pending_writes_to_client--;
if (thr->peer && thr->num_pending_writes_to_client == THR_PENDING_IO_THRESHOLD)
{ {
/* enable input watching */
if (!(thr->over & THR_OVER_READ_FROM_PEER) && if (!(thr->over & THR_OVER_READ_FROM_PEER) &&
hio_dev_thr_read(thr->peer, 1) <= -1) goto oops; hio_dev_thr_read(thr->peer, 1) <= -1) n = -1;
} }
if ((thr->over & THR_OVER_READ_FROM_PEER) && thr->num_pending_writes_to_client <= 0) if ((thr->over & THR_OVER_READ_FROM_PEER) && thr->task_res_pending_writes <= 0)
{ {
thr_mark_over (thr, THR_OVER_WRITE_TO_CLIENT); thr_mark_over (thr, THR_OVER_WRITE_TO_CLIENT);
} }
} }
return 0; if (n <= -1 || wrlen <= -1) thr_halt_participating_devices (thr);
oops:
thr_halt_participating_devices (thr);
return 0; return 0;
} }
@ -798,6 +621,189 @@ static int thr_capture_request_header (hio_htre_t* req, const hio_bch_t* key, co
/* ----------------------------------------------------------------------- */ /* ----------------------------------------------------------------------- */
static void bind_task_to_client (thr_t* thr, hio_dev_sck_t* csck)
{
hio_svc_htts_cli_t* cli = hio_dev_sck_getxtn(csck);
HIO_ASSERT (thr->htts->hio, cli->sck == csck);
HIO_ASSERT (thr->htts->hio, cli->task == HIO_NULL);
thr->client_org_on_read = csck->on_read;
thr->client_org_on_write = csck->on_write;
thr->client_org_on_disconnect = csck->on_disconnect;
csck->on_read = thr_client_on_read;
csck->on_write = thr_client_on_write;
csck->on_disconnect = thr_client_on_disconnect;
cli->task = (hio_svc_htts_task_t*)thr;
HIO_SVC_HTTS_TASK_RCUP (thr);
}
static void unbind_task_from_client (thr_t* thr, int rcdown)
{
hio_dev_sck_t* csck = thr->task_csck;
HIO_ASSERT (thr->htts->hio, thr->task_client != HIO_NULL);
HIO_ASSERT (thr->htts->hio, thr->task_csck != HIO_NULL);
HIO_ASSERT (thr->htts->hio, thr->task_client->task == (hio_svc_htts_task_t*)thr);
HIO_ASSERT (thr->htts->hio, thr->task_client->htrd != HIO_NULL);
if (thr->client_htrd_recbs_changed)
{
hio_htrd_setrecbs (thr->task_client->htrd, &thr->client_htrd_org_recbs);
thr->client_htrd_recbs_changed = 0;
}
if (thr->client_org_on_read)
{
csck->on_read = thr->client_org_on_read;
thr->client_org_on_read = HIO_NULL;
}
if (thr->client_org_on_write)
{
csck->on_write = thr->client_org_on_write;
thr->client_org_on_write = HIO_NULL;
}
if (thr->client_org_on_disconnect)
{
csck->on_disconnect = thr->client_org_on_disconnect;
thr->client_org_on_disconnect = HIO_NULL;
}
/* there is some ordering issue in using HIO_SVC_HTTS_TASK_UNREF()
* because it can destroy the thr itself. so reset thr->task_client->task
* to null and call RCDOWN() later */
thr->task_client->task = HIO_NULL;
/* these two lines are also done in csck_on_disconnect() in http-svr.c because the socket is destroyed.
* the same lines here are because the task is unbound while the socket is still alive */
thr->task_client = HIO_NULL;
thr->task_csck = HIO_NULL;
/* enable input watching on the socket being unbound */
if (thr->task_keep_client_alive && hio_dev_sck_read(csck, 1) <= -1)
{
HIO_DEBUG2 (thr->htts->hio, "HTTS(%p) - halting client(%p) for failure to enable input watching\n", thr->htts, csck);
hio_dev_sck_halt (csck);
}
if (rcdown) HIO_SVC_HTTS_TASK_RCDOWN ((hio_svc_htts_task_t*)thr);
}
/* ----------------------------------------------------------------------- */
static int bind_task_to_peer (thr_t* thr, hio_dev_sck_t* csck, hio_htre_t* req, hio_svc_htts_thr_func_t func, void* ctx)
{
hio_svc_htts_t* htts = thr->htts;
hio_t* hio = htts->hio;
thr_peer_xtn_t* pxtn;
hio_dev_thr_make_t mi;
thr_func_start_t* tfs = HIO_NULL;
hio_htrd_t* htrd = HIO_NULL;
tfs = hio_callocmem(hio, HIO_SIZEOF(*tfs));
if (!tfs) goto oops;
tfs->hio = hio;
tfs->htts = htts;
tfs->thr_func = func;
tfs->thr_ctx = ctx;
tfs->tfi.req_method = hio_htre_getqmethodtype(req);
tfs->tfi.req_version = *hio_htre_getversion(req);
tfs->tfi.req_path = hio_dupbcstr(hio, hio_htre_getqpath(req), HIO_NULL);
if (!tfs->tfi.req_path) goto oops;
if (hio_htre_getqparam(req))
{
tfs->tfi.req_param = hio_dupbcstr(hio, hio_htre_getqparam(req), HIO_NULL);
if (!tfs->tfi.req_param) goto oops;
}
tfs->tfi.req_x_http_method_override = -1;
if (hio_htre_walkheaders(req, thr_capture_request_header, tfs) <= -1) goto oops;
tfs->tfi.server_addr = csck->localaddr;
tfs->tfi.client_addr = csck->remoteaddr;
HIO_MEMSET (&mi, 0, HIO_SIZEOF(mi));
mi.thr_func = thr_func;
mi.thr_ctx = tfs;
mi.on_read = thr_peer_on_read;
mi.on_write = thr_peer_on_write;
mi.on_close = thr_peer_on_close;
htrd = hio_htrd_open(hio, HIO_SIZEOF(*pxtn));
if (HIO_UNLIKELY(!htrd)) goto oops;
hio_htrd_setoption (htrd, HIO_HTRD_SKIP_INITIAL_LINE | HIO_HTRD_RESPONSE);
hio_htrd_setrecbs (htrd, &thr_peer_htrd_recbs);
thr->peer = hio_dev_thr_make(hio, HIO_SIZEOF(*pxtn), &mi);
if (HIO_UNLIKELY(!thr->peer))
{
/* no need to detach the attached task here because that is handled
* in the kill/disconnect callbacks of relevant devices */
HIO_DEBUG3 (hio, "HTTS(%p) - failed to create thread for %p(%d)\n", htts, csck, (int)csck->hnd);
goto oops;
}
tfs = HIO_NULL; /* mark that tfs is delegated to the thread */
thr->peer_htrd = htrd;
/* attach the thr task to the peer thread device */
pxtn = hio_dev_thr_getxtn(thr->peer);
pxtn->task = thr;
/* attach the thr task to the htrd parser set on the peer thread device */
pxtn = hio_htrd_getxtn(thr->peer_htrd);
pxtn->task = thr;
HIO_SVC_HTTS_TASK_RCUP (thr); /* for thr */
HIO_SVC_HTTS_TASK_RCUP (thr); /* for peer_htrd */
return 0;
oops:
if (htrd) hio_htrd_close (htrd);
if (tfs) free_thr_start_info (tfs);
return -1;
}
static void unbind_task_from_peer (thr_t* thr, int rcdown)
{
int n = 0;
if (thr->peer_htrd)
{
thr_peer_xtn_t* pxtn = hio_htrd_getxtn(thr->peer_htrd);
if (pxtn->task) pxtn->task = HIO_NULL;
hio_htrd_close (thr->peer_htrd);
thr->peer_htrd = HIO_NULL;
n++;
}
if (thr->peer)
{
thr_peer_xtn_t* pxtn = hio_dev_thr_getxtn(thr->peer);
if (pxtn->task) pxtn->task = HIO_NULL;
hio_dev_thr_kill (thr->peer);
thr->peer = HIO_NULL;
n++;
}
if (rcdown)
{
while (n > 0)
{
n--;
HIO_SVC_HTTS_TASK_RCDOWN((hio_svc_htts_task_t*)thr);
}
}
}
/* ----------------------------------------------------------------------- */
static int setup_for_content_length(thr_t* thr, hio_htre_t* req) static int setup_for_content_length(thr_t* thr, hio_htre_t* req)
{ {
int have_content; int have_content;
@ -834,93 +840,22 @@ int hio_svc_htts_dothr (hio_svc_htts_t* htts, hio_dev_sck_t* csck, hio_htre_t* r
hio_t* hio = htts->hio; hio_t* hio = htts->hio;
hio_svc_htts_cli_t* cli = hio_dev_sck_getxtn(csck); hio_svc_htts_cli_t* cli = hio_dev_sck_getxtn(csck);
thr_t* thr = HIO_NULL; thr_t* thr = HIO_NULL;
thr_peer_xtn_t* peer_xtn;
hio_dev_thr_make_t mi;
thr_func_start_t* tfs;
int have_content;
/* ensure that you call this function before any contents is received */ /* ensure that you call this function before any contents is received */
HIO_ASSERT (hio, hio_htre_getcontentlen(req) == 0); HIO_ASSERT (hio, hio_htre_getcontentlen(req) == 0);
tfs = hio_callocmem(hio, HIO_SIZEOF(*tfs));
if (!tfs) goto oops;
tfs->hio = hio;
tfs->htts = htts;
tfs->thr_func = func;
tfs->thr_ctx = ctx;
tfs->tfi.req_method = hio_htre_getqmethodtype(req);
tfs->tfi.req_version = *hio_htre_getversion(req);
tfs->tfi.req_path = hio_dupbcstr(hio, hio_htre_getqpath(req), HIO_NULL);
if (!tfs->tfi.req_path) goto oops;
if (hio_htre_getqparam(req))
{
tfs->tfi.req_param = hio_dupbcstr(hio, hio_htre_getqparam(req), HIO_NULL);
if (!tfs->tfi.req_param) goto oops;
}
tfs->tfi.req_x_http_method_override = -1;
if (hio_htre_walkheaders(req, thr_capture_request_header, tfs) <= -1) goto oops;
tfs->tfi.server_addr = cli->sck->localaddr;
tfs->tfi.client_addr = cli->sck->remoteaddr;
HIO_MEMSET (&mi, 0, HIO_SIZEOF(mi));
mi.thr_func = thr_func;
mi.thr_ctx = tfs;
mi.on_read = thr_peer_on_read;
mi.on_write = thr_peer_on_write;
mi.on_close = thr_peer_on_close;
thr = (thr_t*)hio_svc_htts_task_make(htts, HIO_SIZEOF(*thr), thr_on_kill, req, csck); thr = (thr_t*)hio_svc_htts_task_make(htts, HIO_SIZEOF(*thr), thr_on_kill, req, csck);
if (HIO_UNLIKELY(!thr)) goto oops; if (HIO_UNLIKELY(!thr)) goto oops;
thr->on_kill = on_kill; thr->on_kill = on_kill;
thr->options = options; thr->options = options;
thr->client_org_on_read = csck->on_read; bind_task_to_client (thr, csck);
thr->client_org_on_write = csck->on_write; if (bind_task_to_peer(thr, csck, req, func, ctx) <= -1) goto oops;
thr->client_org_on_disconnect = csck->on_disconnect;
csck->on_read = thr_client_on_read;
csck->on_write = thr_client_on_write;
csck->on_disconnect = thr_client_on_disconnect;
/* attach the thr task to the client socket via the task field in the extended space of the socket */
HIO_ASSERT (hio, cli->task == HIO_NULL);
HIO_SVC_HTTS_TASK_REF ((hio_svc_htts_task_t*)thr, cli->task);
thr->peer = hio_dev_thr_make(hio, HIO_SIZEOF(*peer_xtn), &mi);
if (HIO_UNLIKELY(!thr->peer))
{
/* no need to detach the attached task here because that is handled
* in the kill/disconnect callbacks of relevant devices */
HIO_DEBUG3 (hio, "HTTS(%p) - failed to create thread for %p(%d)\n", htts, csck, (int)csck->hnd);
goto oops;
}
tfs = HIO_NULL; /* mark that tfs is delegated to the thread */
/* attach the thr task to the peer thread device */
peer_xtn = hio_dev_thr_getxtn(thr->peer);
HIO_SVC_HTTS_TASK_REF (thr, peer_xtn->task);
thr->peer_htrd = hio_htrd_open(hio, HIO_SIZEOF(*peer_xtn));
if (HIO_UNLIKELY(!thr->peer_htrd)) goto oops;
hio_htrd_setoption (thr->peer_htrd, HIO_HTRD_SKIP_INITIAL_LINE | HIO_HTRD_RESPONSE);
hio_htrd_setrecbs (thr->peer_htrd, &thr_peer_htrd_recbs);
/* attach the thr task to the htrd parser set on the peer thread device */
peer_xtn = hio_htrd_getxtn(thr->peer_htrd);
HIO_SVC_HTTS_TASK_REF (thr, peer_xtn->task);
if (hio_svc_htts_task_handleexpect100(thr, options) <= -1) goto oops; if (hio_svc_htts_task_handleexpect100(thr, options) <= -1) goto oops;
if (setup_for_content_length(thr, req) <= -1) goto oops; if (setup_for_content_length(thr, req) <= -1) goto oops;
thr->res_mode_to_cli = thr->task_keep_client_alive? THR_RES_MODE_CHUNKED: THR_RES_MODE_CLOSE;
/* the mode still can get switched from THR_RES_MODE_CHUNKED to THR_RES_MODE_LENGTH
if the thread function emits Content-Length */
/* TODO: store current input watching state and use it when destroying the thr data */ /* TODO: store current input watching state and use it when destroying the thr data */
if (hio_dev_sck_read(csck, !(thr->over & THR_OVER_READ_FROM_CLIENT)) <= -1) goto oops; if (hio_dev_sck_read(csck, !(thr->over & THR_OVER_READ_FROM_CLIENT)) <= -1) goto oops;
@ -929,7 +864,6 @@ int hio_svc_htts_dothr (hio_svc_htts_t* htts, hio_dev_sck_t* csck, hio_htre_t* r
oops: oops:
HIO_DEBUG2 (hio, "HTTS(%p) - FAILURE in dothr - socket(%p)\n", htts, csck); HIO_DEBUG2 (hio, "HTTS(%p) - FAILURE in dothr - socket(%p)\n", htts, csck);
if (tfs) free_thr_start_info (tfs);
if (thr) thr_halt_participating_devices (thr); if (thr) thr_halt_participating_devices (thr);
return -1; return -1;
} }