changed mio_exec() to kill the device if the on_write callback() returns failure. before this change, mio_exec() also returned -1, causing mio_loop() to abort

This commit is contained in:
hyung-hwan 2020-05-10 16:20:39 +00:00
parent e2115286ec
commit 6cb06f8b57
6 changed files with 213 additions and 12 deletions

View File

@ -6,6 +6,7 @@
#include <unistd.h> /* TODO: move file operations to sys-file.XXX */ #include <unistd.h> /* TODO: move file operations to sys-file.XXX */
#include <fcntl.h> #include <fcntl.h>
#include <sys/stat.h> #include <sys/stat.h>
#include <pthread.h>
struct mio_svc_htts_t struct mio_svc_htts_t
{ {
@ -46,6 +47,21 @@ struct htrd_xtn_t
typedef struct htrd_xtn_t htrd_xtn_t; typedef struct htrd_xtn_t htrd_xtn_t;
/* ------------------------------------------------------------------------ */ /* ------------------------------------------------------------------------ */
static int test_func_handler (int rfd, int wfd)
{
int i;
/* you can read the post data from rfd;
* you can write result to wfd */
write (wfd, "Content-Type: text/plain\r\n\r\n", 28);
for (i = 0 ; i < 10; i++)
{
write (wfd, "hello\n", 6);
sleep (1);
}
return -1;
}
static int process_request (mio_svc_htts_t* htts, mio_dev_sck_t* csck, mio_htre_t* req, int peek) static int process_request (mio_svc_htts_t* htts, mio_dev_sck_t* csck, mio_htre_t* req, int peek)
{ {
//server_xtn_t* server_xtn = GET_SERVER_XTN(htts, client->server); //server_xtn_t* server_xtn = GET_SERVER_XTN(htts, client->server);
@ -141,12 +157,34 @@ if (mio_htre_getcontentlen(req) > 0)
else else
{ {
/* TODO: handle 100 continue??? */ /* TODO: handle 100 continue??? */
if ((req->flags & MIO_HTRE_ATTR_EXPECT) &&
mio_comp_http_version_numbers(&req->version, 1, 1) >= 0 &&
mio_htre_getcontentlen(req) <= 0)
{
if (req->flags & MIO_HTRE_ATTR_EXPECT100)
{
mio_dev_sck_write(csck, "HTTP/1.1 100 Continue\r\n", 23, MIO_NULL, MIO_NULL);
}
else
{
}
}
const mio_bch_t* qpath = mio_htre_getqpath(req); const mio_bch_t* qpath = mio_htre_getqpath(req);
if (mio_svc_htts_sendfile(htts, csck, qpath, 200, mth, mio_htre_getversion(req), (req->flags & MIO_HTRE_ATTR_KEEPALIVE)) <= -1) if (mio_comp_bcstr(qpath, "/testfunc", 0) == 0)
{
if (mio_svc_htts_sendcgi(htts, csck, test_func_handler, req) <= -1)
{
mio_htre_discardcontent (req);
mio_dev_sck_halt (csck);
}
}
else if (mio_svc_htts_sendfile(htts, csck, qpath, 200, mth, mio_htre_getversion(req), (req->flags & MIO_HTRE_ATTR_KEEPALIVE)) <= -1)
{ {
mio_htre_discardcontent (req); mio_htre_discardcontent (req);
mio_dev_halt (csck); mio_dev_sck_halt (csck);
} }
/* /*
if (mio_comp_bcstr(qpath, "/mio.c", 0) == 0) if (mio_comp_bcstr(qpath, "/mio.c", 0) == 0)
@ -357,11 +395,11 @@ static int client_on_write (mio_dev_sck_t* sck, mio_iolen_t wrlen, void* wrctx,
/* 0: end of resource /* 0: end of resource
* -1: error or incompelete transmission. * -1: error or incompelete transmission.
* arrange to close connection regardless of Connection: Keep-Alive or Connection: close */ * arrange to close connection regardless of Connection: Keep-Alive or Connection: close */
if (n <= -1 && mio_dev_sck_write(sck, MIO_NULL, 0, MIO_NULL, MIO_NULL) <= -1) mio_dev_sck_halt (sck); if (n <= -1 && mio_dev_sck_write(sck, MIO_NULL, 0, MIO_NULL, MIO_NULL) <= -1) mio_dev_sck_halt (sck);
} }
} }
return 0; /* if this returns failure, the listener socket gets terminated. it should never return failure. */ return 0;
} }
static void client_on_disconnect (mio_dev_sck_t* sck) static void client_on_disconnect (mio_dev_sck_t* sck)
@ -613,6 +651,21 @@ int mio_svc_htts_sendrsrc (mio_svc_htts_t* htts, mio_dev_sck_t* csck, mio_svc_ht
mio_bch_t dtbuf[64]; mio_bch_t dtbuf[64];
mio_oow_t x; mio_oow_t x;
#if 0
if ((req->flags & MIO_HTRE_ATTR_EXPECT) &&
mio_comp_http_version_numbers(&req->version, 1, 1) >= 0 &&
mio_htre_getcontentlen(req) <= 0)
{
if (req->flags & MIO_HTRE_ATTR_EXPECT100)
{
mio_dev_sck_write(csck, "HTTP/1.1 100 Continue\r\n", 23, MIO_NULL, MIO_NULL);
}
else
{
}
}
#endif
mio_svc_htts_fmtgmtime(htts, MIO_NULL, dtbuf, MIO_COUNTOF(dtbuf)); mio_svc_htts_fmtgmtime(htts, MIO_NULL, dtbuf, MIO_COUNTOF(dtbuf));
x = mio_becs_fmt(csckxtn->c.sbuf, "HTTP/%d.%d %d %hs\r\nServer: %hs\r\nDate: %hs\r\nConnection: %s\r\n", x = mio_becs_fmt(csckxtn->c.sbuf, "HTTP/%d.%d %d %hs\r\nServer: %hs\r\nDate: %hs\r\nConnection: %s\r\n",
@ -756,12 +809,126 @@ int mio_svc_htts_schedproxy (mio_svc_htts_t* htts, mio_dev_sck_t* csck, mio_htre
4. start proxying 4. start proxying
5. if one side is stalled, don't read from another side... let the kernel slow the connection... 5. if one side is stalled, donot read from another side... let the kernel slow the connection...
i need to know how may bytes are pending for this. i need to know how may bytes are pending for this.
if pending too high, disable read watching... mio_dev_watch (csck, MIO_DEV_WATCH_RENEW, 0); if pending too high, disable read watching... mio_dev_watch (csck, MIO_DEV_WATCH_RENEW, 0);
} }
#endif #endif
/* ----------------------------------------------------------------- */
typedef void (*mio_svc_htts_rsrc_cgi_t) (
int rfd,
int wfd
);
struct mio_svc_htts_rsrc_cgi_peer_t
{
int rfd;
int wfd;
};
typedef struct mio_svc_htts_rsrc_cgi_peer_t mio_svc_htts_rsrc_cgi_peer_t;
enum mio_svc_htts_rsrc_cgi_type_t
{
MIO_SVC_HTTS_RSRC_CGI_TYPE_FUNC,
MIO_SVC_HTTS_RSRC_CGI_TYPE_PROC
};
typedef enum mio_svc_htts_rsrc_cgi_type_t mio_svc_htts_rsrc_cgi_type_t;
struct rsrc_cgi_xtn_t
{
mio_svc_htts_rsrc_cgi_type_t type;
int rfd;
int wfd;
mio_svc_htts_rsrc_cgi_t handler;
pthread_t thr;
mio_svc_htts_rsrc_cgi_peer_t peer;
};
typedef struct rsrc_cgi_xtn_t rsrc_cgi_xtn_t;
static int rsrc_cgi_on_write (mio_svc_htts_rsrc_t* rsrc, mio_dev_sck_t* sck)
{
rsrc_cgi_xtn_t* file = (rsrc_cgi_xtn_t*)mio_svc_htts_rsrc_getxtn(rsrc);
return 0;
}
static void rsrc_cgi_on_kill (mio_svc_htts_rsrc_t* rsrc)
{
rsrc_cgi_xtn_t* cgi = (rsrc_cgi_xtn_t*)mio_svc_htts_rsrc_getxtn(rsrc);
close (cgi->rfd); cgi->rfd = -1;
close (cgi->wfd); cgi->wfd = -1;
switch (cgi->type)
{
case MIO_SVC_HTTS_RSRC_CGI_TYPE_FUNC:
/* TODO: check cgi->thr is valid.
* non-blocking way? if alive, kill gracefully?? */
pthread_join (cgi->thr, MIO_NULL);
break;
case MIO_SVC_HTTS_RSRC_CGI_TYPE_PROC:
/* TODO:
waitpid with no wait
still alive kill
waitpid with no wait.
*/
break;
}
}
static void* cgi_thr_func (void* ctx)
{
rsrc_cgi_xtn_t* func = (rsrc_cgi_xtn_t*)ctx;
func->handler (func->peer.rfd, func->peer.wfd);
close (func->peer.rfd); func->peer.rfd = -1;
close (func->peer.wfd); func->peer.wfd = -1;
return MIO_NULL;
}
int mio_svc_htts_sendcgi (mio_svc_htts_t* htts, mio_dev_sck_t* csck, mio_svc_htts_rsrc_cgi_t handler, mio_htre_t* req)
{
mio_svc_htts_rsrc_t* rsrc = MIO_NULL;
rsrc_cgi_xtn_t* cgi = MIO_NULL;
int pfd[2];
rsrc = mio_svc_htts_rsrc_make(htts, rsrc_cgi_on_write, rsrc_cgi_on_kill, MIO_SIZEOF(*cgi));
if (MIO_UNLIKELY(!rsrc)) goto oops;
cgi = mio_svc_htts_rsrc_getxtn(rsrc);
cgi->type = MIO_SVC_HTTS_RSRC_CGI_TYPE_FUNC;
cgi->handler = handler;
cgi->rfd = -1;
cgi->wfd = -1;
cgi->peer.rfd = -1;
cgi->peer.wfd = -1;
if (pipe(pfd) == -1) goto oops;
cgi->peer.rfd = pfd[0];
cgi->wfd = pfd[1];
if (pipe(pfd) == -1) goto oops;
cgi->rfd = pfd[0];
cgi->peer.wfd = pfd[1];
if (pthread_create(&cgi->thr, MIO_NULL, cgi_thr_func, cgi) != 0) goto oops;
return 0;
oops:
if (cgi)
{
if (cgi->peer.rfd >= 0) { close (cgi->peer.rfd); cgi->peer.rfd = -1; }
if (cgi->peer.wfd >= 0) { close (cgi->peer.wfd); cgi->peer.wfd = -1; }
if (cgi->rfd >= 0) { close (cgi->rfd); cgi->rfd = -1; }
if (cgi->wfd >= 0) { close (cgi->wfd); cgi->wfd = -1; }
}
if (rsrc) mio_svc_htts_rsrc_kill (rsrc);
return -1;
}
int mio_svc_htts_sendstatus (mio_svc_htts_t* htts, mio_dev_sck_t* csck, int status_code, mio_http_method_t method, const mio_http_version_t* version, int keepalive, void* extra) int mio_svc_htts_sendstatus (mio_svc_htts_t* htts, mio_dev_sck_t* csck, int status_code, mio_http_method_t method, const mio_http_version_t* version, int keepalive, void* extra)
{ {
/* TODO: change this to use send status */ /* TODO: change this to use send status */
@ -822,7 +989,6 @@ int mio_svc_htts_sendstatus (mio_svc_htts_t* htts, mio_dev_sck_t* csck, int stat
break; break;
} }
mio_svc_htts_fmtgmtime(htts, MIO_NULL, dtbuf, MIO_COUNTOF(dtbuf)); mio_svc_htts_fmtgmtime(htts, MIO_NULL, dtbuf, MIO_COUNTOF(dtbuf));
x = mio_becs_fmt(csckxtn->c.sbuf, "HTTP/%d.%d %d %s\r\nServer: %s\r\nDate: %s\r\nConnection: %s\r\nContent-Type: text/html\r\nContent-Length: %u\r\n%s%s%s\r\n%s", x = mio_becs_fmt(csckxtn->c.sbuf, "HTTP/%d.%d %d %s\r\nServer: %s\r\nDate: %s\r\nConnection: %s\r\nContent-Type: text/html\r\nContent-Length: %u\r\n%s%s%s\r\n%s",

View File

@ -34,6 +34,12 @@ int mio_comp_http_versions (const mio_http_version_t* v1, const mio_http_version
return v1->major - v2->major; return v1->major - v2->major;
} }
int mio_comp_http_version_numbers (const mio_http_version_t* v1, int v2_major, int v2_minor)
{
if (v1->major == v2_major) return v1->minor - v2_minor;
return v1->major - v2_major;
}
const mio_bch_t* mio_http_status_to_bcstr (int code) const mio_bch_t* mio_http_status_to_bcstr (int code)
{ {
const mio_bch_t* msg; const mio_bch_t* msg;

View File

@ -157,10 +157,9 @@ struct mio_htre_t
#define mio_htre_getsmesg(re) ((re)->u.s.mesg) #define mio_htre_getsmesg(re) ((re)->u.s.mesg)
#define mio_htre_getcontent(re) (&(re)->content) #define mio_htre_getcontent(re) (&(re)->content)
#define mio_htre_getcontentxstr(re) MIO_MBS_XSTR(&(re)->content) #define mio_htre_getcontentbcs(re) MIO_BECS_BCS(&(re)->content)
#define mio_htre_getcontentcstr(re) MIO_MBS_CSTR(&(re)->content) #define mio_htre_getcontentptr(re) MIO_BECS_PTR(&(re)->content)
#define mio_htre_getcontentptr(re) MIO_MBS_PTR(&(re)->content) #define mio_htre_getcontentlen(re) MIO_BECS_LEN(&(re)->content)
#define mio_htre_getcontentlen(re) MIO_MBS_LEN(&(re)->content)
typedef int (*mio_htre_header_walker_t) ( typedef int (*mio_htre_header_walker_t) (
mio_htre_t* re, mio_htre_t* re,

View File

@ -204,6 +204,13 @@ MIO_EXPORT int mio_comp_http_versions (
const mio_http_version_t* v2 const mio_http_version_t* v2
); );
MIO_EXPORT int mio_comp_http_version_numbers (
const mio_http_version_t* v1,
int v2_major,
int v2_minor
);
MIO_EXPORT const mio_bch_t* mio_http_status_to_bcstr ( MIO_EXPORT const mio_bch_t* mio_http_status_to_bcstr (
int code int code
); );
@ -327,6 +334,17 @@ MIO_EXPORT void mio_svc_htts_fmtgmtime (
mio_oow_t len mio_oow_t len
); );
MIO_EXPORT mio_svc_htts_rsrc_t* mio_svc_htts_rsrc_make (
mio_svc_htts_t* htts,
mio_svc_htts_rsrc_on_write_t on_write,
mio_svc_htts_rsrc_on_kill_t on_kill,
mio_oow_t xtnsize
);
MIO_EXPORT void mio_svc_htts_rsrc_kill (
mio_svc_htts_rsrc_t* rsrc
);
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif

View File

@ -405,6 +405,7 @@ static MIO_INLINE void handle_event (mio_t* mio, mio_dev_t* dev, int events, int
if (y <= -1) if (y <= -1)
{ {
MIO_DEBUG1 (mio, "Error returned by on_write() of device %p\n", dev);
mio_dev_halt (dev); mio_dev_halt (dev);
dev = MIO_NULL; dev = MIO_NULL;
break; break;
@ -584,9 +585,18 @@ int mio_exec (mio_t* mio)
{ {
mio_cwq_t* cwq; mio_cwq_t* cwq;
mio_oow_t cwqfl_index; mio_oow_t cwqfl_index;
mio_dev_t* dev_to_halt;
cwq = MIO_CWQ_HEAD(&mio->cwq); cwq = MIO_CWQ_HEAD(&mio->cwq);
if (cwq->dev->dev_evcb->on_write(cwq->dev, cwq->olen, cwq->ctx, &cwq->dstaddr) <= -1) return -1; if (cwq->dev->dev_evcb->on_write(cwq->dev, cwq->olen, cwq->ctx, &cwq->dstaddr) <= -1)
{
MIO_DEBUG1 (mio, "Error returned by on_write() of device %p in cwq\n", cwq->dev);
dev_to_halt = cwq->dev;
}
else
{
dev_to_halt = MIO_NULL;
}
cwq->dev->cw_count--; cwq->dev->cw_count--;
MIO_CWQ_UNLINK (cwq); MIO_CWQ_UNLINK (cwq);
@ -602,6 +612,8 @@ int mio_exec (mio_t* mio)
/* TODO: more reuse of objects of different size? */ /* TODO: more reuse of objects of different size? */
mio_freemem (mio, cwq); mio_freemem (mio, cwq);
} }
if (dev_to_halt) mio_dev_halt (dev_to_halt);
} }
/* execute the scheduled jobs before checking devices with the /* execute the scheduled jobs before checking devices with the

View File

@ -1,4 +1,4 @@
/*m /*
* $Id$ * $Id$
* *
Copyright (c) 2016-2020 Chung, Hyung-Hwan. All rights reserved. Copyright (c) 2016-2020 Chung, Hyung-Hwan. All rights reserved.