fixed a bug in keepalive handling in fcgi task by having a write callback to fcgi server

This commit is contained in:
hyung-hwan 2023-02-20 21:01:10 +09:00
parent a2d047a676
commit 2ecfc9be39
5 changed files with 123 additions and 66 deletions

View File

@ -131,7 +131,27 @@ printf ("FCGIC SOCKET CONNECTED >>>>>>>>>>>>>>>>>>>>>>>>>>\n");
static int sck_on_write (hio_dev_sck_t* sck, hio_iolen_t wrlen, void* wrctx, const hio_skad_t* dstaddr) static int sck_on_write (hio_dev_sck_t* sck, hio_iolen_t wrlen, void* wrctx, const hio_skad_t* dstaddr)
{ {
/*printf ("WROTE DATA TO CGI SERVER %d\n", (int)wrlen);*/ /* the lower 2 bits of the context is set to be the packet type in
* hio_svc_fcgic_beginrequest()
* hio_svc_fcgic_writeparam()
* hio_svc_fcgic_writestdin()
*/
hio_svc_fcgic_sess_t* sess = (hio_svc_fcgic_sess_t*)((hio_oow_t)wrctx & ~(hio_oow_t)0x3);
hio_oow_t type = ((hio_oow_t)wrctx & 0x3);
static hio_fcgi_req_type_t rqtype[] =
{
HIO_FCGI_BEGIN_REQUEST,
HIO_FCGI_PARAMS,
HIO_FCGI_STDIN
};
if (wrlen > 0)
{
HIO_ASSERT (sess->conn->hio, wrlen >= HIO_SIZEOF(hio_fcgi_record_header_t));
wrlen -= HIO_SIZEOF(hio_fcgi_record_header_t);
}
sess->on_write (sess, rqtype[type], wrlen, sess->ctx);
return 0; return 0;
} }
@ -151,8 +171,8 @@ static int sck_on_read (hio_dev_sck_t* sck, const void* data, hio_iolen_t dlen,
else if (dlen == 0) else if (dlen == 0)
{ {
/* EOF */ /* EOF */
hio_dev_sck_halt (sck);
/* fire all related fcgi sessions?? -> handled on disconnect?? */ /* fire all related fcgi sessions?? -> handled on disconnect?? */
hio_dev_sck_halt (sck);
} }
else else
{ {
@ -230,12 +250,9 @@ static int sck_on_read (hio_dev_sck_t* sck, const void* data, hio_iolen_t dlen,
if (!sess || !sess->active) if (!sess || !sess->active)
{ {
/* discard the record. no associated sessoin or inactive session */ /* discard the record. no associated sessoin or inactive session */
HIO_DEBUG2 (hio, "UNKNOWN SESSION ..................... %p %d\n", sess, conn->r.id);
if (sess) HIO_DEBUG1 (hio, "UNKNOWN SESSION active? %d\n", sess->active);
goto back_to_header; goto back_to_header;
} }
HIO_DEBUG2 (hio, "OK SESSION ..................... %p %d\n", sess, conn->r.id);
/* the complete body is in conn->r.buf */ /* the complete body is in conn->r.buf */
if (conn->r.type == HIO_FCGI_END_REQUEST) if (conn->r.type == HIO_FCGI_END_REQUEST)
{ {
@ -412,7 +429,7 @@ static void free_connections (hio_svc_fcgic_t* fcgic)
} }
} }
static hio_svc_fcgic_sess_t* new_session (hio_svc_fcgic_t* fcgic, const hio_skad_t* fcgis_addr, hio_svc_fcgic_on_read_t on_read, hio_svc_fcgic_on_untie_t on_untie, void* ctx) static hio_svc_fcgic_sess_t* new_session (hio_svc_fcgic_t* fcgic, const hio_skad_t* fcgis_addr, hio_svc_fcgic_on_read_t on_read, hio_svc_fcgic_on_write_t on_write, hio_svc_fcgic_on_untie_t on_untie, void* ctx)
{ {
hio_t* hio = fcgic->hio; hio_t* hio = fcgic->hio;
hio_svc_fcgic_conn_t* conn; hio_svc_fcgic_conn_t* conn;
@ -460,6 +477,7 @@ static hio_svc_fcgic_sess_t* new_session (hio_svc_fcgic_t* fcgic, const hio_skad
conn->sess.free = sess->next; conn->sess.free = sess->next;
sess->on_read = on_read; sess->on_read = on_read;
sess->on_write = on_write;
sess->on_untie = on_untie; sess->on_untie = on_untie;
sess->active = 1; sess->active = 1;
sess->ctx = ctx; sess->ctx = ctx;
@ -517,10 +535,10 @@ void hio_svc_fcgic_stop (hio_svc_fcgic_t* fcgic)
HIO_DEBUG1 (hio, "FCGIC - STOPPED SERVICE %p\n", fcgic); HIO_DEBUG1 (hio, "FCGIC - STOPPED SERVICE %p\n", fcgic);
} }
hio_svc_fcgic_sess_t* hio_svc_fcgic_tie (hio_svc_fcgic_t* fcgic, const hio_skad_t* addr, hio_svc_fcgic_on_read_t on_read, hio_svc_fcgic_on_untie_t on_untie, void* ctx) hio_svc_fcgic_sess_t* hio_svc_fcgic_tie (hio_svc_fcgic_t* fcgic, const hio_skad_t* addr, hio_svc_fcgic_on_read_t on_read, hio_svc_fcgic_on_write_t on_write, hio_svc_fcgic_on_untie_t on_untie, void* ctx)
{ {
/* TODO: reference counting for safety?? */ /* TODO: reference counting for safety?? */
return new_session(fcgic, addr, on_read, on_untie, ctx); return new_session(fcgic, addr, on_read, on_write, on_untie, ctx);
} }
void hio_svc_fcgic_untie (hio_svc_fcgic_sess_t* sess) void hio_svc_fcgic_untie (hio_svc_fcgic_sess_t* sess)
@ -534,6 +552,7 @@ int hio_svc_fcgic_beginrequest (hio_svc_fcgic_sess_t* sess)
hio_iovec_t iov[2]; hio_iovec_t iov[2];
hio_fcgi_record_header_t h; hio_fcgi_record_header_t h;
hio_fcgi_begin_request_body_t b; hio_fcgi_begin_request_body_t b;
void* wrctx;
if (!sess->conn->dev) if (!sess->conn->dev)
{ {
@ -559,8 +578,10 @@ int hio_svc_fcgic_beginrequest (hio_svc_fcgic_sess_t* sess)
iov[1].iov_ptr = &b; iov[1].iov_ptr = &b;
iov[1].iov_len = HIO_SIZEOF(b); iov[1].iov_len = HIO_SIZEOF(b);
HIO_ASSERT (sess->conn->hio, ((hio_oow_t)sess & 3) == 0);
wrctx = ((hio_oow_t)sess | 0); /* see the sck_on_write() */
/* TODO: check if sess->conn->dev is still valid */ /* TODO: check if sess->conn->dev is still valid */
return hio_dev_sck_writev(sess->conn->dev, iov, 2, HIO_NULL, HIO_NULL); return hio_dev_sck_writev(sess->conn->dev, iov, 2, wrctx, HIO_NULL);
} }
int hio_svc_fcgic_writeparam (hio_svc_fcgic_sess_t* sess, const void* key, hio_iolen_t ksz, const void* val, hio_iolen_t vsz) int hio_svc_fcgic_writeparam (hio_svc_fcgic_sess_t* sess, const void* key, hio_iolen_t ksz, const void* val, hio_iolen_t vsz)
@ -569,6 +590,7 @@ int hio_svc_fcgic_writeparam (hio_svc_fcgic_sess_t* sess, const void* key, hio_i
hio_fcgi_record_header_t h; hio_fcgi_record_header_t h;
hio_uint8_t sz[8]; hio_uint8_t sz[8];
hio_oow_t szc = 0; hio_oow_t szc = 0;
void* wrctx;
if (!sess->conn->dev) if (!sess->conn->dev)
{ {
@ -637,13 +659,16 @@ int hio_svc_fcgic_writeparam (hio_svc_fcgic_sess_t* sess, const void* key, hio_i
iov[3].iov_len = vsz; iov[3].iov_len = vsz;
} }
return hio_dev_sck_writev(sess->conn->dev, iov, (ksz > 0? 4: 1), HIO_NULL, HIO_NULL); HIO_ASSERT (sess->conn->hio, ((hio_oow_t)sess & 3) == 0);
wrctx = ((hio_oow_t)sess | 1); /* see the sck_on_write() */
return hio_dev_sck_writev(sess->conn->dev, iov, (ksz > 0? 4: 1), wrctx, HIO_NULL);
} }
int hio_svc_fcgic_writestdin (hio_svc_fcgic_sess_t* sess, const void* data, hio_iolen_t size) int hio_svc_fcgic_writestdin (hio_svc_fcgic_sess_t* sess, const void* data, hio_iolen_t size)
{ {
hio_iovec_t iov[2]; hio_iovec_t iov[2];
hio_fcgi_record_header_t h; hio_fcgi_record_header_t h;
void* wrctx;
if (!sess->conn->dev) if (!sess->conn->dev)
{ {
@ -665,6 +690,8 @@ int hio_svc_fcgic_writestdin (hio_svc_fcgic_sess_t* sess, const void* data, hio_
iov[1].iov_len = size; iov[1].iov_len = size;
} }
HIO_ASSERT (sess->conn->hio, ((hio_oow_t)sess & 3) == 0);
wrctx = ((hio_oow_t)sess | 2); /* see the sck_on_write() */
/* TODO: check if sess->conn->dev is still valid */ /* TODO: check if sess->conn->dev is still valid */
return hio_dev_sck_writev(sess->conn->dev, iov, (size > 0? 2: 1), HIO_NULL, HIO_NULL); return hio_dev_sck_writev(sess->conn->dev, iov, (size > 0? 2: 1), wrctx, HIO_NULL);
} }

View File

@ -127,6 +127,13 @@ typedef int (*hio_svc_fcgic_on_read_t) (
void* ctx void* ctx
); );
typedef int (*hio_svc_fcgic_on_write_t) (
hio_svc_fcgic_sess_t* sess,
hio_fcgi_req_type_t rqtype,
hio_iolen_t wrlen,
void* wrctx
);
typedef void (*hio_svc_fcgic_on_untie_t) ( typedef void (*hio_svc_fcgic_on_untie_t) (
hio_svc_fcgic_sess_t* sess, hio_svc_fcgic_sess_t* sess,
void* ctx; void* ctx;
@ -138,6 +145,7 @@ struct hio_svc_fcgic_sess_t
hio_oow_t sid; hio_oow_t sid;
hio_svc_fcgic_conn_t* conn; hio_svc_fcgic_conn_t* conn;
hio_svc_fcgic_on_read_t on_read; hio_svc_fcgic_on_read_t on_read;
hio_svc_fcgic_on_write_t on_write;
hio_svc_fcgic_on_untie_t on_untie; hio_svc_fcgic_on_untie_t on_untie;
void* ctx; void* ctx;
@ -171,6 +179,7 @@ HIO_EXPORT hio_svc_fcgic_sess_t* hio_svc_fcgic_tie (
hio_svc_fcgic_t* fcgic, hio_svc_fcgic_t* fcgic,
const hio_skad_t* fcgis_addr, const hio_skad_t* fcgis_addr,
hio_svc_fcgic_on_read_t on_read, hio_svc_fcgic_on_read_t on_read,
hio_svc_fcgic_on_write_t on_write,
hio_svc_fcgic_on_untie_t on_untie, hio_svc_fcgic_on_untie_t on_untie,
void* ctx void* ctx
); );

View File

@ -746,7 +746,7 @@ static HIO_INLINE void handle_event (hio_t* hio, hio_dev_t* dev, int events, int
} }
else /*if (x >= 1) */ else /*if (x >= 1) */
{ {
/* call on_write() callbacks enqueued fro the device before calling on_read(). /* call on_write() callbacks enqueued from the device before calling on_read().
* if on_write() callback is delayed, there can be out-of-order execution * if on_write() callback is delayed, there can be out-of-order execution
* between on_read() and on_write() callbacks. for instance, if a write request * between on_read() and on_write() callbacks. for instance, if a write request
* is started from within on_read() callback, and the input data is available * is started from within on_read() callback, and the input data is available

View File

@ -443,6 +443,58 @@ oops:
return 0; return 0;
} }
static int cgi_peer_on_write (hio_dev_pro_t* pro, hio_iolen_t wrlen, void* wrctx)
{
hio_t* hio = pro->hio;
cgi_peer_xtn_t* peer = hio_dev_pro_getxtn(pro);
cgi_t* cgi = peer->cgi;
if (cgi == HIO_NULL) return 0; /* there is nothing i can do. the cgi is being cleared or has been cleared already. */
HIO_ASSERT (hio, cgi->peer == pro);
if (wrlen <= -1)
{
HIO_DEBUG3 (hio, "HTTS(%p) - unable to write to peer %p(pid=%u)\n", cgi->htts, pro, (int)pro->child_pid);
goto oops;
}
else if (wrlen == 0)
{
/* indicated EOF */
/* do nothing here as i didn't increment num_pending_writes_to_peer when making the write request */
cgi->num_pending_writes_to_peer--;
HIO_ASSERT (hio, cgi->num_pending_writes_to_peer == 0);
HIO_DEBUG3 (hio, "HTTS(%p) - indicated EOF to peer %p(pid=%u)\n", cgi->htts, pro, (int)pro->child_pid);
/* indicated EOF to the peer side. i need no more data from the client side.
* i don't need to enable input watching in the client side either */
cgi_mark_over (cgi, CGI_OVER_WRITE_TO_PEER);
}
else
{
HIO_ASSERT (hio, cgi->num_pending_writes_to_peer > 0);
cgi->num_pending_writes_to_peer--;
if (cgi->num_pending_writes_to_peer == CGI_PENDING_IO_THRESHOLD)
{
if (!(cgi->over & CGI_OVER_READ_FROM_CLIENT) &&
hio_dev_sck_read(cgi->csck, 1) <= -1) goto oops;
}
if ((cgi->over & CGI_OVER_READ_FROM_CLIENT) && cgi->num_pending_writes_to_peer <= 0)
{
cgi_mark_over (cgi, CGI_OVER_WRITE_TO_PEER);
}
}
return 0;
oops:
cgi_halt_participating_devices (cgi);
return 0;
}
static int peer_capture_response_header (hio_htre_t* req, const hio_bch_t* key, const hio_htre_hdrval_t* val, void* ctx) static int peer_capture_response_header (hio_htre_t* req, const hio_bch_t* key, const hio_htre_hdrval_t* val, void* ctx)
{ {
hio_svc_htts_cli_t* cli = (hio_svc_htts_cli_t*)ctx; hio_svc_htts_cli_t* cli = (hio_svc_htts_cli_t*)ctx;
@ -657,57 +709,6 @@ static hio_htrd_recbs_t cgi_client_htrd_recbs =
cgi_client_htrd_push_content cgi_client_htrd_push_content
}; };
static int cgi_peer_on_write (hio_dev_pro_t* pro, hio_iolen_t wrlen, void* wrctx)
{
hio_t* hio = pro->hio;
cgi_peer_xtn_t* peer = hio_dev_pro_getxtn(pro);
cgi_t* cgi = peer->cgi;
if (cgi == HIO_NULL) return 0; /* there is nothing i can do. the cgi is being cleared or has been cleared already. */
HIO_ASSERT (hio, cgi->peer == pro);
if (wrlen <= -1)
{
HIO_DEBUG3 (hio, "HTTS(%p) - unable to write to peer %p(pid=%u)\n", cgi->htts, pro, (int)pro->child_pid);
goto oops;
}
else if (wrlen == 0)
{
/* indicated EOF */
/* do nothing here as i didn't increment num_pending_writes_to_peer when making the write request */
cgi->num_pending_writes_to_peer--;
HIO_ASSERT (hio, cgi->num_pending_writes_to_peer == 0);
HIO_DEBUG3 (hio, "HTTS(%p) - indicated EOF to peer %p(pid=%u)\n", cgi->htts, pro, (int)pro->child_pid);
/* indicated EOF to the peer side. i need no more data from the client side.
* i don't need to enable input watching in the client side either */
cgi_mark_over (cgi, CGI_OVER_WRITE_TO_PEER);
}
else
{
HIO_ASSERT (hio, cgi->num_pending_writes_to_peer > 0);
cgi->num_pending_writes_to_peer--;
if (cgi->num_pending_writes_to_peer == CGI_PENDING_IO_THRESHOLD)
{
if (!(cgi->over & CGI_OVER_READ_FROM_CLIENT) &&
hio_dev_sck_read(cgi->csck, 1) <= -1) goto oops;
}
if ((cgi->over & CGI_OVER_READ_FROM_CLIENT) && cgi->num_pending_writes_to_peer <= 0)
{
cgi_mark_over (cgi, CGI_OVER_WRITE_TO_PEER);
}
}
return 0;
oops:
cgi_halt_participating_devices (cgi);
return 0;
}
static void cgi_client_on_disconnect (hio_dev_sck_t* sck) static void cgi_client_on_disconnect (hio_dev_sck_t* sck)
{ {
hio_svc_htts_cli_t* cli = hio_dev_sck_getxtn(sck); hio_svc_htts_cli_t* cli = hio_dev_sck_getxtn(sck);

View File

@ -219,6 +219,7 @@ static HIO_INLINE void fcgi_mark_over (fcgi_t* fcgi, int over_bits)
if (!(old_over & FCGI_OVER_READ_FROM_CLIENT) && (fcgi->over & FCGI_OVER_READ_FROM_CLIENT)) if (!(old_over & FCGI_OVER_READ_FROM_CLIENT) && (fcgi->over & FCGI_OVER_READ_FROM_CLIENT))
{ {
/* finished reading from the client. stop watching read */
if (fcgi->csck && hio_dev_sck_read(fcgi->csck, 0) <= -1) if (fcgi->csck && hio_dev_sck_read(fcgi->csck, 0) <= -1)
{ {
HIO_DEBUG2 (fcgi->htts->hio, "HTTS(%p) - halting client(%p) for failure to disable input watching\n", fcgi->htts, fcgi->csck); HIO_DEBUG2 (fcgi->htts->hio, "HTTS(%p) - halting client(%p) for failure to disable input watching\n", fcgi->htts, fcgi->csck);
@ -246,6 +247,7 @@ static HIO_INLINE void fcgi_mark_over (fcgi_t* fcgi, int over_bits)
if (fcgi->csck) if (fcgi->csck)
{ {
HIO_DEBUG2 (hio, "HTTS(%p) - ALL OVER keeping client(%p) alive\n", fcgi->htts, fcgi->csck);
if (fcgi->keep_alive && !fcgi->client_eof_detected) if (fcgi->keep_alive && !fcgi->client_eof_detected)
{ {
HIO_DEBUG2 (hio, "HTTS(%p) - keeping client(%p) alive\n", fcgi->htts, fcgi->csck); HIO_DEBUG2 (hio, "HTTS(%p) - keeping client(%p) alive\n", fcgi->htts, fcgi->csck);
@ -377,6 +379,24 @@ oops:
return 0; return 0;
} }
static int fcgi_peer_on_write (hio_svc_fcgic_sess_t* peer, hio_fcgi_req_type_t rqtype, hio_iolen_t wrlen, void* wrctx)
{
fcgi_t* fcgi = (fcgi_t*)wrctx;
if (wrlen <= -1) goto oops;
if (rqtype == HIO_FCGI_STDIN && wrlen == 0)
{
/* completely wrote end of stdin to the cgi server */
fcgi_mark_over (fcgi, FCGI_OVER_WRITE_TO_PEER);
}
return 0;
oops:
fcgi_halt_participating_devices (fcgi);
return 0;
}
static int peer_capture_response_header (hio_htre_t* req, const hio_bch_t* key, const hio_htre_hdrval_t* val, void* ctx) static int peer_capture_response_header (hio_htre_t* req, const hio_bch_t* key, const hio_htre_hdrval_t* val, void* ctx)
{ {
hio_svc_htts_cli_t* cli = (hio_svc_htts_cli_t*)ctx; hio_svc_htts_cli_t* cli = (hio_svc_htts_cli_t*)ctx;
@ -905,6 +925,8 @@ static void unbind_task_from_client (fcgi_t* fcgi, int rcdown)
fcgi->client_org_on_disconnect = HIO_NULL; fcgi->client_org_on_disconnect = HIO_NULL;
} }
HIO_DEBUG2 (csck->hio, "UNBINDING CLEINT FROM TASK... client=%p csck=%p\n", fcgi->client, csck);
/* there is some ordering issue in using HIO_SVC_HTTS_TASK_UNREF() /* there is some ordering issue in using HIO_SVC_HTTS_TASK_UNREF()
* because it can destroy the fcgi itself. so reset fcgi->client->task * because it can destroy the fcgi itself. so reset fcgi->client->task
* to null and call RCDOWN() later */ * to null and call RCDOWN() later */
@ -932,7 +954,7 @@ static int bind_task_to_peer (fcgi_t* fcgi, const hio_skad_t* fcgis_addr)
hio_htrd_setoption (htrd, HIO_HTRD_SKIP_INITIAL_LINE | HIO_HTRD_RESPONSE); hio_htrd_setoption (htrd, HIO_HTRD_SKIP_INITIAL_LINE | HIO_HTRD_RESPONSE);
hio_htrd_setrecbs (htrd, &peer_htrd_recbs); hio_htrd_setrecbs (htrd, &peer_htrd_recbs);
fcgi->peer = hio_svc_fcgic_tie(fcgi->htts->fcgic, fcgis_addr, fcgi_peer_on_read, fcgi_peer_on_untie, fcgi); fcgi->peer = hio_svc_fcgic_tie(fcgi->htts->fcgic, fcgis_addr, fcgi_peer_on_read, fcgi_peer_on_write, fcgi_peer_on_untie, fcgi);
if (HIO_UNLIKELY(!fcgi->peer)) if (HIO_UNLIKELY(!fcgi->peer))
{ {
hio_htrd_close (htrd); hio_htrd_close (htrd);
@ -1064,7 +1086,6 @@ static int setup_for_content_length(fcgi_t* fcgi, hio_htre_t* req)
} }
#endif #endif
#if 0
/* this may change later if Content-Length is included in the fcgi output */ /* this may change later if Content-Length is included in the fcgi output */
if (req->flags & HIO_HTRE_ATTR_KEEPALIVE) if (req->flags & HIO_HTRE_ATTR_KEEPALIVE)
{ {
@ -1073,7 +1094,6 @@ static int setup_for_content_length(fcgi_t* fcgi, hio_htre_t* req)
/* the mode still can get switched to FCGI_RES_MODE_LENGTH if the fcgi script emits Content-Length */ /* the mode still can get switched to FCGI_RES_MODE_LENGTH if the fcgi script emits Content-Length */
} }
else else
#endif
{ {
fcgi->keep_alive = 0; fcgi->keep_alive = 0;
fcgi->res_mode_to_cli = FCGI_RES_MODE_CLOSE; fcgi->res_mode_to_cli = FCGI_RES_MODE_CLOSE;