From 187443bb1d45e69b183fc0421f1d2f417d6d7dbe Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Mon, 25 Aug 2014 16:18:50 +0000 Subject: [PATCH] added ursd.c --- qse/cmd/http/ursd.c | 517 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 517 insertions(+) create mode 100644 qse/cmd/http/ursd.c diff --git a/qse/cmd/http/ursd.c b/qse/cmd/http/ursd.c new file mode 100644 index 00000000..28c22d36 --- /dev/null +++ b/qse/cmd/http/ursd.c @@ -0,0 +1,517 @@ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#if defined(_WIN32) +# include +# include +# include +# include +#elif defined(__OS2__) +# define INCL_DOSPROCESS +# define INCL_DOSEXCEPTIONS +# define INCL_ERRORS +# include +# if defined(TCPV40HDRS) +# define BSD_SELECT +# endif +# include +# include +# include +# include +# include +# if defined(TCPV40HDRS) +# define USE_SELECT +# include +# else +# include +# endif +#elif defined(__DOS__) +# include +#else +# include +# include +# include +# include +# include +# if defined(HAVE_NETINET_SCTP_H) +# include +# endif +#endif + +#if defined(HAVE_SYS_PRCTL_H) +# include +#endif + +#if defined(HAVE_SYS_RESOURCE_H) +# include +#endif + +typedef struct urs_hdr_t urs_hdr_t; +typedef struct urs_pkt_t urs_pkt_t; + +#include +struct urs_hdr_t +{ + qse_uint16_t seq; /* in network-byte order */ + qse_uint16_t rcode; /* response code */ + qse_uint32_t urlsum;/* checksum of url in the request */ + qse_uint16_t urllen; /* url length in network-byte order */ +}; + +struct urs_pkt_t +{ + struct urs_hdr_t hdr; + qse_mchar_t url[1]; +}; +#include + +typedef struct xpio_t xpio_t; + +struct xpio_t +{ + qse_pio_t* pio; + int busy; + + struct + { + qse_skad_t from; + qse_sck_len_t fromlen; + qse_uint8_t buf[65535]; + } req; + + xpio_t* prev; + xpio_t* next; +}; + +typedef struct ursd_t ursd_t; +struct ursd_t +{ + qse_size_t total; + qse_size_t nfree; + + xpio_t* xpios; + xpio_t* free; + xpio_t* busy; + + qse_sck_hnd_t sck; + qse_mux_t* mux; +}; + + +#define TYPE_SOCKET 0 +#define TYPE_PIO 1 + +#define MAKE_MUX_DATA(type,index) ((qse_uintptr_t)type | ((qse_uintptr_t)index << 4)) +#define GET_TYPE_FROM_MUX_DATA(md) ((md) & 0xF) +#define GET_INDEX_FROM_MUX_DATA(md) ((md) >> 4) + +struct mux_xtn_t +{ + ursd_t* ursd; +}; +typedef struct mux_xtn_t mux_xtn_t; + +static void chain_to_free_list (ursd_t* ursd, xpio_t* xpio) +{ + xpio->busy = 0; + xpio->prev = QSE_NULL; + xpio->next = ursd->free; + if (ursd->free) ursd->free->prev = xpio; + ursd->free = xpio; + ursd->nfree++; +} + +static xpio_t* dechain_from_free_list (ursd_t* ursd, xpio_t* xpio) +{ + if (xpio->next) xpio->next->prev = xpio->prev; + if (xpio == ursd->free) ursd->free = xpio->next; + else xpio->prev->next = xpio->next; + ursd->nfree--; + return xpio; +} + +static void chain_to_busy_list (ursd_t* ursd, xpio_t* xpio) +{ + xpio->busy = 1; + xpio->prev = QSE_NULL; + xpio->next = ursd->busy; + if (ursd->busy) ursd->busy->prev = xpio; + ursd->busy = xpio; +} + +static xpio_t* dechain_from_busy_list (ursd_t* ursd, xpio_t* xpio) +{ + if (xpio->next) xpio->next->prev = xpio->prev; + + if (xpio == ursd->busy) ursd->busy = xpio->next; + else xpio->prev->next = xpio->next; + + return xpio; +} + +static int insert_to_mux (qse_mux_t* mux, qse_mux_hnd_t handle, int type, int index) +{ + qse_mux_evt_t evt; + + memset (&evt, 0, QSE_SIZEOF(evt)); + evt.hnd = handle; + evt.mask = QSE_MUX_IN; + evt.data = MAKE_MUX_DATA(type, index); + return qse_mux_insert (mux, &evt); +} + +static int delete_from_mux (qse_mux_t* mux, qse_mux_hnd_t handle, int type, int index) +{ + qse_mux_evt_t evt; + + memset (&evt, 0, QSE_SIZEOF(evt)); + evt.hnd = handle; + evt.mask = QSE_MUX_IN; + evt.data = MAKE_MUX_DATA(type, index); + return qse_mux_delete (mux, &evt); +} + +static qse_sck_hnd_t open_server_socket (int type, int proto, const qse_nwad_t* bindnwad) +{ + qse_sck_hnd_t s = QSE_INVALID_SCKHND; + qse_skad_t skad; + qse_sck_len_t skad_len; + int family, flag; + + skad_len = qse_nwadtoskad (bindnwad, &skad); + family = qse_skadfamily(&skad); + + s = socket (family, type, proto); + if (!qse_isvalidsckhnd(s)) + { + fprintf (stderr, "cannot create a socket\n"); + goto oops; + } + + #if defined(FD_CLOEXEC) + flag = fcntl (s, F_GETFD); + if (flag >= 0) fcntl (s, F_SETFD, flag | FD_CLOEXEC); + #endif + + #if defined(SO_REUSEADDR) + flag = 1; + setsockopt (s, SOL_SOCKET, SO_REUSEADDR, (void*)&flag, QSE_SIZEOF(flag)); + #endif + + if (bind (s, (struct sockaddr*)&skad, skad_len) <= -1) + { + #if defined(IPV6_V6ONLY) && defined(EADDRINUSE) + if (errno == EADDRINUSE && family == AF_INET6) + { + int on = 1; + setsockopt (s, IPPROTO_IPV6, IPV6_V6ONLY, &on, sizeof(on)); + if (bind (s, (struct sockaddr*)&skad, skad_len) == 0) goto bind_ok; + } + + #endif + fprintf (stderr, "cannot bind a socket\n"); + goto oops; + + } + +bind_ok: + if (proto == IPPROTO_SCTP) + { +#if 0 + struct sctp_initmsg im; + + memset (&im, 0, QSE_SIZEOF(im)); + im.sinit_num_ostreams = 1; + im.sinit_max_instreams = 1; + im.sinit_max_attempts = 1; + + if (setsockopt (s, SOL_SCTP, SCTP_INITMSG, &im, QSE_SIZEOF(im)) <= -1) + { + fprintf (stderr, "cannot set sctp initmsg option\n"); + goto oops; + } +#endif + + if (listen (s, 99) <= -1) + { + fprintf (stderr, "cannot set listen on sctp socket\n"); + goto oops; + } + } + + return s; + +oops: + if (qse_isvalidsckhnd(s)) qse_closesckhnd (s); + return QSE_INVALID_SCKHND; +} + +static void schedule_request (ursd_t* ursd, urs_pkt_t* pkt, int pktsize, const qse_skad_t* skad, qse_sck_len_t skadlen) +{ + if (ursd->free) + { + xpio_t* xpio = dechain_from_free_list (ursd, ursd->free); + + xpio->req.from = *skad; + xpio->req.fromlen = skadlen; + memcpy (xpio->req.buf, pkt, QSE_SIZEOF(urs_hdr_t)); /* copy header */ +printf ("XPIO WRITNG TO PIPE %p %d\n", xpio, qse_skadfamily(skad)); + qse_pio_write (xpio->pio, QSE_PIO_IN, pkt->url, qse_ntoh16(pkt->hdr.urllen)); /* TODO: erro ahndlig */ + + chain_to_busy_list (ursd, xpio); + } + else + { + /* queue up in the internal queue... */ + + /* TODO */ + } +} + +static void dispatch_mux_event (qse_mux_t* mux, const qse_mux_evt_t* evt) +{ + mux_xtn_t* mux_xtn; + int type, index; + qse_skad_t skad; + qse_sck_len_t skad_len; + + unsigned char buf[65535]; + + mux_xtn = (mux_xtn_t*)qse_mux_getxtn(mux); + + type = GET_TYPE_FROM_MUX_DATA((qse_uintptr_t)evt->data); + index = GET_INDEX_FROM_MUX_DATA((qse_uintptr_t)evt->data); + + if (type == TYPE_SOCKET) + { + ssize_t x; + + x = recvfrom (evt->hnd, buf, QSE_SIZEOF(buf), 0, (struct sockaddr*)&skad, &skad_len); +/* TODO: error handling */ + if (x >= QSE_SIZEOF(urs_hdr_t)) + { + urs_pkt_t* pkt = (urs_pkt_t*)buf; + qse_uint16_t len = qse_ntoh16(pkt->hdr.urllen); + + if (QSE_SIZEOF(urs_hdr_t) + len == x) + { + printf ("%d %d [[[%s]]]\n", len, x, pkt->url); + schedule_request (mux_xtn->ursd, pkt, x, &skad, skad_len); + } + } + } + else + { + qse_ssize_t x; + urs_pkt_t* pkt; + xpio_t* xpio = &mux_xtn->ursd->xpios[index]; + + pkt = (urs_pkt_t*)xpio->req.buf; +printf ("XPIO READING TO PIPE %p\n", xpio); + x = qse_pio_read (xpio->pio, QSE_PIO_OUT, pkt->url, QSE_SIZEOF(xpio->req.buf) - QSE_SIZEOF(urs_hdr_t)); + + pkt->hdr.urllen = qse_hton16(x); +printf ("READ %d bytes from pipes [%s]\n", x, pkt->url); + sendto (mux_xtn->ursd->sck, pkt, QSE_SIZEOF(urs_hdr_t) + x, 0, &xpio->req.from, xpio->req.fromlen); + +/* TODO: error handling */ + + if (xpio->busy) + { + dechain_from_busy_list (mux_xtn->ursd, xpio); + chain_to_free_list (mux_xtn->ursd, xpio); + } + } +} + +static int init_ursd (ursd_t* ursd, int npios, const qse_char_t* cmdline, const qse_char_t* bindaddr) +{ + qse_size_t i; + qse_nwad_t bindnwad; + mux_xtn_t* mux_xtn; + + memset (ursd, 0, sizeof(*ursd)); + + ursd->mux = qse_mux_open (QSE_MMGR_GETDFL(), QSE_SIZEOF(mux_xtn_t), dispatch_mux_event, 100, QSE_NULL); + if (ursd->mux == QSE_NULL) + { + fprintf (stderr, "cannot create a multiplexer\n"); + goto oops; + } + + if (qse_strtonwad (bindaddr, &bindnwad) <= -1) + { + fprintf (stderr, "invalid binding address\n"); + goto oops; + } + + ursd->sck = open_server_socket (SOCK_SEQPACKET, IPPROTO_SCTP, &bindnwad); + + ursd->xpios = calloc (npios, QSE_SIZEOF(xpio_t)); + if (ursd->xpios == QSE_NULL) + { + fprintf (stderr, "cannot callocate pipes\n"); + goto oops; + } + + for (i = 0; i < npios; i++) + { + qse_pio_t* pio; + + pio = qse_pio_open ( + QSE_MMGR_GETDFL(), 0, cmdline, QSE_NULL, + QSE_PIO_WRITEIN | QSE_PIO_READOUT | QSE_PIO_ERRTONUL | + QSE_PIO_INNOBLOCK | QSE_PIO_OUTNOBLOCK + ); + if (pio == QSE_NULL) goto oops; + + ursd->xpios[i].pio = pio; + } + + if (insert_to_mux (ursd->mux, ursd->sck, TYPE_SOCKET, 0) <= -1) + { + fprintf (stderr, "cannot add socket to multiplexer\n"); + goto oops; + } + for (i = 0; i < npios; i++) + { + if (insert_to_mux (ursd->mux, qse_pio_gethandle(ursd->xpios[i].pio, QSE_PIO_OUT), TYPE_PIO, i) <= -1) + { + fprintf (stderr, "cannot add pipe to multiplexer\n"); + goto oops; + } + } + + for (i = 0; i < npios; i++) chain_to_free_list (ursd, &ursd->xpios[i]); + + ursd->total = npios; + + mux_xtn = qse_mux_getxtn (ursd->mux); + mux_xtn->ursd = ursd; + return 0; + +oops: + if (ursd->mux) qse_mux_close (ursd->mux); + + for (i = 0; i < npios; i++) + { + if (ursd->xpios[i].pio) qse_pio_close (ursd->xpios[i].pio); + } + if (ursd->xpios) free (ursd->xpios); + + return -1; +} + +static void fini_ursd (ursd_t* ursd) +{ + qse_size_t i; + + /* destroy the multiplex first. i don't want to delete handles explicitly */ + qse_mux_close (ursd->mux); + + for (i = 0; i < ursd->total; i++) + { + if (ursd->xpios[i].pio) qse_pio_close (ursd->xpios[i].pio); + } + if (ursd->xpios) free (ursd->xpios); + + qse_closesckhnd (ursd->sck); +} + +static int httpd_main (int argc, qse_char_t* argv[]) +{ + ursd_t ursd; + int ursd_inited = 0; + + if (init_ursd (&ursd, 10, QSE_T("/bin/cat"), QSE_T("[::]:97]")) <= -1) goto oops; + ursd_inited = 1; + + while (1) + { + qse_ntime_t tmout; + qse_cleartime (&tmout); + tmout.sec += 1; + qse_mux_poll (ursd.mux, &tmout); + } + + fini_ursd (&ursd); + return 0; + +oops: + if (ursd_inited) fini_ursd (&ursd); + return -1; +} + +int qse_main (int argc, qse_achar_t* argv[]) +{ + int ret; + +#if defined(_WIN32) + char locale[100]; + UINT codepage; + WSADATA wsadata; +#else + /* nothing */ +#endif + +#if defined(_WIN32) + + codepage = GetConsoleOutputCP(); + if (codepage == CP_UTF8) + { + /*SetConsoleOUtputCP (CP_UTF8);*/ + qse_setdflcmgrbyid (QSE_CMGR_UTF8); + } + else + { + /* .codepage */ + qse_fmtuintmaxtombs (locale, QSE_COUNTOF(locale), + codepage, 10, -1, QSE_MT('\0'), QSE_MT(".")); + setlocale (LC_ALL, locale); + qse_setdflcmgrbyid (QSE_CMGR_SLMB); + } + + if (WSAStartup (MAKEWORD(2,0), &wsadata) != 0) + { + qse_fprintf (QSE_STDERR, QSE_T("Failed to start up winsock\n")); + return -1; + } + +#else + setlocale (LC_ALL, ""); + qse_setdflcmgrbyid (QSE_CMGR_SLMB); +#endif + + + qse_openstdsios (); + ret = qse_runmain (argc, argv, httpd_main); + qse_closestdsios (); + + + +#if defined(_WIN32) + WSACleanup (); +#endif + + return ret; +}