fixed httpd to react to request immediate when threaded

This commit is contained in:
hyung-hwan 2012-02-03 14:30:45 +00:00
parent 681e50822c
commit 17d05bb1e0
4 changed files with 120 additions and 99 deletions

View File

@ -25,6 +25,7 @@
#include "httpd.h" #include "httpd.h"
#include "../cmn/mem.h" #include "../cmn/mem.h"
#include "../cmn/syscall.h"
#include <qse/cmn/chr.h> #include <qse/cmn/chr.h>
#include <qse/cmn/str.h> #include <qse/cmn/str.h>
#include <qse/cmn/mbwc.h> #include <qse/cmn/mbwc.h>
@ -627,51 +628,78 @@ httpd->cbs.on_error (httpd, l).... */
static int make_fd_set_from_client_array ( static int make_fd_set_from_client_array (
qse_httpd_t* httpd, fd_set* r, fd_set* w, int for_rdwr) qse_httpd_t* httpd, fd_set* r, fd_set* w, int for_rdwr)
{ {
/* qse_http_loop() sets for_rdwr to true.
* response_thread() sets for_rdwr to false.
*
* qse_http_loop()
* - accepts a new client connection
* - reads a client request
* - writes back a response to a client request if not threaded.
*
* response_thread()
* - writes back a response to a client request if threaded.
*/
int fd, max = -1; int fd, max = -1;
client_array_t* ca = &httpd->client.array; client_array_t* ca = &httpd->client.array;
if (r && for_rdwr) if (for_rdwr)
{ {
/* qse_http_loop() needs to monitor listner handles
* to handle a new client connection. */
max = httpd->listener.max; max = httpd->listener.max;
*r = httpd->listener.set; *r = httpd->listener.set;
} }
if (w) else
{ {
FD_ZERO (w); FD_ZERO (r);
#if defined(HAVE_PTHREAD)
/* select() in response_thread() needs to be aborted
* if it's blocking on a fd_set previously composed
* when a new task is enqueued. it can select() on new
* fd_set quickly.
*/
QSE_ASSERT (httpd->threaded);
FD_SET (httpd->client.pfd[0], r);
max = httpd->client.pfd[0];
#endif
} }
FD_ZERO (w);
for (fd = 0; fd < ca->capa; fd++) for (fd = 0; fd < ca->capa; fd++)
{ {
if (ca->data[fd].htrd) if (ca->data[fd].htrd)
{ {
if (r && !ca->data[fd].bad) if (!ca->data[fd].bad)
{ {
if (for_rdwr) if (for_rdwr)
{ {
/* add a client-side handle to the read set */ /* add a client-side handle to the read set
* only for qse_httpd_loop(). */
FD_SET (ca->data[fd].handle.i, r); FD_SET (ca->data[fd].handle.i, r);
if (ca->data[fd].handle.i > max) max = ca->data[fd].handle.i; if (ca->data[fd].handle.i > max) max = ca->data[fd].handle.i;
} }
if (!httpd->threaded || !for_rdwr)
{
/* a trigger is a handle to monitor to check
* if there is data avaiable to write back to the client.
* if it is not threaded, qse_httpd_loop() needs to
* monitor trigger handles. if it is threaded,
* response_thread() needs to monitor these handles */
if (ca->data[fd].task.queue.head && if (ca->data[fd].task.queue.head &&
ca->data[fd].task.queue.head->task.trigger.i >= 0) ca->data[fd].task.queue.head->task.trigger.i >= 0)
{ {
/* if a trigger is available, add it to the read set also */ /* if a trigger is available, add it to the read set also. */
FD_SET (ca->data[fd].task.queue.head->task.trigger.i, r); FD_SET (ca->data[fd].task.queue.head->task.trigger.i, r);
if (ca->data[fd].task.queue.head->task.trigger.i > max) if (ca->data[fd].task.queue.head->task.trigger.i > max)
max = ca->data[fd].task.queue.head->task.trigger.i; max = ca->data[fd].task.queue.head->task.trigger.i;
} }
} }
#if 0
if (w && (ca->data[fd].task.queue.count > 0 || ca->data[fd].bad))
{
/* add it to the set if it has a response to send */
FD_SET (ca->data[fd].handle.i, w);
if (ca->data[fd].handle.i > max) max = ca->data[fd].handle.i;
} }
#endif
if (w)
{
if (ca->data[fd].bad || if (ca->data[fd].bad ||
(ca->data[fd].task.queue.head && (ca->data[fd].task.queue.head &&
ca->data[fd].task.queue.head->task.trigger.i <= -1)) ca->data[fd].task.queue.head->task.trigger.i <= -1))
@ -689,7 +717,6 @@ static int make_fd_set_from_client_array (
} }
} }
} }
}
return max; return max;
} }
@ -727,9 +754,6 @@ static void* response_thread (void* arg)
fd_set r, w; fd_set r, w;
struct timeval tv; struct timeval tv;
tv.tv_sec = 1;
tv.tv_usec = 0;
pthread_mutex_lock (&httpd->client.mutex); pthread_mutex_lock (&httpd->client.mutex);
max = make_fd_set_from_client_array (httpd, &r, &w, 0); max = make_fd_set_from_client_array (httpd, &r, &w, 0);
pthread_mutex_unlock (&httpd->client.mutex); pthread_mutex_unlock (&httpd->client.mutex);
@ -742,10 +766,11 @@ static void* response_thread (void* arg)
pthread_mutex_lock (&httpd->client.mutex); pthread_mutex_lock (&httpd->client.mutex);
gettimeofday (&now, QSE_NULL); gettimeofday (&now, QSE_NULL);
timeout.tv_sec = now.tv_sec + 2; timeout.tv_sec = now.tv_sec + 1;
timeout.tv_nsec = now.tv_usec * 1000; timeout.tv_nsec = now.tv_usec * 1000;
pthread_cond_timedwait (&httpd->client.cond, &httpd->client.mutex, &timeout); pthread_cond_timedwait (
&httpd->client.cond, &httpd->client.mutex, &timeout);
max = make_fd_set_from_client_array (httpd, &r, &w, 0); max = make_fd_set_from_client_array (httpd, &r, &w, 0);
pthread_mutex_unlock (&httpd->client.mutex); pthread_mutex_unlock (&httpd->client.mutex);
@ -753,6 +778,9 @@ static void* response_thread (void* arg)
if (httpd->stopreq) break; if (httpd->stopreq) break;
tv.tv_sec = 1;
tv.tv_usec = 0;
n = select (max + 1, &r, &w, QSE_NULL, &tv); n = select (max + 1, &r, &w, QSE_NULL, &tv);
if (n <= -1) if (n <= -1)
{ {
@ -766,29 +794,18 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure - %hs\n"), strerr
continue; continue;
} }
if (FD_ISSET (httpd->client.pfd[0], &r))
{
qse_mchar_t dummy;
QSE_READ (httpd->client.pfd[0], &dummy, 1);
}
for (fd = 0; fd < httpd->client.array.capa; fd++) for (fd = 0; fd < httpd->client.array.capa; fd++)
{ {
qse_httpd_client_t* client = &httpd->client.array.data[fd]; qse_httpd_client_t* client = &httpd->client.array.data[fd];
if (!client->htrd) continue; if (!client->htrd) continue;
/* ---------------------------------------------- */
#if 0
if (FD_ISSET(client->handle.i, &w))
{
if (client->bad)
{
}
else if (client->task.queue.count > 0)
{
if (client->task.queue.head->task.trigger.i <= -1 ||
FD_ISSET(client->task.queue.head->task.trigger.i, &r))
{
perform_task (httpd, client);
}
}
}
#endif
if (client->bad) if (client->bad)
{ {
/*shutdown (client->handle.i, SHUT_RDWR);*/ /*shutdown (client->handle.i, SHUT_RDWR);*/
@ -796,7 +813,7 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure - %hs\n"), strerr
delete_from_client_array (httpd, fd); delete_from_client_array (httpd, fd);
pthread_mutex_unlock (&httpd->client.mutex); pthread_mutex_unlock (&httpd->client.mutex);
} }
else else if (client->task.queue.head)
{ {
if (client->task.queue.head->task.trigger.i <= -1 || if (client->task.queue.head->task.trigger.i <= -1 ||
FD_ISSET(client->task.queue.head->task.trigger.i, &r)) FD_ISSET(client->task.queue.head->task.trigger.i, &r))
@ -805,12 +822,13 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure - %hs\n"), strerr
tv.tv_usec = 0; tv.tv_usec = 0;
FD_ZERO (&w); FD_ZERO (&w);
FD_SET (client->handle.i, &w); FD_SET (client->handle.i, &w);
n = select (max + 1, QSE_NULL, &w, QSE_NULL, &tv); n = select (client->handle.i + 1, QSE_NULL, &w, QSE_NULL, &tv);
if (n > 0 && FD_ISSET(client->handle.i, &w)) if (n > 0 && FD_ISSET(client->handle.i, &w))
{
perform_task (httpd, client); perform_task (httpd, client);
} }
} }
/* ---------------------------------------------- */ }
} }
} }
@ -903,6 +921,16 @@ int qse_httpd_loop (qse_httpd_t* httpd, int threaded)
/* start the response sender as a thread */ /* start the response sender as a thread */
if (threaded) if (threaded)
{ {
if (QSE_PIPE(httpd->client.pfd) == 0)
{
int i;
for (i = 0; i < 2; i++)
{
int flags = QSE_FCNTL (httpd->client.pfd[i], F_GETFD, 0);
if (flags >= 0)
QSE_FCNTL (httpd->client.pfd[i], F_SETFD, flags | FD_CLOEXEC);
}
pthread_mutex_init (&httpd->client.mutex, QSE_NULL); pthread_mutex_init (&httpd->client.mutex, QSE_NULL);
pthread_cond_init (&httpd->client.cond, QSE_NULL); pthread_cond_init (&httpd->client.cond, QSE_NULL);
@ -912,9 +940,12 @@ int qse_httpd_loop (qse_httpd_t* httpd, int threaded)
{ {
pthread_cond_destroy (&httpd->client.cond); pthread_cond_destroy (&httpd->client.cond);
pthread_mutex_destroy (&httpd->client.mutex); pthread_mutex_destroy (&httpd->client.mutex);
QSE_CLOSE (httpd->client.pfd[1]);
QSE_CLOSE (httpd->client.pfd[0]);
} }
else httpd->threaded = 1; else httpd->threaded = 1;
} }
}
#endif #endif
while (!httpd->stopreq) while (!httpd->stopreq)
@ -935,7 +966,6 @@ int qse_httpd_loop (qse_httpd_t* httpd, int threaded)
if (httpd->threaded) pthread_mutex_unlock (&httpd->client.mutex); if (httpd->threaded) pthread_mutex_unlock (&httpd->client.mutex);
#endif #endif
/*n = select (max + 1, &r, &w, QSE_NULL, &tv);*/
n = select (max + 1, &r, &w, QSE_NULL, &tv); n = select (max + 1, &r, &w, QSE_NULL, &tv);
if (n <= -1) if (n <= -1)
{ {
@ -984,31 +1014,6 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure\n"));
} }
/*----------------------------------------------------------*/
#if 0
if (!httpd->threaded && FD_ISSET(client->handle.i, &w))
{
/* output is handled in the main loop if and
* only if it is not threaded */
if (client->bad)
{
/*send (client->handle, i, "INTERNAL SERVER ERROR..", ...);*/
/*shutdown (client->handle.i, SHUT_RDWR);*/
/*pthread_mutex_lock (&httpd->client.mutex);*/
delete_from_client_array (httpd, fd);
/*pthread_mutex_unlock (&httpd->client.mutex);*/
}
else if (client->task.queue.count > 0)
{
if (client->task.queue.head->task.trigger.i <= -1 ||
FD_ISSET(client->task.queue.head->task.trigger.i, &r))
{
perform_task (httpd, client);
}
}
}
#endif
if (!httpd->threaded) if (!httpd->threaded)
{ {
if (client->bad) if (client->bad)
@ -1018,7 +1023,7 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure\n"));
delete_from_client_array (httpd, fd); delete_from_client_array (httpd, fd);
/*pthread_mutex_unlock (&httpd->client.mutex);*/ /*pthread_mutex_unlock (&httpd->client.mutex);*/
} }
else if (client->task.queue.count > 0) else if (client->task.queue.head)
{ {
if (client->task.queue.head->task.trigger.i <= -1 || if (client->task.queue.head->task.trigger.i <= -1 ||
FD_ISSET(client->task.queue.head->task.trigger.i, &r)) FD_ISSET(client->task.queue.head->task.trigger.i, &r))
@ -1027,7 +1032,7 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure\n"));
tv.tv_usec = 0; tv.tv_usec = 0;
FD_ZERO (&w); FD_ZERO (&w);
FD_SET (client->handle.i, &w); FD_SET (client->handle.i, &w);
n = select (max + 1, QSE_NULL, &w, QSE_NULL, &tv); n = select (client->handle.i + 1, QSE_NULL, &w, QSE_NULL, &tv);
/* TODO: logging if n == -1 */ /* TODO: logging if n == -1 */
@ -1036,8 +1041,6 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure\n"));
} }
} }
} }
/*----------------------------------------------------------*/
} }
} }
@ -1047,6 +1050,8 @@ qse_fprintf (QSE_STDERR, QSE_T("Error: select returned failure\n"));
pthread_join (response_thread_id, QSE_NULL); pthread_join (response_thread_id, QSE_NULL);
pthread_cond_destroy (&httpd->client.cond); pthread_cond_destroy (&httpd->client.cond);
pthread_mutex_destroy (&httpd->client.mutex); pthread_mutex_destroy (&httpd->client.mutex);
QSE_CLOSE (httpd->client.pfd[1]);
QSE_CLOSE (httpd->client.pfd[0]);
} }
#endif #endif
@ -1287,7 +1292,15 @@ qse_httpd_task_t* qse_httpd_entask (
ret = enqueue_task_locked (httpd, client, pred, task, xtnsize); ret = enqueue_task_locked (httpd, client, pred, task, xtnsize);
if (ret == QSE_NULL) client->bad = 1; /* mark this client bad */ if (ret == QSE_NULL) client->bad = 1; /* mark this client bad */
#if defined(HAVE_PTHREAD) #if defined(HAVE_PTHREAD)
else if (httpd->threaded) pthread_cond_signal (&httpd->client.cond); else if (httpd->threaded)
{
static qse_byte_t dummy = 0x01;
/* write to the pipe to wake up select() in
* the response thread if it was blocking. */
QSE_WRITE (httpd->client.pfd[1], &dummy, 1);
pthread_cond_signal (&httpd->client.cond);
}
#endif #endif
return ret; return ret;
} }

View File

@ -114,11 +114,13 @@ struct qse_httpd_t
int option; int option;
int stopreq; int stopreq;
int threaded; int threaded;
struct struct
{ {
#if defined(HAVE_PTHREAD) #if defined(HAVE_PTHREAD)
int pfd[2];
pthread_mutex_t mutex; pthread_mutex_t mutex;
pthread_cond_t cond; pthread_cond_t cond;
#endif #endif
@ -149,6 +151,7 @@ void qse_httpd_fini (
qse_httpd_t* httpd qse_httpd_t* httpd
); );
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -1622,7 +1622,9 @@ qse_printf (QSE_T("CGI FUCKED UP...RETURNING TOO MUCH DATA\n"));
return -1; return -1;
} }
#if 0
qse_printf (QSE_T("CGI SEND [%.*hs]\n"), (int)cgi->buflen, cgi->buf); qse_printf (QSE_T("CGI SEND [%.*hs]\n"), (int)cgi->buflen, cgi->buf);
#endif
n = send (client->handle.i, cgi->buf, cgi->buflen, 0); n = send (client->handle.i, cgi->buf, cgi->buflen, 0);
if (n <= -1) if (n <= -1)
{ {
@ -1634,7 +1636,9 @@ qse_printf (QSE_T("CGI SEND [%.*hs]\n"), (int)cgi->buflen, cgi->buf);
QSE_MEMCPY (&cgi->buf[0], &cgi->buf[n], cgi->buflen - n); QSE_MEMCPY (&cgi->buf[0], &cgi->buf[n], cgi->buflen - n);
cgi->buflen -= n; cgi->buflen -= n;
#if 0
qse_printf (QSE_T("CGI SEND DONE\n")); qse_printf (QSE_T("CGI SEND DONE\n"));
#endif
return 1; return 1;
} }

View File

@ -37,7 +37,8 @@ static int handle_request (
#endif #endif
qse_printf (QSE_T("================================\n")); qse_printf (QSE_T("================================\n"));
qse_printf (QSE_T("REQUEST ==> [%hs] version[%d.%d] method[%d]\n"), qse_printf (QSE_T("[%lu] REQUEST ==> [%hs] version[%d.%d] method[%d]\n"),
(unsigned long)time(NULL),
qse_htre_getqpathptr(req), qse_htre_getqpathptr(req),
qse_htre_getmajorversion(req), qse_htre_getmajorversion(req),
qse_htre_getminorversion(req), qse_htre_getminorversion(req),