squeezed in raw proxying code into normal proxy

This commit is contained in:
hyung-hwan 2014-07-15 16:22:24 +00:00
parent a7ca23fa50
commit a0e2a7067c
5 changed files with 397 additions and 252 deletions

View File

@ -480,9 +480,9 @@ struct qse_httpd_rsrc_cgi_t
typedef struct qse_httpd_rsrc_proxy_t qse_httpd_rsrc_proxy_t;
struct qse_httpd_rsrc_proxy_t
{
qse_nwad_t dst;
qse_nwad_t src;
int raw;
qse_nwad_t dst; /* remote destination address to connect to */
qse_nwad_t src; /* local binding address */
int raw; /* raw or normal */
};
typedef struct qse_httpd_rsrc_dir_t qse_httpd_rsrc_dir_t;

View File

@ -37,8 +37,6 @@ struct task_proxy_t
int init_failed;
qse_httpd_t* httpd;
const qse_mchar_t* host;
int method;
qse_http_version_t version;
int keepalive; /* taken from the request */
@ -197,12 +195,18 @@ static int proxy_snatch_client_input_raw (
task = (qse_httpd_task_t*)ctx;
proxy = (task_proxy_t*)task->ctx;
/* this function is never called with ptr of QSE_NULL
* because this callback is set manually after the request
* has been discarded or completed in task_init_proxy() and
* qse_htre_completecontent or qse-htre_discardcontent() is
* not called again. Unlinkw proxy_snatch_client_input(),
* it doesn't care about EOF indicated by ptr of QSE_NULL. */
if (ptr && !(proxy->reqflags & PROXY_REQ_FWDERR))
{
if (qse_mbs_ncat (proxy->reqfwdbuf, ptr, len) == (qse_size_t)-1)
{
proxy->httpd->errnum = QSE_HTTPD_ENOMEM;
return -1;
return -1;
}
task->trigger[0].mask |= QSE_HTTPD_TASK_TRIGGER_WRITE;
@ -732,6 +736,8 @@ to the head all the time.. grow the buffer to a certain limit. */
}
}
/* ------------------------------------------------------------------------ */
static int task_init_proxy (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
@ -776,7 +782,6 @@ static int task_init_proxy (
/* the caller must make sure that the actual content is discarded or completed
* and the following data is treated as contents */
printf ("proxy req = %p %d %d\n", arg->req, (arg->req->state & QSE_HTRE_DISCARDED), (arg->req->state& QSE_HTRE_COMPLETED));
QSE_ASSERT (arg->req->state & (QSE_HTRE_DISCARDED | QSE_HTRE_COMPLETED));
QSE_ASSERT (qse_htrd_getoption(client->htrd) & QSE_HTRD_DUMMY);
@ -946,6 +951,8 @@ qse_printf (QSE_T("GOING TO PROXY [%hs]\n"), QSE_MBS_PTR(proxy->reqfwdbuf));
return 0;
oops:
printf ("init_proxy failed...........................................\n");
/* since a new task can't be added in the initializer,
* i mark that initialization failed and let task_main_proxy()
* add an error task */
@ -961,11 +968,14 @@ oops:
return 0;
}
/* ------------------------------------------------------------------------ */
static void task_fini_proxy (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
task_proxy_t* proxy = (task_proxy_t*)task->ctx;
printf ("task_fini_proxy.................\n");
if (proxy->peer_status & PROXY_PEER_OPEN)
httpd->opt.scb.peer.close (httpd, &proxy->peer);
@ -975,6 +985,8 @@ static void task_fini_proxy (
if (proxy->req) qse_htre_unsetconcb (proxy->req);
}
/* ------------------------------------------------------------------------ */
static int task_main_proxy_5 (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
@ -982,7 +994,7 @@ static int task_main_proxy_5 (
qse_ssize_t n;
#if 0
qse_printf (QSE_T("task_main_proxy_5 trigger[0].mask=%d trigger[1].mask=%d trigger[2].mask=%d\n"),
printf ("task_main_proxy_5 trigger[0].mask=%d trigger[1].mask=%d trigger[2].mask=%d\n",
task->trigger[0].mask, task->trigger[1].mask, task->trigger[2].mask);
#endif
@ -993,7 +1005,7 @@ qse_printf (QSE_T("task_main_proxy_5 trigger[0].mask=%d trigger[1].mask=%d trigg
}
else if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE)
{
/* if the peer side is writable */
/* if the peer side is writable while the client side is not readable*/
proxy_forward_client_input_to_peer (httpd, task, 1);
}
@ -1030,8 +1042,8 @@ static int task_main_proxy_4 (
{
task_proxy_t* proxy = (task_proxy_t*)task->ctx;
#if 0
qse_printf (QSE_T("task_main_proxy_4 trigger[0].mask=%d trigger[1].mask=%d trigger[2].mask=%d\n"),
#if 1
printf ("task_main_proxy_4 trigger[0].mask=%d trigger[1].mask=%d trigger[2].mask=%d\n",
task->trigger[0].mask, task->trigger[1].mask, task->trigger[2].mask);
#endif
@ -1067,8 +1079,11 @@ qse_printf (QSE_T("task_main_proxy_4 trigger[0].mask=%d trigger[1].mask=%d trigg
}
if (n == 0)
{
/* peer closed connection */
if (proxy->resflags & PROXY_RES_PEER_LENGTH)
{
QSE_ASSERT (!proxy->raw);
if (proxy->peer_output_received < proxy->peer_output_length)
{
if (httpd->opt.trait & QSE_HTTPD_LOGACT)
@ -1078,8 +1093,23 @@ qse_printf (QSE_T("task_main_proxy_4 trigger[0].mask=%d trigger[1].mask=%d trigg
}
task->main = task_main_proxy_5;
/* nothing to read from peer. set the mask to 0 */
task->trigger[0].mask = 0;
/* arrange to be called if the client side is writable */
task->trigger[2].mask |= QSE_HTTPD_TASK_TRIGGER_WRITE;
if (proxy->raw)
{
/* peer connection has been closed.
* so no more forwarding from the client to the peer
* is possible. get rid of the content callback on the
* client side. */
qse_htre_unsetconcb (proxy->req);
proxy->req = QSE_NULL;
}
return 1;
}
@ -1088,6 +1118,8 @@ qse_printf (QSE_T("task_main_proxy_4 trigger[0].mask=%d trigger[1].mask=%d trigg
if (proxy->resflags & PROXY_RES_PEER_LENGTH)
{
QSE_ASSERT (!proxy->raw);
if (proxy->peer_output_received > proxy->peer_output_length)
{
/* proxy returning too much data... something is wrong in PROXY */
@ -1208,16 +1240,18 @@ static int task_main_proxy_2 (
int http_errnum = 500;
#if 0
qse_printf (QSE_T("task_main_proxy_2 trigger[0].mask=%d trigger[1].mask=%d trigger[2].mask=%d\n"),
printf ("task_main_proxy_2 trigger[0].mask=%d trigger[1].mask=%d trigger[2].mask=%d\n",
task->trigger[0].mask, task->trigger[1].mask, task->trigger[2].mask);
#endif
if (task->trigger[2].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{
/* client is readable */
proxy_forward_client_input_to_peer (httpd, task, 0);
}
else if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE)
{
/* client is not readable but peer is writable */
proxy_forward_client_input_to_peer (httpd, task, 1);
}
@ -1342,7 +1376,6 @@ for (i = 0; i < proxy->buflen; i++) qse_printf (QSE_T("%hc"), proxy->buf[i]);
qse_printf (QSE_T("]\n"));
#endif
if (qse_htrd_feed (proxy->peer_htrd, proxy->buf, proxy->buflen) <= -1)
{
if (httpd->opt.trait & QSE_HTTPD_LOGACT)
@ -1413,7 +1446,6 @@ static int task_main_proxy_1 (
int http_errnum = 500;
/* wait for peer to get connected */
if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_READABLE ||
task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE)
{
@ -1438,7 +1470,6 @@ static int task_main_proxy_1 (
if (n >= 1)
{
/* connected to the peer now */
proxy->peer_status |= PROXY_PEER_CONNECTED;
if (proxy->req)
@ -1461,9 +1492,25 @@ static int task_main_proxy_1 (
}
if (proxy->raw)
task->main = task_main_proxy_4;
{
printf ("SWITCHING TO PROXY 3...%p\n", proxy->req);
if (qse_mbs_fmt (proxy->res, QSE_MT("HTTP/%d.%d 200 Connection established\r\n\r\n"),
(int)proxy->version.major, (int)proxy->version.minor) == (qse_size_t)-1)
{
proxy->httpd->errnum = QSE_HTTPD_ENOMEM;
goto oops;
}
proxy->res_pending = QSE_MBS_LEN(proxy->res) - proxy->res_consumed;
/* arrange to be called if the client side is writable.
* it must write the injected response. */
task->trigger[2].mask |= QSE_HTTPD_TASK_TRIGGER_WRITE;
task->main = task_main_proxy_3;
}
else
{
task->main = task_main_proxy_2;
}
}
}
@ -1544,7 +1591,28 @@ static int task_main_proxy (
task->trigger[0].mask |= QSE_HTTPD_TASK_TRIGGER_WRITE;
}
}
task->main = task_main_proxy_2;
if (proxy->raw)
{
/* TODO: write response */
/* inject http response */
if (qse_mbs_fmt (proxy->res, QSE_MT("HTTP/%d.%d 200 Connection established\r\n\r\n"),
(int)proxy->version.major, (int)proxy->version.minor) == (qse_size_t)-1)
{
proxy->httpd->errnum = QSE_HTTPD_ENOMEM;
goto oops;
}
proxy->res_pending = QSE_MBS_LEN(proxy->res) - proxy->res_consumed;
/* arrange to be called if the client side is writable.
* it must write the injected response. */
task->trigger[2].mask |= QSE_HTTPD_TASK_TRIGGER_WRITE;
task->main = task_main_proxy_3;
}
else
{
task->main = task_main_proxy_2;
}
}
return 1;
@ -1566,6 +1634,8 @@ oops:
proxy->method, &proxy->version, proxy->keepalive) == QSE_NULL)? -1: 0;
}
/* ------------------------------------------------------------------------ */
qse_httpd_task_t* qse_httpd_entaskproxy (
qse_httpd_t* httpd,
qse_httpd_client_t* client,

View File

@ -29,9 +29,8 @@
typedef struct task_resol_arg_t task_resol_arg_t;
struct task_resol_arg_t
{
const qse_mchar_t* path;
qse_htre_t* req;
int nph;
const qse_mchar_t* host;
const qse_htre_t* req;
};
typedef struct task_resol_t task_resol_t;
@ -40,21 +39,32 @@ struct task_resol_t
int init_failed;
qse_httpd_t* httpd;
const qse_mchar_t* path;
int method;
qse_http_version_t version;
int keepalive; /* taken from the request */
qse_mchar_t* host;
};
static int task_init_resol (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
task_resol_t* resol;
task_resol_arg_t* arg;
resol = (task_resol_t*)qse_httpd_gettaskxtn (httpd, task);
arg = (task_resol_arg_t*)task->ctx;
QSE_MEMSET (resol, 0, QSE_SIZEOF(*resol));
resol->httpd = httpd;
resol->method = qse_htre_getqmethodtype(arg->req);
resol->version = *qse_htre_getversion(arg->req);
resol->keepalive = (arg->req->attr.flags & QSE_HTRE_ATTR_KEEPALIVE);
resol->host = (qse_mchar_t*)(resol + 1);
qse_mbscpy (resol->host, arg->host);
task->ctx = resol;
return 0;
}
@ -68,6 +78,11 @@ static void task_fini_resol (
static int task_main_resol (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
/* dns.open ();
dns.send (...);
dns.close ();*/
return 0;
}
@ -75,11 +90,15 @@ qse_httpd_task_t* qse_httpd_entaskresol (
qse_httpd_t* httpd,
qse_httpd_client_t* client,
qse_httpd_task_t* pred,
const qse_mchar_t* host)
const qse_mchar_t* host,
qse_htre_t* req)
{
qse_httpd_task_t task;
task_resol_arg_t arg;
arg.host = host;
arg.req = req;
QSE_MEMSET (&task, 0, QSE_SIZEOF(task));
task.init = task_init_resol;
task.fini = task_fini_resol;
@ -87,7 +106,7 @@ qse_httpd_task_t* qse_httpd_entaskresol (
task.ctx = &arg;
return qse_httpd_entask (
httpd, client, pred, &task, QSE_SIZEOF(task_resol_t)
httpd, client, pred, &task, QSE_SIZEOF(task_resol_t) + qse_mbslen(host) + 1
);
}

View File

@ -1126,10 +1126,15 @@ static void dispatch_muxcb (qse_mux_t* mux, const qse_mux_evt_t* evt)
{
mux_xtn_t* xtn;
qse_ubi_t ubi;
int mask = 0;
xtn = qse_mux_getxtn (mux);
ubi.i = evt->hnd;
xtn->cbfun (xtn->httpd, mux, ubi, evt->mask, evt->data);
if (evt->mask & QSE_MUX_IN) mask |= QSE_HTTPD_MUX_READ;
if (evt->mask & QSE_MUX_OUT) mask |= QSE_HTTPD_MUX_WRITE;
xtn->cbfun (xtn->httpd, mux, ubi, mask, evt->data);
}
static void* mux_open (qse_httpd_t* httpd, qse_httpd_muxcb_t cbfun)
@ -2041,53 +2046,57 @@ if (qse_htre_getcontentlen(req) > 0)
* 'Expect: 100-continue' and 'Connection: keep-alive'. */
qse_httpd_discardcontent (httpd, req);
}
else if (mth == QSE_HTTP_POST &&
!(req->attr.flags & QSE_HTRE_ATTR_LENGTH) &&
!(req->attr.flags & QSE_HTRE_ATTR_CHUNKED))
{
/* POST without Content-Length nor not chunked */
req->attr.flags &= ~QSE_HTRE_ATTR_KEEPALIVE;
qse_httpd_discardcontent (httpd, req);
task = qse_httpd_entaskerr (httpd, client, QSE_NULL, 411, req);
if (task)
{
/* 411 Length Required - can't keep alive. Force disconnect */
task = qse_httpd_entaskdisconnect (httpd, client, QSE_NULL);
}
}
else if (server_xtn->makersrc (httpd, client, req, &rsrc) <= -1)
{
/* failed to make a resource. just send the internal server error.
* the makersrc handler can return a negative number to return
* '500 Internal Server Error'. If it wants to return a specific
* error code, it should return 0 with the QSE_HTTPD_RSRC_ERR
* resource. */
qse_httpd_discardcontent (httpd, req);
task = qse_httpd_entaskerr (httpd, client, QSE_NULL, 500, req);
}
else
{
task = QSE_NULL;
if ((rsrc.flags & QSE_HTTPD_RSRC_100_CONTINUE) &&
(task = qse_httpd_entaskcontinue (httpd, client, task, req)) == QSE_NULL)
{
/* inject '100 continue' first if it is needed */
goto oops;
}
/* arrange the actual resource to be returned */
task = qse_httpd_entaskrsrc (httpd, client, task, &rsrc, req);
server_xtn->freersrc (httpd, client, req, &rsrc);
/* if the resource is indicating to return an error,
* discard the contents since i won't return them */
if (rsrc.type == QSE_HTTPD_RSRC_ERR)
if (mth == QSE_HTTP_POST &&
!(req->attr.flags & QSE_HTRE_ATTR_LENGTH) &&
!(req->attr.flags & QSE_HTRE_ATTR_CHUNKED))
{
/* POST without Content-Length nor not chunked */
req->attr.flags &= ~QSE_HTRE_ATTR_KEEPALIVE;
qse_httpd_discardcontent (httpd, req);
task = qse_httpd_entaskerr (httpd, client, QSE_NULL, 411, req);
if (task)
{
/* 411 Length Required - can't keep alive. Force disconnect */
task = qse_httpd_entaskdisconnect (httpd, client, QSE_NULL);
}
}
else if (server_xtn->makersrc (httpd, client, req, &rsrc) <= -1)
{
/* failed to make a resource. just send the internal server error.
* the makersrc handler can return a negative number to return
* '500 Internal Server Error'. If it wants to return a specific
* error code, it should return 0 with the QSE_HTTPD_RSRC_ERR
* resource. */
qse_httpd_discardcontent (httpd, req);
task = qse_httpd_entaskerr (httpd, client, QSE_NULL, 500, req);
}
else
{
task = QSE_NULL;
if ((rsrc.flags & QSE_HTTPD_RSRC_100_CONTINUE) &&
(task = qse_httpd_entaskcontinue (httpd, client, task, req)) == QSE_NULL)
{
/* inject '100 continue' first if it is needed */
goto oops;
}
/* arrange the actual resource to be returned */
task = qse_httpd_entaskrsrc (httpd, client, task, &rsrc, req);
server_xtn->freersrc (httpd, client, req, &rsrc);
/* if the resource is indicating to return an error,
* discard the contents since i won't return them */
if (rsrc.type == QSE_HTTPD_RSRC_ERR)
{
qse_httpd_discardcontent (httpd, req);
}
}
if (task == QSE_NULL) goto oops;
}
if (task == QSE_NULL) goto oops;
}
else
{
@ -2095,13 +2104,14 @@ if (qse_htre_getcontentlen(req) > 0)
if (mth == QSE_HTTP_CONNECT)
{
printf ("SWITCHING HTRD TO DUMMY....\n");
printf ("SWITCHING HTRD TO DUMMY.... %s\n", qse_htre_getqpath(req));
/* Switch the http read to a dummy mode so that the subsqeuent
* input is just treaet as connects to the request just completed */
qse_htrd_setoption (client->htrd, qse_htrd_getoption(client->htrd) | QSE_HTRD_DUMMY);
if (server_xtn->makersrc (httpd, client, req, &rsrc) <= -1)
{
printf ("CANOT MAKE RESOURCE.... %s\n", qse_htre_getqpath(req));
/* failed to make a resource. just send the internal server error.
* the makersrc handler can return a negative number to return
* '500 Internal Server Error'. If it wants to return a specific
@ -2236,7 +2246,7 @@ static qse_httpd_scb_t httpd_system_callbacks =
/* directory operation */
{ dir_stat,
dir_make,
dir_purge,
dir_purge,
dir_open,
dir_close,
dir_read
@ -2547,7 +2557,7 @@ static int make_resource (
target->u.proxy.src.type = target->u.proxy.dst.type;
/* mark that this request is going to be proxied. */
/*req->attr.flags |= QSE_HTRE_ATTR_PROXIED;*/
req->attr.flags |= QSE_HTRE_ATTR_PROXIED;
return 0;
}

View File

@ -176,7 +176,7 @@ int qse_httpd_setopt (qse_httpd_t* httpd, qse_httpd_opt_t id, const void* value)
return -1;
}
/* --------------------------------------------------- */
/* ----------------------------------------------------------------------- */
qse_httpd_ecb_t* qse_httpd_popecb (qse_httpd_t* httpd)
{
@ -191,7 +191,7 @@ void qse_httpd_pushecb (qse_httpd_t* httpd, qse_httpd_ecb_t* ecb)
httpd->ecb = ecb;
}
/* --------------------------------------------------- */
/* ----------------------------------------------------------------------- */
QSE_INLINE void* qse_httpd_allocmem (qse_httpd_t* httpd, qse_size_t size)
{
@ -249,7 +249,7 @@ qse_mchar_t* qse_httpd_strntombsdup (qse_httpd_t* httpd, const qse_char_t* str,
return mptr;
}
/* --------------------------------------------------- */
/* ----------------------------------------------------------------------- */
static qse_httpd_real_task_t* enqueue_task (
qse_httpd_t* httpd, qse_httpd_client_t* client,
@ -357,7 +357,7 @@ static QSE_INLINE void purge_tasks (
while (dequeue_task (httpd, client) == 0);
}
/* --------------------------------------------------- */
/* ----------------------------------------------------------------------- */
static int htrd_peek_request (qse_htrd_t* htrd, qse_htre_t* req)
{
@ -377,7 +377,7 @@ static qse_htrd_recbs_t htrd_recbs =
QSE_STRUCT_FIELD (poke, htrd_poke_request)
};
/* --------------------------------------------------- */
/* ----------------------------------------------------------------------- */
static qse_httpd_client_t* new_client (
qse_httpd_t* httpd, qse_httpd_client_t* tmpl)
@ -580,7 +580,7 @@ qse_printf (QSE_T("MUX ADDHND CLIENT READ %d\n"), client->handle.i);
return 0;
}
/* --------------------------------------------------- */
/* ----------------------------------------------------------------------- */
static void deactivate_servers (qse_httpd_t* httpd)
{
@ -726,7 +726,37 @@ qse_httpd_server_t* qse_httpd_getprevserver (qse_httpd_t* httpd, qse_httpd_serve
return server->prev;
}
/* --------------------------------------------------- */
/* ----------------------------------------------------------------------- */
#if 0
qse_httpd_dns_t* qse_httpd_attachdns (qse_httpd_t* httpd, qse_httpd_dns_dope_t* dns, qse_size_t xtnsize)
{
qse_httpd_dns_t* dns;
dns = qse_httpd_callocmem (httpd, QSE_SIZEOF(*dns) + xtnsize);
if (dns == QSE_NULL) return QSE_NULL;
dns->type = QSE_HTTPD_SERVER;
/* copy the dns dope */
dns->dope = *dope;
/* and correct some fields in case the dope contains invalid stuffs */
dns->dope.flags &= ~QSE_HTTPD_SERVER_ACTIVE;
/* chain the dns to the tail of the list */
dns->prev = httpd->dns.list.tail;
dns->next = QSE_NULL;
if (httpd->dns.list.tail)
httpd->dns.list.tail->next = dns;
else
httpd->dns.list.head = dns;
httpd->dns.list.tail = dns;
httpd->dns.navail++;
return dns;
}
#endif
/* ----------------------------------------------------------------------- */
static int read_from_client (qse_httpd_t* httpd, qse_httpd_client_t* client)
{
@ -858,6 +888,181 @@ qse_printf (QSE_T("!!!!!FEEDING OK OK OK OK %d from %d\n"), (int)m, (int)client-
return 0;
}
static int update_mux_for_current_task (qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
qse_size_t i;
/* the code here is pretty fragile. there is a high chance
* that something can go wrong if the task handler plays
* with the trigger field in an unexpected manner.
*/
for (i = 0; i < QSE_COUNTOF(task->trigger); i++)
{
task->trigger[i].mask &= ~(QSE_HTTPD_TASK_TRIGGER_READABLE |
QSE_HTTPD_TASK_TRIGGER_WRITABLE);
}
if (QSE_MEMCMP (client->trigger, task->trigger, QSE_SIZEOF(client->trigger)) != 0 ||
((client->status & CLIENT_MUTE) && !(client->status & CLIENT_MUTE_DELETED)))
{
/* manipulate muxtiplexer settings if there are trigger changes */
int has_trigger;
int trigger_mux_mask;
int client_handle_mux_mask;
int client_handle_mux_status;
/* delete previous trigger handles */
for (i = 0; i < QSE_COUNTOF(task->trigger); i++)
{
if (client->status & CLIENT_TASK_TRIGGER_IN_MUX(i))
{
httpd->opt.scb.mux.delhnd (httpd, httpd->mux, client->trigger[i].handle);
client->status &= ~CLIENT_TASK_TRIGGER_IN_MUX(i);
}
}
has_trigger = 0;
client_handle_mux_mask = 0;
client_handle_mux_status = 0;
if (client->status & CLIENT_MUTE)
{
client->status |= CLIENT_MUTE_DELETED;
}
else
{
client_handle_mux_mask |= QSE_HTTPD_MUX_READ;
client_handle_mux_status |= CLIENT_HANDLE_READ_IN_MUX;
}
/* add new trigger handles */
for (i = 0; i < QSE_COUNTOF(task->trigger); i++)
{
trigger_mux_mask = 0;
if (task->trigger[i].mask & QSE_HTTPD_TASK_TRIGGER_READ)
{
if (task->trigger[i].handle.i != client->handle.i ||
!(client->status & CLIENT_MUTE))
{
trigger_mux_mask |= QSE_HTTPD_MUX_READ;
}
}
if (task->trigger[i].mask & QSE_HTTPD_TASK_TRIGGER_WRITE)
trigger_mux_mask |= QSE_HTTPD_MUX_WRITE;
if (trigger_mux_mask)
{
has_trigger = 1;
if (task->trigger[i].handle.i == client->handle.i) /* TODO: no direct comparsion */
{
/* if the client handle is included in the trigger,
* delay its manipulation until the loop is over.
* instead, just remember what mask is requested */
client_handle_mux_mask |= trigger_mux_mask;
}
else
{
if (httpd->opt.scb.mux.addhnd (
httpd, httpd->mux, task->trigger[i].handle,
trigger_mux_mask, client) <= -1)
{
return -1;
}
client->status |= CLIENT_TASK_TRIGGER_IN_MUX(i);
}
}
}
if (client_handle_mux_mask)
{
/* if the client handle is included in the trigger
* and writing is requested, arrange writing to be
* enabled */
if (client_handle_mux_mask & QSE_HTTPD_MUX_WRITE)
client_handle_mux_status |= CLIENT_HANDLE_WRITE_IN_MUX;
}
else if (!has_trigger)
{
/* if there is no trigger, writing should be enabled */
client_handle_mux_status |= CLIENT_HANDLE_WRITE_IN_MUX;
client_handle_mux_mask |= QSE_HTTPD_MUX_WRITE;
}
if ((client->status & CLIENT_HANDLE_IN_MUX) !=
(client_handle_mux_status & CLIENT_HANDLE_IN_MUX))
{
httpd->opt.scb.mux.delhnd (httpd, httpd->mux, client->handle);
client->status &= ~CLIENT_HANDLE_IN_MUX;
if (client_handle_mux_mask)
{
if (httpd->opt.scb.mux.addhnd (
httpd, httpd->mux, client->handle,
client_handle_mux_mask, client) <= -1)
{
return -1;
}
client->status |= client_handle_mux_status;
}
}
QSE_MEMCPY (client->trigger, task->trigger, QSE_SIZEOF(client->trigger));
}
return 0;
}
static int update_mux_for_next_task (qse_httpd_t* httpd, qse_httpd_client_t* client)
{
int mux_mask;
int mux_status;
mux_mask = QSE_HTTPD_MUX_READ;
mux_status = CLIENT_HANDLE_READ_IN_MUX;
if (client->task.head)
{
/* there is a pending task. arrange to
* trigger it as if it is just entasked */
mux_mask |= QSE_HTTPD_MUX_WRITE;
mux_status |= CLIENT_HANDLE_WRITE_IN_MUX;
if (client->status & CLIENT_MUTE)
{
mux_mask &= ~QSE_HTTPD_MUX_READ;
mux_status &= ~CLIENT_HANDLE_READ_IN_MUX;
}
}
else
{
if (client->status & CLIENT_MUTE)
{
/* no more task. but this client
* has closed connection previously */
return -1;
}
}
if ((client->status & CLIENT_HANDLE_IN_MUX) !=
(mux_status & CLIENT_HANDLE_IN_MUX))
{
httpd->opt.scb.mux.delhnd (httpd, httpd->mux, client->handle);
client->status &= ~CLIENT_HANDLE_IN_MUX;
if (mux_status)
{
if (httpd->opt.scb.mux.addhnd (
httpd, httpd->mux, client->handle, mux_mask, client) <= -1)
{
return -1;
}
client->status |= mux_status;
}
}
QSE_MEMSET (client->trigger, 0, QSE_SIZEOF(client->trigger));
return 0;
}
static int invoke_client_task (
qse_httpd_t* httpd, qse_httpd_client_t* client,
qse_ubi_t handle, int mask)
@ -903,13 +1108,15 @@ static int invoke_client_task (
if (task->trigger[i].handle.i == handle.i) /* TODO: no direct comparision */
{
if (task->trigger[i].mask & QSE_HTTPD_TASK_TRIGGER_READ)
if (mask & QSE_HTTPD_MUX_READ)
{
QSE_ASSERT (task->trigger[i].mask & QSE_HTTPD_TASK_TRIGGER_READ);
trigger_fired = 1;
task->trigger[i].mask |= QSE_HTTPD_TASK_TRIGGER_READABLE;
}
if (task->trigger[i].mask & QSE_HTTPD_TASK_TRIGGER_WRITE)
if (mask & QSE_HTTPD_MUX_WRITE)
{
QSE_ASSERT (task->trigger[i].mask & QSE_HTTPD_TASK_TRIGGER_WRITE);
trigger_fired = 1;
task->trigger[i].mask |= QSE_HTTPD_TASK_TRIGGER_WRITABLE;
if (handle.i == client->handle.i) client_handle_writable = 1; /* TODO: no direct comparison */
@ -932,183 +1139,22 @@ static int invoke_client_task (
}
n = task->main (httpd, client, task);
if (n <= -1) return -1;
if (n <= -1)
{
/* task error */
return -1;
}
else if (n == 0)
{
int mux_mask;
int mux_status;
/* the current task is over. remove the task
* from the queue. dequeue_task() clears task triggers
* from the mux. so i don't clear them explicitly here */
dequeue_task (httpd, client);
mux_mask = QSE_HTTPD_MUX_READ;
mux_status = CLIENT_HANDLE_READ_IN_MUX;
if (client->task.head)
{
/* there is a pending task. arrange to
* trigger it as if it is just entasked */
mux_mask |= QSE_HTTPD_MUX_WRITE;
mux_status |= CLIENT_HANDLE_WRITE_IN_MUX;
if (client->status & CLIENT_MUTE)
{
mux_mask &= ~QSE_HTTPD_MUX_READ;
mux_status &= ~CLIENT_HANDLE_READ_IN_MUX;
}
}
else
{
if (client->status & CLIENT_MUTE)
{
/* no more task. but this client
* has closed connection previously */
return -1;
}
}
if ((client->status & CLIENT_HANDLE_IN_MUX) !=
(mux_status & CLIENT_HANDLE_IN_MUX))
{
httpd->opt.scb.mux.delhnd (httpd, httpd->mux, client->handle);
client->status &= ~CLIENT_HANDLE_IN_MUX;
if (mux_status)
{
if (httpd->opt.scb.mux.addhnd (
httpd, httpd->mux, client->handle, mux_mask, client) <= -1)
{
return -1;
}
client->status |= mux_status;
}
}
QSE_MEMSET (client->trigger, 0, QSE_SIZEOF(client->trigger));
return 0;
return update_mux_for_next_task (httpd, client);
}
else
{
/* the code here is pretty fragile. there is a high chance
* that something can go wrong if the task handler plays
* with the trigger field in an unexpected manner.
*/
for (i = 0; i < QSE_COUNTOF(task->trigger); i++)
{
task->trigger[i].mask &= ~(QSE_HTTPD_TASK_TRIGGER_READABLE |
QSE_HTTPD_TASK_TRIGGER_WRITABLE);
}
if (QSE_MEMCMP (client->trigger, task->trigger, QSE_SIZEOF(client->trigger)) != 0 ||
((client->status & CLIENT_MUTE) && !(client->status & CLIENT_MUTE_DELETED)))
{
/* manipulate muxtiplexer settings if there are trigger changes */
int has_trigger;
int trigger_mux_mask;
int client_handle_mux_mask;
int client_handle_mux_status;
/* delete previous trigger handles */
for (i = 0; i < QSE_COUNTOF(task->trigger); i++)
{
if (client->status & CLIENT_TASK_TRIGGER_IN_MUX(i))
{
httpd->opt.scb.mux.delhnd (httpd, httpd->mux, client->trigger[i].handle);
client->status &= ~CLIENT_TASK_TRIGGER_IN_MUX(i);
}
}
has_trigger = 0;
client_handle_mux_mask = 0;
client_handle_mux_status = 0;
if (client->status & CLIENT_MUTE)
{
client->status |= CLIENT_MUTE_DELETED;
}
else
{
client_handle_mux_mask |= QSE_HTTPD_MUX_READ;
client_handle_mux_status |= CLIENT_HANDLE_READ_IN_MUX;
}
/* add new trigger handles */
for (i = 0; i < QSE_COUNTOF(task->trigger); i++)
{
trigger_mux_mask = 0;
if (task->trigger[i].mask & QSE_HTTPD_TASK_TRIGGER_READ)
{
if (task->trigger[i].handle.i != client->handle.i ||
!(client->status & CLIENT_MUTE))
{
trigger_mux_mask |= QSE_HTTPD_MUX_READ;
}
}
if (task->trigger[i].mask & QSE_HTTPD_TASK_TRIGGER_WRITE)
trigger_mux_mask |= QSE_HTTPD_MUX_WRITE;
if (trigger_mux_mask)
{
has_trigger = 1;
if (task->trigger[i].handle.i == client->handle.i) /* TODO: no direct comparsion */
{
/* if the client handle is included in the trigger,
* delay its manipulation until the loop is over.
* instead, just remember what mask is requested */
client_handle_mux_mask |= trigger_mux_mask;
}
else
{
if (httpd->opt.scb.mux.addhnd (
httpd, httpd->mux, task->trigger[i].handle,
trigger_mux_mask, client) <= -1)
{
return -1;
}
client->status |= CLIENT_TASK_TRIGGER_IN_MUX(i);
}
}
}
if (client_handle_mux_mask)
{
/* if the client handle is included in the trigger
* and writing is requested, arrange writing to be
* enabled */
if (client_handle_mux_mask & QSE_HTTPD_MUX_WRITE)
client_handle_mux_status |= CLIENT_HANDLE_WRITE_IN_MUX;
}
else if (!has_trigger)
{
/* if there is no trigger, writing should be enabled */
client_handle_mux_status |= CLIENT_HANDLE_WRITE_IN_MUX;
client_handle_mux_mask |= QSE_HTTPD_MUX_WRITE;
}
if ((client->status & CLIENT_HANDLE_IN_MUX) !=
(client_handle_mux_status & CLIENT_HANDLE_IN_MUX))
{
httpd->opt.scb.mux.delhnd (httpd, httpd->mux, client->handle);
client->status &= ~CLIENT_HANDLE_IN_MUX;
if (client_handle_mux_mask)
{
if (httpd->opt.scb.mux.addhnd (
httpd, httpd->mux, client->handle,
client_handle_mux_mask, client) <= -1)
{
return -1;
}
client->status |= client_handle_mux_status;
}
}
QSE_MEMCPY (client->trigger, task->trigger, QSE_SIZEOF(client->trigger));
}
return 0;
return update_mux_for_current_task (httpd, client, task);
}
}
@ -1219,7 +1265,7 @@ qse_httpd_task_t* qse_httpd_entask (
client->status &= ~CLIENT_HANDLE_IN_MUX;
#if 0
qse_printf (QSE_T("MUX ADDHND CLIENT RW(ENTASK) %d\n"), client->handle.i);
printf ("MUX ADDHND CLIENT RW(ENTASK) %d\n", client->handle.i);
#endif
if (httpd->opt.scb.mux.addhnd (
httpd, httpd->mux, client->handle,