written more ursd code
This commit is contained in:
parent
4195be2c84
commit
f27f5a650e
@ -18,8 +18,6 @@
|
||||
#include <signal.h>
|
||||
#include <locale.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <stdlib.h>
|
||||
|
||||
#if defined(_WIN32)
|
||||
# include <winsock2.h>
|
||||
@ -94,32 +92,45 @@ struct xreq_t
|
||||
xreq_t* next;
|
||||
};
|
||||
|
||||
typedef struct xpio_t xpio_t;
|
||||
struct xpio_t
|
||||
typedef struct rewriter_t rewriter_t;
|
||||
struct rewriter_t
|
||||
{
|
||||
int index; /* index in ursd->rewriters */
|
||||
qse_pio_t* pio;
|
||||
int busy;
|
||||
|
||||
/* ------------------ */
|
||||
|
||||
unsigned int free: 1;
|
||||
unsigned int busy: 1;
|
||||
unsigned int faulty: 1;
|
||||
unsigned int pio_in_in_mux: 1;
|
||||
|
||||
struct
|
||||
{
|
||||
qse_skad_t from;
|
||||
qse_sck_len_t fromlen;
|
||||
qse_uint16_t urllen;
|
||||
qse_uint16_t urlpos;
|
||||
qse_uint8_t buf[65535];
|
||||
qse_uint32_t outlen; /* length of output read from the rewriter */
|
||||
} req;
|
||||
|
||||
xpio_t* prev;
|
||||
xpio_t* next;
|
||||
rewriter_t* prev;
|
||||
rewriter_t* next;
|
||||
};
|
||||
|
||||
typedef struct ursd_t ursd_t;
|
||||
struct ursd_t
|
||||
{
|
||||
qse_mmgr_t* mmgr;
|
||||
qse_char_t* cmdline;
|
||||
|
||||
qse_size_t total;
|
||||
qse_size_t nfree;
|
||||
|
||||
xpio_t* xpios;
|
||||
xpio_t* free_xpio;
|
||||
xpio_t* busy_xpio;
|
||||
rewriter_t* rewriters;
|
||||
rewriter_t* free_rewriter;
|
||||
rewriter_t* busy_rewriter;
|
||||
|
||||
qse_sck_hnd_t sck;
|
||||
qse_mux_t* mux;
|
||||
@ -130,7 +141,8 @@ struct ursd_t
|
||||
|
||||
|
||||
#define TYPE_SOCKET 0
|
||||
#define TYPE_PIO 1
|
||||
#define TYPE_PIO_OUT 1
|
||||
#define TYPE_PIO_IN 2
|
||||
|
||||
#define MAKE_MUX_DATA(type,index) ((qse_uintptr_t)type | ((qse_uintptr_t)index << 4))
|
||||
#define GET_TYPE_FROM_MUX_DATA(md) ((md) & 0xF)
|
||||
@ -142,71 +154,6 @@ struct mux_xtn_t
|
||||
};
|
||||
typedef struct mux_xtn_t mux_xtn_t;
|
||||
|
||||
static void chain_to_free_list (ursd_t* ursd, xpio_t* xpio)
|
||||
{
|
||||
xpio->busy = 0;
|
||||
xpio->prev = QSE_NULL;
|
||||
xpio->next = ursd->free_xpio;
|
||||
if (ursd->free_xpio) ursd->free_xpio->prev = xpio;
|
||||
ursd->free_xpio = xpio;
|
||||
ursd->nfree++;
|
||||
}
|
||||
|
||||
static xpio_t* dechain_from_free_list (ursd_t* ursd, xpio_t* xpio)
|
||||
{
|
||||
if (xpio->next) xpio->next->prev = xpio->prev;
|
||||
if (xpio == ursd->free_xpio) ursd->free_xpio = xpio->next;
|
||||
else xpio->prev->next = xpio->next;
|
||||
ursd->nfree--;
|
||||
return xpio;
|
||||
}
|
||||
|
||||
static void chain_to_busy_list (ursd_t* ursd, xpio_t* xpio)
|
||||
{
|
||||
xpio->busy = 1;
|
||||
xpio->prev = QSE_NULL;
|
||||
xpio->next = ursd->busy_xpio;
|
||||
if (ursd->busy_xpio) ursd->busy_xpio->prev = xpio;
|
||||
ursd->busy_xpio = xpio;
|
||||
}
|
||||
|
||||
static xpio_t* dechain_from_busy_list (ursd_t* ursd, xpio_t* xpio)
|
||||
{
|
||||
if (xpio->next) xpio->next->prev = xpio->prev;
|
||||
|
||||
if (xpio == ursd->busy_xpio) ursd->busy_xpio = xpio->next;
|
||||
else xpio->prev->next = xpio->next;
|
||||
|
||||
return xpio;
|
||||
}
|
||||
|
||||
static xreq_t* enqueue_request (ursd_t* ursd, urs_pkt_t* pkt)
|
||||
{
|
||||
return QSE_NULL;
|
||||
}
|
||||
|
||||
static int insert_to_mux (qse_mux_t* mux, qse_mux_hnd_t handle, int type, int index)
|
||||
{
|
||||
qse_mux_evt_t evt;
|
||||
|
||||
memset (&evt, 0, QSE_SIZEOF(evt));
|
||||
evt.hnd = handle;
|
||||
evt.mask = QSE_MUX_IN;
|
||||
evt.data = MAKE_MUX_DATA(type, index);
|
||||
return qse_mux_insert (mux, &evt);
|
||||
}
|
||||
|
||||
static int delete_from_mux (qse_mux_t* mux, qse_mux_hnd_t handle, int type, int index)
|
||||
{
|
||||
qse_mux_evt_t evt;
|
||||
|
||||
memset (&evt, 0, QSE_SIZEOF(evt));
|
||||
evt.hnd = handle;
|
||||
evt.mask = QSE_MUX_IN;
|
||||
evt.data = MAKE_MUX_DATA(type, index);
|
||||
return qse_mux_delete (mux, &evt);
|
||||
}
|
||||
|
||||
static qse_sck_hnd_t open_server_socket (int proto, const qse_nwad_t* bindnwad)
|
||||
{
|
||||
qse_sck_hnd_t s = QSE_INVALID_SCKHND;
|
||||
@ -225,6 +172,8 @@ static qse_sck_hnd_t open_server_socket (int proto, const qse_nwad_t* bindnwad)
|
||||
goto oops;
|
||||
}
|
||||
|
||||
/* TODO: increase the socket buffer size, especially the output buffer size */
|
||||
|
||||
#if defined(FD_CLOEXEC)
|
||||
flag = fcntl (s, F_GETFD);
|
||||
if (flag >= 0) fcntl (s, F_SETFD, flag | FD_CLOEXEC);
|
||||
@ -291,25 +240,286 @@ oops:
|
||||
return QSE_INVALID_SCKHND;
|
||||
}
|
||||
|
||||
static void schedule_request (ursd_t* ursd, urs_pkt_t* pkt, const qse_skad_t* skad, qse_sck_len_t skadlen)
|
||||
static int insert_to_mux (qse_mux_t* mux, qse_mux_hnd_t handle, int type, int index)
|
||||
{
|
||||
if (ursd->free_xpio)
|
||||
qse_mux_evt_t evt;
|
||||
|
||||
qse_memset (&evt, 0, QSE_SIZEOF(evt));
|
||||
evt.hnd = handle;
|
||||
evt.mask = (type == TYPE_PIO_IN? QSE_MUX_OUT: QSE_MUX_IN);
|
||||
evt.data = MAKE_MUX_DATA(type, index);
|
||||
return qse_mux_insert (mux, &evt);
|
||||
}
|
||||
|
||||
static int delete_from_mux (qse_mux_t* mux, qse_mux_hnd_t handle, int type, int index)
|
||||
{
|
||||
qse_mux_evt_t evt;
|
||||
|
||||
qse_memset (&evt, 0, QSE_SIZEOF(evt));
|
||||
evt.hnd = handle;
|
||||
evt.mask = (type == TYPE_PIO_IN? QSE_MUX_OUT: QSE_MUX_IN);
|
||||
evt.data = MAKE_MUX_DATA(type, index);
|
||||
return qse_mux_delete (mux, &evt);
|
||||
}
|
||||
|
||||
static void chain_rewriter_to_free_list (ursd_t* ursd, rewriter_t* rewriter)
|
||||
{
|
||||
rewriter->free = 1;
|
||||
rewriter->prev = QSE_NULL;
|
||||
rewriter->next = ursd->free_rewriter;
|
||||
if (ursd->free_rewriter) ursd->free_rewriter->prev = rewriter;
|
||||
ursd->free_rewriter = rewriter;
|
||||
ursd->nfree++;
|
||||
}
|
||||
|
||||
static rewriter_t* dechain_rewriter_from_free_list (ursd_t* ursd, rewriter_t* rewriter)
|
||||
{
|
||||
if (rewriter->next) rewriter->next->prev = rewriter->prev;
|
||||
|
||||
if (rewriter == ursd->free_rewriter) ursd->free_rewriter = rewriter->next;
|
||||
else rewriter->prev->next = rewriter->next;
|
||||
|
||||
rewriter->free = 0;
|
||||
ursd->nfree--;
|
||||
return rewriter;
|
||||
}
|
||||
|
||||
static void chain_rewriter_to_busy_list (ursd_t* ursd, rewriter_t* rewriter)
|
||||
{
|
||||
rewriter->busy = 1;
|
||||
rewriter->prev = QSE_NULL;
|
||||
rewriter->next = ursd->busy_rewriter;
|
||||
if (ursd->busy_rewriter) ursd->busy_rewriter->prev = rewriter;
|
||||
ursd->busy_rewriter = rewriter;
|
||||
}
|
||||
|
||||
static rewriter_t* dechain_rewriter_from_busy_list (ursd_t* ursd, rewriter_t* rewriter)
|
||||
{
|
||||
if (rewriter->next) rewriter->next->prev = rewriter->prev;
|
||||
|
||||
if (rewriter == ursd->busy_rewriter) ursd->busy_rewriter = rewriter->next;
|
||||
else rewriter->prev->next = rewriter->next;
|
||||
|
||||
rewriter->busy = 0;
|
||||
return rewriter;
|
||||
}
|
||||
|
||||
static void start_rewriter (ursd_t* ursd, rewriter_t* rewriter)
|
||||
{
|
||||
QSE_ASSERT (rewriter->pio == QSE_NULL);
|
||||
|
||||
rewriter->pio = qse_pio_open (
|
||||
ursd->mmgr, 0, ursd->cmdline, QSE_NULL,
|
||||
QSE_PIO_WRITEIN | QSE_PIO_READOUT | QSE_PIO_ERRTONUL |
|
||||
QSE_PIO_INNOBLOCK | QSE_PIO_OUTNOBLOCK
|
||||
);
|
||||
|
||||
if (rewriter->pio)
|
||||
{
|
||||
xpio_t* xpio = dechain_from_free_list (ursd, ursd->free_xpio);
|
||||
if (insert_to_mux (ursd->mux, qse_pio_gethandle(rewriter->pio, QSE_PIO_OUT), TYPE_PIO_OUT, rewriter->index) <= -1)
|
||||
{
|
||||
/* error logging */
|
||||
qse_pio_kill (rewriter->pio);
|
||||
qse_pio_close (rewriter->pio);
|
||||
rewriter->pio = QSE_NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
xpio->req.from = *skad;
|
||||
xpio->req.fromlen = skadlen;
|
||||
qse_memcpy (xpio->req.buf, pkt, QSE_SIZEOF(urs_hdr_t)); /* copy header */
|
||||
static void stop_rewriter (ursd_t* ursd, rewriter_t* rewriter)
|
||||
{
|
||||
if (rewriter->pio)
|
||||
{
|
||||
if (rewriter->pio_in_in_mux)
|
||||
{
|
||||
delete_from_mux (ursd->mux, qse_pio_gethandle(rewriter->pio, QSE_PIO_IN), TYPE_PIO_IN, rewriter->index);
|
||||
rewriter->pio_in_in_mux = 0;
|
||||
}
|
||||
|
||||
printf ("XPIO WRITNG TO PIPE %p %d\n", xpio, qse_skadfamily(skad));
|
||||
qse_pio_write (xpio->pio, QSE_PIO_IN, pkt->url, pkt->hdr.pktlen - QSE_SIZEOF(urs_hdr_t)); /* TODO: error handling */
|
||||
chain_to_busy_list (ursd, xpio);
|
||||
delete_from_mux (ursd->mux, qse_pio_gethandle(rewriter->pio, QSE_PIO_OUT), TYPE_PIO_OUT, rewriter->index);
|
||||
|
||||
qse_pio_kill (rewriter->pio);
|
||||
qse_pio_close (rewriter->pio);
|
||||
rewriter->pio = QSE_NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static void restart_rewriter (ursd_t* ursd, rewriter_t* rewriter)
|
||||
{
|
||||
stop_rewriter (ursd, rewriter);
|
||||
start_rewriter (ursd, rewriter);
|
||||
}
|
||||
|
||||
static void reset_rewriter_data (rewriter_t* rewriter)
|
||||
{
|
||||
int index = rewriter->index;
|
||||
qse_pio_t* pio = rewriter->pio;
|
||||
|
||||
qse_memset (rewriter, 0, QSE_SIZEOF(*rewriter));
|
||||
|
||||
rewriter->index = index;
|
||||
rewriter->pio = pio;
|
||||
}
|
||||
|
||||
static rewriter_t* get_free_rewriter (ursd_t* ursd)
|
||||
{
|
||||
rewriter_t* rewriter;
|
||||
|
||||
for (rewriter = ursd->free_rewriter; rewriter; rewriter = rewriter->next)
|
||||
{
|
||||
QSE_ASSERT (!rewriter->busy);
|
||||
QSE_ASSERT (!rewriter->faulty);
|
||||
QSE_ASSERT (!rewriter->pio_in_in_mux);
|
||||
|
||||
if (!rewriter->pio) start_rewriter (ursd, rewriter);
|
||||
|
||||
if (rewriter->pio)
|
||||
{
|
||||
dechain_rewriter_from_free_list (ursd, rewriter);
|
||||
reset_rewriter_data (rewriter);
|
||||
return rewriter;
|
||||
}
|
||||
}
|
||||
|
||||
return QSE_NULL;
|
||||
}
|
||||
|
||||
static void release_rewriter (ursd_t* ursd, rewriter_t* rewriter, int send_empty_response)
|
||||
{
|
||||
if (send_empty_response)
|
||||
{
|
||||
urs_pkt_t* pkt = (urs_pkt_t*)rewriter->req.buf;
|
||||
pkt->hdr.pktlen = qse_ntoh16(QSE_SIZEOF(urs_hdr_t));
|
||||
sendto (ursd->sck, pkt, QSE_SIZEOF(urs_hdr_t), 0, (struct sockaddr*)&rewriter->req.from, rewriter->req.fromlen);
|
||||
/* TOOD: error logging. if this fails, the client side should resend a request or just time out. */
|
||||
}
|
||||
|
||||
if (rewriter->pio_in_in_mux)
|
||||
{
|
||||
delete_from_mux (ursd->mux, qse_pio_gethandle(rewriter->pio, QSE_PIO_IN), TYPE_PIO_IN, rewriter->index);
|
||||
rewriter->pio_in_in_mux = 0;
|
||||
}
|
||||
|
||||
if (rewriter->busy) dechain_rewriter_from_busy_list (ursd, rewriter);
|
||||
|
||||
if (rewriter->faulty)
|
||||
{
|
||||
restart_rewriter (ursd, rewriter);
|
||||
|
||||
/* NOTE: start may fail in restart_rewriter(),
|
||||
* meaning rewrite->pio can still be null. */
|
||||
|
||||
rewriter->faulty = 0;
|
||||
}
|
||||
|
||||
chain_rewriter_to_free_list (ursd, rewriter);
|
||||
}
|
||||
|
||||
|
||||
static void seize_rewriter (ursd_t* ursd, rewriter_t* rewriter)
|
||||
{
|
||||
/* call this function once the request from the socket has been
|
||||
* fully written to the rewriter. */
|
||||
|
||||
if (rewriter->pio_in_in_mux)
|
||||
{
|
||||
delete_from_mux (ursd->mux, qse_pio_gethandle(rewriter->pio, QSE_PIO_IN), TYPE_PIO_IN, rewriter->index);
|
||||
rewriter->pio_in_in_mux = 0;
|
||||
}
|
||||
|
||||
QSE_ASSERT (!rewriter->faulty);
|
||||
QSE_ASSERT (!rewriter->busy);
|
||||
|
||||
chain_rewriter_to_busy_list (ursd, rewriter);
|
||||
}
|
||||
|
||||
static int feed_rewriter (ursd_t* ursd, rewriter_t* rewriter)
|
||||
{
|
||||
qse_ssize_t x;
|
||||
urs_pkt_t* pkt = (urs_pkt_t*)rewriter->req.buf;
|
||||
|
||||
while (rewriter->req.urlpos < rewriter->req.urllen)
|
||||
{
|
||||
x = qse_pio_write (rewriter->pio, QSE_PIO_IN, &pkt->url[rewriter->req.urlpos], rewriter->req.urllen - rewriter->req.urlpos);
|
||||
if (x <= -1)
|
||||
{
|
||||
if (qse_pio_geterrnum(rewriter->pio) == QSE_PIO_EAGAIN)
|
||||
{
|
||||
if (rewriter->pio_in_in_mux ||
|
||||
insert_to_mux (ursd->mux, qse_pio_gethandle(rewriter->pio, QSE_PIO_IN), TYPE_PIO_IN, rewriter->index) >= 0)
|
||||
{
|
||||
/* this is partial success. the request has not
|
||||
been passed to the rewriter in its entirety yet. */
|
||||
rewriter->pio_in_in_mux = 1;
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
/* reclaim the rewriter since it seems faulty */
|
||||
rewriter->faulty = 1;
|
||||
release_rewriter (ursd, rewriter, 1);
|
||||
return -1;
|
||||
}
|
||||
|
||||
rewriter->req.urlpos += x;
|
||||
}
|
||||
|
||||
/* full feeding is completed - the entire request has
|
||||
* been passed to the rewriter */
|
||||
seize_rewriter (ursd, rewriter);
|
||||
return 1; /* the full url has been passed to the rewriter */
|
||||
}
|
||||
|
||||
static void receive_request_from_socket (ursd_t* ursd, const qse_mux_evt_t* evt)
|
||||
{
|
||||
qse_ssize_t x;
|
||||
urs_pkt_t* pkt;
|
||||
rewriter_t* rewriter;
|
||||
|
||||
rewriter = get_free_rewriter (ursd);
|
||||
if (rewriter)
|
||||
{
|
||||
rewriter->req.fromlen = QSE_SIZEOF(rewriter->req.from);
|
||||
x = recvfrom (evt->hnd, rewriter->req.buf, QSE_SIZEOF(rewriter->req.buf) - 1, 0, (struct sockaddr*)&rewriter->req.from, &rewriter->req.fromlen);
|
||||
/*TODO: improve error handling */
|
||||
if (x < QSE_SIZEOF(urs_hdr_t))
|
||||
{
|
||||
/* TODO: message logging */
|
||||
return;
|
||||
}
|
||||
|
||||
pkt = (urs_pkt_t*)rewriter->req.buf;
|
||||
pkt->hdr.pktlen = qse_ntoh16(pkt->hdr.pktlen); /* change the byte order */
|
||||
if (pkt->hdr.pktlen != x)
|
||||
{
|
||||
/* TOOD: message logging */
|
||||
return;
|
||||
}
|
||||
|
||||
rewriter->req.buf[x] = QSE_MT('\n'); /* put a new line at the end */
|
||||
rewriter->req.urlpos = 0;
|
||||
rewriter->req.urllen = pkt->hdr.pktlen - QSE_SIZEOF(urs_hdr_t) + 1; /* +1 for '\n' */
|
||||
|
||||
printf ("%d [[[%.*s]]]\n", (int)x, (int)rewriter->req.urllen, pkt->url);
|
||||
feed_rewriter (ursd, rewriter);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* queue up in the internal queue... */
|
||||
|
||||
/* TODO */
|
||||
qse_sck_len_t fromlen;
|
||||
qse_skad_t from;
|
||||
qse_uint8_t buf[65535];
|
||||
urs_pkt_t* pkt;
|
||||
|
||||
fromlen = QSE_SIZEOF(from);
|
||||
x = recvfrom (evt->hnd, buf, QSE_SIZEOF(buf) - 1, 0, (struct sockaddr*)&from, &fromlen);
|
||||
|
||||
/* TODO: queue up in the internal queue instead of returnign empty response */
|
||||
|
||||
pkt = (urs_pkt_t*)buf;
|
||||
pkt->hdr.pktlen = qse_hton16(QSE_SIZEOF(urs_hdr_t));
|
||||
sendto (evt->hnd, pkt, QSE_SIZEOF(urs_hdr_t), 0, (struct sockaddr*)&from, fromlen);
|
||||
}
|
||||
}
|
||||
|
||||
@ -317,72 +527,98 @@ static void dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt)
|
||||
{
|
||||
mux_xtn_t* mux_xtn;
|
||||
int type, index;
|
||||
qse_skad_t skad;
|
||||
qse_sck_len_t skad_len;
|
||||
|
||||
unsigned char buf[65535];
|
||||
|
||||
mux_xtn = (mux_xtn_t*)qse_mux_getxtn(mux);
|
||||
|
||||
type = GET_TYPE_FROM_MUX_DATA((qse_uintptr_t)evt->data);
|
||||
index = GET_INDEX_FROM_MUX_DATA((qse_uintptr_t)evt->data);
|
||||
|
||||
if (type == TYPE_SOCKET)
|
||||
switch (type)
|
||||
{
|
||||
ssize_t x;
|
||||
|
||||
skad_len = QSE_SIZEOF(skad);
|
||||
x = recvfrom (evt->hnd, buf, QSE_SIZEOF(buf), 0, (struct sockaddr*)&skad, &skad_len);
|
||||
|
||||
printf ("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXx\n");
|
||||
/* TODO: error handling */
|
||||
if (x >= QSE_SIZEOF(urs_hdr_t))
|
||||
case TYPE_SOCKET:
|
||||
{
|
||||
urs_pkt_t* pkt = (urs_pkt_t*)buf;
|
||||
|
||||
pkt->hdr.pktlen = qse_ntoh16(pkt->hdr.pktlen); /* change the byte order */
|
||||
if (pkt->hdr.pktlen == x)
|
||||
{
|
||||
printf ("%d [[[%.*s]]]\n", (int)x, (int)(pkt->hdr.pktlen - QSE_SIZEOF(urs_hdr_t)), pkt->url);
|
||||
schedule_request (mux_xtn->ursd, pkt, &skad, skad_len);
|
||||
receive_request_from_socket (mux_xtn->ursd, evt);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
else
|
||||
|
||||
case TYPE_PIO_OUT:
|
||||
{
|
||||
/* the rewriter has produced some data */
|
||||
|
||||
qse_ssize_t x;
|
||||
urs_pkt_t* pkt;
|
||||
xpio_t* xpio = &mux_xtn->ursd->xpios[index];
|
||||
qse_size_t maxoutlen;
|
||||
|
||||
if (xpio->busy)
|
||||
rewriter_t* rewriter = &mux_xtn->ursd->rewriters[index];
|
||||
|
||||
if (rewriter->busy)
|
||||
{
|
||||
pkt = (urs_pkt_t*)xpio->req.buf;
|
||||
pkt = (urs_pkt_t*)rewriter->req.buf;
|
||||
maxoutlen = QSE_SIZEOF(rewriter->req.buf) - QSE_SIZEOF(urs_hdr_t);
|
||||
|
||||
x = qse_pio_read (xpio->pio, QSE_PIO_OUT, pkt->url, QSE_SIZEOF(xpio->req.buf) - QSE_SIZEOF(urs_hdr_t));
|
||||
printf ("READ %d bytes from pipes [%.*s]\n", (int)x, (int)x, pkt->url);
|
||||
|
||||
x += QSE_SIZEOF(urs_hdr_t); /* add up the header size */
|
||||
if (x > QSE_TYPE_MAX(qse_uint16_t))
|
||||
x = qse_pio_read (rewriter->pio, QSE_PIO_OUT, &pkt->url[rewriter->req.outlen], maxoutlen - rewriter->req.outlen);
|
||||
if (x <= 0)
|
||||
{
|
||||
/* ERROR HANDLING - it's returning too long data */
|
||||
/* read failure or end of input */
|
||||
rewriter->faulty = 1;
|
||||
release_rewriter (mux_xtn->ursd, rewriter, 1);
|
||||
}
|
||||
|
||||
pkt->hdr.pktlen = qse_hton16(x); /* change the byte order */
|
||||
sendto (mux_xtn->ursd->sck, pkt, x, 0, (struct sockaddr*)&xpio->req.from, xpio->req.fromlen);
|
||||
rewriter->req.outlen += x;
|
||||
if (rewriter->req.outlen > maxoutlen)
|
||||
{
|
||||
/* the rewriter returns too long a result */
|
||||
rewriter->faulty = 1;
|
||||
release_rewriter (mux_xtn->ursd, rewriter, 1);
|
||||
}
|
||||
|
||||
/* TODO: error handling */
|
||||
printf ("READ %d, %d bytes from pipes [%.*s]\n", (int)x, (int)rewriter->req.outlen, (int)rewriter->req.outlen, pkt->url);
|
||||
|
||||
/* TODO: if there is a pending request, use this xpio to send request... */
|
||||
if (pkt->url[rewriter->req.outlen - 1] == QSE_MT('\n'))
|
||||
{
|
||||
/* the last byte is a new line. i don't really care about
|
||||
* new lines in the middle of data. the rewriter must
|
||||
* keep to the protocol. */
|
||||
|
||||
dechain_from_busy_list (mux_xtn->ursd, xpio);
|
||||
chain_to_free_list (mux_xtn->ursd, xpio);
|
||||
/* add up the header size. -1 to exclude '\n' */
|
||||
rewriter->req.outlen += QSE_SIZEOF(urs_hdr_t) - 1;
|
||||
|
||||
pkt->hdr.pktlen = qse_hton16(rewriter->req.outlen); /* change the byte order */
|
||||
sendto (mux_xtn->ursd->sck, pkt, rewriter->req.outlen, 0, (struct sockaddr*)&rewriter->req.from, rewriter->req.fromlen);
|
||||
/* TODO: error logging */
|
||||
/* sendto() to the socket can be blocking. if the socket side is too busy, there's no reason for rewriter to be as busy.
|
||||
* it can wait a while. think about it. */
|
||||
|
||||
release_rewriter (mux_xtn->ursd, rewriter, 0);
|
||||
/* TODO: if there is a pending request, schedule another rewriter here. */
|
||||
}
|
||||
|
||||
|
||||
/* TODO: is the complete output is not received within time, some actions must be taken. timer based... rewrite timeout */
|
||||
}
|
||||
else
|
||||
{
|
||||
/* something is wrong. if the child process writes something
|
||||
* while it's not given any input. restart this process */
|
||||
/* something is wrong. if the rewriter process writes something
|
||||
* while it's not given the full input. reclaim it */
|
||||
rewriter->faulty = 1;
|
||||
release_rewriter (mux_xtn->ursd, rewriter, 1);
|
||||
}
|
||||
|
||||
/* TODO: */
|
||||
break;
|
||||
}
|
||||
|
||||
case TYPE_PIO_IN:
|
||||
{
|
||||
/* the pipe to the rewriter is writable.
|
||||
* pass the leftover to the rewriter */
|
||||
|
||||
rewriter_t* rewriter = &mux_xtn->ursd->rewriters[index];
|
||||
|
||||
QSE_ASSERT (rewriter->pio_in_in_mux);
|
||||
QSE_ASSERT (!rewriter->free && !rewriter->busy);
|
||||
|
||||
feed_rewriter (mux_xtn->ursd, rewriter);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -393,9 +629,17 @@ static int init_ursd (ursd_t* ursd, int npios, const qse_char_t* cmdline, const
|
||||
qse_nwad_t bindnwad;
|
||||
mux_xtn_t* mux_xtn;
|
||||
|
||||
memset (ursd, 0, sizeof(*ursd));
|
||||
qse_memset (ursd, 0, sizeof(*ursd));
|
||||
ursd->mmgr = QSE_MMGR_GETDFL();
|
||||
|
||||
ursd->mux = qse_mux_open (QSE_MMGR_GETDFL(), QSE_SIZEOF(mux_xtn_t), dispatch_mux_event, 100, QSE_NULL);
|
||||
ursd->cmdline = qse_strdup (cmdline, ursd->mmgr);
|
||||
if (ursd->cmdline == QSE_NULL)
|
||||
{
|
||||
fprintf (stderr, "cannot copy cmdline\n");
|
||||
goto oops;
|
||||
}
|
||||
|
||||
ursd->mux = qse_mux_open (ursd->mmgr, QSE_SIZEOF(mux_xtn_t), dispatch_mux_event, 100, QSE_NULL);
|
||||
if (ursd->mux == QSE_NULL)
|
||||
{
|
||||
fprintf (stderr, "cannot create a multiplexer\n");
|
||||
@ -409,26 +653,21 @@ static int init_ursd (ursd_t* ursd, int npios, const qse_char_t* cmdline, const
|
||||
}
|
||||
|
||||
ursd->sck = open_server_socket (/*IPPROTO_SCTP*/IPPROTO_UDP, &bindnwad);
|
||||
if (ursd->sck == QSE_INVALID_SCKHND) goto oops;
|
||||
|
||||
ursd->xpios = calloc (npios, QSE_SIZEOF(xpio_t));
|
||||
if (ursd->xpios == QSE_NULL)
|
||||
ursd->rewriters = QSE_MMGR_ALLOC (ursd->mmgr, npios * QSE_SIZEOF(rewriter_t));
|
||||
if (ursd->rewriters == QSE_NULL)
|
||||
{
|
||||
fprintf (stderr, "cannot callocate pipes\n");
|
||||
goto oops;
|
||||
}
|
||||
qse_memset (ursd->rewriters, 0, npios * QSE_SIZEOF(rewriter_t));
|
||||
|
||||
for (i = 0; i < npios; i++)
|
||||
{
|
||||
qse_pio_t* pio;
|
||||
|
||||
pio = qse_pio_open (
|
||||
QSE_MMGR_GETDFL(), 0, cmdline, QSE_NULL,
|
||||
QSE_PIO_WRITEIN | QSE_PIO_READOUT | QSE_PIO_ERRTONUL |
|
||||
QSE_PIO_INNOBLOCK | QSE_PIO_OUTNOBLOCK
|
||||
);
|
||||
if (pio == QSE_NULL) goto oops;
|
||||
|
||||
ursd->xpios[i].pio = pio;
|
||||
ursd->rewriters[i].index = i;
|
||||
start_rewriter (ursd, &ursd->rewriters[i]);
|
||||
release_rewriter (ursd, &ursd->rewriters[i], 0);
|
||||
}
|
||||
|
||||
if (insert_to_mux (ursd->mux, ursd->sck, TYPE_SOCKET, 0) <= -1)
|
||||
@ -436,16 +675,6 @@ static int init_ursd (ursd_t* ursd, int npios, const qse_char_t* cmdline, const
|
||||
fprintf (stderr, "cannot add socket to multiplexer\n");
|
||||
goto oops;
|
||||
}
|
||||
for (i = 0; i < npios; i++)
|
||||
{
|
||||
if (insert_to_mux (ursd->mux, qse_pio_gethandle(ursd->xpios[i].pio, QSE_PIO_OUT), TYPE_PIO, i) <= -1)
|
||||
{
|
||||
fprintf (stderr, "cannot add pipe to multiplexer\n");
|
||||
goto oops;
|
||||
}
|
||||
}
|
||||
|
||||
for (i = 0; i < npios; i++) chain_to_free_list (ursd, &ursd->xpios[i]);
|
||||
|
||||
ursd->total = npios;
|
||||
|
||||
@ -454,13 +683,14 @@ static int init_ursd (ursd_t* ursd, int npios, const qse_char_t* cmdline, const
|
||||
return 0;
|
||||
|
||||
oops:
|
||||
if (ursd->mux) qse_mux_close (ursd->mux);
|
||||
|
||||
for (i = 0; i < npios; i++)
|
||||
if (ursd->rewriters)
|
||||
{
|
||||
if (ursd->xpios[i].pio) qse_pio_close (ursd->xpios[i].pio);
|
||||
for (i = 0; i < npios; i++) stop_rewriter (ursd, &ursd->rewriters[i]);
|
||||
QSE_MMGR_FREE (ursd->mmgr, ursd->rewriters);
|
||||
}
|
||||
if (ursd->xpios) free (ursd->xpios);
|
||||
if (qse_isvalidsckhnd(ursd->sck)) qse_closesckhnd (ursd->sck);
|
||||
if (ursd->mux) qse_mux_close (ursd->mux);
|
||||
if (ursd->cmdline) QSE_MMGR_FREE(ursd->mmgr, ursd->cmdline);
|
||||
|
||||
return -1;
|
||||
}
|
||||
@ -469,16 +699,19 @@ static void fini_ursd (ursd_t* ursd)
|
||||
{
|
||||
qse_size_t i;
|
||||
|
||||
/* destroy the multiplex first. i don't want to delete handles explicitly */
|
||||
qse_mux_close (ursd->mux);
|
||||
|
||||
for (i = 0; i < ursd->total; i++)
|
||||
{
|
||||
if (ursd->xpios[i].pio) qse_pio_close (ursd->xpios[i].pio);
|
||||
}
|
||||
if (ursd->xpios) free (ursd->xpios);
|
||||
for (i = 0; i < ursd->total; i++) stop_rewriter (ursd, &ursd->rewriters[i]);
|
||||
QSE_MMGR_FREE (ursd->mmgr, ursd->rewriters);
|
||||
|
||||
delete_from_mux (ursd->mux, ursd->sck, TYPE_SOCKET, 0);
|
||||
qse_closesckhnd (ursd->sck);
|
||||
qse_mux_close (ursd->mux);
|
||||
QSE_MMGR_FREE (ursd->mmgr, ursd->cmdline);
|
||||
}
|
||||
|
||||
static int g_stop_requested = 0;
|
||||
static void handle_signal (int sig)
|
||||
{
|
||||
if (sig == SIGINT) g_stop_requested = 1;
|
||||
}
|
||||
|
||||
static int httpd_main (int argc, qse_char_t* argv[])
|
||||
@ -486,10 +719,13 @@ static int httpd_main (int argc, qse_char_t* argv[])
|
||||
ursd_t ursd;
|
||||
int ursd_inited = 0;
|
||||
|
||||
if (init_ursd (&ursd, 10, QSE_T("/bin/cat"), QSE_T("[::]:97]")) <= -1) goto oops;
|
||||
signal (SIGINT, handle_signal);
|
||||
signal (SIGPIPE, SIG_IGN);
|
||||
|
||||
if (init_ursd (&ursd, 10, QSE_T("/tmp/urs.awk"), QSE_T("[::]:97]")) <= -1) goto oops;
|
||||
ursd_inited = 1;
|
||||
|
||||
while (1)
|
||||
while (!g_stop_requested)
|
||||
{
|
||||
qse_ntime_t tmout;
|
||||
qse_cleartime (&tmout);
|
||||
@ -545,7 +781,6 @@ int qse_main (int argc, qse_achar_t* argv[])
|
||||
qse_setdflcmgrbyid (QSE_CMGR_SLMB);
|
||||
#endif
|
||||
|
||||
|
||||
qse_openstdsios ();
|
||||
ret = qse_runmain (argc, argv, httpd_main);
|
||||
qse_closestdsios ();
|
||||
|
@ -1946,6 +1946,7 @@ static void on_url_rewritten (qse_httpd_t* httpd, const qse_mchar_t* url, const
|
||||
|
||||
proxy->flags &= ~PROXY_REWRITE_URL;
|
||||
|
||||
printf ("XXXXXXXXXXXXXXXXXXXXXXXXXX URL REWRITTEN TO [%s].....\n", new_url);
|
||||
if (new_url[0] == QSE_MT('\0'))
|
||||
{
|
||||
/* no change. carry on */
|
||||
|
@ -221,10 +221,11 @@ static int urs_recv (qse_httpd_t* httpd, qse_httpd_urs_t* urs, qse_ubi_t handle)
|
||||
urs_pkt_t* pkt;
|
||||
urs_req_t* req;
|
||||
|
||||
printf ("URS_RECV....\n");
|
||||
printf ("URS_RECV............................................\n");
|
||||
|
||||
httpd_xtn = qse_httpd_getxtn (httpd);
|
||||
|
||||
/* TODO: use recvmsg with MSG_ERRQUEUE... set socket option IP_RECVERR... */
|
||||
fromlen = QSE_SIZEOF(fromaddr);
|
||||
len = recvfrom (handle.i, dc->rcvbuf, QSE_SIZEOF(dc->rcvbuf) - 1, 0, (struct sockaddr*)&fromaddr, &fromlen);
|
||||
|
||||
@ -348,6 +349,7 @@ static int urs_send (qse_httpd_t* httpd, qse_httpd_urs_t* urs, const qse_mchar_t
|
||||
qse_tmr_event_t tmout_event;
|
||||
|
||||
|
||||
printf ("... URS_SEND.....................\n");
|
||||
httpd_xtn = qse_httpd_getxtn (httpd);
|
||||
|
||||
if (dc->req_count >= QSE_COUNTOF(dc->reqs))
|
||||
|
Loading…
Reference in New Issue
Block a user