enhanced task trigger for httpd.

added cleanup logic for idle httpd clients
This commit is contained in:
hyung-hwan 2012-03-20 15:31:10 +00:00
parent c55bceb220
commit 7c224230f2
4 changed files with 305 additions and 188 deletions

View File

@ -220,8 +220,18 @@ enum qse_httpd_task_trigger_mask_t
QSE_HTTPD_TASK_TRIGGER_READABLE = (1 << 3), QSE_HTTPD_TASK_TRIGGER_READABLE = (1 << 3),
QSE_HTTPD_TASK_TRIGGER_RELAYABLE = (1 << 4), QSE_HTTPD_TASK_TRIGGER_RELAYABLE = (1 << 4),
QSE_HTTPD_TASK_TRIGGER_WRITABLE = (1 << 5) QSE_HTTPD_TASK_TRIGGER_WRITABLE = (1 << 5)
}; };
typedef struct qse_httpd_task_trigger_t qse_httpd_task_trigger_t;
struct qse_httpd_task_trigger_t
{
int mask; /* QSE_HTTPD_TASK_TRIGGER_READ | QSE_HTTPD_TASK_TRIGGER_WRITE */
qse_ubi_t handle;
};
#define QSE_HTTPD_TASK_TRIGGER_MAX 3
struct qse_httpd_task_t struct qse_httpd_task_t
{ {
/* == PUBLIC == */ /* == PUBLIC == */
@ -229,22 +239,11 @@ struct qse_httpd_task_t
/* you must not call another entask functions from within /* you must not call another entask functions from within
* an initailizer. you can call entask functions from within * an initailizer. you can call entask functions from within
* a finalizer and a main function. */ * a finalizer and a main function. */
qse_httpd_task_init_t init; qse_httpd_task_init_t init;
qse_httpd_task_fini_t fini; qse_httpd_task_fini_t fini;
qse_httpd_task_main_t main; qse_httpd_task_main_t main;
qse_httpd_task_trigger_t trigger[QSE_HTTPD_TASK_TRIGGER_MAX];
int trigger_mask; void* ctx;
qse_ubi_t trigger[3];
#if 0
struct
{
int mask; /* QSE_HTTPD_TASK_TRIGGER_READ | QSE_HTTPD_TASK_TRIGGER_WRITE */
qse_ubi_t handle;
} trigger[3];
#endif
void* ctx;
/* == PRIVATE == */ /* == PRIVATE == */
qse_httpd_task_t* prev; qse_httpd_task_t* prev;
@ -256,23 +255,25 @@ struct qse_httpd_client_t
{ {
/* == PUBLIC == */ /* == PUBLIC == */
qse_ubi_t handle; qse_ubi_t handle;
qse_ubi_t handle2; qse_ubi_t handle2;
qse_nwad_t local_addr; qse_nwad_t local_addr;
qse_nwad_t remote_addr; qse_nwad_t remote_addr;
/* == PRIVATE == */ /* == PRIVATE == */
qse_htrd_t* htrd; qse_htrd_t* htrd;
int secure; int secure;
int status; int status;
qse_httpd_task_trigger_t trigger[QSE_HTTPD_TASK_TRIGGER_MAX];
qse_ntime_t last_active;
qse_httpd_client_t* prev; qse_httpd_client_t* prev;
qse_httpd_client_t* next; qse_httpd_client_t* next;
qse_httpd_client_t* bad_next;
qse_httpd_client_t* bad_next; qse_httpd_client_t* prev_tasked;
qse_httpd_client_t* next_tasked;
qse_httpd_client_t* prev_tasked;
qse_httpd_client_t* next_tasked;
struct struct
{ {

View File

@ -1591,20 +1591,17 @@ else qse_printf (QSE_T("!!!SNATCHING DONE\n"));
/* since there is no more to read from the client side. /* since there is no more to read from the client side.
* the relay trigger is not needed any more. */ * the relay trigger is not needed any more. */
task->trigger_mask &= task->trigger[2].mask = 0;
~(QSE_HTTPD_TASK_TRIGGER_RELAY |
QSE_HTTPD_TASK_TRIGGER_RELAYABLE);
if (QSE_MBS_LEN(cgi->reqfwdbuf) > 0 && cgi->pio_inited && if (QSE_MBS_LEN(cgi->reqfwdbuf) > 0 && cgi->pio_inited &&
!(task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_WRITE)) !(task->trigger[1].mask & QSE_HTTPD_TASK_TRIGGER_WRITE))
{ {
/* there's nothing more to read from the client side. /* there's nothing more to read from the client side.
* there's something to forward in the forwarding buffer. * there's something to forward in the forwarding buffer.
* but no write trigger is set. add the write trigger * but no write trigger is set. add the write trigger
* for task invocation. */ * for task invocation. */
task->trigger_mask |= QSE_HTTPD_TASK_TRIGGER_WRITE; task->trigger[1].mask = QSE_HTTPD_TASK_TRIGGER_WRITE;
task->trigger_mask &= ~QSE_HTTPD_TASK_TRIGGER_WRITABLE; task->trigger[1].handle = qse_pio_gethandleasubi (&cgi->pio, QSE_PIO_IN);
task->trigger[2] = qse_pio_gethandleasubi (&cgi->pio, QSE_PIO_IN);
} }
} }
else if (!cgi->reqfwderr) else if (!cgi->reqfwderr)
@ -1691,11 +1688,8 @@ qse_printf (QSE_T("FORWARD: @@@@@@@@WRITE TO CGI FAILED\n"));
* clear the relay and write triggers. * clear the relay and write triggers.
*/ */
qse_printf (QSE_T("FORWARD: @@@@@@@@NOTHING MORE TO WRITE TO CGI\n")); qse_printf (QSE_T("FORWARD: @@@@@@@@NOTHING MORE TO WRITE TO CGI\n"));
task->trigger_mask &= task->trigger[1].mask = 0;
~(QSE_HTTPD_TASK_TRIGGER_RELAY | task->trigger[2].mask = 0;
QSE_HTTPD_TASK_TRIGGER_RELAYABLE |
QSE_HTTPD_TASK_TRIGGER_WRITE |
QSE_HTTPD_TASK_TRIGGER_WRITABLE);
} }
} }
@ -1855,11 +1849,11 @@ static int task_main_cgi_5 (
QSE_ASSERT (cgi->pio_inited); QSE_ASSERT (cgi->pio_inited);
if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_RELAYABLE) if (task->trigger[2].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{ {
cgi_forward_content (httpd, task, 0); cgi_forward_content (httpd, task, 0);
} }
else if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE) else if (task->trigger[1].mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE)
{ {
cgi_forward_content (httpd, task, 1); cgi_forward_content (httpd, task, 1);
} }
@ -1896,17 +1890,19 @@ static int task_main_cgi_4 (
QSE_ASSERT (cgi->pio_inited); QSE_ASSERT (cgi->pio_inited);
qse_printf (QSE_T("task_main_cgi_4 trigger_mask = %d\n"), task->trigger_mask); qse_printf (QSE_T("task_main_cgi_4 trigger[0].mask=%d trigger[1].mask=%d trigger[2].mask=%d\n"),
if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_RELAYABLE) task->trigger[0].mask, task->trigger[1].mask, task->trigger[2].mask);
if (task->trigger[2].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{ {
cgi_forward_content (httpd, task, 0); cgi_forward_content (httpd, task, 0);
} }
else if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE) else if (task->trigger[1].mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE)
{ {
cgi_forward_content (httpd, task, 1); cgi_forward_content (httpd, task, 1);
} }
if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_READABLE) if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{ {
/* this function assumes that the chunk length does not exceeded /* this function assumes that the chunk length does not exceeded
* 4 hexadecimal digits. */ * 4 hexadecimal digits. */
@ -1942,9 +1938,7 @@ qse_printf (QSE_T("task_main_cgi_4 trigger_mask = %d\n"), task->trigger_mask);
if (n == 0) if (n == 0)
{ {
/* the cgi script closed the output */ /* the cgi script closed the output */
task->trigger_mask &= task->trigger[0].mask = 0;
~(QSE_HTTPD_TASK_TRIGGER_READ |
QSE_HTTPD_TASK_TRIGGER_READABLE);
cgi->buf[cgi->buflen++] = QSE_MT('0'); cgi->buf[cgi->buflen++] = QSE_MT('0');
cgi->buf[cgi->buflen++] = QSE_MT('\r'); cgi->buf[cgi->buflen++] = QSE_MT('\r');
@ -1988,9 +1982,7 @@ qse_printf (QSE_T("task_main_cgi_4 trigger_mask = %d\n"), task->trigger_mask);
} }
if (n == 0) if (n == 0)
{ {
task->trigger_mask &= task->trigger[0].mask = 0;
~(QSE_HTTPD_TASK_TRIGGER_READ |
QSE_HTTPD_TASK_TRIGGER_READABLE);
task->main = task_main_cgi_5; task->main = task_main_cgi_5;
/* ok to chain-call since this task is called /* ok to chain-call since this task is called
* if the client-side is writable */ * if the client-side is writable */
@ -2043,16 +2035,16 @@ static int task_main_cgi_3 (
qse_ssize_t n; qse_ssize_t n;
qse_size_t count; qse_size_t count;
if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_RELAYABLE) if (task->trigger[2].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{ {
cgi_forward_content (httpd, task, 0); cgi_forward_content (httpd, task, 0);
} }
else if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE) else if (task->trigger[1].mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE)
{ {
cgi_forward_content (httpd, task, 1); cgi_forward_content (httpd, task, 1);
} }
if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_READABLE) if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{ {
count = MAX_SEND_SIZE; count = MAX_SEND_SIZE;
if (count >= cgi->res_left) count = cgi->res_left; if (count >= cgi->res_left) count = cgi->res_left;
@ -2100,17 +2092,17 @@ static int task_main_cgi_2 (
QSE_ASSERT (cgi->pio_inited); QSE_ASSERT (cgi->pio_inited);
qse_printf (QSE_T("[cgi_2 ]\n")); qse_printf (QSE_T("[cgi_2 ]\n"));
if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_RELAYABLE) if (task->trigger[2].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{ {
qse_printf (QSE_T("[cgi_2 write]\n")); qse_printf (QSE_T("[cgi_2 write]\n"));
cgi_forward_content (httpd, task, 0); cgi_forward_content (httpd, task, 0);
} }
else if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE) else if (task->trigger[1].mask & QSE_HTTPD_TASK_TRIGGER_WRITABLE)
{ {
cgi_forward_content (httpd, task, 1); cgi_forward_content (httpd, task, 1);
} }
if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_READABLE) if (task->trigger[0].mask & QSE_HTTPD_TASK_TRIGGER_READABLE)
{ {
qse_printf (QSE_T("[cgi_2 read]\n")); qse_printf (QSE_T("[cgi_2 read]\n"));
/* <- can i make it non-block?? or use select??? pio_tryread()? */ /* <- can i make it non-block?? or use select??? pio_tryread()? */
@ -2248,8 +2240,8 @@ static int task_main_cgi (
* it the output from the child is available, this task * it the output from the child is available, this task
* writes it back to the client. so add a trigger for * writes it back to the client. so add a trigger for
* checking the data availability from the child process */ * checking the data availability from the child process */
task->trigger_mask = QSE_HTTPD_TASK_TRIGGER_READ; task->trigger[0].mask = QSE_HTTPD_TASK_TRIGGER_READ;
task->trigger[0] = qse_pio_gethandleasubi (&cgi->pio, QSE_PIO_OUT); task->trigger[0].handle = qse_pio_gethandleasubi (&cgi->pio, QSE_PIO_OUT);
if (cgi->reqfwdbuf) if (cgi->reqfwdbuf)
{ {
/* the existence of the forwarding buffer leads to a trigger /* the existence of the forwarding buffer leads to a trigger
@ -2259,8 +2251,8 @@ static int task_main_cgi (
{ {
/* there are still things to forward from the client-side. /* there are still things to forward from the client-side.
* i can rely on this relay trigger for task invocation. */ * i can rely on this relay trigger for task invocation. */
task->trigger_mask |= QSE_HTTPD_TASK_TRIGGER_RELAY; task->trigger[2].mask = QSE_HTTPD_TASK_TRIGGER_READ;
task->trigger[1].i = client->handle.i; task->trigger[2].handle = client->handle;
} }
else if (QSE_MBS_LEN(cgi->reqfwdbuf) > 0) else if (QSE_MBS_LEN(cgi->reqfwdbuf) > 0)
{ {
@ -2284,8 +2276,8 @@ qse_printf (QSE_T("FORWARDING INITIAL PART OF CONTENT...\n"));
* a trigger for invocation. ask the main loop to * a trigger for invocation. ask the main loop to
* invoke this task so long as it is able to write * invoke this task so long as it is able to write
* to the child process */ * to the child process */
task->trigger_mask |= QSE_HTTPD_TASK_TRIGGER_WRITE; task->trigger[1].mask = QSE_HTTPD_TASK_TRIGGER_WRITE;
task->trigger[2] = qse_pio_gethandleasubi (&cgi->pio, QSE_PIO_IN); task->trigger[1].handle = qse_pio_gethandleasubi (&cgi->pio, QSE_PIO_IN);
} }
} }
} }

View File

@ -46,24 +46,13 @@ QSE_IMPLEMENT_COMMON_FUNCTIONS (httpd)
#define DEFAULT_PORT 80 #define DEFAULT_PORT 80
#define DEFAULT_SECURE_PORT 443 #define DEFAULT_SECURE_PORT 443
enum client_status_t #define CLIENT_BAD (1 << 0)
{ #define CLIENT_READY (1 << 1)
CLIENT_BAD = (1 << 0), #define CLIENT_SECURE (1 << 2)
CLIENT_READY = (1 << 1), #define CLIENT_HANDLE_READ_IN_MUX (1 << 3)
CLIENT_SECURE = (1 << 2), #define CLIENT_HANDLE_WRITE_IN_MUX (1 << 4)
#define CLIENT_HANDLE_IN_MUX (CLIENT_HANDLE_READ_IN_MUX|CLIENT_HANDLE_WRITE_IN_MUX)
CLIENT_HANDLE_READ_IN_MUX = (1 << 3), #define CLIENT_TASK_TRIGGER_IN_MUX(i) (1 << ((i) + 5))
CLIENT_HANDLE_WRITE_IN_MUX = (1 << 4),
CLIENT_HANDLE_IN_MUX = (CLIENT_HANDLE_READ_IN_MUX |
CLIENT_HANDLE_WRITE_IN_MUX),
CLIENT_TASK_TRIGGER_READ_IN_MUX = (1 << 5),
CLIENT_TASK_TRIGGER_RELAY_IN_MUX = (1 << 6),
CLIENT_TASK_TRIGGER_WRITE_IN_MUX = (1 << 7),
CLIENT_TASK_TRIGGER_IN_MUX = (CLIENT_TASK_TRIGGER_READ_IN_MUX |
CLIENT_TASK_TRIGGER_RELAY_IN_MUX |
CLIENT_TASK_TRIGGER_WRITE_IN_MUX)
};
static void free_server_list ( static void free_server_list (
qse_httpd_t* httpd, qse_httpd_server_t* server); qse_httpd_t* httpd, qse_httpd_server_t* server);
@ -217,27 +206,22 @@ static QSE_INLINE int dequeue_task (
qse_httpd_t* httpd, qse_httpd_client_t* client) qse_httpd_t* httpd, qse_httpd_client_t* client)
{ {
qse_httpd_task_t* task; qse_httpd_task_t* task;
qse_size_t i;
if (client->task.count <= 0) return -1; if (client->task.count <= 0) return -1;
task = client->task.head; task = client->task.head;
/* clear task triggers from mux if they are registered */ /* clear task triggers from mux if they are registered */
if (client->status & CLIENT_TASK_TRIGGER_READ_IN_MUX) for (i = 0; i < QSE_COUNTOF(task->trigger); i++)
{ {
httpd->cbs->mux.delhnd (httpd, httpd->mux, task->trigger[0]); if (client->status & CLIENT_TASK_TRIGGER_IN_MUX(i))
client->status &= ~CLIENT_TASK_TRIGGER_READ_IN_MUX; {
} httpd->cbs->mux.delhnd (httpd, httpd->mux, task->trigger[i].handle);
if (client->status & CLIENT_TASK_TRIGGER_RELAY_IN_MUX) client->status &= ~CLIENT_TASK_TRIGGER_IN_MUX(i);
{ }
httpd->cbs->mux.delhnd (httpd, httpd->mux, task->trigger[1]);
client->status &= ~CLIENT_TASK_TRIGGER_RELAY_IN_MUX;
}
if (client->status & CLIENT_TASK_TRIGGER_WRITE_IN_MUX)
{
httpd->cbs->mux.delhnd (httpd, httpd->mux, task->trigger[2]);
client->status &= ~CLIENT_TASK_TRIGGER_WRITE_IN_MUX;
} }
/* --------------------------------------------------- */ /* --------------------------------------------------- */
if (task == client->task.tail) if (task == client->task.tail)
@ -388,6 +372,28 @@ static void purge_client_list (qse_httpd_t* httpd)
purge_client (httpd, httpd->client.list.tail); purge_client (httpd, httpd->client.list.tail);
} }
static void move_client_to_tail (qse_httpd_t* httpd, qse_httpd_client_t* client)
{
if (httpd->client.list.tail != client)
{
qse_httpd_client_t* prev;
qse_httpd_client_t* next;
prev = client->prev;
next = client->next;
if (prev) prev->next = next;
else httpd->client.list.head = next;
if (next) next->prev = prev;
else httpd->client.list.tail = prev;
client->next = QSE_NULL;
client->prev = httpd->client.list.tail;
httpd->client.list.tail->next = client;
httpd->client.list.tail = client;
}
}
static int accept_client ( static int accept_client (
qse_httpd_t* httpd, void* mux, qse_ubi_t handle, int mask, void* cbarg) qse_httpd_t* httpd, void* mux, qse_ubi_t handle, int mask, void* cbarg)
{ {
@ -433,7 +439,8 @@ qse_printf (QSE_T("failed to accept from server %s\n"), tmp);
} }
client->status |= CLIENT_HANDLE_READ_IN_MUX; client->status |= CLIENT_HANDLE_READ_IN_MUX;
/* link the new client to the back of the client list. */ qse_gettime (&client->last_active); /* TODO: error check */
/* link the new client to the tail of the client list. */
if (httpd->client.list.tail) if (httpd->client.list.tail)
{ {
QSE_ASSERT (httpd->client.list.head); QSE_ASSERT (httpd->client.list.head);
@ -713,10 +720,11 @@ static int invoke_client_task (
qse_ubi_t handle, int mask) qse_ubi_t handle, int mask)
{ {
qse_httpd_task_t* task; qse_httpd_task_t* task;
int n; qse_size_t i;
int n, trigger_fired, client_handle_writable;
/* TODO: handle comparison callback ... */ /* TODO: handle comparison callback ... */
if (handle.i == client->handle.i && (mask & QSE_HTTPD_MUX_READ)) if (handle.i == client->handle.i && (mask & QSE_HTTPD_MUX_READ)) /* TODO: no direct comparision */
{ {
if (read_from_client (httpd, client) <= -1) if (read_from_client (httpd, client) <= -1)
{ {
@ -732,31 +740,30 @@ static int invoke_client_task (
task = client->task.head; task = client->task.head;
if (task == QSE_NULL) return 0; if (task == QSE_NULL) return 0;
task->trigger_mask &= trigger_fired = 0;
~(QSE_HTTPD_TASK_TRIGGER_READABLE | client_handle_writable = 0;
QSE_HTTPD_TASK_TRIGGER_RELAYABLE |
QSE_HTTPD_TASK_TRIGGER_WRITABLE);
qse_printf (QSE_T("handle.i %d mask %d\n"), handle.i, mask); for (i = 0; i < QSE_COUNTOF(task->trigger); i++)
if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_READ)
{ {
if ((mask & QSE_HTTPD_MUX_READ) && task->trigger[0].i == handle.i) task->trigger[i].mask &= ~(QSE_HTTPD_TASK_TRIGGER_READABLE |
task->trigger_mask |= QSE_HTTPD_TASK_TRIGGER_READABLE; QSE_HTTPD_TASK_TRIGGER_WRITABLE);
}
if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_RELAY)
{
if ((mask & QSE_HTTPD_MUX_READ) && task->trigger[1].i == handle.i)
task->trigger_mask |= QSE_HTTPD_TASK_TRIGGER_RELAYABLE;
}
if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_WRITE)
{
if ((mask & QSE_HTTPD_MUX_WRITE) && task->trigger[2].i == handle.i)
task->trigger_mask |= QSE_HTTPD_TASK_TRIGGER_WRITABLE;
}
if (task->trigger_mask & (QSE_HTTPD_TASK_TRIGGER_READABLE | if (task->trigger[i].handle.i == handle.i) /* TODO: no direct comparision */
QSE_HTTPD_TASK_TRIGGER_RELAYABLE | {
QSE_HTTPD_TASK_TRIGGER_WRITABLE)) if (task->trigger[i].mask & QSE_HTTPD_TASK_TRIGGER_READ)
{
trigger_fired = 1;
task->trigger[i].mask |= QSE_HTTPD_TASK_TRIGGER_READABLE;
}
if (task->trigger[i].mask & QSE_HTTPD_TASK_TRIGGER_WRITE)
{
trigger_fired = 1;
task->trigger[i].mask |= QSE_HTTPD_TASK_TRIGGER_WRITABLE;
if (handle.i == client->handle.i) client_handle_writable = 1; /* TODO: no direct comparison */
}
}
}
if (trigger_fired && !client_handle_writable)
{ {
/* the task is invoked for triggers. /* the task is invoked for triggers.
* check if the client handle is writable */ * check if the client handle is writable */
@ -768,6 +775,10 @@ qse_printf (QSE_T("handle.i %d mask %d\n"), handle.i, mask);
} }
} }
/* locate an active client to the tail of the client list */
qse_gettime (&client->last_active); /* TODO: error check??? */
move_client_to_tail (httpd, client);
n = task->main (httpd, client, task); n = task->main (httpd, client, task);
qse_printf (QSE_T("task returend %d\n"), n); qse_printf (QSE_T("task returend %d\n"), n);
if (n <= -1) return -1; if (n <= -1) return -1;
@ -792,77 +803,118 @@ qse_printf (QSE_T("task returend %d\n"), n);
mux_status |= CLIENT_HANDLE_WRITE_IN_MUX; mux_status |= CLIENT_HANDLE_WRITE_IN_MUX;
} }
QSE_ASSERT (client->status & CLIENT_HANDLE_IN_MUX); if ((client->status & CLIENT_HANDLE_IN_MUX) !=
(mux_status & CLIENT_HANDLE_IN_MUX))
httpd->cbs->mux.delhnd (httpd, httpd->mux, client->handle);
client->status &= ~CLIENT_HANDLE_IN_MUX;
if (httpd->cbs->mux.addhnd (
httpd, httpd->mux, client->handle,
mux_mask, perform_client_task, client) <= -1) return -1;
client->status |= mux_status;
return 0;
}
else
{
/* TODO: if there are no changes in the triggers, do do delhnd/addhnd() */
static int mux_status[] =
{
CLIENT_TASK_TRIGGER_READ_IN_MUX,
CLIENT_TASK_TRIGGER_RELAY_IN_MUX,
CLIENT_TASK_TRIGGER_WRITE_IN_MUX
};
static int trigger_mask[] =
{
QSE_HTTPD_TASK_TRIGGER_READ,
QSE_HTTPD_TASK_TRIGGER_RELAY,
QSE_HTTPD_TASK_TRIGGER_WRITE
};
static int mux_mask[] =
{
QSE_HTTPD_MUX_READ,
QSE_HTTPD_MUX_READ,
QSE_HTTPD_MUX_WRITE
};
int i;
for (i = 0; i < 3; i++)
{
if (client->status & mux_status[i])
{
httpd->cbs->mux.delhnd (httpd, httpd->mux, task->trigger[i]);
client->status &= ~mux_status[i];
}
if (task->trigger_mask & trigger_mask[i])
{
if (client->handle.i != task->trigger[i].i)
{
if (httpd->cbs->mux.addhnd (
httpd, httpd->mux, task->trigger[i],
mux_mask[i], perform_client_task, client) <= -1) return -1;
client->status |= mux_status[i];
}
}
}
QSE_ASSERT (client->status & CLIENT_HANDLE_READ_IN_MUX);
if ((client->status & CLIENT_TASK_TRIGGER_IN_MUX) &&
(client->status & CLIENT_HANDLE_WRITE_IN_MUX))
{ {
httpd->cbs->mux.delhnd (httpd, httpd->mux, client->handle); httpd->cbs->mux.delhnd (httpd, httpd->mux, client->handle);
client->status &= ~CLIENT_HANDLE_IN_MUX; client->status &= ~CLIENT_HANDLE_IN_MUX;
if (httpd->cbs->mux.addhnd ( if (httpd->cbs->mux.addhnd (
httpd, httpd->mux, client->handle, httpd, httpd->mux, client->handle,
QSE_HTTPD_MUX_READ, perform_client_task, client) <= -1) return -1; mux_mask, perform_client_task, client) <= -1) return -1;
client->status |= CLIENT_HANDLE_READ_IN_MUX; client->status |= mux_status;
} }
QSE_MEMSET (client->trigger, 0, QSE_SIZEOF(client->trigger));
return 0;
}
else
{
/* the code here is pretty fragile. there is a high chance
* that something can go wrong if the task handler plays
* with the trigger field in an unexpected mannger.
*/
for (i = 0; i < QSE_COUNTOF(task->trigger); i++)
{
task->trigger[i].mask &= ~(QSE_HTTPD_TASK_TRIGGER_READABLE |
QSE_HTTPD_TASK_TRIGGER_WRITABLE);
}
if (QSE_MEMCMP (client->trigger, task->trigger, QSE_SIZEOF(client->trigger)) != 0)
{
/* manipulate muxtiplexer settings if there are trigger changes */
int has_trigger;
int trigger_mux_mask;
int client_handle_mux_mask;
int client_handle_mux_status;
/* delete previous trigger handles */
for (i = 0; i < QSE_COUNTOF(task->trigger); i++)
{
if (client->status & CLIENT_TASK_TRIGGER_IN_MUX(i))
{
httpd->cbs->mux.delhnd (httpd, httpd->mux, client->trigger[i].handle);
client->status &= ~CLIENT_TASK_TRIGGER_IN_MUX(i);
}
}
has_trigger = 0;
client_handle_mux_mask = QSE_HTTPD_MUX_READ;
/* add new trigger handles */
for (i = 0; i < QSE_COUNTOF(task->trigger); i++)
{
trigger_mux_mask = 0;
if (task->trigger[i].mask & QSE_HTTPD_TASK_TRIGGER_READ)
trigger_mux_mask |= QSE_HTTPD_MUX_READ;
if (task->trigger[i].mask & QSE_HTTPD_TASK_TRIGGER_WRITE)
trigger_mux_mask |= QSE_HTTPD_MUX_WRITE;
if (trigger_mux_mask)
{
has_trigger = 1;
if (task->trigger[i].handle.i == client->handle.i) /* TODO: no direct comparsion */
{
/* if the client handle is included in the trigger,
* delay its manipulation until the loop is over.
* instead, just remember what mask is requested */
client_handle_mux_mask |= trigger_mux_mask;
}
else
{
if (httpd->cbs->mux.addhnd (
httpd, httpd->mux, task->trigger[i].handle,
trigger_mux_mask, perform_client_task, client) <= -1) return -1;
client->status |= CLIENT_TASK_TRIGGER_IN_MUX(i);
}
}
}
/* manipulate the client handle. reading is always enabled
* on the cleint handle */
client_handle_mux_status = CLIENT_HANDLE_READ_IN_MUX;
if (client_handle_mux_mask)
{
/* if the client handle is included in the trigger
* and writing is requested, arrange writing to be
* enabled */
if (client_handle_mux_mask & QSE_HTTPD_MUX_WRITE)
client_handle_mux_status |= CLIENT_HANDLE_WRITE_IN_MUX;
}
else if (!has_trigger)
{
/* if there is no trigger, writing should be enabled */
client_handle_mux_status |= CLIENT_HANDLE_WRITE_IN_MUX;
client_handle_mux_mask |= QSE_HTTPD_MUX_WRITE;
}
if ((client->status & CLIENT_HANDLE_IN_MUX) !=
(client_handle_mux_status & CLIENT_HANDLE_IN_MUX))
{
httpd->cbs->mux.delhnd (httpd, httpd->mux, client->handle);
client->status &= ~CLIENT_HANDLE_IN_MUX;
if (httpd->cbs->mux.addhnd (
httpd, httpd->mux, client->handle,
client_handle_mux_mask, perform_client_task, client) <= -1) return -1;
client->status |= client_handle_mux_status;
}
QSE_MEMCPY (client->trigger, task->trigger, QSE_SIZEOF(client->trigger));
}
return 0; return 0;
} }
} }
@ -881,7 +933,13 @@ static int perform_client_task (
int x; int x;
x = httpd->cbs->client.accepted (httpd, client); x = httpd->cbs->client.accepted (httpd, client);
if (x <= -1) goto oops; if (x <= -1) goto oops;
if (x >= 1) client->status |= CLIENT_READY; if (x >= 1)
{
client->status |= CLIENT_READY;
qse_gettime (&client->last_active);
move_client_to_tail (httpd, client);
}
} }
else else
{ {
@ -912,6 +970,29 @@ qse_printf (QSE_T("PURGING BAD CLIENT XXXXXXXXXXXXXX\n"));
} }
} }
static void purge_idle_clients (qse_httpd_t* httpd)
{
qse_httpd_client_t* client;
qse_httpd_client_t* next_client;
qse_ntime_t now;
qse_gettime (&now);
client = httpd->client.list.head;
while (client)
{
next_client = client->next;
if (now <= client->last_active) break;
if (now - client->last_active < 30000) break; /* TODO: make this time configurable... */
qse_printf (QSE_T("PURGING IDLE CLIENT XXXXXXXXXXXXXX %d\n"), (int)(now - client->last_active));
purge_client (httpd, client);
client = next_client;
}
/* TODO: */
}
qse_httpd_task_t* qse_httpd_entask ( qse_httpd_task_t* qse_httpd_entask (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_t* httpd, qse_httpd_client_t* client,
qse_httpd_task_t* pred, const qse_httpd_task_t* task, qse_httpd_task_t* pred, const qse_httpd_task_t* task,
@ -1003,6 +1084,7 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: mux returned failure\n"));
} }
purge_bad_clients (httpd); purge_bad_clients (httpd);
purge_idle_clients (httpd);
} }
purge_client_list (httpd); purge_client_list (httpd);

View File

@ -412,7 +412,6 @@ struct mux_ev_t
int reqmask; int reqmask;
qse_httpd_muxcb_t cbfun; qse_httpd_muxcb_t cbfun;
void* cbarg; void* cbarg;
struct mux_ee_t* next;
}; };
struct mux_t struct mux_t
@ -425,8 +424,20 @@ struct mux_t
qse_size_t len; qse_size_t len;
qse_size_t capa; qse_size_t capa;
} ee; } ee;
struct
{
struct mux_ev_t* ptr;
qse_size_t capa;
} mev;
#if 0
qse_fma_t* fma;
#endif
}; };
#define MUX_EV_ALIGN 64
static void* mux_open (qse_httpd_t* httpd) static void* mux_open (qse_httpd_t* httpd)
{ {
struct mux_t* mux; struct mux_t* mux;
@ -444,6 +455,13 @@ static void* mux_open (qse_httpd_t* httpd)
return QSE_NULL; return QSE_NULL;
} }
#if 0
mux->fma = qse_fma_open (qse_getmmgr(httpd), QSE_NULL);
if (mux->fma == QSE_NULL)
{
}
#endif
return mux; return mux;
} }
@ -451,6 +469,7 @@ static void mux_close (qse_httpd_t* httpd, void* vmux)
{ {
struct mux_t* mux = (struct mux_t*)vmux; struct mux_t* mux = (struct mux_t*)vmux;
if (mux->ee.ptr) qse_httpd_freemem (httpd, mux->ee.ptr); if (mux->ee.ptr) qse_httpd_freemem (httpd, mux->ee.ptr);
if (mux->mev.ptr) qse_httpd_freemem (httpd, mux->mev.ptr);
close (mux->fd); close (mux->fd);
qse_httpd_freemem (httpd, mux); qse_httpd_freemem (httpd, mux);
} }
@ -473,8 +492,27 @@ static int mux_addhnd (
return -1; return -1;
} }
#if 0
mev = qse_httpd_allocmem (httpd, QSE_SIZEOF(*mev)); mev = qse_httpd_allocmem (httpd, QSE_SIZEOF(*mev));
if (mev == QSE_NULL) return -1; if (mev == QSE_NULL) return -1;
#endif
if (handle.i >= mux->mev.capa)
{
struct mux_ev_t* tmp;
qse_size_t tmpcapa;
tmpcapa = (((handle.i + MUX_EV_ALIGN - 1) / MUX_EV_ALIGN) * MUX_EV_ALIGN) + 1;
/* TODO: allocate this from fma ... */
tmp = qse_httpd_reallocmem (
httpd, mux->mev.ptr,
QSE_SIZEOF(*mux->mev.ptr) * tmpcapa); /* TODO: round up handle.i ... */
if (tmp == QSE_NULL) return -1;
mux->mev.ptr = tmp;
mux->mev.capa = tmpcapa;
}
if (mux->ee.len >= mux->ee.capa) if (mux->ee.len >= mux->ee.capa)
{ {
@ -485,7 +523,7 @@ static int mux_addhnd (
QSE_SIZEOF(*mux->ee.ptr) * (mux->ee.capa + 1) * 2); QSE_SIZEOF(*mux->ee.ptr) * (mux->ee.capa + 1) * 2);
if (tmp == QSE_NULL) if (tmp == QSE_NULL)
{ {
qse_httpd_freemem (httpd, mev); /*qse_httpd_freemem (httpd, mev);*/
return -1; return -1;
} }
@ -493,6 +531,8 @@ static int mux_addhnd (
mux->ee.capa = (mux->ee.capa + 1) * 2; mux->ee.capa = (mux->ee.capa + 1) * 2;
} }
mev = &mux->mev.ptr[handle.i];
mev->handle = handle; mev->handle = handle;
mev->reqmask = mask; mev->reqmask = mask;
mev->cbfun = cbfun; mev->cbfun = cbfun;
@ -504,7 +544,7 @@ static int mux_addhnd (
{ {
/* don't rollback ee.ptr */ /* don't rollback ee.ptr */
qse_httpd_seterrnum (httpd, syserr_to_errnum(errno)); qse_httpd_seterrnum (httpd, syserr_to_errnum(errno));
qse_httpd_freemem (httpd, mev); /*qse_httpd_freemem (httpd, mev);*/
return -1; return -1;
} }
@ -516,6 +556,8 @@ static int mux_delhnd (qse_httpd_t* httpd, void* vmux, qse_ubi_t handle)
{ {
struct mux_t* mux = (struct mux_t*)vmux; struct mux_t* mux = (struct mux_t*)vmux;
/* TODO: delete mev associated with handle.i */
if (epoll_ctl (mux->fd, EPOLL_CTL_DEL, handle.i, QSE_NULL) <= -1) if (epoll_ctl (mux->fd, EPOLL_CTL_DEL, handle.i, QSE_NULL) <= -1)
{ {
qse_httpd_seterrnum (httpd, syserr_to_errnum(errno)); qse_httpd_seterrnum (httpd, syserr_to_errnum(errno));