diff --git a/qse/cmd/http/ursd.c b/qse/cmd/http/ursd.c index 33ebacad..c73e62a0 100644 --- a/qse/cmd/http/ursd.c +++ b/qse/cmd/http/ursd.c @@ -67,6 +67,9 @@ typedef struct urs_hdr_t urs_hdr_t; typedef struct urs_pkt_t urs_pkt_t; +#define URS_RCODE_OK 0 +#define URS_RCODE_ERROR 1 + #include struct urs_hdr_t { @@ -83,12 +86,19 @@ struct urs_pkt_t }; #include +#define MAX_PACKET_SIZE 65535 +#define XREQ_BLOCK_SIZE 2048 +#define XREQ_MAX_BLOCKS 32 + typedef struct xreq_t xreq_t; struct xreq_t { + qse_ntime_t timestamp; + qse_skad_t from; qse_sck_len_t fromlen; - qse_uint8_t* data; + urs_pkt_t* pkt; + xreq_t* next; }; @@ -111,7 +121,7 @@ struct rewriter_t qse_sck_len_t fromlen; qse_uint16_t urllen; qse_uint16_t urlpos; - qse_uint8_t buf[65535]; + qse_uint8_t buf[MAX_PACKET_SIZE]; qse_uint32_t outlen; /* length of output read from the rewriter */ } req; @@ -125,8 +135,8 @@ struct ursd_t qse_mmgr_t* mmgr; qse_char_t* cmdline; - qse_size_t total; - qse_size_t nfree; + qse_size_t total_rewriter_count; + qse_size_t free_rewriter_count; rewriter_t* rewriters; rewriter_t* free_rewriter; @@ -135,8 +145,18 @@ struct ursd_t qse_sck_hnd_t sck; qse_mux_t* mux; - xreq_t* head; - xreq_t* tail; + + struct + { + qse_uint8_t buf[MAX_PACKET_SIZE]; /* temporary buffer */ + + qse_size_t count; /* number of packets in the request queue */ + xreq_t* head; /* head of the request queue */ + xreq_t* tail; /* tail of the request queue */ + + xreq_t* free[XREQ_MAX_BLOCKS]; /* xreq avaialble chains per block size */ + } xreq; /* request queue */ + }; @@ -168,7 +188,7 @@ static qse_sck_hnd_t open_server_socket (int proto, const qse_nwad_t* bindnwad) s = socket (family, type, proto); if (!qse_isvalidsckhnd(s)) { - fprintf (stderr, "cannot create a socket\n"); + qse_fprintf (QSE_STDERR, QSE_T("cannot create a socket\n")); goto oops; } @@ -195,7 +215,7 @@ static qse_sck_hnd_t open_server_socket (int proto, const qse_nwad_t* bindnwad) } #endif - fprintf (stderr, "cannot bind a socket\n"); + qse_fprintf (QSE_STDERR, QSE_T("cannot bind a socket\n")); goto oops; } @@ -214,7 +234,7 @@ bind_ok: if (setsockopt (s, SOL_SCTP, SCTP_INITMSG, &im, QSE_SIZEOF(im)) <= -1) { - fprintf (stderr, "cannot set sctp initmsg option\n"); + qse_fprintf (QSE_STDERR, QSE_T("cannot set sctp initmsg option\n")); goto oops; } @@ -228,7 +248,7 @@ bind_ok: if (listen (s, 99) <= -1) { - fprintf (stderr, "cannot set listen on sctp socket\n"); + qse_fprintf (QSE_STDERR, QSE_T("cannot set listen on sctp socket\n")); goto oops; } } @@ -264,12 +284,13 @@ static int delete_from_mux (qse_mux_t* mux, qse_mux_hnd_t handle, int type, int static void chain_rewriter_to_free_list (ursd_t* ursd, rewriter_t* rewriter) { + /* attach a rewriter to the head of the list */ rewriter->free = 1; rewriter->prev = QSE_NULL; rewriter->next = ursd->free_rewriter; if (ursd->free_rewriter) ursd->free_rewriter->prev = rewriter; ursd->free_rewriter = rewriter; - ursd->nfree++; + ursd->free_rewriter_count++; } static rewriter_t* dechain_rewriter_from_free_list (ursd_t* ursd, rewriter_t* rewriter) @@ -280,7 +301,7 @@ static rewriter_t* dechain_rewriter_from_free_list (ursd_t* ursd, rewriter_t* re else rewriter->prev->next = rewriter->next; rewriter->free = 0; - ursd->nfree--; + ursd->free_rewriter_count--; return rewriter; } @@ -365,23 +386,21 @@ static rewriter_t* get_free_rewriter (ursd_t* ursd) { rewriter_t* rewriter; - for (rewriter = ursd->free_rewriter; rewriter; rewriter = rewriter->next) + rewriter = ursd->free_rewriter; + + if (rewriter) { QSE_ASSERT (!rewriter->busy); QSE_ASSERT (!rewriter->faulty); QSE_ASSERT (!rewriter->pio_in_in_mux); - if (!rewriter->pio) start_rewriter (ursd, rewriter); + if (!rewriter->pio) start_rewriter (ursd, rewriter); - if (rewriter->pio) - { - dechain_rewriter_from_free_list (ursd, rewriter); - reset_rewriter_data (rewriter); - return rewriter; - } + dechain_rewriter_from_free_list (ursd, rewriter); + reset_rewriter_data (rewriter); } - return QSE_NULL; + return rewriter; } static void release_rewriter (ursd_t* ursd, rewriter_t* rewriter, int send_empty_response) @@ -396,13 +415,14 @@ static void release_rewriter (ursd_t* ursd, rewriter_t* rewriter, int send_empty if (rewriter->pio_in_in_mux) { + QSE_ASSERT (rewriter->pio); delete_from_mux (ursd->mux, qse_pio_gethandle(rewriter->pio, QSE_PIO_IN), TYPE_PIO_IN, rewriter->index); rewriter->pio_in_in_mux = 0; } if (rewriter->busy) dechain_rewriter_from_busy_list (ursd, rewriter); - if (rewriter->faulty) + if (rewriter->faulty || !rewriter->pio) { restart_rewriter (ursd, rewriter); @@ -470,6 +490,124 @@ static int feed_rewriter (ursd_t* ursd, rewriter_t* rewriter) return 1; /* the full url has been passed to the rewriter */ } +static int enqueue_request (ursd_t* ursd, urs_pkt_t* pkt, const qse_skad_t* from, qse_sck_len_t fromlen) +{ + qse_uint16_t blkidx; + qse_size_t memsize; + xreq_t* xreq; + + /* TODO: if (ursd->xreq.count > MAX_QUEUE_SIZE) return -1; */ + + blkidx = (pkt->hdr.pktlen - 1) / XREQ_BLOCK_SIZE; /* 0 based */ + if (blkidx < XREQ_MAX_BLOCKS) + { + xreq = ursd->xreq.free[blkidx]; + if (xreq) + { + ursd->xreq.free[blkidx] = xreq->next; + goto copy_packet; + } + memsize = (blkidx + 1) * XREQ_BLOCK_SIZE; + } + else + { + memsize = pkt->hdr.pktlen; + } + + xreq = QSE_MMGR_ALLOC (ursd->mmgr, QSE_SIZEOF(*xreq) + memsize); + if (xreq == QSE_NULL) return -1; + +copy_packet: + /* TODOO: xreq->timestamp */ + xreq->next = QSE_NULL; + xreq->from = *from; + xreq->fromlen = fromlen; + xreq->pkt = (urs_pkt_t*)(xreq + 1); + qse_memcpy (xreq->pkt, pkt, pkt->hdr.pktlen); + + if (ursd->xreq.count > 0) + { + ursd->xreq.tail->next = xreq; + ursd->xreq.tail = xreq; + } + else + { + ursd->xreq.head = xreq; + ursd->xreq.tail = xreq; + } + + ursd->xreq.count++; + return 0; +} + +static xreq_t* dequeue_request (ursd_t* ursd) +{ + xreq_t* xreq = QSE_NULL; + + if (ursd->xreq.count > 0) + { + xreq = ursd->xreq.head; + + ursd->xreq.head = xreq->next; + if (ursd->xreq.count == 1) + ursd->xreq.tail = ursd->xreq.head; + + ursd->xreq.count--; + } + + return xreq; +} + +static void release_request (ursd_t* ursd, xreq_t* xreq) +{ + qse_uint16_t blkidx; + + blkidx = (xreq->pkt->hdr.pktlen - 1) / XREQ_BLOCK_SIZE; + if (blkidx < XREQ_MAX_BLOCKS) + { + xreq->next = ursd->xreq.free[blkidx]; + ursd->xreq.free[blkidx] = xreq; + } + else + { + QSE_MMGR_FREE (ursd->mmgr, xreq); + } +} + +static void handle_pending_requests (ursd_t* ursd) +{ + rewriter_t* rewriter; + xreq_t* xreq; + + while (ursd->xreq.count > 0) + { + rewriter = get_free_rewriter (ursd); + if (rewriter) + { + qse_printf (QSE_T("HANDLING PENDING REQUEST.... %d\n"), (int)ursd->xreq.count); + xreq = dequeue_request (ursd); + QSE_ASSERT (xreq); + + rewriter->req.fromlen = xreq->fromlen; + rewriter->req.from = xreq->from; + qse_memcpy (rewriter->req.buf, xreq->pkt, xreq->pkt->hdr.pktlen); + rewriter->req.buf[xreq->pkt->hdr.pktlen] = QSE_MT('\n'); /* put a new line at the end */ + rewriter->req.urlpos = 0; + rewriter->req.urllen = xreq->pkt->hdr.pktlen - QSE_SIZEOF(urs_hdr_t) + 1; /* +1 for '\n' */ + + release_request (ursd, xreq); + + if (rewriter->pio) feed_rewriter (ursd, rewriter); + else release_rewriter (ursd, rewriter, 1); + } + else + { + qse_printf (QSE_T("NO REWRITER AVAILABLE FOR PENDING REQUEST....%d\n"), (int)ursd->xreq.count); + break; + } + } +} + static void receive_request_from_socket (ursd_t* ursd, const qse_mux_evt_t* evt) { qse_ssize_t x; @@ -496,30 +634,56 @@ static void receive_request_from_socket (ursd_t* ursd, const qse_mux_evt_t* evt) return; } - rewriter->req.buf[x] = QSE_MT('\n'); /* put a new line at the end */ - rewriter->req.urlpos = 0; - rewriter->req.urllen = pkt->hdr.pktlen - QSE_SIZEOF(urs_hdr_t) + 1; /* +1 for '\n' */ + if (rewriter->pio) + { + rewriter->req.buf[x] = QSE_MT('\n'); /* put a new line at the end */ + rewriter->req.urlpos = 0; + rewriter->req.urllen = pkt->hdr.pktlen - QSE_SIZEOF(urs_hdr_t) + 1; /* +1 for '\n' */ - printf ("%d [[[%.*s]]]\n", (int)x, (int)rewriter->req.urllen, pkt->url); - feed_rewriter (ursd, rewriter); +qse_printf (QSE_T("%d [[[%.*hs]]]\n"), (int)x, (int)rewriter->req.urllen, pkt->url); + feed_rewriter (ursd, rewriter); + } + else + { + /* the actual rewriter is defunct. */ + /* TODO: error logging */ +qse_printf (QSE_T("rewriter->pio is NULL. ....\n")); + release_rewriter (ursd, rewriter, 1); + } } else { - - /* TODO */ qse_sck_len_t fromlen; qse_skad_t from; - qse_uint8_t buf[65535]; urs_pkt_t* pkt; fromlen = QSE_SIZEOF(from); - x = recvfrom (evt->hnd, buf, QSE_SIZEOF(buf) - 1, 0, (struct sockaddr*)&from, &fromlen); + x = recvfrom (evt->hnd, ursd->xreq.buf, QSE_SIZEOF(ursd->xreq.buf) - 1, 0, (struct sockaddr*)&from, &fromlen); - /* TODO: queue up in the internal queue instead of returnign empty response */ + if (x < QSE_SIZEOF(urs_hdr_t)) + { + /* TODO: message logging */ + return; + } - pkt = (urs_pkt_t*)buf; - pkt->hdr.pktlen = qse_hton16(QSE_SIZEOF(urs_hdr_t)); - sendto (evt->hnd, pkt, QSE_SIZEOF(urs_hdr_t), 0, (struct sockaddr*)&from, fromlen); + pkt = (urs_pkt_t*)ursd->xreq.buf; + pkt->hdr.pktlen = qse_ntoh16(pkt->hdr.pktlen); /* change the byte order */ + if (pkt->hdr.pktlen != x) + { + /* TOOD: message logging */ + return; + } + + if (enqueue_request (ursd, pkt, &from, fromlen) <= -1) + { + /* TODO: error logging for failed enqueuing */ + + /* if enqueuing fails, send an empty response */ + pkt->hdr.rcode = qse_hton16(URS_RCODE_ERROR); + pkt->hdr.pktlen = qse_hton16(QSE_SIZEOF(urs_hdr_t)); + sendto (evt->hnd, pkt, QSE_SIZEOF(urs_hdr_t), 0, (struct sockaddr*)&from, fromlen); + /* TODO: error logging for sendto failure */ + } } } @@ -572,7 +736,7 @@ static void dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) release_rewriter (mux_xtn->ursd, rewriter, 1); } -printf ("READ %d, %d bytes from pipes [%.*s]\n", (int)x, (int)rewriter->req.outlen, (int)rewriter->req.outlen, pkt->url); +qse_printf (QSE_T("READ %d, %d bytes from pipes [%.*hs]\n"), (int)x, (int)rewriter->req.outlen, (int)rewriter->req.outlen, pkt->url); if (pkt->url[rewriter->req.outlen - 1] == QSE_MT('\n')) { @@ -590,10 +754,8 @@ printf ("READ %d, %d bytes from pipes [%.*s]\n", (int)x, (int)rewriter->req.outl * it can wait a while. think about it. */ release_rewriter (mux_xtn->ursd, rewriter, 0); -/* TODO: if there is a pending request, schedule another rewriter here. */ } - /* TODO: is the complete output is not received within time, some actions must be taken. timer based... rewrite timeout */ } else @@ -629,26 +791,28 @@ static int init_ursd (ursd_t* ursd, int npios, const qse_char_t* cmdline, const qse_nwad_t bindnwad; mux_xtn_t* mux_xtn; + if (npios <= 0) npios = 1; + qse_memset (ursd, 0, sizeof(*ursd)); ursd->mmgr = QSE_MMGR_GETDFL(); ursd->cmdline = qse_strdup (cmdline, ursd->mmgr); if (ursd->cmdline == QSE_NULL) { - fprintf (stderr, "cannot copy cmdline\n"); + qse_fprintf (QSE_STDERR, QSE_T("cannot duplicate cmdline\n")); goto oops; } ursd->mux = qse_mux_open (ursd->mmgr, QSE_SIZEOF(mux_xtn_t), dispatch_mux_event, 100, QSE_NULL); if (ursd->mux == QSE_NULL) { - fprintf (stderr, "cannot create a multiplexer\n"); + qse_fprintf (QSE_STDERR, QSE_T("cannot create a multiplexer\n")); goto oops; } if (qse_strtonwad (bindaddr, &bindnwad) <= -1) { - fprintf (stderr, "invalid binding address\n"); + qse_fprintf (QSE_STDERR, QSE_T("invalid binding address\n")); goto oops; } @@ -658,7 +822,7 @@ static int init_ursd (ursd_t* ursd, int npios, const qse_char_t* cmdline, const ursd->rewriters = QSE_MMGR_ALLOC (ursd->mmgr, npios * QSE_SIZEOF(rewriter_t)); if (ursd->rewriters == QSE_NULL) { - fprintf (stderr, "cannot callocate pipes\n"); + qse_fprintf (QSE_STDERR, QSE_T("cannot allocate rewriters\n")); goto oops; } qse_memset (ursd->rewriters, 0, npios * QSE_SIZEOF(rewriter_t)); @@ -672,11 +836,11 @@ static int init_ursd (ursd_t* ursd, int npios, const qse_char_t* cmdline, const if (insert_to_mux (ursd->mux, ursd->sck, TYPE_SOCKET, 0) <= -1) { - fprintf (stderr, "cannot add socket to multiplexer\n"); + qse_fprintf (QSE_STDERR, QSE_T("cannot add socket to multiplexer\n")); goto oops; } - ursd->total = npios; + ursd->total_rewriter_count = npios; mux_xtn = qse_mux_getxtn (ursd->mux); mux_xtn->ursd = ursd; @@ -698,20 +862,55 @@ oops: static void fini_ursd (ursd_t* ursd) { qse_size_t i; + xreq_t* xreq; + + for (i = 0; i < ursd->total_rewriter_count; i++) + stop_rewriter (ursd, &ursd->rewriters[i]); - for (i = 0; i < ursd->total; i++) stop_rewriter (ursd, &ursd->rewriters[i]); QSE_MMGR_FREE (ursd->mmgr, ursd->rewriters); delete_from_mux (ursd->mux, ursd->sck, TYPE_SOCKET, 0); qse_closesckhnd (ursd->sck); qse_mux_close (ursd->mux); QSE_MMGR_FREE (ursd->mmgr, ursd->cmdline); + + + /* destroy the request queue */ + xreq = ursd->xreq.head; + while (xreq) + { + xreq_t* next = xreq->next; + QSE_MMGR_FREE (ursd->mmgr, xreq); + xreq = next; + } + + for (i = 0; i < QSE_COUNTOF(ursd->xreq.free); i++) + { + while (ursd->xreq.free[i]) + { + xreq = ursd->xreq.free[i]; + ursd->xreq.free[i] = xreq->next; + QSE_MMGR_FREE (ursd->mmgr, xreq); + } + } } static int g_stop_requested = 0; static void handle_signal (int sig) { - if (sig == SIGINT) g_stop_requested = 1; + switch (sig) + { + case SIGINT: + case SIGTERM: + case SIGHUP: + g_stop_requested = 1; + break; + } +} + +static void print_usage (qse_char_t* argv0) +{ + qse_fprintf (QSE_STDERR, QSE_T("Usage: %s rewriter-path rewriter-count binding-address\n"), qse_basename(argv0)); } static int httpd_main (int argc, qse_char_t* argv[]) @@ -719,18 +918,40 @@ static int httpd_main (int argc, qse_char_t* argv[]) ursd_t ursd; int ursd_inited = 0; + + if (argc < 4) + { + print_usage (argv[0]); + return -1; + } + signal (SIGINT, handle_signal); + signal (SIGTERM, handle_signal); + signal (SIGHUP, handle_signal); signal (SIGPIPE, SIG_IGN); - if (init_ursd (&ursd, 10, QSE_T("/tmp/urs.awk"), QSE_T("[::]:97]")) <= -1) goto oops; + if (init_ursd (&ursd, qse_strtoi(argv[2]), argv[1], argv[3]) <= -1) goto oops; ursd_inited = 1; while (!g_stop_requested) { qse_ntime_t tmout; qse_cleartime (&tmout); - tmout.sec += 1; + + /* if there are pending requests, use timeout of 0. + * this way, multiplexer events and pending requests can be + * handled together. note if there's no free rewriter while + * there's a pending request, the request can't be performed. + * the free rewriter check is simpler than the actual check + * in get_free_rewriter(). it's also possible that no free + * rewriter is available after qse_mux_poll(). this timeout + * calculation is on the best-effort basis. */ + if (ursd.xreq.count <= 0 || !ursd.free_rewriter) tmout.sec += 1; +/* TODO: add timer also... and consider that in calculating in tmout */ + qse_mux_poll (ursd.mux, &tmout); + + handle_pending_requests (&ursd); } fini_ursd (&ursd); @@ -772,7 +993,7 @@ int qse_main (int argc, qse_achar_t* argv[]) if (WSAStartup (MAKEWORD(2,0), &wsadata) != 0) { - qse_fprintf (QSE_STDERR, QSE_T("Failed to start up winsock\n")); + qse_fprintf (QSE_STDERR, QSE_T("failed to start up winsock\n")); return -1; }