From c55bceb220f5064054ed80d8087821df58fab70b Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Sun, 18 Mar 2012 14:24:54 +0000 Subject: [PATCH] abstracted out most of muplexing code from httpd --- qse/include/qse/net/httpd.h | 117 ++-- qse/lib/net/httpd-task.c | 40 +- qse/lib/net/httpd.c | 1078 +++++++++++++++++++---------------- qse/lib/net/httpd.h | 70 +-- qse/samples/net/http01.c | 178 +++++- 5 files changed, 890 insertions(+), 593 deletions(-) diff --git a/qse/include/qse/net/httpd.h b/qse/include/qse/net/httpd.h index 7a50018c..cadf8140 100644 --- a/qse/include/qse/net/httpd.h +++ b/qse/include/qse/net/httpd.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -73,6 +74,7 @@ typedef struct qse_httpd_server_t qse_httpd_server_t; struct qse_httpd_server_t { qse_httpd_server_t* next; + int active; qse_nwad_t nwad; int secure; @@ -81,6 +83,20 @@ struct qse_httpd_server_t qse_ubi_t handle; }; +enum qse_httpd_mux_mask_t +{ + QSE_HTTPD_MUX_READ = (1 << 0), + QSE_HTTPD_MUX_WRITE = (1 << 1) +}; + +typedef int (*qse_httpd_muxcb_t) ( + qse_httpd_t* httpd, + void* mux, + qse_ubi_t handle, + int mask, /* ORed of qse_httpd_mux_mask_t */ + void* cbarg +); + typedef struct qse_httpd_cbs_t qse_httpd_cbs_t; struct qse_httpd_cbs_t { @@ -93,6 +109,14 @@ struct qse_httpd_cbs_t struct { + void* (*open) (qse_httpd_t* httpd); + void (*close) (qse_httpd_t* httpd, void* mux); + int (*addhnd) ( + qse_httpd_t* httpd, void* mux, qse_ubi_t handle, + int mask, qse_httpd_muxcb_t cbfun, void* cbarg); + int (*delhnd) (qse_httpd_t* httpd, void* mux, qse_ubi_t handle); + int (*poll) (qse_httpd_t* httpd, void* mux, qse_ntime_t timeout); + int (*readable) ( qse_httpd_t* httpd, qse_ubi_t handle, qse_ntoff_t timeout); int (*writable) ( @@ -200,6 +224,8 @@ enum qse_httpd_task_trigger_mask_t struct qse_httpd_task_t { + /* == PUBLIC == */ + /* you must not call another entask functions from within * an initailizer. you can call entask functions from within * a finalizer and a main function. */ @@ -210,7 +236,50 @@ struct qse_httpd_task_t int trigger_mask; qse_ubi_t trigger[3]; +#if 0 + struct + { + int mask; /* QSE_HTTPD_TASK_TRIGGER_READ | QSE_HTTPD_TASK_TRIGGER_WRITE */ + qse_ubi_t handle; + } trigger[3]; +#endif + void* ctx; + + /* == PRIVATE == */ + qse_httpd_task_t* prev; + qse_httpd_task_t* next; +}; + + +struct qse_httpd_client_t +{ + /* == PUBLIC == */ + + qse_ubi_t handle; + qse_ubi_t handle2; + qse_nwad_t local_addr; + qse_nwad_t remote_addr; + + /* == PRIVATE == */ + qse_htrd_t* htrd; + int secure; + int status; + + qse_httpd_client_t* prev; + qse_httpd_client_t* next; + + qse_httpd_client_t* bad_next; + + qse_httpd_client_t* prev_tasked; + qse_httpd_client_t* next_tasked; + + struct + { + int count; + qse_httpd_task_t* head; + qse_httpd_task_t* tail; + } task; }; #ifdef __cplusplus @@ -257,7 +326,8 @@ void qse_httpd_setoption ( */ int qse_httpd_loop ( qse_httpd_t* httpd, - qse_httpd_cbs_t* cbs + qse_httpd_cbs_t* cbs, + qse_ntime_t timeout ); /** @@ -288,7 +358,7 @@ void qse_httpd_completecontent ( qse_httpd_task_t* qse_httpd_entask ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, const qse_httpd_task_t* task, qse_size_t xtnsize ); @@ -298,58 +368,37 @@ qse_httpd_task_t* qse_httpd_entask ( qse_httpd_task_t* qse_httpd_entaskdisconnect ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred + qse_httpd_task_t* pred ); qse_httpd_task_t* qse_httpd_entasktext ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, const qse_mchar_t* text ); qse_httpd_task_t* qse_httpd_entaskstatictext ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, const qse_mchar_t* text ); qse_httpd_task_t* qse_httpd_entaskformat ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, const qse_mchar_t* fmt, ... ); /* -------------------------------------------- */ -#if 0 -qse_httpd_task_t* qse_httpd_entaskfile ( - qse_httpd_t* httpd, - qse_httpd_client_t* client, - const qse_httpd_task_t* pred, - qse_ubi_t handle, - qse_foff_t offset, - qse_foff_t size -); - -qse_httpd_task_t* qse_httpd_entaskdir ( - qse_httpd_t* httpd, - qse_httpd_client_t* client, - const qse_httpd_task_t* pred, - qse_ubi_t handle, - int chunked -); -#endif - -/* -------------------------------------------- */ - qse_httpd_task_t* qse_httpd_entaskerror ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* task, + qse_httpd_task_t* pred, int code, qse_htre_t* req ); @@ -357,7 +406,7 @@ qse_httpd_task_t* qse_httpd_entaskerror ( qse_httpd_task_t* qse_httpd_entaskcontinue ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* task, + qse_httpd_task_t* pred, qse_htre_t* req ); @@ -367,7 +416,7 @@ qse_httpd_task_t* qse_httpd_entaskcontinue ( qse_httpd_task_t* qse_httpd_entaskauth ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* task, + qse_httpd_task_t* pred, const qse_mchar_t* realm, qse_htre_t* req ); @@ -375,7 +424,7 @@ qse_httpd_task_t* qse_httpd_entaskauth ( qse_httpd_task_t* qse_httpd_entaskdir ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, const qse_mchar_t* name, qse_htre_t* req ); @@ -383,7 +432,7 @@ qse_httpd_task_t* qse_httpd_entaskdir ( qse_httpd_task_t* qse_httpd_entaskfile ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, const qse_mchar_t* name, qse_htre_t* req ); @@ -391,7 +440,7 @@ qse_httpd_task_t* qse_httpd_entaskfile ( qse_httpd_task_t* qse_httpd_entaskcgi ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, const qse_mchar_t* path, qse_htre_t* req ); @@ -399,7 +448,7 @@ qse_httpd_task_t* qse_httpd_entaskcgi ( qse_httpd_task_t* qse_httpd_entasknph ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, const qse_mchar_t* path, qse_htre_t* req ); diff --git a/qse/lib/net/httpd-task.c b/qse/lib/net/httpd-task.c index b889198f..e807da39 100644 --- a/qse/lib/net/httpd-task.c +++ b/qse/lib/net/httpd-task.c @@ -48,7 +48,7 @@ static int task_main_disconnect ( qse_httpd_task_t* qse_httpd_entaskdisconnect ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred) + qse_httpd_task_t* pred) { qse_httpd_task_t task; @@ -86,7 +86,7 @@ static int task_main_statictext ( qse_httpd_task_t* qse_httpd_entaskstatictext ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, const qse_mchar_t* text) { qse_httpd_task_t task; @@ -144,7 +144,7 @@ static int task_main_text ( qse_httpd_task_t* qse_httpd_entasktext ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, const qse_mchar_t* text) { qse_httpd_task_t task; @@ -217,7 +217,7 @@ static int task_main_format ( qse_httpd_task_t* qse_httpd_entaskformat ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, const qse_mchar_t* fmt, ...) { qse_httpd_task_t task; @@ -315,7 +315,7 @@ qse_printf (QSE_T("SEND: [%.*hs]\n"), (int)l, buf); static qse_httpd_task_t* entask_error ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* task, int code, + qse_httpd_task_t* pred, int code, const qse_http_version_t* version, int keepalive) { const qse_mchar_t* smsg; @@ -380,7 +380,7 @@ static qse_httpd_task_t* entask_error ( } return qse_httpd_entaskformat ( - httpd, client, task, + httpd, client, pred, QSE_MT("HTTP/%d.%d %d %s\r\nConnection: %s\r\nContent-Type: text/html\r\nContent-Length: %lu\r\n\r\n%s\r\n\r\n"), version->major, version->minor, code, smsg, (keepalive? QSE_MT("keep-alive"): QSE_MT("close")), @@ -390,21 +390,21 @@ static qse_httpd_task_t* entask_error ( qse_httpd_task_t* qse_httpd_entaskerror ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* task, int code, qse_htre_t* req) + qse_httpd_task_t* pred, int code, qse_htre_t* req) { return entask_error ( - httpd, client, task, code, + httpd, client, pred, code, qse_htre_getversion(req), req->attr.keepalive); } /*------------------------------------------------------------------------*/ qse_httpd_task_t* qse_httpd_entaskcontinue ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* task, qse_htre_t* req) + qse_httpd_task_t* pred, qse_htre_t* req) { const qse_http_version_t* version = qse_htre_getversion(req); return qse_httpd_entaskformat ( - httpd, client, task, + httpd, client, pred, QSE_MT("HTTP/%d.%d 100 Continue\r\n\r\n"), version->major, version->minor); } @@ -413,7 +413,7 @@ qse_httpd_task_t* qse_httpd_entaskcontinue ( qse_httpd_task_t* qse_httpd_entaskauth ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* task, const qse_mchar_t* realm, qse_htre_t* req) + qse_httpd_task_t* pred, const qse_mchar_t* realm, qse_htre_t* req) { const qse_http_version_t* version; const qse_mchar_t* lmsg; @@ -422,7 +422,7 @@ qse_httpd_task_t* qse_httpd_entaskauth ( lmsg = QSE_MT("UnauthorizedUNAUTHORIZED"); return qse_httpd_entaskformat ( - httpd, client, task, + httpd, client, pred, QSE_MT("HTTP/%d.%d 401 Unauthorized\r\nConnection: %s\r\nWWW-Authenticate: Basic realm=\"%s\"\r\nContent-Type: text/html\r\nContent-Length: %lu\r\n\r\n%s\r\n\r\n"), version->major, version->minor, (req->attr.keepalive? QSE_MT("keep-alive"): QSE_MT("close")), @@ -764,7 +764,7 @@ send_dirlist: qse_httpd_task_t* qse_httpd_entaskdir ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, qse_ubi_t handle, int chunked) { qse_httpd_task_t task; @@ -888,7 +888,7 @@ static int task_main_path ( qse_httpd_task_t* qse_httpd_entaskpath ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, const qse_mchar_t* name, qse_htre_t* req) { @@ -996,7 +996,7 @@ static int task_main_fseg ( static qse_httpd_task_t* entask_file_segment ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, qse_ubi_t handle, qse_foff_t offset, qse_foff_t size) { qse_httpd_task_t task; @@ -1179,7 +1179,7 @@ no_file_send: qse_httpd_task_t* qse_httpd_entaskfile ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, + qse_httpd_task_t* pred, const qse_mchar_t* path, qse_htre_t* req) { @@ -1896,6 +1896,7 @@ static int task_main_cgi_4 ( QSE_ASSERT (cgi->pio_inited); +qse_printf (QSE_T("task_main_cgi_4 trigger_mask = %d\n"), task->trigger_mask); if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_RELAYABLE) { cgi_forward_content (httpd, task, 0); @@ -2069,6 +2070,7 @@ qse_printf (QSE_T("[cgi-3 send failure....\n")); cgi->res_left -= n; if (cgi->res_left <= 0) { +qse_printf (QSE_T("[switching to cgi-4....\n")); task->main = task_main_cgi_4; /* don't chain-call task_main_cgi_4 since it has another send * and it has already been sent here. so the writability must @@ -2317,7 +2319,7 @@ oops: static QSE_INLINE qse_httpd_task_t* entask_cgi ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, const qse_mchar_t* path, + qse_httpd_task_t* pred, const qse_mchar_t* path, qse_htre_t* req, int nph) { qse_httpd_task_t task; @@ -2341,14 +2343,14 @@ static QSE_INLINE qse_httpd_task_t* entask_cgi ( qse_httpd_task_t* qse_httpd_entaskcgi ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, const qse_mchar_t* path, qse_htre_t* req) + qse_httpd_task_t* pred, const qse_mchar_t* path, qse_htre_t* req) { return entask_cgi (httpd, client, pred, path, req, 0); } qse_httpd_task_t* qse_httpd_entasknph ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, const qse_mchar_t* path, qse_htre_t* req) + qse_httpd_task_t* pred, const qse_mchar_t* path, qse_htre_t* req) { return entask_cgi (httpd, client, pred, path, req, 1); } diff --git a/qse/lib/net/httpd.c b/qse/lib/net/httpd.c index d6cad7b4..9c4fdcde 100644 --- a/qse/lib/net/httpd.c +++ b/qse/lib/net/httpd.c @@ -37,8 +37,8 @@ typedef struct htrd_xtn_t htrd_xtn_t; struct htrd_xtn_t { - qse_size_t client_index; - qse_httpd_t* httpd; + qse_httpd_t* httpd; + qse_httpd_client_t* client; }; QSE_IMPLEMENT_COMMON_FUNCTIONS (httpd) @@ -46,8 +46,29 @@ QSE_IMPLEMENT_COMMON_FUNCTIONS (httpd) #define DEFAULT_PORT 80 #define DEFAULT_SECURE_PORT 443 +enum client_status_t +{ + CLIENT_BAD = (1 << 0), + CLIENT_READY = (1 << 1), + CLIENT_SECURE = (1 << 2), + + CLIENT_HANDLE_READ_IN_MUX = (1 << 3), + CLIENT_HANDLE_WRITE_IN_MUX = (1 << 4), + CLIENT_HANDLE_IN_MUX = (CLIENT_HANDLE_READ_IN_MUX | + CLIENT_HANDLE_WRITE_IN_MUX), + + CLIENT_TASK_TRIGGER_READ_IN_MUX = (1 << 5), + CLIENT_TASK_TRIGGER_RELAY_IN_MUX = (1 << 6), + CLIENT_TASK_TRIGGER_WRITE_IN_MUX = (1 << 7), + CLIENT_TASK_TRIGGER_IN_MUX = (CLIENT_TASK_TRIGGER_READ_IN_MUX | + CLIENT_TASK_TRIGGER_RELAY_IN_MUX | + CLIENT_TASK_TRIGGER_WRITE_IN_MUX) +}; + static void free_server_list ( qse_httpd_t* httpd, qse_httpd_server_t* server); +static int perform_client_task ( + qse_httpd_t* httpd, void* mux, qse_ubi_t handle, int mask, void* cbarg); qse_httpd_t* qse_httpd_open (qse_mmgr_t* mmgr, qse_size_t xtnsize) { @@ -77,17 +98,14 @@ int qse_httpd_init (qse_httpd_t* httpd, qse_mmgr_t* mmgr) { QSE_MEMSET (httpd, 0, QSE_SIZEOF(*httpd)); httpd->mmgr = mmgr; - -/* TODO: abstract away this field */ - httpd->server.max = -1; - return 0; } void qse_httpd_fini (qse_httpd_t* httpd) { - /* TODO */ +/* TODO */ free_server_list (httpd, httpd->server.list); + QSE_ASSERT (httpd->server.navail == 0); httpd->server.list = QSE_NULL; } @@ -119,7 +137,7 @@ void qse_httpd_setoption (qse_httpd_t* httpd, int option) QSE_INLINE void* qse_httpd_allocmem (qse_httpd_t* httpd, qse_size_t size) { void* ptr = QSE_MMGR_ALLOC (httpd->mmgr, size); - if (ptr == QSE_NULL) httpd->errnum = QSE_HTTPD_ENOMEM; + if (ptr == QSE_NULL) httpd->errnum = QSE_HTTPD_ENOMEM; return ptr; } @@ -127,7 +145,7 @@ QSE_INLINE void* qse_httpd_reallocmem ( qse_httpd_t* httpd, void* ptr, qse_size_t size) { void* nptr = QSE_MMGR_REALLOC (httpd->mmgr, ptr, size); - if (nptr == QSE_NULL) httpd->errnum = QSE_HTTPD_ENOMEM; + if (nptr == QSE_NULL) httpd->errnum = QSE_HTTPD_ENOMEM; return nptr; } @@ -136,92 +154,106 @@ QSE_INLINE void qse_httpd_freemem (qse_httpd_t* httpd, void* ptr) QSE_MMGR_FREE (httpd->mmgr, ptr); } +/* --------------------------------------------------- */ + static qse_httpd_task_t* enqueue_task ( qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, const qse_httpd_task_t* task, + qse_httpd_task_t* pred, const qse_httpd_task_t* task, qse_size_t xtnsize) { - task_queue_node_t* node; + qse_httpd_task_t* new_task; /* TODO: limit check - if (client->task.queue.count >= httpd->limit.client_task_queue) + if (client->task.count >= httpd->limit.client_task_queue) { httpd->errnum = QSE_HTTPD_ETASK; return -1; } */ - node = (task_queue_node_t*) - qse_httpd_allocmem (httpd, QSE_SIZEOF(*node) + xtnsize); - if (node == QSE_NULL) return QSE_NULL; + new_task = (qse_httpd_task_t*) + qse_httpd_allocmem (httpd, QSE_SIZEOF(*new_task) + xtnsize); + if (new_task == QSE_NULL) return QSE_NULL; - node->task = *task; + QSE_MEMCPY (new_task, task, QSE_SIZEOF(*new_task)); - if (task->init) + if (new_task->init) { httpd->errnum = QSE_HTTPD_ENOERR; - if (task->init (httpd, client, &node->task) <= -1) + if (new_task->init (httpd, client, new_task) <= -1) { if (httpd->errnum == QSE_HTTPD_ENOERR) httpd->errnum = QSE_HTTPD_ETASK; - qse_httpd_freemem (httpd, node); + qse_httpd_freemem (httpd, new_task); return QSE_NULL; } } if (pred) { - task_queue_node_t* prev; + new_task->next = pred->next; + new_task->prev = pred; - /* TODO: confirm if this calculation works all the time, - * especially regarding structure alignment */ - prev = (task_queue_node_t*) - ((qse_byte_t*)pred - (QSE_SIZEOF(*prev) - QSE_SIZEOF(*pred))); - - node->next = prev->next; - node->prev = prev; - - if (prev->next) prev->next->prev = node; - else client->task.queue.tail = node; - prev->next = node; + if (pred->next) pred->next->prev = new_task; + else client->task.tail = new_task; + pred->next = new_task; } else { - node->next = QSE_NULL; - node->prev = client->task.queue.tail; - if (client->task.queue.tail) - client->task.queue.tail->next = node; - else - client->task.queue.head = node; - client->task.queue.tail = node; - } - client->task.queue.count++; + new_task->next = QSE_NULL; + new_task->prev = client->task.tail; - return &node->task; + if (client->task.tail) + client->task.tail->next = new_task; + else + client->task.head = new_task; + client->task.tail = new_task; + } + client->task.count++; + + return new_task; } static QSE_INLINE int dequeue_task ( qse_httpd_t* httpd, qse_httpd_client_t* client) { - task_queue_node_t* node; + qse_httpd_task_t* task; - if (client->task.queue.count <= 0) return -1; + if (client->task.count <= 0) return -1; - node = client->task.queue.head; + task = client->task.head; - if (node == client->task.queue.tail) + /* clear task triggers from mux if they are registered */ + if (client->status & CLIENT_TASK_TRIGGER_READ_IN_MUX) { - client->task.queue.head = QSE_NULL; - client->task.queue.tail = QSE_NULL; + httpd->cbs->mux.delhnd (httpd, httpd->mux, task->trigger[0]); + client->status &= ~CLIENT_TASK_TRIGGER_READ_IN_MUX; + } + if (client->status & CLIENT_TASK_TRIGGER_RELAY_IN_MUX) + { + httpd->cbs->mux.delhnd (httpd, httpd->mux, task->trigger[1]); + client->status &= ~CLIENT_TASK_TRIGGER_RELAY_IN_MUX; + } + if (client->status & CLIENT_TASK_TRIGGER_WRITE_IN_MUX) + { + httpd->cbs->mux.delhnd (httpd, httpd->mux, task->trigger[2]); + client->status &= ~CLIENT_TASK_TRIGGER_WRITE_IN_MUX; + } + /* --------------------------------------------------- */ + + if (task == client->task.tail) + { + client->task.head = QSE_NULL; + client->task.tail = QSE_NULL; } else { - node->next->prev = QSE_NULL; - client->task.queue.head = node->next; + task->next->prev = QSE_NULL; + client->task.head = task->next; } - client->task.queue.count--; + client->task.count--; - if (node->task.fini) node->task.fini (httpd, client, &node->task); - qse_httpd_freemem (httpd, node); + if (task->fini) task->fini (httpd, client, task); + qse_httpd_freemem (httpd, task); return 0; } @@ -232,20 +264,18 @@ static QSE_INLINE void purge_tasks ( while (dequeue_task (httpd, client) == 0); } +/* --------------------------------------------------- */ + static int htrd_peek_request (qse_htrd_t* htrd, qse_htre_t* req) { htrd_xtn_t* xtn = (htrd_xtn_t*) qse_htrd_getxtn (htrd); - qse_httpd_client_t* client = - &xtn->httpd->client.array.data[xtn->client_index]; - return xtn->httpd->cbs->peek_request (xtn->httpd, client, req); + return xtn->httpd->cbs->peek_request (xtn->httpd, xtn->client, req); } static int htrd_handle_request (qse_htrd_t* htrd, qse_htre_t* req) { htrd_xtn_t* xtn = (htrd_xtn_t*) qse_htrd_getxtn (htrd); - qse_httpd_client_t* client = - &xtn->httpd->client.array.data[xtn->client_index]; - return xtn->httpd->cbs->handle_request (xtn->httpd, client, req); + return xtn->httpd->cbs->handle_request (xtn->httpd, xtn->client, req); } static qse_htrd_recbs_t htrd_recbs = @@ -256,43 +286,259 @@ static qse_htrd_recbs_t htrd_recbs = /* --------------------------------------------------- */ +static qse_httpd_client_t* new_client ( + qse_httpd_t* httpd, qse_httpd_client_t* tmpl) +{ + qse_httpd_client_t* client; + htrd_xtn_t* xtn; + int opt; + + client = qse_httpd_allocmem (httpd, QSE_SIZEOF(*client)); + if (client == QSE_NULL) return QSE_NULL; + + QSE_MEMSET (client, 0, QSE_SIZEOF(*client)); + + client->htrd = qse_htrd_open (httpd->mmgr, QSE_SIZEOF(*xtn)); + if (client->htrd == QSE_NULL) + { + httpd->errnum = QSE_HTTPD_ENOMEM; + qse_httpd_freemem (httpd, client); + return QSE_NULL; + } + + opt = qse_htrd_getoption (client->htrd); + opt |= QSE_HTRD_REQUEST; + opt &= ~QSE_HTRD_RESPONSE; + qse_htrd_setoption (client->htrd, opt); + + if (httpd->cbs->client.accepted == QSE_NULL) + client->status |= CLIENT_READY; + + client->status = tmpl->status; + client->handle = tmpl->handle; + client->handle2 = tmpl->handle2; + client->local_addr = tmpl->local_addr; + client->remote_addr = tmpl->remote_addr; + + xtn = (htrd_xtn_t*)qse_htrd_getxtn (client->htrd); + xtn->httpd = httpd; + xtn->client = client; + + qse_htrd_setrecbs (client->htrd, &htrd_recbs); + + return client; +} + +static void free_client ( + qse_httpd_t* httpd, qse_httpd_client_t* client) +{ + QSE_ASSERT (client->htrd != QSE_NULL); + + purge_tasks (httpd, client); + + qse_htrd_close (client->htrd); + +qse_fprintf (QSE_STDERR, QSE_T("Debug: closing socket %d\n"), client->handle.i); + + if (client->status & CLIENT_HANDLE_IN_MUX) + { + httpd->cbs->mux.delhnd (httpd, httpd->mux, client->handle); + client->status &= ~CLIENT_HANDLE_IN_MUX; + } + + /* note that client.closed is not a counterpart to client.accepted. + * so it is called even if client.close() failed. */ + if (httpd->cbs->client.closed) + httpd->cbs->client.closed (httpd, client); + + httpd->cbs->client.close (httpd, client); + + qse_httpd_freemem (httpd, client); +} + +static void purge_client (qse_httpd_t* httpd, qse_httpd_client_t* client) +{ + qse_httpd_client_t* prev; + qse_httpd_client_t* next; + qse_httpd_client_t* prev_tasked; + qse_httpd_client_t* next_tasked; + + prev = client->prev; + next = client->next; + + prev_tasked = client->prev_tasked; + next_tasked = client->next_tasked; + + free_client (httpd, client); + + if (prev) prev->next = next; + else httpd->client.list.head = next; + if (next) next->prev = prev; + else httpd->client.list.tail = prev; + + if (prev_tasked) prev_tasked->next_tasked = next_tasked; + else httpd->client.tasked.head = next_tasked; + if (next_tasked) next_tasked->prev_tasked = prev_tasked; + else httpd->client.tasked.tail = prev_tasked; +} + +static void purge_client_list (qse_httpd_t* httpd) +{ + while (httpd->client.list.tail) + purge_client (httpd, httpd->client.list.tail); +} + +static int accept_client ( + qse_httpd_t* httpd, void* mux, qse_ubi_t handle, int mask, void* cbarg) +{ + qse_httpd_server_t* server; + qse_httpd_client_t clibuf; + qse_httpd_client_t* client; + + if (mask & QSE_HTTPD_MUX_READ) + { + server = (qse_httpd_server_t*)cbarg; + + /*QSE_ASSERT (handle == server->handle);*/ + + QSE_MEMSET (&clibuf, 0, QSE_SIZEOF(clibuf)); + + if (httpd->cbs->server.accept (httpd, server, &clibuf) <= -1) + { +/* TODO: proper logging */ +qse_char_t tmp[128]; +qse_nwadtostr (&server->nwad, tmp, QSE_COUNTOF(tmp), QSE_NWADTOSTR_ALL); +qse_printf (QSE_T("failed to accept from server %s\n"), tmp); + + return -1; + } + +/* TODO: check maximum number of client. if exceed call client.close */ + + if (server->secure) clibuf.status |= CLIENT_SECURE; + + client = new_client (httpd, &clibuf); + if (client == QSE_NULL) + { + httpd->cbs->client.close (httpd, &clibuf); + return -1; + } + + if (httpd->cbs->mux.addhnd ( + httpd, mux, client->handle, QSE_HTTPD_MUX_READ, + perform_client_task, client) <= -1) + { + free_client (httpd, client); + return -1; + } + client->status |= CLIENT_HANDLE_READ_IN_MUX; + + /* link the new client to the back of the client list. */ + if (httpd->client.list.tail) + { + QSE_ASSERT (httpd->client.list.head); + client->prev = httpd->client.list.tail; + httpd->client.list.tail->next = client; + httpd->client.list.tail = client; + } + else + { + httpd->client.list.head = client; + httpd->client.list.tail = client; + } + +{ +/* TODO: proper logging */ +qse_char_t tmp[128], tmp2[128]; +qse_nwadtostr (&client->local_addr, tmp, QSE_COUNTOF(tmp), QSE_NWADTOSTR_ALL); +qse_nwadtostr (&client->remote_addr, tmp2, QSE_COUNTOF(tmp2), QSE_NWADTOSTR_ALL); +qse_printf (QSE_T("connection %d accepted %s from %s\n"), client->handle.i, tmp, tmp2); +} + } + return 0; +} + +static void insert_client_to_tasked_list ( + qse_httpd_t* httpd, qse_httpd_client_t* client) +{ + QSE_ASSERT (client->prev_tasked == QSE_NULL); + QSE_ASSERT (client->next_tasked == QSE_NULL); + + if (httpd->client.tasked.tail) + { + QSE_ASSERT (httpd->client.tasked.head); + client->prev_tasked = httpd->client.tasked.tail; + httpd->client.tasked.tail->next_tasked = client; + httpd->client.tasked.tail = client; + } + else + { + httpd->client.tasked.head = client; + httpd->client.tasked.tail = client; + } +} + +static void delete_client_from_tasked_list ( + qse_httpd_t* httpd, qse_httpd_client_t* client) +{ + qse_httpd_client_t* prev_tasked; + qse_httpd_client_t* next_tasked; + + prev_tasked = client->prev_tasked; + next_tasked = client->next_tasked; + + if (prev_tasked) prev_tasked->next_tasked = next_tasked; + else httpd->client.tasked.head = next_tasked; + if (next_tasked) next_tasked->prev_tasked = prev_tasked; + else httpd->client.tasked.tail = prev_tasked; + + client->prev_tasked = QSE_NULL; + client->next_tasked = QSE_NULL; +} + +/* --------------------------------------------------- */ + static void deactivate_servers (qse_httpd_t* httpd) { qse_httpd_server_t* server; for (server = httpd->server.list; server; server = server->next) { - httpd->cbs->server.close (httpd, server); + if (server->active) + { + httpd->cbs->mux.delhnd (httpd, httpd->mux, server->handle); + httpd->cbs->server.close (httpd, server); + server->active = 0; + httpd->server.nactive--; + } } - - FD_ZERO (&httpd->server.set); - httpd->server.max = -1; } static int activate_servers (qse_httpd_t* httpd) { qse_httpd_server_t* server; -/* TODO: delete these two variables */ - fd_set server_set; - int server_max = -1; - - FD_ZERO (&server_set); - for (server = httpd->server.list; server; server = server->next) { if (httpd->cbs->server.open (httpd, server) <= -1) { +qse_printf (QSE_T("FAILED TO ACTIVATE SERVER....\n")); + continue; } -/* TODO: abstract away these 2 lines to a callback ... */ - FD_SET (server->handle.i, &server_set); - if (server->handle.i >= server_max) server_max = server->handle.i; + if (httpd->cbs->mux.addhnd ( + httpd, httpd->mux, server->handle, QSE_HTTPD_MUX_READ, + accept_client, server) <= -1) + { +qse_printf (QSE_T("FAILED TO ADD SERVER HANDLE TO MUX....\n")); + httpd->cbs->server.close (httpd, server); + continue; + } + + server->active = 1; + httpd->server.nactive++; } -/* abstract away these 2 lines also */ - httpd->server.set = server_set; - httpd->server.max = server_max; return 0; } @@ -304,6 +550,7 @@ static void free_server_list (qse_httpd_t* httpd, qse_httpd_server_t* server) httpd->cbs->server.close (httpd, server); qse_httpd_freemem (httpd, server); + httpd->server.navail--; server = next; } @@ -393,280 +640,18 @@ int qse_httpd_addserver (qse_httpd_t* httpd, const qse_char_t* uri) server->next = httpd->server.list; httpd->server.list = server; + httpd->server.navail++; return 0; } /* --------------------------------------------------- */ -static void init_client_array (qse_httpd_t* httpd) -{ - client_array_t* array = &httpd->client.array; - array->capa = 0; - array->size = 0; - array->data = QSE_NULL; -} - -static void delete_from_client_array (qse_httpd_t* httpd, int fd) -{ - client_array_t* array = &httpd->client.array; - if (array->data[fd].htrd) - { - purge_tasks (httpd, &array->data[fd]); - - qse_htrd_close (array->data[fd].htrd); - array->data[fd].htrd = QSE_NULL; -qse_fprintf (QSE_STDERR, QSE_T("Debug: closing socket %d\n"), array->data[fd].handle.i); - - /* note that client.closed is not a counterpart to client.accepted. - * so it is called even if client.closed failed. */ - if (httpd->cbs->client.closed) - httpd->cbs->client.closed (httpd, &array->data[fd]); - - httpd->cbs->client.close (httpd, &array->data[fd]); - array->size--; - } -} - -static void fini_client_array (qse_httpd_t* httpd) -{ - client_array_t* array = &httpd->client.array; - if (array->data) - { - int fd; - - for (fd = 0; fd < array->capa; fd++) - delete_from_client_array (httpd, fd); - - qse_httpd_freemem (httpd, array->data); - array->capa = 0; - array->size = 0; - array->data = QSE_NULL; - } -} - -static qse_httpd_client_t* insert_into_client_array ( - qse_httpd_t* httpd, qse_httpd_client_t* client) -{ - htrd_xtn_t* xtn; - client_array_t* array = &httpd->client.array; - int opt, fd = client->handle.i; - -/* TODO: is an array is the best??? - * i do use an array for direct access by fd. */ - if (fd >= array->capa) - { - #define ALIGN 512 - qse_httpd_client_t* tmp; - qse_size_t capa = ((fd + ALIGN) / ALIGN) * ALIGN; - - tmp = qse_httpd_reallocmem ( - httpd, array->data, capa * QSE_SIZEOF(*tmp)); - if (tmp == QSE_NULL) return QSE_NULL; - - QSE_MEMSET (&tmp[array->capa], 0, - QSE_SIZEOF(*tmp) * (capa - array->capa)); - - array->data = tmp; - array->capa = capa; - } - - QSE_ASSERT (array->data[fd].htrd == QSE_NULL); - - array->data[fd].htrd = qse_htrd_open (httpd->mmgr, QSE_SIZEOF(*xtn)); - if (array->data[fd].htrd == QSE_NULL) return QSE_NULL; - opt = qse_htrd_getoption (array->data[fd].htrd); - opt |= QSE_HTRD_REQUEST; - opt &= ~QSE_HTRD_RESPONSE; - qse_htrd_setoption (array->data[fd].htrd, opt); - - array->data[fd].ready = httpd->cbs->client.accepted? 0 : 1; - array->data[fd].bad = 0; - array->data[fd].secure = client->secure; - array->data[fd].handle = client->handle; - array->data[fd].handle2 = client->handle2; - array->data[fd].local_addr = client->local_addr; - array->data[fd].remote_addr = client->remote_addr; - - xtn = (htrd_xtn_t*)qse_htrd_getxtn (array->data[fd].htrd); - xtn->client_index = fd; - xtn->httpd = httpd; - - qse_htrd_setrecbs (array->data[fd].htrd, &htrd_recbs); - array->size++; - return &array->data[fd]; -} - -static int accept_client_from_server ( - qse_httpd_t* httpd, qse_httpd_server_t* server) -{ - qse_httpd_client_t clibuf; - qse_httpd_client_t* client; - - QSE_MEMSET (&clibuf, 0, QSE_SIZEOF(clibuf)); - - if (httpd->cbs->server.accept (httpd, server, &clibuf) <= -1) return -1; - -/* TODO: check maximum number of client. if exceed call client.close */ - - clibuf.secure = server->secure; - client = insert_into_client_array (httpd, &clibuf); - if (client == QSE_NULL) - { - httpd->cbs->client.close (httpd, &clibuf); - return -1; - } - -qse_printf (QSE_T("connection %d accepted\n"), clibuf.handle.i); - return 0; -} - -static void accept_client_from_servers (qse_httpd_t* httpd, fd_set* r) -{ - qse_httpd_server_t* server; - - for (server = httpd->server.list; server; server = server->next) - { - /* TODO: abstract this part... */ - if (FD_ISSET(server->handle.i, r)) - { - accept_client_from_server (httpd, server); - -/* TODO: if (accept_client_from_listener (httpd, l) <= -1) -httpd->cbs.on_error (httpd, l).... */ - } - } -} - -static int make_fd_set_from_client_array ( - qse_httpd_t* httpd, fd_set* r, fd_set* w) -{ - int fd, max; - client_array_t* ca; - qse_httpd_client_t* client; - - ca = &httpd->client.array; - - /* qse_http_loop() needs to monitor listner handles - * to handle a new client connection. */ - max = httpd->server.max; - *r = httpd->server.set; - FD_ZERO (w); - - for (fd = 0; fd < ca->capa; fd++) - { - client = &ca->data[fd]; - - if (!client->htrd) continue; - - if (client->bad) - { - /* add a client-side handle to the write set - * if the client is already marked bad */ - FD_SET (client->handle.i, w); - if (client->handle.i > max) max = client->handle.i; - } - else - { - /* add a client-side handle to the read set */ - FD_SET (client->handle.i, r); - if (client->handle.i > max) max = client->handle.i; -qse_printf (QSE_T(">>>>ADDING CLIENT HANDLE %d\n"), client->handle.i); - - /* trigger[0] is a handle to monitor to check - * if there is data avaiable to read to write back to - * the client. qse_httpd_loop() needs to monitor - * trigger handles. - * - * trigger[1] is a user-defined handle to monitor to - * check if httpd can post data to. but this is not - * a client-side handle. - */ - if (client->task.queue.head) - { - qse_httpd_task_t* task = &client->task.queue.head->task; - int has_trigger = 0; - - if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_READ) - { - /* if a trigger is available, add it to the read set also. */ -qse_printf (QSE_T(">>>>%s ADDING TRIGGER[0] %d\n"), - (task->trigger[0].i == client->handle.i? QSE_T("NOT"): QSE_T("")), - task->trigger[0].i); - if (task->trigger[0].i != client->handle.i) - { - FD_SET (task->trigger[0].i, r); - if (task->trigger[0].i > max) max = task->trigger[0].i; - } - has_trigger = 1; - } - if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_RELAY) - { -qse_printf (QSE_T(">>>>%s ADDING TRIGGER[1] %d\n"), - (task->trigger[1].i == client->handle.i? QSE_T("NOT"): QSE_T("")), - task->trigger[1].i); - if (task->trigger[1].i != client->handle.i) - { - FD_SET (task->trigger[1].i, r); - if (task->trigger[1].i > max) max = task->trigger[1].i; - } - has_trigger = 1; - } - if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_WRITE) - { - /* if a trigger is available, add it to the read set also. */ -qse_printf (QSE_T(">>>>ADDING TRIGGER[2] %d\n"), task->trigger[2].i); - FD_SET (task->trigger[2].i, w); - if (task->trigger[2].i > max) max = task->trigger[2].i; - has_trigger = 1; - } - - if (!has_trigger) - { - /* there is a task to perform. but no triggers - * were specified. if the client-side handle is - * available for writing, arrange to perform the - * task in the main loop by adding the client-side - * handle to the write set. */ -qse_printf (QSE_T(">>>>ADDING CLIENT CONNECTION %d TO WRITE\n"), client->handle.i); - FD_SET (client->handle.i, w); - if (client->handle.i > max) max = client->handle.i; - } - } - } - - } - - return max; -} - -static void perform_task (qse_httpd_t* httpd, qse_httpd_client_t* client) -{ - task_queue_node_t* node; - int n; - - QSE_ASSERT (client->task.queue.count > 0); - QSE_ASSERT (client->task.queue.head != QSE_NULL); - node = client->task.queue.head; - - n = node->task.main (httpd, client, &node->task); - if (n <= -1) - { - dequeue_task (httpd, client); - /*shutdown (client->handle.i, SHUT_RDWR);*/ - client->bad = 1; - } - else if (n == 0) - { - dequeue_task (httpd, client); - } -} - static int read_from_client (qse_httpd_t* httpd, qse_httpd_client_t* client) { qse_mchar_t buf[2048]; /* TODO: adjust this buffer size */ qse_ssize_t m; - + QSE_ASSERT (httpd->cbs->client.recv != QSE_NULL); reread: @@ -688,6 +673,7 @@ qse_fprintf (QSE_STDERR, QSE_T("Warning: Nothing to read from a client %d\n"), c { /* TOOD: if (httpd->errnum == QSE_HTTPD_ENOERR) httpd->errnum = QSE_HTTPD_ECALLBACK; */ qse_fprintf (QSE_STDERR, QSE_T("Error: failed to read from a client %d\n"), client->handle.i); + /* TODO: find a way to disconnect */ return -1; } } @@ -697,12 +683,12 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: failed to read from a client %d\n"), clie qse_fprintf (QSE_STDERR, QSE_T("Debug: connection closed %d\n"), client->handle.i); return -1; } - + /* feed may have called the request callback multiple times... * that's because we don't know how many valid requests * are included in 'buf' */ qse_fprintf (QSE_STDERR, QSE_T("Debug: read from a client %d\n"), client->handle.i); - + httpd->errnum = QSE_HTTPD_ENOERR; qse_printf (QSE_T("!!!!!FEEDING [%.*hs]\n"), (int)m, buf); if (qse_htrd_feed (client->htrd, buf, m) <= -1) @@ -714,7 +700,7 @@ qse_printf (QSE_T("!!!!!FEEDING [%.*hs]\n"), (int)m, buf); httpd->errnum = QSE_HTTPD_EBADREQ; else httpd->errnum = QSE_HTTPD_ENOMEM; /* TODO: better translate error code */ } - + qse_fprintf (QSE_STDERR, QSE_T("Error: http error while processing \n")); return -1; } @@ -722,18 +708,257 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: http error while processing \n")); return 0; } -int qse_httpd_loop (qse_httpd_t* httpd, qse_httpd_cbs_t* cbs) +static int invoke_client_task ( + qse_httpd_t* httpd, qse_httpd_client_t* client, + qse_ubi_t handle, int mask) +{ + qse_httpd_task_t* task; + int n; + +/* TODO: handle comparison callback ... */ + if (handle.i == client->handle.i && (mask & QSE_HTTPD_MUX_READ)) + { + if (read_from_client (httpd, client) <= -1) + { + /* return failure on disconnection also in order to + * purge the client in perform_client_task(). + * thus the following line isn't necessary. + *if (httpd->errnum == QSE_HTTPD_EDISCON) return 0;*/ + return -1; + } + } + + /* this client doesn't have any task */ + task = client->task.head; + if (task == QSE_NULL) return 0; + + task->trigger_mask &= + ~(QSE_HTTPD_TASK_TRIGGER_READABLE | + QSE_HTTPD_TASK_TRIGGER_RELAYABLE | + QSE_HTTPD_TASK_TRIGGER_WRITABLE); + +qse_printf (QSE_T("handle.i %d mask %d\n"), handle.i, mask); + if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_READ) + { + if ((mask & QSE_HTTPD_MUX_READ) && task->trigger[0].i == handle.i) + task->trigger_mask |= QSE_HTTPD_TASK_TRIGGER_READABLE; + } + if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_RELAY) + { + if ((mask & QSE_HTTPD_MUX_READ) && task->trigger[1].i == handle.i) + task->trigger_mask |= QSE_HTTPD_TASK_TRIGGER_RELAYABLE; + } + if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_WRITE) + { + if ((mask & QSE_HTTPD_MUX_WRITE) && task->trigger[2].i == handle.i) + task->trigger_mask |= QSE_HTTPD_TASK_TRIGGER_WRITABLE; + } + + if (task->trigger_mask & (QSE_HTTPD_TASK_TRIGGER_READABLE | + QSE_HTTPD_TASK_TRIGGER_RELAYABLE | + QSE_HTTPD_TASK_TRIGGER_WRITABLE)) + { + /* the task is invoked for triggers. + * check if the client handle is writable */ + if (httpd->cbs->mux.writable (httpd, client->handle, 0) <= 0) + { + /* it is not writable yet. so just skip + * performing the actual task */ + return 0; + } + } + + n = task->main (httpd, client, task); +qse_printf (QSE_T("task returend %d\n"), n); + if (n <= -1) return -1; + else if (n == 0) + { + int mux_mask; + int mux_status; + + /* the current task is over. remove remove the task + * from the queue. dequeue_task() clears task triggers + * from the mux. so i don't clear them explicitly here */ + + dequeue_task (httpd, client); + + mux_mask = QSE_HTTPD_MUX_READ; + mux_status = CLIENT_HANDLE_READ_IN_MUX; + if (client->task.head) + { + /* there is a pending task. arrange to + * trigger it as if it is just entasked */ + mux_mask |= QSE_HTTPD_MUX_WRITE; + mux_status |= CLIENT_HANDLE_WRITE_IN_MUX; + } + + QSE_ASSERT (client->status & CLIENT_HANDLE_IN_MUX); + + httpd->cbs->mux.delhnd (httpd, httpd->mux, client->handle); + client->status &= ~CLIENT_HANDLE_IN_MUX; + + if (httpd->cbs->mux.addhnd ( + httpd, httpd->mux, client->handle, + mux_mask, perform_client_task, client) <= -1) return -1; + client->status |= mux_status; + + return 0; + } + else + { +/* TODO: if there are no changes in the triggers, do do delhnd/addhnd() */ + static int mux_status[] = + { + CLIENT_TASK_TRIGGER_READ_IN_MUX, + CLIENT_TASK_TRIGGER_RELAY_IN_MUX, + CLIENT_TASK_TRIGGER_WRITE_IN_MUX + }; + + static int trigger_mask[] = + { + QSE_HTTPD_TASK_TRIGGER_READ, + QSE_HTTPD_TASK_TRIGGER_RELAY, + QSE_HTTPD_TASK_TRIGGER_WRITE + }; + + static int mux_mask[] = + { + QSE_HTTPD_MUX_READ, + QSE_HTTPD_MUX_READ, + QSE_HTTPD_MUX_WRITE + }; + + int i; + + for (i = 0; i < 3; i++) + { + if (client->status & mux_status[i]) + { + httpd->cbs->mux.delhnd (httpd, httpd->mux, task->trigger[i]); + client->status &= ~mux_status[i]; + } + if (task->trigger_mask & trigger_mask[i]) + { + if (client->handle.i != task->trigger[i].i) + { + if (httpd->cbs->mux.addhnd ( + httpd, httpd->mux, task->trigger[i], + mux_mask[i], perform_client_task, client) <= -1) return -1; + client->status |= mux_status[i]; + } + } + } + + QSE_ASSERT (client->status & CLIENT_HANDLE_READ_IN_MUX); + + if ((client->status & CLIENT_TASK_TRIGGER_IN_MUX) && + (client->status & CLIENT_HANDLE_WRITE_IN_MUX)) + { + httpd->cbs->mux.delhnd (httpd, httpd->mux, client->handle); + client->status &= ~CLIENT_HANDLE_IN_MUX; + + if (httpd->cbs->mux.addhnd ( + httpd, httpd->mux, client->handle, + QSE_HTTPD_MUX_READ, perform_client_task, client) <= -1) return -1; + client->status |= CLIENT_HANDLE_READ_IN_MUX; + } + + return 0; + } +} + +static int perform_client_task ( + qse_httpd_t* httpd, void* mux, qse_ubi_t handle, int mask, void* cbarg) +{ + qse_httpd_client_t* client; + + client = (qse_httpd_client_t*)cbarg; + + if (client->status & CLIENT_BAD) return 0; + + if (!(client->status & CLIENT_READY)) + { + int x; + x = httpd->cbs->client.accepted (httpd, client); + if (x <= -1) goto oops; + if (x >= 1) client->status |= CLIENT_READY; + } + else + { + if (invoke_client_task (httpd, client, handle, mask) <= -1) goto oops; + } + + return 0; + +oops: +qse_printf (QSE_T("MARKING BAD XXXXXXXXXXXXXX\n")); + /*purge_client (httpd, client);*/ + client->status |= CLIENT_BAD; + client->bad_next = httpd->client.bad; + httpd->client.bad = client; + return -1; +} + +static void purge_bad_clients (qse_httpd_t* httpd) +{ + qse_httpd_client_t* client; + + while (httpd->client.bad) + { + client = httpd->client.bad; + httpd->client.bad = client->bad_next; +qse_printf (QSE_T("PURGING BAD CLIENT XXXXXXXXXXXXXX\n")); + purge_client (httpd, client); + } +} + +qse_httpd_task_t* qse_httpd_entask ( + qse_httpd_t* httpd, qse_httpd_client_t* client, + qse_httpd_task_t* pred, const qse_httpd_task_t* task, + qse_size_t xtnsize) +{ + qse_httpd_task_t* new_task; + + new_task = enqueue_task (httpd, client, pred, task, xtnsize); + if (new_task == QSE_NULL) purge_client (httpd, client); + else if (new_task->prev == QSE_NULL) + { + /* this new task is the first task for a client */ + /*insert_client_to_tasked_list (httpd, client);*/ + + /* arrange to invokde this task so long as + * the client-side handle is writable. */ + QSE_ASSERT (client->status & CLIENT_HANDLE_IN_MUX); + httpd->cbs->mux.delhnd (httpd, httpd->mux, client->handle); + client->status &= ~CLIENT_HANDLE_IN_MUX; + + if (httpd->cbs->mux.addhnd ( + httpd, httpd->mux, client->handle, + QSE_HTTPD_MUX_READ | QSE_HTTPD_MUX_WRITE, + perform_client_task, client) <= -1) + { + purge_client (httpd, client); + new_task = QSE_NULL; + } + client->status |= CLIENT_HANDLE_IN_MUX; /* READ | WRITE */ + } + + return new_task; +} + +int qse_httpd_loop (qse_httpd_t* httpd, qse_httpd_cbs_t* cbs, qse_ntime_t timeout) { httpd->stopreq = 0; httpd->cbs = cbs; QSE_ASSERTX (httpd->server.list != QSE_NULL, - "Add listeners before calling qse_httpd_loop()" - ); + "Add listeners before calling qse_httpd_loop()"); + + QSE_ASSERTX (httpd->client.list.head == QSE_NULL, + "No client should exist when this loop is started"); QSE_ASSERTX (httpd->cbs != QSE_NULL, - "Set httpd callbacks before calling qse_httpd_loop()" - ); + "Set httpd callbacks before calling qse_httpd_loop()"); if (httpd->server.list == QSE_NULL) { @@ -742,161 +967,52 @@ int qse_httpd_loop (qse_httpd_t* httpd, qse_httpd_cbs_t* cbs) return -1; } - if (activate_servers (httpd) <= -1) return -1; + QSE_ASSERT (httpd->server.navail > 0); - init_client_array (httpd); + httpd->mux = httpd->cbs->mux.open (httpd); + if (httpd->mux == QSE_NULL) + { +qse_printf (QSE_T("can't open mux....\n")); + return -1; + } + + if (activate_servers (httpd) <= -1) + { + httpd->cbs->mux.close (httpd, httpd->mux); + return -1; + } + if (httpd->server.nactive <= 0) + { +qse_printf (QSE_T("no servers are active....\n")); + httpd->cbs->mux.close (httpd, httpd->mux); + return -1; + } while (!httpd->stopreq) { - int n, max, fd; - fd_set r; - fd_set w; - struct timeval tv; + int count; - tv.tv_sec = 1; - tv.tv_usec = 0; - - max = make_fd_set_from_client_array (httpd, &r, &w); - n = select (max + 1, &r, &w, QSE_NULL, &tv); - if (n <= -1) + count = httpd->cbs->mux.poll (httpd, httpd->mux, timeout); + if (count <= -1) { httpd->errnum = QSE_HTTPD_EIOMUX; /* TODO: call user callback for this multiplexer error */ /*if (errno == EINTR) continue;*/ -qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure\n")); +qse_fprintf (QSE_STDERR, QSE_T("Error: mux returned failure\n")); /* break; */ - continue; - } - if (n == 0) - { - continue; } - /* check the listener activity */ - accept_client_from_servers (httpd, &r); - - /* check the client activity */ - for (fd = 0; fd < httpd->client.array.capa; fd++) - { - qse_httpd_client_t* client = &httpd->client.array.data[fd]; - - if (!client->htrd) continue; - - if (FD_ISSET(client->handle.i, &r)) - { - /* got input */ - if (!client->ready) - { - /* if client.accepted() returns 0, it is called - * again next time. */ - int x; - - QSE_ASSERT (httpd->cbs->client.accepted != QSE_NULL); - - x = httpd->cbs->client.accepted (httpd, client); /* is this correct???? what if ssl handshaking got stalled because writing failed in SSL_accept()? */ - if (x >= 1) client->ready = 1; - else if (x <= -1) - { - delete_from_client_array (httpd, fd); - continue; - } - } - else - { -// TODO: any way to suspend read while a request is being processed??? - if (read_from_client (httpd, client) <= -1) - { - delete_from_client_array (httpd, fd); - continue; - } - } - } - - /* perform a client task enqued to a client */ - if (client->task.queue.head) - { - qse_httpd_task_t* task; - int perform = 0; - -qse_printf (QSE_T(".....CLIENT %d HAS TASK\n"), fd); - task = &client->task.queue.head->task; - task->trigger_mask &= - ~(QSE_HTTPD_TASK_TRIGGER_READABLE | - QSE_HTTPD_TASK_TRIGGER_RELAYABLE | - QSE_HTTPD_TASK_TRIGGER_WRITABLE); - - if (!(task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_READ) && - !(task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_RELAY) && - !(task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_WRITE)) - { -qse_printf (QSE_T(".....NO TRIGGER ACTION....\n")); - /* no trigger set. set the flag to - * non-readable and non-writable */ - perform = 1; - } - else - { -if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_READ) -qse_printf (QSE_T(".....CLIENT %d HAS READ TREIGGER %d\n"), fd, task->trigger[0].i); -if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_RELAY) -qse_printf (QSE_T(".....CLIENT %d HAS RELAY TREIGGER %d\n"), fd, task->trigger[1].i); -if (task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_WRITE) -qse_printf (QSE_T(".....CLIENT %d HAS WRITE TREIGGER %d\n"), fd, task->trigger[2].i); - if ((task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_READ) && - FD_ISSET(task->trigger[0].i, &r)) - { - task->trigger_mask |= QSE_HTTPD_TASK_TRIGGER_READABLE; - perform = 1; -qse_printf (QSE_T(".....TRIGGER READABLE....\n")); - } - - if ((task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_RELAY) && - FD_ISSET(task->trigger[1].i, &r)) - { - task->trigger_mask |= QSE_HTTPD_TASK_TRIGGER_RELAYABLE; - perform = 1; -qse_printf (QSE_T(".....TRIGGER RELAYABLE....\n")); - } - - if ((task->trigger_mask & QSE_HTTPD_TASK_TRIGGER_WRITE) && - FD_ISSET(task->trigger[2].i, &w)) - { - task->trigger_mask |= QSE_HTTPD_TASK_TRIGGER_WRITABLE; - perform = 1; -qse_printf (QSE_T(".....TRIGGER WRITABLE....\n")); - } - } - - if (perform) - { - /* TODO: error handling -> writable() returns <= -1 */ - /* TODO: though the client side is not writable, can't i still exeucte the task? - * if the task needs to transfer anything yet.. it can do that. - * i probably need a new trigger type??? */ - if (httpd->cbs->mux.writable (httpd, client->handle, 0) >= 1) - { - perform_task (httpd, client); - } - } - } - } + purge_bad_clients (httpd); } - fini_client_array (httpd); + purge_client_list (httpd); deactivate_servers (httpd); + httpd->cbs->mux.close (httpd, httpd->mux); return 0; } /* --------------------------------------------------- */ -qse_httpd_task_t* qse_httpd_entask ( - qse_httpd_t* httpd, qse_httpd_client_t* client, - const qse_httpd_task_t* pred, const qse_httpd_task_t* task, - qse_size_t xtnsize) -{ - return enqueue_task (httpd, client, pred, task, xtnsize); -} - void qse_httpd_discardcontent (qse_httpd_t* httpd, qse_htre_t* req) { qse_htre_discardcontent (req); diff --git a/qse/lib/net/httpd.h b/qse/lib/net/httpd.h index 625bff34..47ac5e8b 100644 --- a/qse/lib/net/httpd.h +++ b/qse/lib/net/httpd.h @@ -24,56 +24,6 @@ /* private header file for httpd */ #include -#include -#include - -/* REMOVE THESE headers after abstracting away select()/fd_set */ -#include -#include -#include - - -typedef struct client_array_t client_array_t; - -typedef struct task_queue_node_t task_queue_node_t; -struct task_queue_node_t -{ - task_queue_node_t* next; - task_queue_node_t* prev; - qse_httpd_task_t task; -}; - -struct qse_httpd_client_t -{ - qse_ubi_t handle; - qse_ubi_t handle2; - qse_nwad_t local_addr; - qse_nwad_t remote_addr; - - /* ------------------------------ */ - - int ready; - int secure; - int bad; - qse_htrd_t* htrd; - - struct - { - struct - { - int count; - task_queue_node_t* head; - task_queue_node_t* tail; - } queue; - } task; -}; - -struct client_array_t -{ - int capa; - int size; - qse_httpd_client_t* data; -}; struct qse_httpd_t { @@ -86,15 +36,29 @@ struct qse_httpd_t struct { - client_array_t array; + struct + { + qse_httpd_client_t* head; + qse_httpd_client_t* tail; + } list; + + struct + { + qse_httpd_client_t* head; + qse_httpd_client_t* tail; + } tasked; + + qse_httpd_client_t* bad; } client; struct { qse_httpd_server_t* list; - fd_set set; - int max; + qse_size_t navail; + qse_size_t nactive; } server; + + void* mux; }; #ifdef __cplusplus diff --git a/qse/samples/net/http01.c b/qse/samples/net/http01.c index 2c26dead..28108fdd 100644 --- a/qse/samples/net/http01.c +++ b/qse/samples/net/http01.c @@ -19,16 +19,13 @@ # include # include # include +# include #endif #include #include #include - -// TODO: remove this and export structured needed like qse_httpd_client_t -#include "../../lib/net/httpd.h" - /* ------------------------------------------------------------------- */ #define MAX_SEND_SIZE 4096 @@ -373,6 +370,7 @@ static int server_accept ( return -1; } +#if 0 if (fd >= FD_SETSIZE) { qse_fprintf (QSE_STDERR, QSE_T("Error: too many client?\n")); @@ -380,6 +378,7 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: too many client?\n")); close (fd); return -1; } +#endif flag = fcntl (fd, F_GETFL); if (flag >= 0) fcntl (fd, F_SETFL, flag | O_NONBLOCK); @@ -407,6 +406,161 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: too many client?\n")); /* ------------------------------------------------------------------- */ +struct mux_ev_t +{ + qse_ubi_t handle; + int reqmask; + qse_httpd_muxcb_t cbfun; + void* cbarg; + struct mux_ee_t* next; +}; + +struct mux_t +{ + int fd; + + struct + { + struct epoll_event* ptr; + qse_size_t len; + qse_size_t capa; + } ee; +}; + +static void* mux_open (qse_httpd_t* httpd) +{ + struct mux_t* mux; + + mux = qse_httpd_allocmem (httpd, QSE_SIZEOF(*mux)); + if (mux == QSE_NULL) return QSE_NULL; + + memset (mux, 0, QSE_SIZEOF(*mux)); + + mux->fd = epoll_create (100); + if (mux->fd <= -1) + { + qse_httpd_freemem (httpd, mux); + qse_httpd_seterrnum (httpd, syserr_to_errnum(errno)); + return QSE_NULL; + } + + return mux; +} + +static void mux_close (qse_httpd_t* httpd, void* vmux) +{ + struct mux_t* mux = (struct mux_t*)vmux; + if (mux->ee.ptr) qse_httpd_freemem (httpd, mux->ee.ptr); + close (mux->fd); + qse_httpd_freemem (httpd, mux); +} + +static int mux_addhnd ( + qse_httpd_t* httpd, void* vmux, qse_ubi_t handle, + int mask, qse_httpd_muxcb_t cbfun, void* cbarg) +{ + struct mux_t* mux = (struct mux_t*)vmux; + struct epoll_event ev; + struct mux_ev_t* mev; + + ev.events = 0; + if (mask & QSE_HTTPD_MUX_READ) ev.events |= EPOLLIN; + if (mask & QSE_HTTPD_MUX_WRITE) ev.events |= EPOLLOUT; + + if (ev.events == 0) + { + qse_httpd_seterrnum (httpd, QSE_HTTPD_EINVAL); + return -1; + } + + mev = qse_httpd_allocmem (httpd, QSE_SIZEOF(*mev)); + if (mev == QSE_NULL) return -1; + + if (mux->ee.len >= mux->ee.capa) + { + struct epoll_event* tmp; + + tmp = qse_httpd_reallocmem ( + httpd, mux->ee.ptr, + QSE_SIZEOF(*mux->ee.ptr) * (mux->ee.capa + 1) * 2); + if (tmp == QSE_NULL) + { + qse_httpd_freemem (httpd, mev); + return -1; + } + + mux->ee.ptr = tmp; + mux->ee.capa = (mux->ee.capa + 1) * 2; + } + + mev->handle = handle; + mev->reqmask = mask; + mev->cbfun = cbfun; + mev->cbarg = cbarg; + + ev.data.ptr = mev; + + if (epoll_ctl (mux->fd, EPOLL_CTL_ADD, handle.i, &ev) <= -1) + { + /* don't rollback ee.ptr */ + qse_httpd_seterrnum (httpd, syserr_to_errnum(errno)); + qse_httpd_freemem (httpd, mev); + return -1; + } + + mux->ee.len++; + return 0; +} + +static int mux_delhnd (qse_httpd_t* httpd, void* vmux, qse_ubi_t handle) +{ + struct mux_t* mux = (struct mux_t*)vmux; + + if (epoll_ctl (mux->fd, EPOLL_CTL_DEL, handle.i, QSE_NULL) <= -1) + { + qse_httpd_seterrnum (httpd, syserr_to_errnum(errno)); + return -1; + } + + mux->ee.len--; + return 0; +} + +static int mux_poll (qse_httpd_t* httpd, void* vmux, qse_ntime_t timeout) +{ + struct mux_t* mux = (struct mux_t*)vmux; + struct mux_ev_t* mev; + int mask, nfds, i; + + nfds = epoll_wait (mux->fd, mux->ee.ptr, mux->ee.len, timeout); + if (nfds <= -1) + { + qse_httpd_seterrnum (httpd, syserr_to_errnum(errno)); + return -1; + } + + for (i = 0; i < nfds; i++) + { + mev = mux->ee.ptr[i].data.ptr; + + mask = 0; + + if (mux->ee.ptr[i].events & EPOLLIN) mask |= QSE_HTTPD_MUX_READ; + if (mux->ee.ptr[i].events & EPOLLOUT) mask |= QSE_HTTPD_MUX_WRITE; + + if (mux->ee.ptr[i].events & EPOLLHUP) + { + if (mev->reqmask & QSE_HTTPD_MUX_READ) mask |= QSE_HTTPD_MUX_READ; + if (mev->reqmask & QSE_HTTPD_MUX_WRITE) mask |= QSE_HTTPD_MUX_WRITE; + } + + mev->cbfun (httpd, mux, mev->handle, mask, mev->cbarg); + +//if (cbfun fails and the client is deleted???) other pending events should also be dropped??? + } + return 0; +} + static int mux_readable (qse_httpd_t* httpd, qse_ubi_t handle, qse_ntoff_t msec) { fd_set r; @@ -574,7 +728,11 @@ static void client_close ( static void client_shutdown ( qse_httpd_t* httpd, qse_httpd_client_t* client) { +#if defined(SHUT_RDWR) shutdown (client->handle.i, SHUT_RDWR); +#else + shutdown (client->handle.i, 2); +#endif } static qse_ssize_t client_recv ( @@ -912,7 +1070,15 @@ static qse_httpd_cbs_t httpd_cbs = { server_open, server_close, server_accept }, /* multiplexer */ - { mux_readable, mux_writable }, + { mux_open, + mux_close, + mux_addhnd, + mux_delhnd, + mux_poll, + + mux_readable, + mux_writable + }, /* file operation */ { file_executable, @@ -991,7 +1157,7 @@ int httpd_main (int argc, qse_char_t* argv[]) signal (SIGPIPE, SIG_IGN); qse_httpd_setoption (httpd, QSE_HTTPD_CGIERRTONUL); - ret = qse_httpd_loop (httpd, &httpd_cbs); + ret = qse_httpd_loop (httpd, &httpd_cbs, 10000); signal (SIGINT, SIG_DFL); signal (SIGPIPE, SIG_DFL);