written some code for proxy handling

This commit is contained in:
hyung-hwan 2012-03-27 15:45:10 +00:00
parent 5a0bd73990
commit 3d07790370
5 changed files with 643 additions and 73 deletions

View File

@ -1,19 +1,12 @@
QSE is a code library that implements various Unix utilities in an The QSE library implements AWK, SED, and other Unix commands in an embeddable
embeddable form and provides a set of APIs to embed them into an application. form and defines data types, functions, and classes that you can use when you
The APIs have been designed to be flexible enough to access various aspects of embed them into an application. It also provides more fundamental data types
a hosting application and an embedded object from each other. By embedding and functions needed when you deal with memory, streams, data structures. The
a Unix utility into an application, a developer is relieved of problems caused interface has been designed to be flexible enough to access various aspects of
by interacting with external programs and is given tighter control over it. embedding application and an embedded object from each other.
Currently the library implements the following utilities: * The library is licensed under the GNU Lesser General Public License
- SED Stream Editor version 3: http://www.gnu.org/licenses/
- CUT Text Cutter * The project webpage is at @QSE_PROJECT_URL@
- AWK Interpreter
As the library grows, more utilities will be added.
The library is licensed under LGPLv3.
The project webpage: @QSE_PROJECT_URL@
For further information, contact: @QSE_PROJECT_AUTHOR@ For further information, contact: @QSE_PROJECT_AUTHOR@

View File

@ -83,6 +83,13 @@ struct qse_httpd_server_t
qse_ubi_t handle; qse_ubi_t handle;
}; };
typedef struct qse_httpd_peer_t qse_httpd_peer_t;
struct qse_httpd_peer_t
{
qse_nwad_t nwad;
qse_ubi_t handle;
};
enum qse_httpd_mux_mask_t enum qse_httpd_mux_mask_t
{ {
QSE_HTTPD_MUX_READ = (1 << 0), QSE_HTTPD_MUX_READ = (1 << 0),
@ -107,6 +114,23 @@ struct qse_httpd_cbs_t
int (*accept) (qse_httpd_t* httpd, qse_httpd_server_t* server, qse_httpd_client_t* client); int (*accept) (qse_httpd_t* httpd, qse_httpd_server_t* server, qse_httpd_client_t* client);
} server; } server;
struct
{
int (*open) (qse_httpd_t* httpd, qse_httpd_peer_t* peer);
void (*close) (qse_httpd_t* httpd, qse_httpd_peer_t* peer);
int (*connected) (qse_httpd_t* httpd, qse_httpd_peer_t* peer);
qse_ssize_t (*recv) (
qse_httpd_t* httpd,
qse_httpd_peer_t* peer,
qse_mchar_t* buf, qse_size_t bufsize);
qse_ssize_t (*send) (
qse_httpd_t* httpd,
qse_httpd_peer_t* peer,
const qse_mchar_t* buf, qse_size_t bufsize);
} peer;
struct struct
{ {
void* (*open) (qse_httpd_t* httpd); void* (*open) (qse_httpd_t* httpd);
@ -454,6 +478,14 @@ qse_httpd_task_t* qse_httpd_entasknph (
qse_htre_t* req qse_htre_t* req
); );
qse_httpd_task_t* qse_httpd_entaskproxy (
qse_httpd_t* httpd,
qse_httpd_client_t* client,
const qse_httpd_task_t* pred,
const qse_nwad_t* nwad,
const qse_htre_t* req
);
/* -------------------------------------------- */ /* -------------------------------------------- */
void* qse_httpd_allocmem ( void* qse_httpd_allocmem (

View File

@ -1484,6 +1484,9 @@ create_process:
goto oops; goto oops;
} }
/* prepare some data before vforking for vfork limitation.
* the child in vfork should not make function calls or
* change data shared with the parent. */
if (!(flags & QSE_PIO_NOCLOEXEC)) if (!(flags & QSE_PIO_NOCLOEXEC))
highest_fd = get_highest_fd (); highest_fd = get_highest_fd ();
envarr = env? qse_env_getarr(env): environ; envarr = env? qse_env_getarr(env): environ;
@ -1498,7 +1501,11 @@ create_process:
if (pid == 0) if (pid == 0)
{ {
/* child */ /* the child after vfork should not make function calls.
* since the system call like close() are also normal
* functions, i have to use assembly macros to make
* system calls. */
qse_pio_hnd_t devnull = -1; qse_pio_hnd_t devnull = -1;
if (!(flags & QSE_PIO_NOCLOEXEC)) if (!(flags & QSE_PIO_NOCLOEXEC))
@ -1522,18 +1529,15 @@ create_process:
{ {
/* child should read */ /* child should read */
QSE_SYSCALL1 (dummy, SYS_close, handle[1]); QSE_SYSCALL1 (dummy, SYS_close, handle[1]);
/*handle[1] = QSE_PIO_HND_NIL;*/
QSE_SYSCALL2 (dummy, SYS_dup2, handle[0], 0); QSE_SYSCALL2 (dummy, SYS_dup2, handle[0], 0);
if (dummy <= -1) goto child_oops; if (dummy <= -1) goto child_oops;
QSE_SYSCALL1 (dummy, SYS_close, handle[0]); QSE_SYSCALL1 (dummy, SYS_close, handle[0]);
/*handle[0] = QSE_PIO_HND_NIL;*/
} }
if (flags & QSE_PIO_READOUT) if (flags & QSE_PIO_READOUT)
{ {
/* child should write */ /* child should write */
QSE_SYSCALL1 (dummy, SYS_close, handle[2]); QSE_SYSCALL1 (dummy, SYS_close, handle[2]);
/*handle[2] = QSE_PIO_HND_NIL;*/
QSE_SYSCALL2 (dummy, SYS_dup2, handle[3], 1); QSE_SYSCALL2 (dummy, SYS_dup2, handle[3], 1);
if (dummy <= -1) goto child_oops; if (dummy <= -1) goto child_oops;
@ -1544,14 +1548,12 @@ create_process:
} }
QSE_SYSCALL1 (dummy, SYS_close, handle[3]); QSE_SYSCALL1 (dummy, SYS_close, handle[3]);
/*handle[3] = QSE_PIO_HND_NIL;*/
} }
if (flags & QSE_PIO_READERR) if (flags & QSE_PIO_READERR)
{ {
/* child should write */ /* child should write */
QSE_SYSCALL1 (dummy, SYS_close, handle[4]); QSE_SYSCALL1 (dummy, SYS_close, handle[4]);
/*handle[4] = QSE_PIO_HND_NIL;*/
QSE_SYSCALL2 (dummy, SYS_dup2, handle[5], 2); QSE_SYSCALL2 (dummy, SYS_dup2, handle[5], 2);
if (dummy <= -1) goto child_oops; if (dummy <= -1) goto child_oops;
@ -1562,7 +1564,6 @@ create_process:
} }
QSE_SYSCALL1 (dummy, SYS_close, handle[5]); QSE_SYSCALL1 (dummy, SYS_close, handle[5]);
/*handle[5] = QSE_PIO_HND_NIL;*/
} }
if ((flags & QSE_PIO_INTONUL) || if ((flags & QSE_PIO_INTONUL) ||
@ -1606,11 +1607,7 @@ create_process:
if (flags & QSE_PIO_DROPERR) QSE_SYSCALL1 (dummy, SYS_close, 2); if (flags & QSE_PIO_DROPERR) QSE_SYSCALL1 (dummy, SYS_close, 2);
QSE_SYSCALL3 (dummy, SYS_execve, param.argv[0], param.argv, envarr); QSE_SYSCALL3 (dummy, SYS_execve, param.argv[0], param.argv, envarr);
if (dummy == -1) /*free_param (pio, &param); don't free this in the vfork version */
{
printf ("hello\n");
}
/*free_param (pio, &param); */
child_oops: child_oops:
if (devnull >= 0) QSE_SYSCALL1 (dummy, SYS_close, devnull); if (devnull >= 0) QSE_SYSCALL1 (dummy, SYS_close, devnull);

View File

@ -1719,7 +1719,6 @@ static int task_init_cgi (
if (arg->req->state & QSE_HTRE_DISCARDED) if (arg->req->state & QSE_HTRE_DISCARDED)
{ {
qse_printf (QSE_T("XXXXXXXXXXXXXXXXX\n"));
content_length = 0; content_length = 0;
goto done; goto done;
} }
@ -1727,7 +1726,6 @@ qse_printf (QSE_T("XXXXXXXXXXXXXXXXX\n"));
len = qse_htre_getcontentlen(arg->req); len = qse_htre_getcontentlen(arg->req);
if ((arg->req->state & QSE_HTRE_COMPLETED) && len <= 0) if ((arg->req->state & QSE_HTRE_COMPLETED) && len <= 0)
{ {
qse_printf (QSE_T("YYYYYYYYYYYYYYYYy\n"));
/* the content part is completed and no content /* the content part is completed and no content
* in the content buffer. there is nothing to forward */ * in the content buffer. there is nothing to forward */
content_length = 0; content_length = 0;
@ -1737,7 +1735,6 @@ qse_printf (QSE_T("YYYYYYYYYYYYYYYYy\n"));
if (!(arg->req->state & QSE_HTRE_COMPLETED) && if (!(arg->req->state & QSE_HTRE_COMPLETED) &&
!arg->req->attr.content_length_set) !arg->req->attr.content_length_set)
{ {
qse_printf (QSE_T("ZZZZZZZZZZZZZZZ\n"));
/* if the request is not completed and doesn't have /* if the request is not completed and doesn't have
* content-length set, it's not really possible to * content-length set, it's not really possible to
* pass the content. this function, however, allows * pass the content. this function, however, allows
@ -1770,7 +1767,6 @@ qse_printf (QSE_T("ZZZZZZZZZZZZZZZ\n"));
QSE_ASSERT (len > 0); QSE_ASSERT (len > 0);
QSE_ASSERT (!arg->req->attr.content_length_set || QSE_ASSERT (!arg->req->attr.content_length_set ||
(arg->req->attr.content_length_set && arg->req->attr.content_length == len)); (arg->req->attr.content_length_set && arg->req->attr.content_length == len));
qse_printf (QSE_T("HHHHHHHHHHHHHHHHhh %d\n"), (int)len);
content_length = len; content_length = len;
} }
else else
@ -1792,7 +1788,6 @@ qse_printf (QSE_T("HHHHHHHHHHHHHHHHhh %d\n"), (int)len);
QSE_ASSERT (arg->req->attr.content_length_set); QSE_ASSERT (arg->req->attr.content_length_set);
content_length = arg->req->attr.content_length; content_length = arg->req->attr.content_length;
qse_printf (QSE_T("TTTTTTTTTTTTTTTTTTTT %d\n"), (int)content_length);
} }
} }
@ -2093,7 +2088,11 @@ static int task_main_cgi_2 (
QSE_ASSERT (cgi->pio_inited); QSE_ASSERT (cgi->pio_inited);
qse_printf (QSE_T("[cgi_2 ]\n")); {
qse_ntime_t now;
qse_gettime(&now);
qse_printf (QSE_T("[cgi_2 at %lld]\n"), now);
}
if (task->trigger[2].mask & QSE_HTTPD_TASK_TRIGGER_READABLE) if (task->trigger[2].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{ {
qse_printf (QSE_T("[cgi_2 write]\n")); qse_printf (QSE_T("[cgi_2 write]\n"));
@ -2354,7 +2353,7 @@ qse_httpd_task_t* qse_httpd_entasknph (
typedef struct task_proxy_arg_t task_proxy_arg_t; typedef struct task_proxy_arg_t task_proxy_arg_t;
struct task_proxy_arg_t struct task_proxy_arg_t
{ {
const qse_mchar_t* host; qse_nwad_t peer_nwad;
qse_htre_t* req; qse_htre_t* req;
int nph; int nph;
}; };
@ -2370,6 +2369,11 @@ struct task_proxy_t
qse_htrd_t* htrd; qse_htrd_t* htrd;
qse_httpd_peer_t peer;
#define PEER_OPEN (1 << 0)
#define PEER_CONNECTED (1 << 1)
int peer_status;
qse_htre_t* req; /* original request associated with this */ qse_htre_t* req; /* original request associated with this */
qse_mbs_t* reqfwdbuf; /* content from the request */ qse_mbs_t* reqfwdbuf; /* content from the request */
int reqfwderr; int reqfwderr;
@ -2377,6 +2381,20 @@ struct task_proxy_t
qse_mbs_t* res; qse_mbs_t* res;
qse_mchar_t* res_ptr; qse_mchar_t* res_ptr;
qse_size_t res_left; qse_size_t res_left;
/* if true, close connection after response is sent out */
int disconnect;
/* if true, the content of response is chunked */
int content_chunked;
/* if true, content_length is set. */
int content_length_set;
/* content-length that CGI returned */
qse_size_t content_length;
/* the number of octets in the contents received */
qse_size_t content_received;
qse_mchar_t buf[MAX_SEND_SIZE];
qse_size_t buflen;
}; };
typedef struct proxy_htrd_xtn_t proxy_htrd_xtn_t; typedef struct proxy_htrd_xtn_t proxy_htrd_xtn_t;
@ -2415,22 +2433,20 @@ else qse_printf (QSE_T("!!!PROXY SNATCHING DONE\n"));
* the relay trigger is not needed any more. */ * the relay trigger is not needed any more. */
task->trigger[2].mask = 0; task->trigger[2].mask = 0;
#if 0 if (QSE_MBS_LEN(proxy->reqfwdbuf) > 0 &&
if (QSE_MBS_LEN(proxy->reqfwdbuf) > 0 && proxy->pio_inited && (proxy->peer_status & PEER_CONNECTED) &&
!(task->trigger[1].mask & QSE_HTTPD_TASK_TRIGGER_WRITE)) !(task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_WRITE))
{ {
/* there's nothing more to read from the client side. /* there's nothing more to read from the client side.
* there's something to forward in the forwarding buffer. * there's something to forward in the forwarding buffer.
* but no write trigger is set. add the write trigger * but no write trigger is set. add the write trigger
* for task invocation. */ * for task invocation. */
task->trigger[1].mask = QSE_HTTPD_TASK_TRIGGER_WRITE; task->trigger[0].mask |= QSE_HTTPD_TASK_TRIGGER_WRITE;
task->trigger[1].handle = qse_pio_gethandleasubi (&proxy->pio, QSE_PIO_IN);
} }
#endif
} }
else if (!proxy->reqfwderr) else if (!proxy->reqfwderr)
{ {
/* we can write to the child process if a forwarding error /* we can write to the peer if a forwarding error
* didn't occur previously. we store data from the client side * didn't occur previously. we store data from the client side
* to the forwaring buffer only if there's no such previous * to the forwaring buffer only if there's no such previous
* error. if an error occurred, we simply drop the data. */ * error. if an error occurred, we simply drop the data. */
@ -2446,7 +2462,16 @@ qse_printf (QSE_T("!!!PROXY SNATCHED [%.*hs]\n"), len, ptr);
static int proxy_htrd_peek_request (qse_htrd_t* htrd, qse_htre_t* req) static int proxy_htrd_peek_request (qse_htrd_t* htrd, qse_htre_t* req)
{ {
return -1; proxy_htrd_xtn_t* xtn = (proxy_htrd_xtn_t*) qse_htrd_getxtn (htrd);
task_proxy_t* proxy = xtn->proxy;
/* add initial line and headers to proxy->res */
if (qse_mbs_cat (proxy->res, QSE_MT("\r\n")) == (qse_size_t)-1) return -1;
/* add any contents received so far to cgi->res */
if (qse_mbs_ncat (proxy->res, qse_htre_getcontentptr(req), qse_htre_getcontentlen(req)) == (qse_size_t)-1) return -1;
return 0;
} }
static qse_htrd_recbs_t proxy_htrd_cbs = static qse_htrd_recbs_t proxy_htrd_cbs =
@ -2455,6 +2480,79 @@ static qse_htrd_recbs_t proxy_htrd_cbs =
QSE_NULL /* not needed for proxy */ QSE_NULL /* not needed for proxy */
}; };
static void proxy_forward_content (
qse_httpd_t* httpd, qse_httpd_task_t* task, int writable)
{
task_proxy_t* proxy = (task_proxy_t*)task->ctx;
QSE_ASSERT (proxy->reqfwdbuf != QSE_NULL);
if (QSE_MBS_LEN(proxy->reqfwdbuf) > 0)
{
/* there is something to forward in the forwarding buffer. */
if (proxy->reqfwderr)
{
/* a forwarding error has occurred previously.
* clear the forwarding buffer */
qse_printf (QSE_T("FORWARD: CLEARING REQCON FOR ERROR\n"));
qse_mbs_clear (proxy->reqfwdbuf);
}
else
{
/* normal forwarding */
qse_ssize_t n;
if (writable) goto forward;
n = httpd->cbs->mux.writable (httpd, proxy->peer.handle, 0);
if (n == 0) qse_printf (QSE_T("PROXY FORWARD: @@@@@@@@@NOT WRITABLE\n"));
if (n >= 1)
{
forward:
/* writable */
qse_printf (QSE_T("PROXY FORWARD: @@@@@@@@@@WRITING[%.*hs]\n"),
(int)QSE_MBS_LEN(proxy->reqfwdbuf),
QSE_MBS_PTR(proxy->reqfwdbuf));
n = httpd->cbs->peer.send (
httpd, &proxy->peer,
QSE_MBS_PTR(proxy->reqfwdbuf),
QSE_MBS_LEN(proxy->reqfwdbuf)
);
/* TODO: improve performance.. instead of copying the remaing part
to the head all the time.. grow the buffer to a certain limit. */
if (n > 0) qse_mbs_del (proxy->reqfwdbuf, 0, n);
}
if (n <= -1)
{
qse_printf (QSE_T("PROXY FORWARD: @@@@@@@@WRITE TO PROXY FAILED\n"));
/* TODO: logging ... */
proxy->reqfwderr = 1;
qse_mbs_clear (proxy->reqfwdbuf);
if (proxy->req)
{
qse_htre_discardcontent (proxy->req);
/* NOTE: proxy->req may be set to QSE_NULL
* in proxy_snatch_content() triggered by
* qse_htre_discardcontent() */
}
}
}
}
else if (proxy->req == QSE_NULL)
{
/* there is nothing to read from the client side and
* there is nothing more to forward in the forwarding buffer.
* clear the relay and write triggers.
*/
qse_printf (QSE_T("FORWARD: @@@@@@@@NOTHING MORE TO WRITE TO PROXY\n"));
task->trigger[2].mask = 0;
}
}
static int task_init_proxy ( static int task_init_proxy (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task) qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{ {
@ -2473,14 +2571,17 @@ static int task_init_proxy (
*/ */
QSE_MEMSET (proxy, 0, QSE_SIZEOF(*proxy)); QSE_MEMSET (proxy, 0, QSE_SIZEOF(*proxy));
qse_mbscpy ((qse_mchar_t*)(proxy + 1), arg->host);
proxy->host = (qse_mchar_t*)(proxy + 1);
proxy->version = *qse_htre_getversion(arg->req); proxy->version = *qse_htre_getversion(arg->req);
proxy->keepalive = arg->req->attr.keepalive; proxy->keepalive = arg->req->attr.keepalive;
proxy->peer.nwad = arg->peer_nwad;
/* --------------------------------------------------------------------
* TODO: compose headers to send to peer and push them to fwdbuf...
* TODO: also change the content length check logic below...
* -------------------------------------------------------------------- */
if (arg->req->state & QSE_HTRE_DISCARDED) if (arg->req->state & QSE_HTRE_DISCARDED)
{ {
qse_printf (QSE_T("XXXXXXXXXXXXXXXXX\n"));
content_length = 0; content_length = 0;
goto done; goto done;
} }
@ -2488,7 +2589,6 @@ qse_printf (QSE_T("XXXXXXXXXXXXXXXXX\n"));
len = qse_htre_getcontentlen(arg->req); len = qse_htre_getcontentlen(arg->req);
if ((arg->req->state & QSE_HTRE_COMPLETED) && len <= 0) if ((arg->req->state & QSE_HTRE_COMPLETED) && len <= 0)
{ {
qse_printf (QSE_T("YYYYYYYYYYYYYYYYy\n"));
/* the content part is completed and no content /* the content part is completed and no content
* in the content buffer. there is nothing to forward */ * in the content buffer. there is nothing to forward */
content_length = 0; content_length = 0;
@ -2498,7 +2598,6 @@ qse_printf (QSE_T("YYYYYYYYYYYYYYYYy\n"));
if (!(arg->req->state & QSE_HTRE_COMPLETED) && if (!(arg->req->state & QSE_HTRE_COMPLETED) &&
!arg->req->attr.content_length_set) !arg->req->attr.content_length_set)
{ {
qse_printf (QSE_T("ZZZZZZZZZZZZZZZ\n"));
/* if the request is not completed and doesn't have /* if the request is not completed and doesn't have
* content-length set, it's not really possible to * content-length set, it's not really possible to
* pass the content. this function, however, allows * pass the content. this function, however, allows
@ -2522,6 +2621,7 @@ qse_printf (QSE_T("ZZZZZZZZZZZZZZZ\n"));
goto oops; goto oops;
} }
if (arg->req->state & QSE_HTRE_COMPLETED) if (arg->req->state & QSE_HTRE_COMPLETED)
{ {
/* no furthur forwarding is needed. /* no furthur forwarding is needed.
@ -2531,7 +2631,6 @@ qse_printf (QSE_T("ZZZZZZZZZZZZZZZ\n"));
QSE_ASSERT (len > 0); QSE_ASSERT (len > 0);
QSE_ASSERT (!arg->req->attr.content_length_set || QSE_ASSERT (!arg->req->attr.content_length_set ||
(arg->req->attr.content_length_set && arg->req->attr.content_length == len)); (arg->req->attr.content_length_set && arg->req->attr.content_length == len));
qse_printf (QSE_T("HHHHHHHHHHHHHHHHhh %d\n"), (int)len);
content_length = len; content_length = len;
} }
else else
@ -2553,7 +2652,6 @@ qse_printf (QSE_T("HHHHHHHHHHHHHHHHhh %d\n"), (int)len);
QSE_ASSERT (arg->req->attr.content_length_set); QSE_ASSERT (arg->req->attr.content_length_set);
content_length = arg->req->attr.content_length; content_length = arg->req->attr.content_length;
qse_printf (QSE_T("TTTTTTTTTTTTTTTTTTTT %d\n"), (int)content_length);
} }
} }
@ -2578,6 +2676,312 @@ oops:
static void task_fini_proxy ( static void task_fini_proxy (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task) qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{ {
task_proxy_t* proxy = (task_proxy_t*)task->ctx;
if (proxy->peer_status & PEER_OPEN)
httpd->cbs->peer.close (httpd, &proxy->peer);
if (proxy->res) qse_mbs_close (proxy->res);
if (proxy->htrd) qse_htrd_close (proxy->htrd);
if (proxy->reqfwdbuf) qse_mbs_close (proxy->reqfwdbuf);
if (proxy->req) qse_htre_unsetconcb (proxy->req);
}
static int task_main_proxy_5 (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
task_proxy_t* proxy = (task_proxy_t*)task->ctx;
qse_ssize_t n;
QSE_ASSERT (proxy->pio_inited);
if (task->trigger[2].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{
proxy_forward_content (httpd, task, 0);
}
else if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE)
{
proxy_forward_content (httpd, task, 1);
}
qse_printf (QSE_T("task_main_proxy_5\n"));
if (proxy->buflen > 0)
{
/* TODO: check if proxy outputs more than content-length if it is set... */
httpd->errnum = QSE_HTTPD_ENOERR;
n = httpd->cbs->client.send (httpd, client, proxy->buf, proxy->buflen);
if (n <= -1)
{
/* can't return internal server error any more... */
/* TODO: logging ... */
return -1;
}
QSE_MEMCPY (&proxy->buf[0], &proxy->buf[n], proxy->buflen - n);
proxy->buflen -= n;
}
/* if forwarding didn't finish, something is not really right...
* so long as the output from CGI is finished, no more forwarding
* is performed */
return (proxy->buflen > 0 || proxy->req ||
(proxy->reqfwdbuf && QSE_MBS_LEN(proxy->reqfwdbuf) > 0))? 1: 0;
}
static int task_main_proxy_4 (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
task_proxy_t* proxy = (task_proxy_t*)task->ctx;
qse_ssize_t n;
QSE_ASSERT (proxy->pio_inited);
qse_printf (QSE_T("task_main_proxy_4 trigger[0].mask=%d trigger[1].mask=%d trigger[2].mask=%d\n"),
task->trigger[0].mask, task->trigger[1].mask, task->trigger[2].mask);
if (task->trigger[2].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{
proxy_forward_content (httpd, task, 0);
}
else if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE)
{
proxy_forward_content (httpd, task, 1);
}
if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{
/* this function assumes that the chunk length does not exceeded
* 4 hexadecimal digits. */
QSE_ASSERT (QSE_SIZEOF(proxy->buf) <= 0xFFFF);
qse_printf (QSE_T("task_main_proxy_4\n"));
n = httpd->cbs->peer.recv (
httpd, &proxy->peer,
&proxy->buf[proxy->buflen],
QSE_SIZEOF(proxy->buf) - proxy->buflen
);
if (n <= -1)
{
/* can't return internal server error any more... */
/* TODO: loggig ... */
return -1;
}
if (n == 0)
{
task->trigger[0].mask = 0;
task->main = task_main_proxy_5;
/* ok to chain-call since this task is called
* if the client-side is writable */
return task_main_proxy_5 (httpd, client, task);
}
proxy->buflen += n;
proxy->content_received += n;
if (proxy->content_length_set &&
proxy->content_received > proxy->content_length)
{
/* TODO: proxy returning too much data... something is wrong in CGI */
qse_printf (QSE_T("CGI FUCKED UP...RETURNING TOO MUCH DATA\n"));
return -1;
}
#if 0
qse_printf (QSE_T("CGI_4 SEND [%.*hs]\n"), (int)proxy->buflen, proxy->buf);
#endif
httpd->errnum = QSE_HTTPD_ENOERR;
n = httpd->cbs->client.send (httpd, client, proxy->buf, proxy->buflen);
if (n <= -1)
{
/* can't return internal server error any more... */
/* TODO: logging ... */
qse_printf (QSE_T("CGI SEND FAILURE\n"));
return -1;
}
QSE_MEMCPY (&proxy->buf[0], &proxy->buf[n], proxy->buflen - n);
proxy->buflen -= n;
#if 0
qse_printf (QSE_T("CGI SEND DONE\n"));
#endif
}
return 1;
}
static int task_main_proxy_3 (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
/* send the http initial line and headers built using the headers
* returned by peer. it may include some contents as well */
task_proxy_t* proxy = (task_proxy_t*)task->ctx;
qse_ssize_t n;
qse_size_t count;
if (task->trigger[2].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{
proxy_forward_content (httpd, task, 0);
}
else if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE)
{
proxy_forward_content (httpd, task, 1);
}
if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{
count = MAX_SEND_SIZE;
if (count >= proxy->res_left) count = proxy->res_left;
qse_printf (QSE_T("[proxy_3 sending %d bytes]\n"), (int)count);
httpd->errnum = QSE_HTTPD_ENOERR;
n = httpd->cbs->client.send (httpd, client, proxy->res_ptr, count);
if (n <= -1)
{
qse_printf (QSE_T("[proxy-3 send failure....\n"));
return -1;
}
proxy->res_left -= n;
if (proxy->res_left <= 0)
{
qse_printf (QSE_T("[switching to proxy-4....\n"));
task->main = task_main_proxy_4;
/* don't chain-call task_main_proxy_4 since it has another send
* and it has already been sent here. so the writability must
* be checked again in the main loop.
* => return task_main_proxy_4 (httpd, client, task);*/
return 1;
}
proxy->res_ptr += n;
}
return 1; /* more work to do */
}
static int task_main_proxy_2 (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
task_proxy_t* proxy = (task_proxy_t*)task->ctx;
qse_ssize_t n;
qse_size_t count;
if (task->trigger[2].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{
proxy_forward_content (httpd, task, 0);
}
else if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE)
{
proxy_forward_content (httpd, task, 1);
}
if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{
/* there is something to read from peer */
n = httpd->cbs->peer.recv (
httpd, &proxy->peer,
&proxy->buf[proxy->buflen],
QSE_SIZEOF(proxy->buf) - proxy->buflen
);
if (n <= -1)
{
/* can't return internal server error any more... */
/* TODO: logging ... */
goto oops;
}
if (n == 0)
{
/* end of output from peer before it has seen a header.
* the proxy script must be crooked. */
/* TODO: logging */
qse_printf (QSE_T("#####PREMATURE EOF FROM PEER\n"));
goto oops;
}
proxy->buflen += n;
if (qse_htrd_feed (proxy->htrd, proxy->buf, proxy->buflen) <= -1)
{
/* TODO: logging */
qse_printf (QSE_T("#####INVALID HEADER FROM FROM PEER [%.*hs]\n"), (int)proxy->buflen, proxy->buf);
goto oops;
}
proxy->buflen = 0;
if (QSE_MBS_LEN(proxy->res) > 0)
{
if (proxy->disconnect &&
qse_httpd_entaskdisconnect (httpd, client, task) == QSE_NULL)
{
goto oops;
}
proxy->res_ptr = QSE_MBS_PTR(proxy->res);
proxy->res_left = QSE_MBS_LEN(proxy->res);
qse_printf (QSE_T("TRAILING DATA=[%.*hs]\n"), (int)QSE_MBS_LEN(proxy->res), QSE_MBS_PTR(proxy->res));
task->main = task_main_proxy_3;
/* ok to chain-call since this task is called
* only if the client-side is writable */
return task_main_proxy_3 (httpd, client, task);
}
}
/* complete headers not seen yet. i need to be called again */
return 1;
oops:
return (entask_error (httpd, client, task, 500, &proxy->version, proxy->keepalive) == QSE_NULL)? -1: 0;
}
static int task_main_proxy_0 (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
task_proxy_t* proxy = (task_proxy_t*)task->ctx;
int n;
/* wait for peer to get connected */
if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_READABLE ||
task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE)
{
n = httpd->cbs->peer.connected (httpd, &proxy->peer);
if (n <= -1) return -1;
if (n >= 1)
{
proxy->peer_status |= PEER_CONNECTED;
task->trigger[0].mask &= ~QSE_HTTPD_TASK_TRIGGER_WRITE;
if (proxy->reqfwdbuf)
{
if (proxy->req)
{
task->trigger[2].mask = QSE_HTTPD_TASK_TRIGGER_READ;
task->trigger[2].handle = client->handle;
}
else if (QSE_MBS_LEN(proxy->reqfwdbuf) > 0)
{
proxy_forward_content (httpd, task, 0);
if (QSE_MBS_LEN(proxy->reqfwdbuf) > 0)
{
task->trigger[0].mask |= QSE_HTTPD_TASK_TRIGGER_WRITE;
}
}
}
qse_printf (QSE_T("FINALLY connected to peer ...............................\n"));
qse_printf (QSE_T("FINALLY connected to peer ...............................\n"));
qse_printf (QSE_T("FINALLY connected to peer ...............................\n"));
qse_printf (QSE_T("FINALLY connected to peer ...............................\n"));
task->main = task_main_proxy_2;
}
}
return 1;
} }
static int task_main_proxy ( static int task_main_proxy (
@ -2586,6 +2990,7 @@ static int task_main_proxy (
task_proxy_t* proxy = (task_proxy_t*)task->ctx; task_proxy_t* proxy = (task_proxy_t*)task->ctx;
proxy_htrd_xtn_t* xtn; proxy_htrd_xtn_t* xtn;
int http_errnum = 500; int http_errnum = 500;
int n;
if (proxy->init_failed) goto oops; if (proxy->init_failed) goto oops;
@ -2596,7 +3001,6 @@ static int task_main_proxy (
qse_htrd_setrecbs (proxy->htrd, &proxy_htrd_cbs); qse_htrd_setrecbs (proxy->htrd, &proxy_htrd_cbs);
qse_htrd_setoption ( qse_htrd_setoption (
proxy->htrd, proxy->htrd,
QSE_HTRD_SKIPINITIALLINE |
QSE_HTRD_PEEKONLY | QSE_HTRD_PEEKONLY |
QSE_HTRD_REQUEST QSE_HTRD_REQUEST
); );
@ -2604,7 +3008,49 @@ static int task_main_proxy (
proxy->res = qse_mbs_open (httpd->mmgr, 0, 256); proxy->res = qse_mbs_open (httpd->mmgr, 0, 256);
if (proxy->res == QSE_NULL) goto oops; if (proxy->res == QSE_NULL) goto oops;
/////////////////////// httpd->errnum = QSE_HTTPD_ENOERR;
n = httpd->cbs->peer.open (httpd, &proxy->peer);
if (n <= -1)
{
/* TODO: translate error code to http error... */
if (httpd->errnum == QSE_HTTPD_ENOENT) http_errnum = 404;
else if (httpd->errnum == QSE_HTTPD_EACCES) http_errnum = 403;
goto oops;
}
proxy->peer_status |= PEER_OPEN;
task->trigger[0].mask = QSE_HTTPD_TASK_TRIGGER_READ;
task->trigger[0].handle = proxy->peer.handle;
if (n == 0)
{
/* peer not connected yet */
task->trigger[0].mask |= QSE_HTTPD_TASK_TRIGGER_WRITE;
task->main = task_main_proxy_0;
}
else
{
/* peer connected already */
proxy->peer_status |= PEER_CONNECTED;
if (proxy->reqfwdbuf)
{
if (proxy->req)
{
task->trigger[2].mask = QSE_HTTPD_TASK_TRIGGER_READ;
task->trigger[2].handle = client->handle;
}
else if (QSE_MBS_LEN(proxy->reqfwdbuf) > 0)
{
proxy_forward_content (httpd, task, 0);
if (QSE_MBS_LEN(proxy->reqfwdbuf) > 0)
{
task->trigger[0].mask |= QSE_HTTPD_TASK_TRIGGER_WRITE;
}
}
}
task->main = task_main_proxy_2;
}
return 1; return 1;
@ -2629,13 +3075,13 @@ qse_httpd_task_t* qse_httpd_entaskproxy (
qse_httpd_t* httpd, qse_httpd_t* httpd,
qse_httpd_client_t* client, qse_httpd_client_t* client,
const qse_httpd_task_t* pred, const qse_httpd_task_t* pred,
const qse_mchar_t* host, const qse_nwad_t* nwad,
const qse_htre_t* req) const qse_htre_t* req)
{ {
qse_httpd_task_t task; qse_httpd_task_t task;
task_proxy_arg_t arg; task_proxy_arg_t arg;
arg.host = host; arg.peer_nwad = *nwad;
arg.req = req; arg.req = req;
QSE_MEMSET (&task, 0, QSE_SIZEOF(task)); QSE_MEMSET (&task, 0, QSE_SIZEOF(task));
@ -2645,8 +3091,7 @@ qse_httpd_task_t* qse_httpd_entaskproxy (
task.ctx = &arg; task.ctx = &arg;
return qse_httpd_entask ( return qse_httpd_entask (
httpd, client, pred, &task, httpd, client, pred, &task, QSE_SIZEOF(task_proxy_t)
QSE_SIZEOF(task_proxy_t) + ((qse_mbslen(host) + 1) * QSE_SIZEOF(*host))
); );
} }

View File

@ -320,6 +320,9 @@ static int server_open (qse_httpd_t* httpd, qse_httpd_server_t* server)
fd = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP); fd = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
if (fd <= -1) goto oops; if (fd <= -1) goto oops;
flag = fcntl (fd, F_GETFD);
if (flag >= 0) fcntl (fd, F_SETFD, flag | FD_CLOEXEC);
flag = 1; flag = 1;
setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &flag, QSE_SIZEOF(flag)); setsockopt (fd, SOL_SOCKET, SO_REUSEADDR, &flag, QSE_SIZEOF(flag));
@ -332,9 +335,6 @@ static int server_open (qse_httpd_t* httpd, qse_httpd_server_t* server)
flag = fcntl (fd, F_GETFL); flag = fcntl (fd, F_GETFL);
if (flag >= 0) fcntl (fd, F_SETFL, flag | O_NONBLOCK); if (flag >= 0) fcntl (fd, F_SETFL, flag | O_NONBLOCK);
flag = fcntl (fd, F_GETFD);
if (flag >= 0) fcntl (fd, F_SETFD, flag | FD_CLOEXEC);
server->handle.i = fd; server->handle.i = fd;
return 0; return 0;
@ -379,12 +379,12 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: too many client?\n"));
return -1; return -1;
} }
#endif #endif
flag = fcntl (fd, F_GETFD);
if (flag >= 0) fcntl (fd, F_SETFD, flag | FD_CLOEXEC);
flag = fcntl (fd, F_GETFL); flag = fcntl (fd, F_GETFL);
if (flag >= 0) fcntl (fd, F_SETFL, flag | O_NONBLOCK); if (flag >= 0) fcntl (fd, F_SETFL, flag | O_NONBLOCK);
flag = fcntl (fd, F_GETFD);
if (flag >= 0) fcntl (fd, F_SETFD, flag | FD_CLOEXEC);
if (sockaddr_to_nwad (&addr, &client->remote_addr) <= -1) if (sockaddr_to_nwad (&addr, &client->remote_addr) <= -1)
{ {
@ -406,6 +406,92 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: too many client?\n"));
/* ------------------------------------------------------------------- */ /* ------------------------------------------------------------------- */
static int peer_open (qse_httpd_t* httpd, qse_httpd_peer_t* peer)
{
int fd = -1, flag;
/* TODO: if AF_INET6 is not defined sockaddr_storage is not available...
* create your own union or somehting similar... */
struct sockaddr_storage addr;
int addrsize;
int connected = 1;
addrsize = nwad_to_sockaddr (&peer->nwad, &addr);
if (addrsize <= -1)
{
qse_httpd_seterrnum (httpd, QSE_HTTPD_ENOIMPL);
return -1;
}
fd = socket (addr.ss_family, SOCK_STREAM, IPPROTO_TCP);
if (fd <= -1) goto oops;
flag = fcntl (fd, F_GETFD);
if (flag >= 0) fcntl (fd, F_SETFD, flag | FD_CLOEXEC);
flag = fcntl (fd, F_GETFL);
if (flag >= 0) fcntl (fd, F_SETFL, flag | O_NONBLOCK);
if (connect (fd, (struct sockaddr*)&addr, addrsize) <= -1)
{
if (errno != EINPROGRESS) goto oops;
connected = 0;
}
/* restore flags */
if (fcntl (fd, F_SETFL, flag) <= -1) goto oops;
peer->handle.i = fd;
return connected;
oops:
qse_httpd_seterrnum (httpd, syserr_to_errnum(errno));
if (fd >= 0) close (fd);
return -1;
}
static void peer_close (qse_httpd_t* httpd, qse_httpd_peer_t* peer)
{
close (peer->handle.i);
}
static int peer_connected (qse_httpd_t* httpd, qse_httpd_peer_t* peer)
{
#ifdef HAVE_SOCKLEN_T
socklen_t len;
#else
int len;
#endif
int ret;
len = QSE_SIZEOF(ret);
if (getsockopt (peer->handle.i, SOL_SOCKET, SO_ERROR, &ret, &len) <= -1) return -1;
if (ret == EINPROGRESS) return 0;
if (ret != 0) return -1;
return 1; /* connection completed */
}
static qse_ssize_t peer_recv (
qse_httpd_t* httpd, qse_httpd_peer_t* peer,
qse_mchar_t* buf, qse_size_t bufsize)
{
ssize_t ret = read (peer->handle.i, buf, bufsize);
if (ret <= -1) qse_httpd_seterrnum (httpd, syserr_to_errnum(errno));
return ret;
}
static qse_ssize_t peer_send (
qse_httpd_t* httpd, qse_httpd_peer_t* peer,
const qse_mchar_t* buf, qse_size_t bufsize)
{
ssize_t ret = write (peer->handle.i, buf, bufsize);
if (ret <= -1) qse_httpd_seterrnum (httpd, syserr_to_errnum(errno));
return ret;
}
/* ------------------------------------------------------------------- */
struct mux_ev_t struct mux_ev_t
{ {
qse_ubi_t handle; qse_ubi_t handle;
@ -934,6 +1020,11 @@ if (qse_htre_getcontentlen(req) > 0)
else else
{ {
/* TODO: determine if to return 100-continue or other errors */ /* TODO: determine if to return 100-continue or other errors */
{
qse_ntime_t now;
qse_gettime (&now);
qse_printf (QSE_T("entasking continue at %lld\n"), (long long)now);
}
if (qse_httpd_entaskcontinue ( if (qse_httpd_entaskcontinue (
httpd, client, QSE_NULL, req) == QSE_NULL) return -1; httpd, client, QSE_NULL, req) == QSE_NULL) return -1;
} }
@ -1062,10 +1153,12 @@ oops:
return -1; return -1;
} }
#if 0
static int proxy_request ( static int proxy_request (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_htre_t* req, int peek) qse_httpd_t* httpd, qse_httpd_client_t* client, qse_htre_t* req, int peek)
{ {
qse_httpd_task_t* task;
#if 0
const qse_mchar_t* qpath; const qse_mchar_t* qpath;
qpath = qse_htre_qpathptr (eq); qpath = qse_htre_qpathptr (eq);
@ -1083,6 +1176,7 @@ qse_printf (QSE_T("Host not included....\n"));
const qse_mchar_t* host; const qse_mchar_t* host;
qse_parseuri (); qse_parseuri ();
} }
#endif
#if 0 #if 0
@ -1114,11 +1208,13 @@ qse_printf (QSE_T("Host not included....\n"));
} }
#endif #endif
if (qse_htre_getqparamlen(req) > 0) qse_printf (QSE_T("PARAMS ==> [%hs]\n"), qse_htre_getqparamptr(req)); if (peek)
{
qse_nwad_t nwad;
task = qse_httpd_entaskproxy (httpd, client, QSE_NULL, qpath, req); qse_strtonwad (QSE_T("192.168.1.3:80"), &nwad);
task = qse_httpd_entaskproxy (httpd, client, QSE_NULL, &nwad, req);
if (task == QSE_NULL) goto oops; if (task == QSE_NULL) goto oops;
}
if (!req->attr.keepalive) if (!req->attr.keepalive)
{ {
@ -1134,18 +1230,19 @@ if (qse_htre_getqparamlen(req) > 0) qse_printf (QSE_T("PARAMS ==> [%hs]\n"), qse
oops: oops:
return -1; return -1;
} }
#endif
static int peek_request ( static int peek_request (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_htre_t* req) qse_httpd_t* httpd, qse_httpd_client_t* client, qse_htre_t* req)
{ {
return process_request (httpd, client, req, 1); //return process_request (httpd, client, req, 1);
return proxy_request (httpd, client, req, 1);
} }
static int handle_request ( static int handle_request (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_htre_t* req) qse_httpd_t* httpd, qse_httpd_client_t* client, qse_htre_t* req)
{ {
return process_request (httpd, client, req, 0); //return process_request (httpd, client, req, 0);
return proxy_request (httpd, client, req, 0);
} }
int list_directory (qse_httpd_t* httpd, const qse_mchar_t* path) int list_directory (qse_httpd_t* httpd, const qse_mchar_t* path)
@ -1158,6 +1255,12 @@ static qse_httpd_cbs_t httpd_cbs =
/* server */ /* server */
{ server_open, server_close, server_accept }, { server_open, server_close, server_accept },
{ peer_open,
peer_close,
peer_connected,
peer_recv,
peer_send },
/* multiplexer */ /* multiplexer */
{ mux_open, { mux_open,
mux_close, mux_close,