From 5e85ab6613e367275b438897e7f32a11811b86b2 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Thu, 28 Jul 2011 21:57:39 +0000 Subject: [PATCH] added code to selective turn on/off an output thread --- qse/include/qse/net/http.h | 5 + qse/include/qse/net/httpd.h | 18 +++- qse/lib/net/http.c | 8 ++ qse/lib/net/httpd.c | 199 ++++++++++++++++++++++++------------ qse/lib/net/httpd.h | 14 ++- qse/lib/net/httpd_task.c | 56 ++++++++-- qse/samples/net/http01.c | 2 +- 7 files changed, 229 insertions(+), 73 deletions(-) diff --git a/qse/include/qse/net/http.h b/qse/include/qse/net/http.h index a89d080a..c9c34ac7 100644 --- a/qse/include/qse/net/http.h +++ b/qse/include/qse/net/http.h @@ -120,6 +120,11 @@ typedef struct qse_http_range_t qse_http_range_t; extern "C" { #endif +int qse_comparehttpversions ( + const qse_http_version_t* v1, + const qse_http_version_t* v2 +); + const qse_mchar_t* qse_gethttpmethodname ( qse_http_method_t type ); diff --git a/qse/include/qse/net/httpd.h b/qse/include/qse/net/httpd.h index c356670c..6a8168ce 100644 --- a/qse/include/qse/net/httpd.h +++ b/qse/include/qse/net/httpd.h @@ -111,8 +111,18 @@ void qse_httpd_setcbs ( qse_httpd_cbs_t* cbs ); +/** + * The qse_httpd_loop() function starts a httpd server loop. + * If @a threaded is non-zero, it creates a separate output thread. + * If no thread support is available, it is ignored. + * + * @note + * In the future, the @a threaded parameter will be extended to + * specify the number of output threads. + */ int qse_httpd_loop ( - qse_httpd_t* httpd + qse_httpd_t* httpd, + int threaded ); /** @@ -149,6 +159,12 @@ int qse_httpd_entasktext ( const qse_mchar_t* text ); +int qse_httpd_entaskstatictext ( + qse_httpd_t* httpd, + qse_httpd_client_t* client, + const qse_mchar_t* text +); + int qse_httpd_entaskformat ( qse_httpd_t* httpd, qse_httpd_client_t* client, diff --git a/qse/lib/net/http.c b/qse/lib/net/http.c index d1605557..808c604f 100644 --- a/qse/lib/net/http.c +++ b/qse/lib/net/http.c @@ -23,6 +23,14 @@ #include #include +int qse_comparehttpversions ( + const qse_http_version_t* v1, + const qse_http_version_t* v2) +{ + if (v1->major == v2->major) return v1->minor - v2->minor; + return v1->major - v2->major; +} + const qse_mchar_t* qse_gethttpmethodname (qse_http_method_t type) { static qse_mchar_t* names[] = diff --git a/qse/lib/net/httpd.c b/qse/lib/net/httpd.c index 097d12fc..73422e92 100644 --- a/qse/lib/net/httpd.c +++ b/qse/lib/net/httpd.c @@ -207,14 +207,25 @@ static int enqueue_task_locked ( qse_httpd_t* httpd, qse_httpd_client_t* client, const qse_httpd_task_t* task, qse_size_t xtnsize) { - int ret; - pthread_mutex_lock (&client->task.mutex); - ret = enqueue_task_unlocked (httpd, client, task, xtnsize); - pthread_mutex_unlock (&client->task.mutex); - return ret; +#if defined(HAVE_PTHREAD) + if (httpd->threaded) + { + int ret; + pthread_mutex_lock (&client->task.mutex); + ret = enqueue_task_unlocked (httpd, client, task, xtnsize); + pthread_mutex_unlock (&client->task.mutex); + return ret; + } + else + { +#endif + return enqueue_task_unlocked (httpd, client, task, xtnsize); +#if defined(HAVE_PTHREAD) + } +#endif } -static int dequeue_task_unlocked ( +static QSE_INLINE int dequeue_task_unlocked ( qse_httpd_t* httpd, qse_httpd_client_t* client) { task_queue_node_t* node; @@ -243,18 +254,33 @@ static int dequeue_task_unlocked ( static int dequeue_task_locked (qse_httpd_t* httpd, qse_httpd_client_t* client) { - int ret; - pthread_mutex_lock (&client->task.mutex); - ret = dequeue_task_unlocked (httpd, client); - pthread_mutex_unlock (&client->task.mutex); - return ret; +#if defined(HAVE_PTHREAD) + if (httpd->threaded) + { + int ret; + pthread_mutex_lock (&client->task.mutex); + ret = dequeue_task_unlocked (httpd, client); + pthread_mutex_unlock (&client->task.mutex); + return ret; + } + else + { +#endif + return dequeue_task_unlocked (httpd, client); +#if defined(HAVE_PTHREAD) + } +#endif } static void purge_tasks_locked (qse_httpd_t* httpd, qse_httpd_client_t* client) { - pthread_mutex_lock (&client->task.mutex); +#if defined(HAVE_PTHREAD) + if (httpd->threaded) pthread_mutex_lock (&client->task.mutex); +#endif while (dequeue_task_unlocked (httpd, client) == 0); - pthread_mutex_unlock (&client->task.mutex); +#if defined(HAVE_PTHREAD) + if (httpd->threaded) pthread_mutex_unlock (&client->task.mutex); +#endif } static int capture_param (qse_htrd_t* http, const qse_mcstr_t* key, const qse_mcstr_t* val) @@ -488,7 +514,7 @@ static void deactivate_listener (qse_httpd_t* httpd, listener_t* l) l->handle = -1; } -static int activate_listner (qse_httpd_t* httpd, listener_t* l) +static int activate_listener (qse_httpd_t* httpd, listener_t* l) { /* TODO: suport https... */ sockaddr_t addr; @@ -571,7 +597,7 @@ static void deactivate_listeners (qse_httpd_t* httpd) httpd->listener.max = -1; } -static int activate_listners (qse_httpd_t* httpd) +static int activate_listeners (qse_httpd_t* httpd) { listener_t* l; fd_set listener_set; @@ -582,7 +608,7 @@ static int activate_listners (qse_httpd_t* httpd) { if (l->handle <= -1) { - if (activate_listner (httpd, l) <= -1) goto oops; + if (activate_listener (httpd, l) <= -1) goto oops; /*TODO: callback httpd->cbs.listener_activated (httpd, l);*/ } @@ -613,7 +639,9 @@ static void delete_from_client_array (qse_httpd_t* httpd, int fd) if (array->data[fd].htrd) { purge_tasks_locked (httpd, &array->data[fd]); - pthread_mutex_destroy (&array->data[fd].task.mutex); +#if defined(HAVE_PTHREAD) + if (httpd->threaded) pthread_mutex_destroy (&array->data[fd].task.mutex); +#endif qse_htrd_close (array->data[fd].htrd); array->data[fd].htrd = QSE_NULL; @@ -674,7 +702,10 @@ static qse_httpd_client_t* insert_into_client_array (qse_httpd_t* httpd, int fd, array->data[fd].handle.i = fd; array->data[fd].addr = *addr; - pthread_mutex_init (&array->data[fd].task.mutex, NULL); +#if defined(HAVE_PTHREAD) + if (httpd->threaded) + pthread_mutex_init (&array->data[fd].task.mutex, QSE_NULL); +#endif xtn = (htrd_xtn_t*)qse_htrd_getxtn (array->data[fd].htrd); xtn->client_index = fd; @@ -726,9 +757,15 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: too many client?\n")); if (flag >= 0) fcntl (c, F_SETFL, flag | O_NONBLOCK); fcntl (c, F_SETFD, FD_CLOEXEC); - pthread_mutex_lock (&httpd->client.mutex); +#if defined(HAVE_PTHREAD) + if (httpd->threaded) pthread_mutex_lock (&httpd->client.mutex); +#endif client = insert_into_client_array (httpd, c, &addr); - pthread_mutex_unlock (&httpd->client.mutex); + +#if defined(HAVE_PTHREAD) + if (httpd->threaded) pthread_mutex_unlock (&httpd->client.mutex); +#endif + if (client == QSE_NULL) { close (c); @@ -759,7 +796,8 @@ httpd->cbs.on_error (httpd, l).... */ } -static int make_fd_set_from_client_array (qse_httpd_t* httpd, fd_set* r, fd_set* w) +static int make_fd_set_from_client_array ( + qse_httpd_t* httpd, fd_set* r, fd_set* w) { int fd, max = -1; client_array_t* ca = &httpd->client.array; @@ -816,6 +854,7 @@ static void perform_task (qse_httpd_t* httpd, qse_httpd_client_t* client) } } +#if defined(HAVE_PTHREAD) static void* response_thread (void* arg) { qse_httpd_t* httpd = (qse_httpd_t*)arg; @@ -830,7 +869,7 @@ static void* response_thread (void* arg) tv.tv_usec = 0; pthread_mutex_lock (&httpd->client.mutex); - max = make_fd_set_from_client_array (httpd, NULL, &w); + max = make_fd_set_from_client_array (httpd, QSE_NULL, &w); pthread_mutex_unlock (&httpd->client.mutex); while (max == -1 && !httpd->stopreq) @@ -840,22 +879,22 @@ static void* response_thread (void* arg) pthread_mutex_lock (&httpd->client.mutex); - gettimeofday (&now, NULL); + gettimeofday (&now, QSE_NULL); timeout.tv_sec = now.tv_sec + 2; timeout.tv_nsec = now.tv_usec * 1000; pthread_cond_timedwait (&httpd->client.cond, &httpd->client.mutex, &timeout); - max = make_fd_set_from_client_array (httpd, NULL, &w); + max = make_fd_set_from_client_array (httpd, QSE_NULL, &w); pthread_mutex_unlock (&httpd->client.mutex); } if (httpd->stopreq) break; - n = select (max + 1, NULL, &w, NULL, &tv); + n = select (max + 1, QSE_NULL, &w, QSE_NULL, &tv); if (n <= -1) { - if (errno == EINTR) continue; + /*if (errno == EINTR) continue; */ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure - %S\n"), strerror(errno)); /* break; */ continue; @@ -886,13 +925,13 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure - %S\n"), strerro perform_task (httpd, client); } } - } } - pthread_exit (NULL); - return NULL; + pthread_exit (QSE_NULL); + return QSE_NULL; } +#endif static int read_from_client (qse_httpd_t* httpd, qse_httpd_client_t* client) { @@ -939,9 +978,11 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: http error while processing \n")); return 0; } -int qse_httpd_loop (qse_httpd_t* httpd) +int qse_httpd_loop (qse_httpd_t* httpd, int threaded) { +#if defined(HAVE_PTHREAD) pthread_t response_thread_id; +#endif httpd->stopreq = 0; @@ -951,48 +992,52 @@ int qse_httpd_loop (qse_httpd_t* httpd) if (httpd->listener.list == QSE_NULL) { + /* no listener specified */ httpd->errnum = QSE_HTTPD_EINVAL; - goto oops; + return -1; } - if (activate_listners (httpd) <= -1) - { - qse_fprintf (QSE_STDERR, QSE_T("Error: failed to make a server socket\n")); - goto oops; - } - - /* data receiver main logic */ - pthread_mutex_init (&httpd->client.mutex, NULL); - pthread_cond_init (&httpd->client.cond, NULL); + if (activate_listeners (httpd) <= -1) return -1; init_client_array (httpd); #if defined(HAVE_PTHREAD) /* start the response sender as a thread */ - pthread_create (&response_thread_id, NULL, response_thread, httpd); -/* TODO: error check */ + if (threaded) + { + pthread_mutex_init (&httpd->client.mutex, QSE_NULL); + pthread_cond_init (&httpd->client.cond, QSE_NULL); + + if (pthread_create (&response_thread_id, QSE_NULL, response_thread, httpd) != 0) + { + pthread_cond_destroy (&httpd->client.cond); + pthread_mutex_destroy (&httpd->client.mutex); + httpd->threaded = 0; + } + else httpd->threaded = 1; + } #endif while (!httpd->stopreq) { - int n, max, fd; fd_set r; + fd_set w; struct timeval tv; tv.tv_sec = 1; tv.tv_usec = 0; pthread_mutex_lock (&httpd->client.mutex); - max = make_fd_set_from_client_array (httpd, &r, QSE_NULL); + max = make_fd_set_from_client_array (httpd, &r, &w); pthread_mutex_unlock (&httpd->client.mutex); - n = select (max + 1, &r, NULL, NULL, &tv); + n = select (max + 1, &r, &w, QSE_NULL, &tv); if (n <= -1) { httpd->errnum = QSE_HTTPD_EIOMUX; /* TODO: call user callback for this multiplexer error */ - if (errno == EINTR) continue; + /*if (errno == EINTR) continue;*/ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure\n")); /* break; */ continue; @@ -1014,33 +1059,57 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure\n")); /* got input */ if (read_from_client (httpd, client) <= -1) { - qse_httpd_markclientbad (httpd, client); - shutdown (client->handle.i, 0); - #if 0 - pthread_mutex_lock (&httpd->client.mutex); - delete_from_client_array (httpd, fd); - pthread_mutex_unlock (&httpd->client.mutex); - #endif + if (httpd->threaded) + { + /* let the writing part handle it, + * probably in the next iteration */ + qse_httpd_markclientbad (httpd, client); + shutdown (client->handle.i, 0); + } + else + { + /*pthread_mutex_lock (&httpd->client.mutex);*/ + delete_from_client_array (httpd, fd); + /*pthread_mutex_unlock (&httpd->client.mutex);*/ + continue; /* don't need to go to the writing part */ + } } } + + if (!httpd->threaded && FD_ISSET(client->handle.i, &w)) + { + /* output is handled in the main loop if and + * only if it is not threaded */ + if (client->bad) + { + /*send (client->handle, i, "INTERNAL SERVER ERROR..", ...);*/ + /*shutdown (client->handle.i, 0);*/ + + /*pthread_mutex_lock (&httpd->client.mutex);*/ + delete_from_client_array (httpd, fd); + /*pthread_mutex_unlock (&httpd->client.mutex);*/ + } + else if (client->task.queue.count > 0) + { + perform_task (httpd, client); + } + } + } } #if defined(HAVE_PTHREAD) - pthread_join (response_thread_id, NULL); + if (httpd->threaded) + { + pthread_join (response_thread_id, QSE_NULL); + pthread_cond_destroy (&httpd->client.cond); + pthread_mutex_destroy (&httpd->client.mutex); + } #endif fini_client_array (httpd); - - pthread_cond_destroy (&httpd->client.cond); - pthread_mutex_destroy (&httpd->client.mutex); - deactivate_listeners (httpd); return 0; - -oops: - deactivate_listeners (httpd); - return -1; } static void free_listener (qse_httpd_t* httpd, listener_t* l) @@ -1267,7 +1336,7 @@ void qse_httpd_clearlisteners (qse_httpd_t* httpd) pthread_mutex_lock (&httpd->listener.mutex); deactivate_listeners (httpd); free_listener_list (httpd, httpd->listener.list); - httpd->listener.list = NULL; + httpd->listener.list = QSE_NULL; pthread_mutex_unlock (&httpd->listener.mutex); } #endif @@ -1279,7 +1348,9 @@ int qse_httpd_entask ( int ret; ret = enqueue_task_locked (httpd, client, task, xtnsize); if (ret <= -1) client->bad = 1; /* mark this client bad */ - else pthread_cond_signal (&httpd->client.cond); +#if defined(HAVE_PTHREAD) + else if (httpd->threaded) pthread_cond_signal (&httpd->client.cond); +#endif return ret; } diff --git a/qse/lib/net/httpd.h b/qse/lib/net/httpd.h index aebb90dc..5ef4df4a 100644 --- a/qse/lib/net/httpd.h +++ b/qse/lib/net/httpd.h @@ -26,11 +26,14 @@ #include #include -#include #include #include #include +#if defined(HAVE_PTHREAD) +# include +#endif + typedef struct client_array_t client_array_t; union sockaddr_t @@ -58,7 +61,9 @@ struct qse_httpd_client_t struct { +#if defined(HAVE_PTHREAD) pthread_mutex_t mutex; +#endif struct { int count; @@ -100,17 +105,24 @@ struct qse_httpd_t qse_httpd_cbs_t* cbs; int stopreq; +#if defined(HAVE_PTHREAD) + int threaded; +#endif struct { +#if defined(HAVE_PTHREAD) pthread_mutex_t mutex; pthread_cond_t cond; +#endif client_array_t array; } client; struct { +#if defined(HAVE_PTHREAD) pthread_mutex_t mutex; +#endif listener_t* list; fd_set set; int max; diff --git a/qse/lib/net/httpd_task.c b/qse/lib/net/httpd_task.c index 4d7a9313..1af22ae8 100644 --- a/qse/lib/net/httpd_task.c +++ b/qse/lib/net/httpd_task.c @@ -56,6 +56,49 @@ int qse_httpd_entaskdisconnect (qse_httpd_t* httpd, qse_httpd_client_t* client) /*------------------------------------------------------------------------*/ +static int task_main_statictext ( + qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task) +{ + qse_ssize_t n; + qse_size_t count = 0; + const qse_mchar_t* ptr = (const qse_mchar_t*)task->ctx; + + while (*ptr != QSE_MT('\0') && count < MAX_SENDFILE_SIZE) + { + ptr++; count++; + } + +/* TODO: do i need to add code to skip this send if count is 0? */ + n = send ( + client->handle.i, + task->ctx, + count, + 0 + ); + + if (n <= -1) return -1; + + ptr = (const qse_mchar_t*)task->ctx + n; + if (*ptr == QSE_MT('\0')) return 0; + + task->ctx = (void*)ptr; + return 1; /* more work to do */ +} + +int qse_httpd_entaskstatictext ( + qse_httpd_t* httpd, qse_httpd_client_t* client, const qse_mchar_t* text) +{ + qse_httpd_task_t task; + + QSE_MEMSET (&task, 0, QSE_SIZEOF(task)); + task.main = task_main_statictext; + task.ctx = (void*)text; + + return qse_httpd_entask (httpd, client, &task, 0); +} + +/*------------------------------------------------------------------------*/ + typedef struct task_text_t task_text_t; struct task_text_t { @@ -79,13 +122,14 @@ static int task_init_text ( static int task_main_text ( qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task) { - ssize_t n; - size_t count; + qse_ssize_t n; + qse_size_t count; task_text_t* ctx = (task_text_t*)task->ctx; count = MAX_SENDFILE_SIZE; if (count >= ctx->left) count = ctx->left; +/* TODO: do i need to add code to skip this send if count is 0? */ n = send ( client->handle.i, ctx->ptr, @@ -155,8 +199,8 @@ static void task_fini_format ( static int task_main_format ( qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task) { - ssize_t n; - size_t count; + qse_ssize_t n; + qse_size_t count; task_format_t* ctx = (task_format_t*)task->ctx; count = MAX_SENDFILE_SIZE; @@ -299,8 +343,8 @@ static void task_fini_file ( static int task_main_file ( qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task) { - ssize_t n; - size_t count; + qse_ssize_t n; + qse_size_t count; task_file_t* ctx = (task_file_t*)task->ctx; count = MAX_SENDFILE_SIZE; diff --git a/qse/samples/net/http01.c b/qse/samples/net/http01.c index 4039434e..11be3b18 100644 --- a/qse/samples/net/http01.c +++ b/qse/samples/net/http01.c @@ -156,7 +156,7 @@ int httpd_main (int argc, qse_char_t* argv[]) signal (SIGINT, sigint); signal (SIGPIPE, SIG_IGN); - n = qse_httpd_loop (httpd); + n = qse_httpd_loop (httpd, 0); signal (SIGINT, SIG_DFL); signal (SIGPIPE, SIG_DFL);