enhanced ursd
This commit is contained in:
parent
f27f5a650e
commit
433686b599
@ -67,6 +67,9 @@
|
|||||||
typedef struct urs_hdr_t urs_hdr_t;
|
typedef struct urs_hdr_t urs_hdr_t;
|
||||||
typedef struct urs_pkt_t urs_pkt_t;
|
typedef struct urs_pkt_t urs_pkt_t;
|
||||||
|
|
||||||
|
#define URS_RCODE_OK 0
|
||||||
|
#define URS_RCODE_ERROR 1
|
||||||
|
|
||||||
#include <qse/pack1.h>
|
#include <qse/pack1.h>
|
||||||
struct urs_hdr_t
|
struct urs_hdr_t
|
||||||
{
|
{
|
||||||
@ -83,12 +86,19 @@ struct urs_pkt_t
|
|||||||
};
|
};
|
||||||
#include <qse/unpack.h>
|
#include <qse/unpack.h>
|
||||||
|
|
||||||
|
#define MAX_PACKET_SIZE 65535
|
||||||
|
#define XREQ_BLOCK_SIZE 2048
|
||||||
|
#define XREQ_MAX_BLOCKS 32
|
||||||
|
|
||||||
typedef struct xreq_t xreq_t;
|
typedef struct xreq_t xreq_t;
|
||||||
struct xreq_t
|
struct xreq_t
|
||||||
{
|
{
|
||||||
|
qse_ntime_t timestamp;
|
||||||
|
|
||||||
qse_skad_t from;
|
qse_skad_t from;
|
||||||
qse_sck_len_t fromlen;
|
qse_sck_len_t fromlen;
|
||||||
qse_uint8_t* data;
|
urs_pkt_t* pkt;
|
||||||
|
|
||||||
xreq_t* next;
|
xreq_t* next;
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -111,7 +121,7 @@ struct rewriter_t
|
|||||||
qse_sck_len_t fromlen;
|
qse_sck_len_t fromlen;
|
||||||
qse_uint16_t urllen;
|
qse_uint16_t urllen;
|
||||||
qse_uint16_t urlpos;
|
qse_uint16_t urlpos;
|
||||||
qse_uint8_t buf[65535];
|
qse_uint8_t buf[MAX_PACKET_SIZE];
|
||||||
qse_uint32_t outlen; /* length of output read from the rewriter */
|
qse_uint32_t outlen; /* length of output read from the rewriter */
|
||||||
} req;
|
} req;
|
||||||
|
|
||||||
@ -125,8 +135,8 @@ struct ursd_t
|
|||||||
qse_mmgr_t* mmgr;
|
qse_mmgr_t* mmgr;
|
||||||
qse_char_t* cmdline;
|
qse_char_t* cmdline;
|
||||||
|
|
||||||
qse_size_t total;
|
qse_size_t total_rewriter_count;
|
||||||
qse_size_t nfree;
|
qse_size_t free_rewriter_count;
|
||||||
|
|
||||||
rewriter_t* rewriters;
|
rewriter_t* rewriters;
|
||||||
rewriter_t* free_rewriter;
|
rewriter_t* free_rewriter;
|
||||||
@ -135,8 +145,18 @@ struct ursd_t
|
|||||||
qse_sck_hnd_t sck;
|
qse_sck_hnd_t sck;
|
||||||
qse_mux_t* mux;
|
qse_mux_t* mux;
|
||||||
|
|
||||||
xreq_t* head;
|
|
||||||
xreq_t* tail;
|
struct
|
||||||
|
{
|
||||||
|
qse_uint8_t buf[MAX_PACKET_SIZE]; /* temporary buffer */
|
||||||
|
|
||||||
|
qse_size_t count; /* number of packets in the request queue */
|
||||||
|
xreq_t* head; /* head of the request queue */
|
||||||
|
xreq_t* tail; /* tail of the request queue */
|
||||||
|
|
||||||
|
xreq_t* free[XREQ_MAX_BLOCKS]; /* xreq avaialble chains per block size */
|
||||||
|
} xreq; /* request queue */
|
||||||
|
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
@ -168,7 +188,7 @@ static qse_sck_hnd_t open_server_socket (int proto, const qse_nwad_t* bindnwad)
|
|||||||
s = socket (family, type, proto);
|
s = socket (family, type, proto);
|
||||||
if (!qse_isvalidsckhnd(s))
|
if (!qse_isvalidsckhnd(s))
|
||||||
{
|
{
|
||||||
fprintf (stderr, "cannot create a socket\n");
|
qse_fprintf (QSE_STDERR, QSE_T("cannot create a socket\n"));
|
||||||
goto oops;
|
goto oops;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -195,7 +215,7 @@ static qse_sck_hnd_t open_server_socket (int proto, const qse_nwad_t* bindnwad)
|
|||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
fprintf (stderr, "cannot bind a socket\n");
|
qse_fprintf (QSE_STDERR, QSE_T("cannot bind a socket\n"));
|
||||||
goto oops;
|
goto oops;
|
||||||
|
|
||||||
}
|
}
|
||||||
@ -214,7 +234,7 @@ bind_ok:
|
|||||||
|
|
||||||
if (setsockopt (s, SOL_SCTP, SCTP_INITMSG, &im, QSE_SIZEOF(im)) <= -1)
|
if (setsockopt (s, SOL_SCTP, SCTP_INITMSG, &im, QSE_SIZEOF(im)) <= -1)
|
||||||
{
|
{
|
||||||
fprintf (stderr, "cannot set sctp initmsg option\n");
|
qse_fprintf (QSE_STDERR, QSE_T("cannot set sctp initmsg option\n"));
|
||||||
goto oops;
|
goto oops;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -228,7 +248,7 @@ bind_ok:
|
|||||||
|
|
||||||
if (listen (s, 99) <= -1)
|
if (listen (s, 99) <= -1)
|
||||||
{
|
{
|
||||||
fprintf (stderr, "cannot set listen on sctp socket\n");
|
qse_fprintf (QSE_STDERR, QSE_T("cannot set listen on sctp socket\n"));
|
||||||
goto oops;
|
goto oops;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -264,12 +284,13 @@ static int delete_from_mux (qse_mux_t* mux, qse_mux_hnd_t handle, int type, int
|
|||||||
|
|
||||||
static void chain_rewriter_to_free_list (ursd_t* ursd, rewriter_t* rewriter)
|
static void chain_rewriter_to_free_list (ursd_t* ursd, rewriter_t* rewriter)
|
||||||
{
|
{
|
||||||
|
/* attach a rewriter to the head of the list */
|
||||||
rewriter->free = 1;
|
rewriter->free = 1;
|
||||||
rewriter->prev = QSE_NULL;
|
rewriter->prev = QSE_NULL;
|
||||||
rewriter->next = ursd->free_rewriter;
|
rewriter->next = ursd->free_rewriter;
|
||||||
if (ursd->free_rewriter) ursd->free_rewriter->prev = rewriter;
|
if (ursd->free_rewriter) ursd->free_rewriter->prev = rewriter;
|
||||||
ursd->free_rewriter = rewriter;
|
ursd->free_rewriter = rewriter;
|
||||||
ursd->nfree++;
|
ursd->free_rewriter_count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
static rewriter_t* dechain_rewriter_from_free_list (ursd_t* ursd, rewriter_t* rewriter)
|
static rewriter_t* dechain_rewriter_from_free_list (ursd_t* ursd, rewriter_t* rewriter)
|
||||||
@ -280,7 +301,7 @@ static rewriter_t* dechain_rewriter_from_free_list (ursd_t* ursd, rewriter_t* re
|
|||||||
else rewriter->prev->next = rewriter->next;
|
else rewriter->prev->next = rewriter->next;
|
||||||
|
|
||||||
rewriter->free = 0;
|
rewriter->free = 0;
|
||||||
ursd->nfree--;
|
ursd->free_rewriter_count--;
|
||||||
return rewriter;
|
return rewriter;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -365,7 +386,9 @@ static rewriter_t* get_free_rewriter (ursd_t* ursd)
|
|||||||
{
|
{
|
||||||
rewriter_t* rewriter;
|
rewriter_t* rewriter;
|
||||||
|
|
||||||
for (rewriter = ursd->free_rewriter; rewriter; rewriter = rewriter->next)
|
rewriter = ursd->free_rewriter;
|
||||||
|
|
||||||
|
if (rewriter)
|
||||||
{
|
{
|
||||||
QSE_ASSERT (!rewriter->busy);
|
QSE_ASSERT (!rewriter->busy);
|
||||||
QSE_ASSERT (!rewriter->faulty);
|
QSE_ASSERT (!rewriter->faulty);
|
||||||
@ -373,15 +396,11 @@ static rewriter_t* get_free_rewriter (ursd_t* ursd)
|
|||||||
|
|
||||||
if (!rewriter->pio) start_rewriter (ursd, rewriter);
|
if (!rewriter->pio) start_rewriter (ursd, rewriter);
|
||||||
|
|
||||||
if (rewriter->pio)
|
dechain_rewriter_from_free_list (ursd, rewriter);
|
||||||
{
|
reset_rewriter_data (rewriter);
|
||||||
dechain_rewriter_from_free_list (ursd, rewriter);
|
|
||||||
reset_rewriter_data (rewriter);
|
|
||||||
return rewriter;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return QSE_NULL;
|
return rewriter;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void release_rewriter (ursd_t* ursd, rewriter_t* rewriter, int send_empty_response)
|
static void release_rewriter (ursd_t* ursd, rewriter_t* rewriter, int send_empty_response)
|
||||||
@ -396,13 +415,14 @@ static void release_rewriter (ursd_t* ursd, rewriter_t* rewriter, int send_empty
|
|||||||
|
|
||||||
if (rewriter->pio_in_in_mux)
|
if (rewriter->pio_in_in_mux)
|
||||||
{
|
{
|
||||||
|
QSE_ASSERT (rewriter->pio);
|
||||||
delete_from_mux (ursd->mux, qse_pio_gethandle(rewriter->pio, QSE_PIO_IN), TYPE_PIO_IN, rewriter->index);
|
delete_from_mux (ursd->mux, qse_pio_gethandle(rewriter->pio, QSE_PIO_IN), TYPE_PIO_IN, rewriter->index);
|
||||||
rewriter->pio_in_in_mux = 0;
|
rewriter->pio_in_in_mux = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rewriter->busy) dechain_rewriter_from_busy_list (ursd, rewriter);
|
if (rewriter->busy) dechain_rewriter_from_busy_list (ursd, rewriter);
|
||||||
|
|
||||||
if (rewriter->faulty)
|
if (rewriter->faulty || !rewriter->pio)
|
||||||
{
|
{
|
||||||
restart_rewriter (ursd, rewriter);
|
restart_rewriter (ursd, rewriter);
|
||||||
|
|
||||||
@ -470,6 +490,124 @@ static int feed_rewriter (ursd_t* ursd, rewriter_t* rewriter)
|
|||||||
return 1; /* the full url has been passed to the rewriter */
|
return 1; /* the full url has been passed to the rewriter */
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int enqueue_request (ursd_t* ursd, urs_pkt_t* pkt, const qse_skad_t* from, qse_sck_len_t fromlen)
|
||||||
|
{
|
||||||
|
qse_uint16_t blkidx;
|
||||||
|
qse_size_t memsize;
|
||||||
|
xreq_t* xreq;
|
||||||
|
|
||||||
|
/* TODO: if (ursd->xreq.count > MAX_QUEUE_SIZE) return -1; */
|
||||||
|
|
||||||
|
blkidx = (pkt->hdr.pktlen - 1) / XREQ_BLOCK_SIZE; /* 0 based */
|
||||||
|
if (blkidx < XREQ_MAX_BLOCKS)
|
||||||
|
{
|
||||||
|
xreq = ursd->xreq.free[blkidx];
|
||||||
|
if (xreq)
|
||||||
|
{
|
||||||
|
ursd->xreq.free[blkidx] = xreq->next;
|
||||||
|
goto copy_packet;
|
||||||
|
}
|
||||||
|
memsize = (blkidx + 1) * XREQ_BLOCK_SIZE;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
memsize = pkt->hdr.pktlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
xreq = QSE_MMGR_ALLOC (ursd->mmgr, QSE_SIZEOF(*xreq) + memsize);
|
||||||
|
if (xreq == QSE_NULL) return -1;
|
||||||
|
|
||||||
|
copy_packet:
|
||||||
|
/* TODOO: xreq->timestamp */
|
||||||
|
xreq->next = QSE_NULL;
|
||||||
|
xreq->from = *from;
|
||||||
|
xreq->fromlen = fromlen;
|
||||||
|
xreq->pkt = (urs_pkt_t*)(xreq + 1);
|
||||||
|
qse_memcpy (xreq->pkt, pkt, pkt->hdr.pktlen);
|
||||||
|
|
||||||
|
if (ursd->xreq.count > 0)
|
||||||
|
{
|
||||||
|
ursd->xreq.tail->next = xreq;
|
||||||
|
ursd->xreq.tail = xreq;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ursd->xreq.head = xreq;
|
||||||
|
ursd->xreq.tail = xreq;
|
||||||
|
}
|
||||||
|
|
||||||
|
ursd->xreq.count++;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static xreq_t* dequeue_request (ursd_t* ursd)
|
||||||
|
{
|
||||||
|
xreq_t* xreq = QSE_NULL;
|
||||||
|
|
||||||
|
if (ursd->xreq.count > 0)
|
||||||
|
{
|
||||||
|
xreq = ursd->xreq.head;
|
||||||
|
|
||||||
|
ursd->xreq.head = xreq->next;
|
||||||
|
if (ursd->xreq.count == 1)
|
||||||
|
ursd->xreq.tail = ursd->xreq.head;
|
||||||
|
|
||||||
|
ursd->xreq.count--;
|
||||||
|
}
|
||||||
|
|
||||||
|
return xreq;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void release_request (ursd_t* ursd, xreq_t* xreq)
|
||||||
|
{
|
||||||
|
qse_uint16_t blkidx;
|
||||||
|
|
||||||
|
blkidx = (xreq->pkt->hdr.pktlen - 1) / XREQ_BLOCK_SIZE;
|
||||||
|
if (blkidx < XREQ_MAX_BLOCKS)
|
||||||
|
{
|
||||||
|
xreq->next = ursd->xreq.free[blkidx];
|
||||||
|
ursd->xreq.free[blkidx] = xreq;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
QSE_MMGR_FREE (ursd->mmgr, xreq);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void handle_pending_requests (ursd_t* ursd)
|
||||||
|
{
|
||||||
|
rewriter_t* rewriter;
|
||||||
|
xreq_t* xreq;
|
||||||
|
|
||||||
|
while (ursd->xreq.count > 0)
|
||||||
|
{
|
||||||
|
rewriter = get_free_rewriter (ursd);
|
||||||
|
if (rewriter)
|
||||||
|
{
|
||||||
|
qse_printf (QSE_T("HANDLING PENDING REQUEST.... %d\n"), (int)ursd->xreq.count);
|
||||||
|
xreq = dequeue_request (ursd);
|
||||||
|
QSE_ASSERT (xreq);
|
||||||
|
|
||||||
|
rewriter->req.fromlen = xreq->fromlen;
|
||||||
|
rewriter->req.from = xreq->from;
|
||||||
|
qse_memcpy (rewriter->req.buf, xreq->pkt, xreq->pkt->hdr.pktlen);
|
||||||
|
rewriter->req.buf[xreq->pkt->hdr.pktlen] = QSE_MT('\n'); /* put a new line at the end */
|
||||||
|
rewriter->req.urlpos = 0;
|
||||||
|
rewriter->req.urllen = xreq->pkt->hdr.pktlen - QSE_SIZEOF(urs_hdr_t) + 1; /* +1 for '\n' */
|
||||||
|
|
||||||
|
release_request (ursd, xreq);
|
||||||
|
|
||||||
|
if (rewriter->pio) feed_rewriter (ursd, rewriter);
|
||||||
|
else release_rewriter (ursd, rewriter, 1);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
qse_printf (QSE_T("NO REWRITER AVAILABLE FOR PENDING REQUEST....%d\n"), (int)ursd->xreq.count);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static void receive_request_from_socket (ursd_t* ursd, const qse_mux_evt_t* evt)
|
static void receive_request_from_socket (ursd_t* ursd, const qse_mux_evt_t* evt)
|
||||||
{
|
{
|
||||||
qse_ssize_t x;
|
qse_ssize_t x;
|
||||||
@ -496,30 +634,56 @@ static void receive_request_from_socket (ursd_t* ursd, const qse_mux_evt_t* evt)
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
rewriter->req.buf[x] = QSE_MT('\n'); /* put a new line at the end */
|
if (rewriter->pio)
|
||||||
rewriter->req.urlpos = 0;
|
{
|
||||||
rewriter->req.urllen = pkt->hdr.pktlen - QSE_SIZEOF(urs_hdr_t) + 1; /* +1 for '\n' */
|
rewriter->req.buf[x] = QSE_MT('\n'); /* put a new line at the end */
|
||||||
|
rewriter->req.urlpos = 0;
|
||||||
|
rewriter->req.urllen = pkt->hdr.pktlen - QSE_SIZEOF(urs_hdr_t) + 1; /* +1 for '\n' */
|
||||||
|
|
||||||
printf ("%d [[[%.*s]]]\n", (int)x, (int)rewriter->req.urllen, pkt->url);
|
qse_printf (QSE_T("%d [[[%.*hs]]]\n"), (int)x, (int)rewriter->req.urllen, pkt->url);
|
||||||
feed_rewriter (ursd, rewriter);
|
feed_rewriter (ursd, rewriter);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* the actual rewriter is defunct. */
|
||||||
|
/* TODO: error logging */
|
||||||
|
qse_printf (QSE_T("rewriter->pio is NULL. ....\n"));
|
||||||
|
release_rewriter (ursd, rewriter, 1);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
||||||
/* TODO */
|
|
||||||
qse_sck_len_t fromlen;
|
qse_sck_len_t fromlen;
|
||||||
qse_skad_t from;
|
qse_skad_t from;
|
||||||
qse_uint8_t buf[65535];
|
|
||||||
urs_pkt_t* pkt;
|
urs_pkt_t* pkt;
|
||||||
|
|
||||||
fromlen = QSE_SIZEOF(from);
|
fromlen = QSE_SIZEOF(from);
|
||||||
x = recvfrom (evt->hnd, buf, QSE_SIZEOF(buf) - 1, 0, (struct sockaddr*)&from, &fromlen);
|
x = recvfrom (evt->hnd, ursd->xreq.buf, QSE_SIZEOF(ursd->xreq.buf) - 1, 0, (struct sockaddr*)&from, &fromlen);
|
||||||
|
|
||||||
/* TODO: queue up in the internal queue instead of returnign empty response */
|
if (x < QSE_SIZEOF(urs_hdr_t))
|
||||||
|
{
|
||||||
|
/* TODO: message logging */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
pkt = (urs_pkt_t*)buf;
|
pkt = (urs_pkt_t*)ursd->xreq.buf;
|
||||||
pkt->hdr.pktlen = qse_hton16(QSE_SIZEOF(urs_hdr_t));
|
pkt->hdr.pktlen = qse_ntoh16(pkt->hdr.pktlen); /* change the byte order */
|
||||||
sendto (evt->hnd, pkt, QSE_SIZEOF(urs_hdr_t), 0, (struct sockaddr*)&from, fromlen);
|
if (pkt->hdr.pktlen != x)
|
||||||
|
{
|
||||||
|
/* TOOD: message logging */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (enqueue_request (ursd, pkt, &from, fromlen) <= -1)
|
||||||
|
{
|
||||||
|
/* TODO: error logging for failed enqueuing */
|
||||||
|
|
||||||
|
/* if enqueuing fails, send an empty response */
|
||||||
|
pkt->hdr.rcode = qse_hton16(URS_RCODE_ERROR);
|
||||||
|
pkt->hdr.pktlen = qse_hton16(QSE_SIZEOF(urs_hdr_t));
|
||||||
|
sendto (evt->hnd, pkt, QSE_SIZEOF(urs_hdr_t), 0, (struct sockaddr*)&from, fromlen);
|
||||||
|
/* TODO: error logging for sendto failure */
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -572,7 +736,7 @@ static void dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt)
|
|||||||
release_rewriter (mux_xtn->ursd, rewriter, 1);
|
release_rewriter (mux_xtn->ursd, rewriter, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
printf ("READ %d, %d bytes from pipes [%.*s]\n", (int)x, (int)rewriter->req.outlen, (int)rewriter->req.outlen, pkt->url);
|
qse_printf (QSE_T("READ %d, %d bytes from pipes [%.*hs]\n"), (int)x, (int)rewriter->req.outlen, (int)rewriter->req.outlen, pkt->url);
|
||||||
|
|
||||||
if (pkt->url[rewriter->req.outlen - 1] == QSE_MT('\n'))
|
if (pkt->url[rewriter->req.outlen - 1] == QSE_MT('\n'))
|
||||||
{
|
{
|
||||||
@ -590,10 +754,8 @@ printf ("READ %d, %d bytes from pipes [%.*s]\n", (int)x, (int)rewriter->req.outl
|
|||||||
* it can wait a while. think about it. */
|
* it can wait a while. think about it. */
|
||||||
|
|
||||||
release_rewriter (mux_xtn->ursd, rewriter, 0);
|
release_rewriter (mux_xtn->ursd, rewriter, 0);
|
||||||
/* TODO: if there is a pending request, schedule another rewriter here. */
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* TODO: is the complete output is not received within time, some actions must be taken. timer based... rewrite timeout */
|
/* TODO: is the complete output is not received within time, some actions must be taken. timer based... rewrite timeout */
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
@ -629,26 +791,28 @@ static int init_ursd (ursd_t* ursd, int npios, const qse_char_t* cmdline, const
|
|||||||
qse_nwad_t bindnwad;
|
qse_nwad_t bindnwad;
|
||||||
mux_xtn_t* mux_xtn;
|
mux_xtn_t* mux_xtn;
|
||||||
|
|
||||||
|
if (npios <= 0) npios = 1;
|
||||||
|
|
||||||
qse_memset (ursd, 0, sizeof(*ursd));
|
qse_memset (ursd, 0, sizeof(*ursd));
|
||||||
ursd->mmgr = QSE_MMGR_GETDFL();
|
ursd->mmgr = QSE_MMGR_GETDFL();
|
||||||
|
|
||||||
ursd->cmdline = qse_strdup (cmdline, ursd->mmgr);
|
ursd->cmdline = qse_strdup (cmdline, ursd->mmgr);
|
||||||
if (ursd->cmdline == QSE_NULL)
|
if (ursd->cmdline == QSE_NULL)
|
||||||
{
|
{
|
||||||
fprintf (stderr, "cannot copy cmdline\n");
|
qse_fprintf (QSE_STDERR, QSE_T("cannot duplicate cmdline\n"));
|
||||||
goto oops;
|
goto oops;
|
||||||
}
|
}
|
||||||
|
|
||||||
ursd->mux = qse_mux_open (ursd->mmgr, QSE_SIZEOF(mux_xtn_t), dispatch_mux_event, 100, QSE_NULL);
|
ursd->mux = qse_mux_open (ursd->mmgr, QSE_SIZEOF(mux_xtn_t), dispatch_mux_event, 100, QSE_NULL);
|
||||||
if (ursd->mux == QSE_NULL)
|
if (ursd->mux == QSE_NULL)
|
||||||
{
|
{
|
||||||
fprintf (stderr, "cannot create a multiplexer\n");
|
qse_fprintf (QSE_STDERR, QSE_T("cannot create a multiplexer\n"));
|
||||||
goto oops;
|
goto oops;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (qse_strtonwad (bindaddr, &bindnwad) <= -1)
|
if (qse_strtonwad (bindaddr, &bindnwad) <= -1)
|
||||||
{
|
{
|
||||||
fprintf (stderr, "invalid binding address\n");
|
qse_fprintf (QSE_STDERR, QSE_T("invalid binding address\n"));
|
||||||
goto oops;
|
goto oops;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -658,7 +822,7 @@ static int init_ursd (ursd_t* ursd, int npios, const qse_char_t* cmdline, const
|
|||||||
ursd->rewriters = QSE_MMGR_ALLOC (ursd->mmgr, npios * QSE_SIZEOF(rewriter_t));
|
ursd->rewriters = QSE_MMGR_ALLOC (ursd->mmgr, npios * QSE_SIZEOF(rewriter_t));
|
||||||
if (ursd->rewriters == QSE_NULL)
|
if (ursd->rewriters == QSE_NULL)
|
||||||
{
|
{
|
||||||
fprintf (stderr, "cannot callocate pipes\n");
|
qse_fprintf (QSE_STDERR, QSE_T("cannot allocate rewriters\n"));
|
||||||
goto oops;
|
goto oops;
|
||||||
}
|
}
|
||||||
qse_memset (ursd->rewriters, 0, npios * QSE_SIZEOF(rewriter_t));
|
qse_memset (ursd->rewriters, 0, npios * QSE_SIZEOF(rewriter_t));
|
||||||
@ -672,11 +836,11 @@ static int init_ursd (ursd_t* ursd, int npios, const qse_char_t* cmdline, const
|
|||||||
|
|
||||||
if (insert_to_mux (ursd->mux, ursd->sck, TYPE_SOCKET, 0) <= -1)
|
if (insert_to_mux (ursd->mux, ursd->sck, TYPE_SOCKET, 0) <= -1)
|
||||||
{
|
{
|
||||||
fprintf (stderr, "cannot add socket to multiplexer\n");
|
qse_fprintf (QSE_STDERR, QSE_T("cannot add socket to multiplexer\n"));
|
||||||
goto oops;
|
goto oops;
|
||||||
}
|
}
|
||||||
|
|
||||||
ursd->total = npios;
|
ursd->total_rewriter_count = npios;
|
||||||
|
|
||||||
mux_xtn = qse_mux_getxtn (ursd->mux);
|
mux_xtn = qse_mux_getxtn (ursd->mux);
|
||||||
mux_xtn->ursd = ursd;
|
mux_xtn->ursd = ursd;
|
||||||
@ -698,20 +862,55 @@ oops:
|
|||||||
static void fini_ursd (ursd_t* ursd)
|
static void fini_ursd (ursd_t* ursd)
|
||||||
{
|
{
|
||||||
qse_size_t i;
|
qse_size_t i;
|
||||||
|
xreq_t* xreq;
|
||||||
|
|
||||||
|
for (i = 0; i < ursd->total_rewriter_count; i++)
|
||||||
|
stop_rewriter (ursd, &ursd->rewriters[i]);
|
||||||
|
|
||||||
for (i = 0; i < ursd->total; i++) stop_rewriter (ursd, &ursd->rewriters[i]);
|
|
||||||
QSE_MMGR_FREE (ursd->mmgr, ursd->rewriters);
|
QSE_MMGR_FREE (ursd->mmgr, ursd->rewriters);
|
||||||
|
|
||||||
delete_from_mux (ursd->mux, ursd->sck, TYPE_SOCKET, 0);
|
delete_from_mux (ursd->mux, ursd->sck, TYPE_SOCKET, 0);
|
||||||
qse_closesckhnd (ursd->sck);
|
qse_closesckhnd (ursd->sck);
|
||||||
qse_mux_close (ursd->mux);
|
qse_mux_close (ursd->mux);
|
||||||
QSE_MMGR_FREE (ursd->mmgr, ursd->cmdline);
|
QSE_MMGR_FREE (ursd->mmgr, ursd->cmdline);
|
||||||
|
|
||||||
|
|
||||||
|
/* destroy the request queue */
|
||||||
|
xreq = ursd->xreq.head;
|
||||||
|
while (xreq)
|
||||||
|
{
|
||||||
|
xreq_t* next = xreq->next;
|
||||||
|
QSE_MMGR_FREE (ursd->mmgr, xreq);
|
||||||
|
xreq = next;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (i = 0; i < QSE_COUNTOF(ursd->xreq.free); i++)
|
||||||
|
{
|
||||||
|
while (ursd->xreq.free[i])
|
||||||
|
{
|
||||||
|
xreq = ursd->xreq.free[i];
|
||||||
|
ursd->xreq.free[i] = xreq->next;
|
||||||
|
QSE_MMGR_FREE (ursd->mmgr, xreq);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static int g_stop_requested = 0;
|
static int g_stop_requested = 0;
|
||||||
static void handle_signal (int sig)
|
static void handle_signal (int sig)
|
||||||
{
|
{
|
||||||
if (sig == SIGINT) g_stop_requested = 1;
|
switch (sig)
|
||||||
|
{
|
||||||
|
case SIGINT:
|
||||||
|
case SIGTERM:
|
||||||
|
case SIGHUP:
|
||||||
|
g_stop_requested = 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void print_usage (qse_char_t* argv0)
|
||||||
|
{
|
||||||
|
qse_fprintf (QSE_STDERR, QSE_T("Usage: %s rewriter-path rewriter-count binding-address\n"), qse_basename(argv0));
|
||||||
}
|
}
|
||||||
|
|
||||||
static int httpd_main (int argc, qse_char_t* argv[])
|
static int httpd_main (int argc, qse_char_t* argv[])
|
||||||
@ -719,18 +918,40 @@ static int httpd_main (int argc, qse_char_t* argv[])
|
|||||||
ursd_t ursd;
|
ursd_t ursd;
|
||||||
int ursd_inited = 0;
|
int ursd_inited = 0;
|
||||||
|
|
||||||
|
|
||||||
|
if (argc < 4)
|
||||||
|
{
|
||||||
|
print_usage (argv[0]);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
signal (SIGINT, handle_signal);
|
signal (SIGINT, handle_signal);
|
||||||
|
signal (SIGTERM, handle_signal);
|
||||||
|
signal (SIGHUP, handle_signal);
|
||||||
signal (SIGPIPE, SIG_IGN);
|
signal (SIGPIPE, SIG_IGN);
|
||||||
|
|
||||||
if (init_ursd (&ursd, 10, QSE_T("/tmp/urs.awk"), QSE_T("[::]:97]")) <= -1) goto oops;
|
if (init_ursd (&ursd, qse_strtoi(argv[2]), argv[1], argv[3]) <= -1) goto oops;
|
||||||
ursd_inited = 1;
|
ursd_inited = 1;
|
||||||
|
|
||||||
while (!g_stop_requested)
|
while (!g_stop_requested)
|
||||||
{
|
{
|
||||||
qse_ntime_t tmout;
|
qse_ntime_t tmout;
|
||||||
qse_cleartime (&tmout);
|
qse_cleartime (&tmout);
|
||||||
tmout.sec += 1;
|
|
||||||
|
/* if there are pending requests, use timeout of 0.
|
||||||
|
* this way, multiplexer events and pending requests can be
|
||||||
|
* handled together. note if there's no free rewriter while
|
||||||
|
* there's a pending request, the request can't be performed.
|
||||||
|
* the free rewriter check is simpler than the actual check
|
||||||
|
* in get_free_rewriter(). it's also possible that no free
|
||||||
|
* rewriter is available after qse_mux_poll(). this timeout
|
||||||
|
* calculation is on the best-effort basis. */
|
||||||
|
if (ursd.xreq.count <= 0 || !ursd.free_rewriter) tmout.sec += 1;
|
||||||
|
/* TODO: add timer also... and consider that in calculating in tmout */
|
||||||
|
|
||||||
qse_mux_poll (ursd.mux, &tmout);
|
qse_mux_poll (ursd.mux, &tmout);
|
||||||
|
|
||||||
|
handle_pending_requests (&ursd);
|
||||||
}
|
}
|
||||||
|
|
||||||
fini_ursd (&ursd);
|
fini_ursd (&ursd);
|
||||||
@ -772,7 +993,7 @@ int qse_main (int argc, qse_achar_t* argv[])
|
|||||||
|
|
||||||
if (WSAStartup (MAKEWORD(2,0), &wsadata) != 0)
|
if (WSAStartup (MAKEWORD(2,0), &wsadata) != 0)
|
||||||
{
|
{
|
||||||
qse_fprintf (QSE_STDERR, QSE_T("Failed to start up winsock\n"));
|
qse_fprintf (QSE_STDERR, QSE_T("failed to start up winsock\n"));
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user