added code to selective turn on/off an output thread

This commit is contained in:
hyung-hwan 2011-07-28 21:57:39 +00:00
parent f4ad3e3b66
commit 5e85ab6613
7 changed files with 229 additions and 73 deletions

View File

@ -120,6 +120,11 @@ typedef struct qse_http_range_t qse_http_range_t;
extern "C" {
#endif
int qse_comparehttpversions (
const qse_http_version_t* v1,
const qse_http_version_t* v2
);
const qse_mchar_t* qse_gethttpmethodname (
qse_http_method_t type
);

View File

@ -111,8 +111,18 @@ void qse_httpd_setcbs (
qse_httpd_cbs_t* cbs
);
/**
* The qse_httpd_loop() function starts a httpd server loop.
* If @a threaded is non-zero, it creates a separate output thread.
* If no thread support is available, it is ignored.
*
* @note
* In the future, the @a threaded parameter will be extended to
* specify the number of output threads.
*/
int qse_httpd_loop (
qse_httpd_t* httpd
qse_httpd_t* httpd,
int threaded
);
/**
@ -149,6 +159,12 @@ int qse_httpd_entasktext (
const qse_mchar_t* text
);
int qse_httpd_entaskstatictext (
qse_httpd_t* httpd,
qse_httpd_client_t* client,
const qse_mchar_t* text
);
int qse_httpd_entaskformat (
qse_httpd_t* httpd,
qse_httpd_client_t* client,

View File

@ -23,6 +23,14 @@
#include <qse/cmn/chr.h>
#include <qse/cmn/htb.h>
int qse_comparehttpversions (
const qse_http_version_t* v1,
const qse_http_version_t* v2)
{
if (v1->major == v2->major) return v1->minor - v2->minor;
return v1->major - v2->major;
}
const qse_mchar_t* qse_gethttpmethodname (qse_http_method_t type)
{
static qse_mchar_t* names[] =

View File

@ -206,6 +206,9 @@ static int enqueue_task_unlocked (
static int enqueue_task_locked (
qse_httpd_t* httpd, qse_httpd_client_t* client,
const qse_httpd_task_t* task, qse_size_t xtnsize)
{
#if defined(HAVE_PTHREAD)
if (httpd->threaded)
{
int ret;
pthread_mutex_lock (&client->task.mutex);
@ -213,8 +216,16 @@ static int enqueue_task_locked (
pthread_mutex_unlock (&client->task.mutex);
return ret;
}
else
{
#endif
return enqueue_task_unlocked (httpd, client, task, xtnsize);
#if defined(HAVE_PTHREAD)
}
#endif
}
static int dequeue_task_unlocked (
static QSE_INLINE int dequeue_task_unlocked (
qse_httpd_t* httpd, qse_httpd_client_t* client)
{
task_queue_node_t* node;
@ -242,6 +253,9 @@ static int dequeue_task_unlocked (
}
static int dequeue_task_locked (qse_httpd_t* httpd, qse_httpd_client_t* client)
{
#if defined(HAVE_PTHREAD)
if (httpd->threaded)
{
int ret;
pthread_mutex_lock (&client->task.mutex);
@ -249,12 +263,24 @@ static int dequeue_task_locked (qse_httpd_t* httpd, qse_httpd_client_t* client)
pthread_mutex_unlock (&client->task.mutex);
return ret;
}
else
{
#endif
return dequeue_task_unlocked (httpd, client);
#if defined(HAVE_PTHREAD)
}
#endif
}
static void purge_tasks_locked (qse_httpd_t* httpd, qse_httpd_client_t* client)
{
pthread_mutex_lock (&client->task.mutex);
#if defined(HAVE_PTHREAD)
if (httpd->threaded) pthread_mutex_lock (&client->task.mutex);
#endif
while (dequeue_task_unlocked (httpd, client) == 0);
pthread_mutex_unlock (&client->task.mutex);
#if defined(HAVE_PTHREAD)
if (httpd->threaded) pthread_mutex_unlock (&client->task.mutex);
#endif
}
static int capture_param (qse_htrd_t* http, const qse_mcstr_t* key, const qse_mcstr_t* val)
@ -488,7 +514,7 @@ static void deactivate_listener (qse_httpd_t* httpd, listener_t* l)
l->handle = -1;
}
static int activate_listner (qse_httpd_t* httpd, listener_t* l)
static int activate_listener (qse_httpd_t* httpd, listener_t* l)
{
/* TODO: suport https... */
sockaddr_t addr;
@ -571,7 +597,7 @@ static void deactivate_listeners (qse_httpd_t* httpd)
httpd->listener.max = -1;
}
static int activate_listners (qse_httpd_t* httpd)
static int activate_listeners (qse_httpd_t* httpd)
{
listener_t* l;
fd_set listener_set;
@ -582,7 +608,7 @@ static int activate_listners (qse_httpd_t* httpd)
{
if (l->handle <= -1)
{
if (activate_listner (httpd, l) <= -1) goto oops;
if (activate_listener (httpd, l) <= -1) goto oops;
/*TODO: callback httpd->cbs.listener_activated (httpd, l);*/
}
@ -613,7 +639,9 @@ static void delete_from_client_array (qse_httpd_t* httpd, int fd)
if (array->data[fd].htrd)
{
purge_tasks_locked (httpd, &array->data[fd]);
pthread_mutex_destroy (&array->data[fd].task.mutex);
#if defined(HAVE_PTHREAD)
if (httpd->threaded) pthread_mutex_destroy (&array->data[fd].task.mutex);
#endif
qse_htrd_close (array->data[fd].htrd);
array->data[fd].htrd = QSE_NULL;
@ -674,7 +702,10 @@ static qse_httpd_client_t* insert_into_client_array (qse_httpd_t* httpd, int fd,
array->data[fd].handle.i = fd;
array->data[fd].addr = *addr;
pthread_mutex_init (&array->data[fd].task.mutex, NULL);
#if defined(HAVE_PTHREAD)
if (httpd->threaded)
pthread_mutex_init (&array->data[fd].task.mutex, QSE_NULL);
#endif
xtn = (htrd_xtn_t*)qse_htrd_getxtn (array->data[fd].htrd);
xtn->client_index = fd;
@ -726,9 +757,15 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: too many client?\n"));
if (flag >= 0) fcntl (c, F_SETFL, flag | O_NONBLOCK);
fcntl (c, F_SETFD, FD_CLOEXEC);
pthread_mutex_lock (&httpd->client.mutex);
#if defined(HAVE_PTHREAD)
if (httpd->threaded) pthread_mutex_lock (&httpd->client.mutex);
#endif
client = insert_into_client_array (httpd, c, &addr);
pthread_mutex_unlock (&httpd->client.mutex);
#if defined(HAVE_PTHREAD)
if (httpd->threaded) pthread_mutex_unlock (&httpd->client.mutex);
#endif
if (client == QSE_NULL)
{
close (c);
@ -759,7 +796,8 @@ httpd->cbs.on_error (httpd, l).... */
}
static int make_fd_set_from_client_array (qse_httpd_t* httpd, fd_set* r, fd_set* w)
static int make_fd_set_from_client_array (
qse_httpd_t* httpd, fd_set* r, fd_set* w)
{
int fd, max = -1;
client_array_t* ca = &httpd->client.array;
@ -816,6 +854,7 @@ static void perform_task (qse_httpd_t* httpd, qse_httpd_client_t* client)
}
}
#if defined(HAVE_PTHREAD)
static void* response_thread (void* arg)
{
qse_httpd_t* httpd = (qse_httpd_t*)arg;
@ -830,7 +869,7 @@ static void* response_thread (void* arg)
tv.tv_usec = 0;
pthread_mutex_lock (&httpd->client.mutex);
max = make_fd_set_from_client_array (httpd, NULL, &w);
max = make_fd_set_from_client_array (httpd, QSE_NULL, &w);
pthread_mutex_unlock (&httpd->client.mutex);
while (max == -1 && !httpd->stopreq)
@ -840,22 +879,22 @@ static void* response_thread (void* arg)
pthread_mutex_lock (&httpd->client.mutex);
gettimeofday (&now, NULL);
gettimeofday (&now, QSE_NULL);
timeout.tv_sec = now.tv_sec + 2;
timeout.tv_nsec = now.tv_usec * 1000;
pthread_cond_timedwait (&httpd->client.cond, &httpd->client.mutex, &timeout);
max = make_fd_set_from_client_array (httpd, NULL, &w);
max = make_fd_set_from_client_array (httpd, QSE_NULL, &w);
pthread_mutex_unlock (&httpd->client.mutex);
}
if (httpd->stopreq) break;
n = select (max + 1, NULL, &w, NULL, &tv);
n = select (max + 1, QSE_NULL, &w, QSE_NULL, &tv);
if (n <= -1)
{
if (errno == EINTR) continue;
/*if (errno == EINTR) continue; */
qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure - %S\n"), strerror(errno));
/* break; */
continue;
@ -886,13 +925,13 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure - %S\n"), strerro
perform_task (httpd, client);
}
}
}
}
pthread_exit (NULL);
return NULL;
pthread_exit (QSE_NULL);
return QSE_NULL;
}
#endif
static int read_from_client (qse_httpd_t* httpd, qse_httpd_client_t* client)
{
@ -939,9 +978,11 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: http error while processing \n"));
return 0;
}
int qse_httpd_loop (qse_httpd_t* httpd)
int qse_httpd_loop (qse_httpd_t* httpd, int threaded)
{
#if defined(HAVE_PTHREAD)
pthread_t response_thread_id;
#endif
httpd->stopreq = 0;
@ -951,48 +992,52 @@ int qse_httpd_loop (qse_httpd_t* httpd)
if (httpd->listener.list == QSE_NULL)
{
/* no listener specified */
httpd->errnum = QSE_HTTPD_EINVAL;
goto oops;
return -1;
}
if (activate_listners (httpd) <= -1)
{
qse_fprintf (QSE_STDERR, QSE_T("Error: failed to make a server socket\n"));
goto oops;
}
/* data receiver main logic */
pthread_mutex_init (&httpd->client.mutex, NULL);
pthread_cond_init (&httpd->client.cond, NULL);
if (activate_listeners (httpd) <= -1) return -1;
init_client_array (httpd);
#if defined(HAVE_PTHREAD)
/* start the response sender as a thread */
pthread_create (&response_thread_id, NULL, response_thread, httpd);
/* TODO: error check */
if (threaded)
{
pthread_mutex_init (&httpd->client.mutex, QSE_NULL);
pthread_cond_init (&httpd->client.cond, QSE_NULL);
if (pthread_create (&response_thread_id, QSE_NULL, response_thread, httpd) != 0)
{
pthread_cond_destroy (&httpd->client.cond);
pthread_mutex_destroy (&httpd->client.mutex);
httpd->threaded = 0;
}
else httpd->threaded = 1;
}
#endif
while (!httpd->stopreq)
{
int n, max, fd;
fd_set r;
fd_set w;
struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
pthread_mutex_lock (&httpd->client.mutex);
max = make_fd_set_from_client_array (httpd, &r, QSE_NULL);
max = make_fd_set_from_client_array (httpd, &r, &w);
pthread_mutex_unlock (&httpd->client.mutex);
n = select (max + 1, &r, NULL, NULL, &tv);
n = select (max + 1, &r, &w, QSE_NULL, &tv);
if (n <= -1)
{
httpd->errnum = QSE_HTTPD_EIOMUX;
/* TODO: call user callback for this multiplexer error */
if (errno == EINTR) continue;
/*if (errno == EINTR) continue;*/
qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure\n"));
/* break; */
continue;
@ -1014,33 +1059,57 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure\n"));
/* got input */
if (read_from_client (httpd, client) <= -1)
{
if (httpd->threaded)
{
/* let the writing part handle it,
* probably in the next iteration */
qse_httpd_markclientbad (httpd, client);
shutdown (client->handle.i, 0);
#if 0
pthread_mutex_lock (&httpd->client.mutex);
}
else
{
/*pthread_mutex_lock (&httpd->client.mutex);*/
delete_from_client_array (httpd, fd);
pthread_mutex_unlock (&httpd->client.mutex);
#endif
/*pthread_mutex_unlock (&httpd->client.mutex);*/
continue; /* don't need to go to the writing part */
}
}
}
if (!httpd->threaded && FD_ISSET(client->handle.i, &w))
{
/* output is handled in the main loop if and
* only if it is not threaded */
if (client->bad)
{
/*send (client->handle, i, "INTERNAL SERVER ERROR..", ...);*/
/*shutdown (client->handle.i, 0);*/
/*pthread_mutex_lock (&httpd->client.mutex);*/
delete_from_client_array (httpd, fd);
/*pthread_mutex_unlock (&httpd->client.mutex);*/
}
else if (client->task.queue.count > 0)
{
perform_task (httpd, client);
}
}
}
}
#if defined(HAVE_PTHREAD)
pthread_join (response_thread_id, NULL);
if (httpd->threaded)
{
pthread_join (response_thread_id, QSE_NULL);
pthread_cond_destroy (&httpd->client.cond);
pthread_mutex_destroy (&httpd->client.mutex);
}
#endif
fini_client_array (httpd);
pthread_cond_destroy (&httpd->client.cond);
pthread_mutex_destroy (&httpd->client.mutex);
deactivate_listeners (httpd);
return 0;
oops:
deactivate_listeners (httpd);
return -1;
}
static void free_listener (qse_httpd_t* httpd, listener_t* l)
@ -1267,7 +1336,7 @@ void qse_httpd_clearlisteners (qse_httpd_t* httpd)
pthread_mutex_lock (&httpd->listener.mutex);
deactivate_listeners (httpd);
free_listener_list (httpd, httpd->listener.list);
httpd->listener.list = NULL;
httpd->listener.list = QSE_NULL;
pthread_mutex_unlock (&httpd->listener.mutex);
}
#endif
@ -1279,7 +1348,9 @@ int qse_httpd_entask (
int ret;
ret = enqueue_task_locked (httpd, client, task, xtnsize);
if (ret <= -1) client->bad = 1; /* mark this client bad */
else pthread_cond_signal (&httpd->client.cond);
#if defined(HAVE_PTHREAD)
else if (httpd->threaded) pthread_cond_signal (&httpd->client.cond);
#endif
return ret;
}

View File

@ -26,11 +26,14 @@
#include <qse/net/httpd.h>
#include <qse/net/htrd.h>
#include <pthread.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#if defined(HAVE_PTHREAD)
# include <pthread.h>
#endif
typedef struct client_array_t client_array_t;
union sockaddr_t
@ -58,7 +61,9 @@ struct qse_httpd_client_t
struct
{
#if defined(HAVE_PTHREAD)
pthread_mutex_t mutex;
#endif
struct
{
int count;
@ -100,17 +105,24 @@ struct qse_httpd_t
qse_httpd_cbs_t* cbs;
int stopreq;
#if defined(HAVE_PTHREAD)
int threaded;
#endif
struct
{
#if defined(HAVE_PTHREAD)
pthread_mutex_t mutex;
pthread_cond_t cond;
#endif
client_array_t array;
} client;
struct
{
#if defined(HAVE_PTHREAD)
pthread_mutex_t mutex;
#endif
listener_t* list;
fd_set set;
int max;

View File

@ -56,6 +56,49 @@ int qse_httpd_entaskdisconnect (qse_httpd_t* httpd, qse_httpd_client_t* client)
/*------------------------------------------------------------------------*/
static int task_main_statictext (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
qse_ssize_t n;
qse_size_t count = 0;
const qse_mchar_t* ptr = (const qse_mchar_t*)task->ctx;
while (*ptr != QSE_MT('\0') && count < MAX_SENDFILE_SIZE)
{
ptr++; count++;
}
/* TODO: do i need to add code to skip this send if count is 0? */
n = send (
client->handle.i,
task->ctx,
count,
0
);
if (n <= -1) return -1;
ptr = (const qse_mchar_t*)task->ctx + n;
if (*ptr == QSE_MT('\0')) return 0;
task->ctx = (void*)ptr;
return 1; /* more work to do */
}
int qse_httpd_entaskstatictext (
qse_httpd_t* httpd, qse_httpd_client_t* client, const qse_mchar_t* text)
{
qse_httpd_task_t task;
QSE_MEMSET (&task, 0, QSE_SIZEOF(task));
task.main = task_main_statictext;
task.ctx = (void*)text;
return qse_httpd_entask (httpd, client, &task, 0);
}
/*------------------------------------------------------------------------*/
typedef struct task_text_t task_text_t;
struct task_text_t
{
@ -79,13 +122,14 @@ static int task_init_text (
static int task_main_text (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
ssize_t n;
size_t count;
qse_ssize_t n;
qse_size_t count;
task_text_t* ctx = (task_text_t*)task->ctx;
count = MAX_SENDFILE_SIZE;
if (count >= ctx->left) count = ctx->left;
/* TODO: do i need to add code to skip this send if count is 0? */
n = send (
client->handle.i,
ctx->ptr,
@ -155,8 +199,8 @@ static void task_fini_format (
static int task_main_format (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
ssize_t n;
size_t count;
qse_ssize_t n;
qse_size_t count;
task_format_t* ctx = (task_format_t*)task->ctx;
count = MAX_SENDFILE_SIZE;
@ -299,8 +343,8 @@ static void task_fini_file (
static int task_main_file (
qse_httpd_t* httpd, qse_httpd_client_t* client, qse_httpd_task_t* task)
{
ssize_t n;
size_t count;
qse_ssize_t n;
qse_size_t count;
task_file_t* ctx = (task_file_t*)task->ctx;
count = MAX_SENDFILE_SIZE;

View File

@ -156,7 +156,7 @@ int httpd_main (int argc, qse_char_t* argv[])
signal (SIGINT, sigint);
signal (SIGPIPE, SIG_IGN);
n = qse_httpd_loop (httpd);
n = qse_httpd_loop (httpd, 0);
signal (SIGINT, SIG_DFL);
signal (SIGPIPE, SIG_DFL);