diff --git a/mio/bin/t06.c b/mio/bin/t06.c new file mode 100644 index 0000000..83c4168 --- /dev/null +++ b/mio/bin/t06.c @@ -0,0 +1,574 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define NUM_THRS 8 +static int g_reuse_port = 0; +static mio_svc_htts_t* g_htts[NUM_THRS]; +static int g_htts_no = 0; +static pthread_mutex_t g_htts_mutex = PTHREAD_MUTEX_INITIALIZER; + +static int print_qparam (mio_bcs_t* key, mio_bcs_t* val, void* ctx) +{ + key->len = mio_perdec_http_bcs(key, key->ptr, MIO_NULL); + val->len = mio_perdec_http_bcs(val, val->ptr, MIO_NULL); + fprintf ((FILE*)ctx, "\t[%.*s] = [%.*s]\n", (int)key->len, key->ptr, (int)val->len, val->ptr); + return 0; +} + +static void on_htts_thr_request (mio_t* mio, mio_dev_thr_iopair_t* iop, mio_svc_htts_thr_func_info_t* tfi, void* ctx) +{ + FILE* fp; + int i; + + if (tfi->req_method != MIO_HTTP_GET) + { + write (iop->wfd, "Status: 405\r\n\r\n", 15); /* method not allowed */ + return; + } + + fp = fdopen(iop->wfd, "w"); + if (!fp) + { + write (iop->wfd, "Status: 500\r\n\r\n", 15); /* internal server error */ + return; + } + + fprintf (fp, "Status: 200\r\n"); + fprintf (fp, "Content-Type: text/html\r\n\r\n"); + + fprintf (fp, "request path = %s\n", tfi->req_path); + if (tfi->req_param) + { + fprintf (fp, "request params:\n"); + mio_scan_http_qparam (tfi->req_param, print_qparam, fp); + } + for (i = 0; i < 100; i++) fprintf (fp, "%d * %d => %d\n", i, i, i * i); + + /* invalid iop->wfd to mark that this function closed this file descriptor. + * no invalidation will lead to double closes on the same file descriptor. */ + iop->wfd = MIO_SYSHND_INVALID; + fclose (fp); +} + +static void on_htts_thr2_request (mio_t* mio, mio_dev_thr_iopair_t* iop, mio_svc_htts_thr_func_info_t* tfi, void* ctx) +{ + FILE* fp, * sf; + + if (tfi->req_method != MIO_HTTP_GET) + { + write (iop->wfd, "Status: 405\r\n\r\n", 15); /* method not allowed */ + return; + } + + fp = fdopen(iop->wfd, "w"); + if (!fp) + { + write (iop->wfd, "Status: 500\r\n\r\n", 15); /* internal server error */ + return; + } + + sf = fopen(&tfi->req_path[5], "r"); + if (!sf) + { + fprintf (fp, "Status: 404\r\n\r\n"); + } + else + { + char buf[4096]; + + fprintf (fp, "Status: 200\r\n"); + fprintf (fp, "Content-Type: text/html\r\n\r\n"); + + while (!feof(sf)) + { + size_t n; + n = fread(buf, 1, sizeof(buf), sf); + if (n > 0) fwrite (buf, 1, n, fp); + } + + fclose (sf); + } + + /* invalid iop->wfd to mark that this function closed this file descriptor. + * no invalidation will lead to double closes on the same file descriptor. */ + iop->wfd = MIO_SYSHND_INVALID; + fclose (fp); +} + +/* ========================================================================= */ +int process_http_request (mio_svc_htts_t* htts, mio_dev_sck_t* csck, mio_htre_t* req) +{ + mio_t* mio = mio_svc_htts_getmio(htts); +// mio_svc_htts_cli_t* cli = mio_dev_sck_getxtn(csck); + mio_http_method_t mth; + + /* percent-decode the query path to the original buffer + * since i'm not going to need it in the original form + * any more. once it's decoded in the peek mode, + * the decoded query path is made available in the + * non-peek mode as well */ + + MIO_DEBUG2 (mio, "[RAW-REQ] %s %s\n", mio_htre_getqmethodname(req), mio_htre_getqpath(req)); + + mio_htre_perdecqpath(req); + /* TODO: proper request logging */ + + MIO_DEBUG2 (mio, "[REQ] %s %s\n", mio_htre_getqmethodname(req), mio_htre_getqpath(req)); + +#if 0 +mio_printf (MIO_T("================================\n")); +mio_printf (MIO_T("[%lu] %hs REQUEST ==> [%hs] version[%d.%d %hs] method[%hs]\n"), + (unsigned long)time(NULL), + (peek? MIO_MT("PEEK"): MIO_MT("HANDLE")), + mio_htre_getqpath(req), + mio_htre_getmajorversion(req), + mio_htre_getminorversion(req), + mio_htre_getverstr(req), + mio_htre_getqmethodname(req) +); +if (mio_htre_getqparam(req)) + mio_printf (MIO_T("PARAMS ==> [%hs]\n"), mio_htre_getqparam(req)); + +mio_htb_walk (&req->hdrtab, walk, MIO_NULL); +if (mio_htre_getcontentlen(req) > 0) +{ + mio_printf (MIO_T("CONTENT [%.*S]\n"), (int)mio_htre_getcontentlen(req), mio_htre_getcontentptr(req)); +} +#endif + + mth = mio_htre_getqmethodtype(req); + /* determine what to do once the header fields are all received. + * i don't want to delay this until the contents are received. + * if you don't like this behavior, you must implement your own + * callback function for request handling. */ +#if 0 + /* TODO support X-HTTP-Method-Override */ + if (data.method == MIO_HTTP_POST) + { + tmp = mio_htre_getheaderval(req, MIO_MT("X-HTTP-Method-Override")); + if (tmp) + { + /*while (tmp->next) tmp = tmp->next;*/ /* get the last value */ + data.method = mio_mbstohttpmethod (tmp->ptr); + } + } +#endif + +#if 0 + if (mth == MIO_HTTP_CONNECT) + { + /* CONNECT method must not have content set. + * however, arrange to discard it if so. + * + * NOTE: CONNECT is implemented to ignore many headers like + * 'Expect: 100-continue' and 'Connection: keep-alive'. */ + mio_htre_discardcontent (req); + } + else + { +/* this part can be checked in actual mio_svc_htts_doXXX() functions. + * some doXXX handlers may not require length for POST. + * it may be able to simply accept till EOF? or treat as if CONTENT_LENGTH is 0*/ + if (mth == MIO_HTTP_POST && !(req->flags & (MIO_HTRE_ATTR_LENGTH | MIO_HTRE_ATTR_CHUNKED))) + { + /* POST without Content-Length nor not chunked */ + mio_htre_discardcontent (req); + /* 411 Length Required - can't keep alive. Force disconnect */ + req->flags &= ~MIO_HTRE_ATTR_KEEPALIVE; /* to cause sendstatus() to close */ + if (mio_svc_htts_sendstatus(htts, csck, req, 411, MIO_NULL) <= -1) goto oops; + } + else + + { +#endif + const mio_bch_t* qpath = mio_htre_getqpath(req); + int x; + if (mio_comp_bcstr_limited(qpath, "/thr/", 5, 1) == 0) + x = mio_svc_htts_dothr(htts, csck, req, on_htts_thr_request, MIO_NULL); + else if (mio_comp_bcstr_limited(qpath, "/thr2/", 6, 1) == 0) + x = mio_svc_htts_dothr(htts, csck, req, on_htts_thr2_request, MIO_NULL); + else if (mio_comp_bcstr_limited(qpath, "/txt/", 5, 1) == 0) + x = mio_svc_htts_dotxt(htts, csck, req, 200, "text/plain", qpath); + else if (mio_comp_bcstr_limited(qpath, "/cgi/", 5, 1) == 0) + x = mio_svc_htts_docgi(htts, csck, req, "", mio_htre_getqpath(req)); + else + x = mio_svc_htts_dofile(htts, csck, req, "", mio_htre_getqpath(req), "text/plain"); + if (x <= -1) goto oops; + + return 0; + +oops: + mio_dev_sck_halt (csck); + return -1; +} + +void* thr_func (void* arg) +{ + mio_t* mio = MIO_NULL; + mio_svc_htts_t* htts = MIO_NULL; + mio_dev_sck_bind_t htts_bind_info; + + mio = mio_open(MIO_NULL, 0, MIO_NULL, 512, MIO_NULL); + if (!mio) + { + printf ("Cannot open mio\n"); + goto oops; + } + + memset (&htts_bind_info, 0, MIO_SIZEOF(htts_bind_info)); + if (g_reuse_port) + { + mio_bcstrtoskad (mio, "0.0.0.0:9987", &htts_bind_info.localaddr); + htts_bind_info.options = MIO_DEV_SCK_BIND_REUSEADDR | MIO_DEV_SCK_BIND_REUSEPORT | MIO_DEV_SCK_BIND_IGNERR; + //htts_bind_info.options |= MIO_DEV_SCK_BIND_SSL; + htts_bind_info.ssl_certfile = "localhost.crt"; + htts_bind_info.ssl_keyfile = "localhost.key"; + } + + htts = mio_svc_htts_start(mio, &htts_bind_info, process_http_request); + if (!htts) + { + printf ("Unable to start htts\n"); + goto oops; + } + + pthread_mutex_lock (&g_htts_mutex); + g_htts[g_htts_no] = htts; +printf ("starting the loop for %d\n", g_htts_no); + g_htts_no = (g_htts_no + 1) % MIO_COUNTOF(g_htts); + pthread_mutex_unlock (&g_htts_mutex); + + mio_loop (mio); + +oops: + if (htts) mio_svc_htts_stop (htts); + if (mio) mio_close (mio); + + pthread_exit (MIO_NULL); + return MIO_NULL; +} + + +/* ========================================================================= */ + +static void tcp_sck_on_disconnect (mio_dev_sck_t* tcp) +{ + switch (MIO_DEV_SCK_GET_PROGRESS(tcp)) + { + case MIO_DEV_SCK_CONNECTING: + MIO_INFO1 (tcp->mio, "OUTGOING SESSION DISCONNECTED - FAILED TO CONNECT (%d) TO REMOTE SERVER\n", (int)tcp->hnd); + break; + + case MIO_DEV_SCK_CONNECTING_SSL: + MIO_INFO1 (tcp->mio, "OUTGOING SESSION DISCONNECTED - FAILED TO SSL-CONNECT (%d) TO REMOTE SERVER\n", (int)tcp->hnd); + break; + + case MIO_DEV_SCK_LISTENING: + MIO_INFO1 (tcp->mio, "SHUTTING DOWN THE SERVER SOCKET(%d)...\n", (int)tcp->hnd); + break; + + case MIO_DEV_SCK_CONNECTED: + MIO_INFO1 (tcp->mio, "OUTGOING CLIENT CONNECTION GOT TORN DOWN(%d).......\n", (int)tcp->hnd); + break; + + case MIO_DEV_SCK_ACCEPTING_SSL: + MIO_INFO1 (tcp->mio, "INCOMING SSL-ACCEPT GOT DISCONNECTED(%d) ....\n", (int)tcp->hnd); + break; + + case MIO_DEV_SCK_ACCEPTED: + MIO_INFO1 (tcp->mio, "INCOMING CLIENT BEING SERVED GOT DISCONNECTED(%d).......\n", (int)tcp->hnd); + break; + + default: + MIO_INFO2 (tcp->mio, "SOCKET DEVICE DISCONNECTED (%d - %x)\n", (int)tcp->hnd, (unsigned int)tcp->state); + break; + } +} + +static void tcp_sck_on_connect (mio_dev_sck_t* tcp) +{ + mio_bch_t buf1[128], buf2[128]; + + mio_skadtobcstr (tcp->mio, &tcp->localaddr, buf1, MIO_COUNTOF(buf1), MIO_SKAD_TO_BCSTR_ADDR | MIO_SKAD_TO_BCSTR_PORT); + mio_skadtobcstr (tcp->mio, &tcp->remoteaddr, buf2, MIO_COUNTOF(buf2), MIO_SKAD_TO_BCSTR_ADDR | MIO_SKAD_TO_BCSTR_PORT); + + if (tcp->state & MIO_DEV_SCK_CONNECTED) + { + MIO_INFO3 (tcp->mio, "DEVICE connected to a remote server... LOCAL %hs REMOTE %hs SCK: %d\n", buf1, buf2, tcp->hnd); + } + else if (tcp->state & MIO_DEV_SCK_ACCEPTED) + { + /* TODO: pass it to distributor??? */ +/* THIS PART WON"T BE CALLED FOR tcp_sck_on_raw_accept.. */ + } +} + + +static mio_tmridx_t xx_tmridx; +static int try_to_accept (mio_dev_sck_t* sck, mio_dev_sck_qxmsg_t* qxmsg, int in_mq); + +typedef struct xx_mq_t xx_mq_t; + +struct xx_mq_t +{ + xx_mq_t* q_next; + xx_mq_t* q_prev; + + mio_dev_sck_qxmsg_t msg; +}; + +#define XX_MQ_INIT(mq) ((mq)->q_next = (mq)->q_prev = (mq)) +#define XX_MQ_TAIL(mq) ((mq)->q_prev) +#define XX_MQ_HEAD(mq) ((mq)->q_next) +#define XX_MQ_IS_EMPTY(mq) (XX_MQ_HEAD(mq) == (mq)) +#define XX_MQ_IS_NODE(mq,x) ((mq) != (x)) +#define XX_MQ_IS_HEAD(mq,x) (XX_MQ_HEAD(mq) == (x)) +#define XX_MQ_IS_TAIL(mq,x) (XX_MQ_TAIL(mq) == (x)) +#define XX_MQ_NEXT(x) ((x)->q_next) +#define XX_MQ_PREV(x) ((x)->q_prev) +#define XX_MQ_LINK(p,x,n) MIO_Q_LINK((mio_q_t*)p,(mio_q_t*)x,(mio_q_t*)n) +#define XX_MQ_UNLINK(x) MIO_Q_UNLINK((mio_q_t*)x) +#define XX_MQ_REPL(o,n) MIO_Q_REPL(o,n); +#define XX_MQ_ENQ(mq,x) XX_MQ_LINK(XX_MQ_TAIL(mq), (mio_q_t*)x, mq) +#define XX_MQ_DEQ(mq) XX_MQ_UNLINK(XX_MQ_HEAD(mq)) + +static xx_mq_t xx_mq; + +static int schedule_timer_job_at (mio_dev_sck_t* dev, const mio_ntime_t* fire_at, mio_tmrjob_handler_t handler, mio_tmridx_t* tmridx) +{ + mio_tmrjob_t tmrjob; + + memset (&tmrjob, 0, MIO_SIZEOF(tmrjob)); + tmrjob.ctx = dev; + if (fire_at) tmrjob.when = *fire_at; + + tmrjob.handler = handler; + tmrjob.idxptr = tmridx; + + return mio_instmrjob(dev->mio, &tmrjob); +} + +static void enable_accept (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job) +{ + mio_dev_sck_t* rdev = (mio_dev_sck_t*)job->ctx; + + while (!XX_MQ_IS_EMPTY(&xx_mq)) + { + xx_mq_t* mq; + + mq = XX_MQ_HEAD(&xx_mq); + if (try_to_accept(rdev, &mq->msg, 1) == 0) return; /* EAGAIN situation */ + + XX_MQ_UNLINK (mq); + mio_freemem (mio, mq); + } + + assert (XX_MQ_IS_EMPTY(&xx_mq)); + if (mio_dev_sck_read(rdev, 1) <= -1) // it's a disaster if this fails. the acceptor will get stalled if it happens + { +printf ("DISASTER.... UNABLE TO ENABLE READ ON ACCEPTOR\n"); + } +} + +static int try_to_accept (mio_dev_sck_t* sck, mio_dev_sck_qxmsg_t* qxmsg, int in_mq) +{ + mio_t* mio = sck->mio; + mio_svc_htts_t* htts; + + pthread_mutex_lock (&g_htts_mutex); + htts = g_htts[g_htts_no]; + g_htts_no = (g_htts_no + 1) % MIO_COUNTOF(g_htts); + pthread_mutex_unlock (&g_htts_mutex); + + if (mio_svc_htts_writetosidechan(htts, qxmsg, MIO_SIZEOF(*qxmsg)) <= -1) + { + mio_bch_t buf[128]; + + if (errno == EAGAIN) + { +//printf ("sidechannel retrying %s\n", strerror(errno)); + + if (mio_dev_sck_read(sck, 0) <= -1) goto sidechan_write_error; + + if (!in_mq) + { + xx_mq_t* mq; + mq = mio_allocmem(mio, MIO_SIZEOF(*mq)); + if (MIO_UNLIKELY(!mq)) goto sidechan_write_error; + mq->msg = *qxmsg; + XX_MQ_ENQ (&xx_mq, mq); + } + + if (xx_tmridx == MIO_TMRIDX_INVALID) + schedule_timer_job_at (sck, MIO_NULL, enable_accept, &xx_tmridx); + + return 0; /* enqueued for later writing */ + } + else + { + sidechan_write_error: +printf ("sidechannel write error %s\n", strerror(errno)); + mio_skadtobcstr (mio, &qxmsg->remoteaddr, buf, MIO_COUNTOF(buf), MIO_SKAD_TO_BCSTR_ADDR | MIO_SKAD_TO_BCSTR_PORT); + MIO_INFO2 (mio, "unable to handle the accepted connection %ld from %hs\n", (long int)qxmsg->syshnd, buf); + + const char* msg = "HTTP/1.0 503 Service unavailable\r\nConnection: close\r\nContent-Length: 0\r\n\r\n"; + write (qxmsg->syshnd, msg, strlen(msg)); + printf ("close %d\n", qxmsg->syshnd); + close (qxmsg->syshnd); + + return -1; /* failed to accept */ + } + } + +/************************************ +{ +static int sc = 0; +printf ("sc => %d\n", sc++); +} +************************************/ + + return 1; /* full success */ +} + +static void tcp_sck_on_raw_accept (mio_dev_sck_t* sck, mio_syshnd_t syshnd, mio_skad_t* remoteaddr) +{ + /*mio_t* mio = sck->mio;*/ + + /* inform the worker of this accepted syshnd */ + mio_dev_sck_qxmsg_t qxmsg; + memset (&qxmsg, 0, MIO_SIZEOF(qxmsg)); + qxmsg.cmd = MIO_DEV_SCK_QXMSG_NEWCONN; + qxmsg.scktype = sck->type; + qxmsg.syshnd = syshnd; + qxmsg.remoteaddr = *remoteaddr; + +//printf ("A %d\n", qxmsg.syshnd); + try_to_accept (sck, &qxmsg, 0); +} + +static int tcp_sck_on_write (mio_dev_sck_t* tcp, mio_iolen_t wrlen, void* wrctx, const mio_skad_t* dstaddr) +{ + /* won't be invoked */ + return 0; +} + +static int tcp_sck_on_read (mio_dev_sck_t* tcp, const void* buf, mio_iolen_t len, const mio_skad_t* srcaddr) +{ + /* won't be invoked */ + return 0; +} + +static int add_listener (mio_t* mio, mio_bch_t* addrstr) +{ + mio_dev_sck_make_t mi; + mio_dev_sck_t* tcp; + mio_dev_sck_bind_t bi; + mio_dev_sck_listen_t li; + + memset (&bi, 0, MIO_SIZEOF(bi)); + if (mio_bcstrtoskad(mio, addrstr, &bi.localaddr) <= -1) + { + MIO_INFO1 (mio, "invalid listening address - %hs\n", addrstr); + return -1; + } + bi.options = MIO_DEV_SCK_BIND_REUSEADDR /*| MIO_DEV_SCK_BIND_REUSEPORT |*/; +#if defined(USE_SSL) + bi.options |= MIO_DEV_SCK_BIND_SSL; + bi.ssl_certfile = "localhost.crt"; + bi.ssl_keyfile = "localhost.key"; +#endif + + memset (&mi, 0, MIO_SIZEOF(mi)); + mi.type = (mio_skad_family(&bi.localaddr) == MIO_AF_INET? MIO_DEV_SCK_TCP4: MIO_DEV_SCK_TCP6); + mi.on_write = tcp_sck_on_write; + mi.on_read = tcp_sck_on_read; + mi.on_connect = tcp_sck_on_connect; /* this is invoked on a client accept as well */ + mi.on_disconnect = tcp_sck_on_disconnect; + mi.on_raw_accept = tcp_sck_on_raw_accept; + + tcp = mio_dev_sck_make(mio, 0, &mi); + if (!tcp) + { + MIO_INFO1 (mio, "Cannot make tcp - %js\n", mio_geterrmsg(mio)); + return -1; + } + + if (!g_reuse_port) + { + if (mio_dev_sck_bind(tcp, &bi) <= -1) + { + MIO_INFO1 (mio, "tcp mio_dev_sck_bind() failed - %js\n", mio_geterrmsg(mio)); + return -1; + } + } + + memset (&li, 0, MIO_SIZEOF(li)); + li.backlogs = 4096; + MIO_INIT_NTIME (&li.accept_tmout, 5, 1); + if (mio_dev_sck_listen(tcp, &li) <= -1) + { + MIO_INFO1 (mio, "tcp[2] mio_dev_sck_listen() failed - %js\n", mio_geterrmsg(mio)); + return -1; + } + + return 0; +} + + +int main (int argc, char* argv[]) +{ + mio_t* mio = MIO_NULL; + pthread_t t[NUM_THRS]; + mio_oow_t i; + struct sigaction sigact; + + if (argc >= 2 && strcmp(argv[1], "-r") == 0) + { + g_reuse_port = 1; + } + + memset (&sigact, 0, MIO_SIZEOF(sigact)); + sigact.sa_handler = SIG_IGN; + sigaction (SIGPIPE, &sigact, MIO_NULL); + + XX_MQ_INIT (&xx_mq); + xx_tmridx = MIO_TMRIDX_INVALID; + + mio = mio_open(MIO_NULL, 0, MIO_NULL, 512, MIO_NULL); + if (!mio) + { + printf ("Cannot open mio\n"); + goto oops; + } + + for (i = 0; i < MIO_COUNTOF(t); i++) + pthread_create (&t[i], MIO_NULL, thr_func, mio); + + sleep (1); /* TODO: use pthread_cond_wait()/pthread_cond_signal() or a varialble to see if all threads are up */ +/* TODO: wait until all threads are ready to serve... */ + + if (add_listener(mio, "[::]:9987") <= -1 || + add_listener(mio, "0.0.0.0:9987") <= -1) goto oops; + +printf ("starting the main loop\n"); + mio_loop (mio); + + /* close all threaded mios here */ +printf ("TERMINATING..NORMALLY \n"); + mio_close (mio); + return 0; + +oops: +printf ("TERMINATING..ABNORMALLY \n"); + if (mio) mio_close (mio); + return -1; +} + diff --git a/mio/lib/http-thr.c b/mio/lib/http-thr.c index 9c03980..74d76cd 100644 --- a/mio/lib/http-thr.c +++ b/mio/lib/http-thr.c @@ -81,10 +81,8 @@ struct thr_state_t mio_dev_sck_on_write_t client_org_on_write; mio_dev_sck_on_disconnect_t client_org_on_disconnect; mio_htrd_recbs_t client_htrd_org_recbs; - - - }; + typedef struct thr_state_t thr_state_t; struct thr_peer_xtn_t @@ -100,7 +98,6 @@ static void thr_state_halt_participating_devices (thr_state_t* thr_state) MIO_DEBUG4 (thr_state->client->htts->mio, "HTTS(%p) - Halting participating devices in thr state %p(client=%p,peer=%p)\n", thr_state->client->htts, thr_state, thr_state->client->sck, thr_state->peer); - mio_dev_sck_halt (thr_state->client->sck); /* check for peer as it may not have been started */ if (thr_state->peer) mio_dev_thr_halt (thr_state->peer); diff --git a/mio/lib/pipe.c b/mio/lib/pipe.c index 2e9208a..1e7a58a 100644 --- a/mio/lib/pipe.c +++ b/mio/lib/pipe.c @@ -58,8 +58,19 @@ static int dev_pipe_make_master (mio_dev_t* dev, void* ctx) /* TODO: support a named pipe. use mkfifo()? * support socketpair */ + +#if defined(HAVE_PIPE2) && defined(O_CLOEXEC) && defined(O_NONBLOCK) + if (pipe2(pfds, O_CLOEXEC | O_NONBLOCK) == -1) + { + if (errno != ENOSYS) goto pipe_error; + } + else goto pipe_done; +#endif if (pipe(pfds) == -1) { +#if defined(HAVE_PIPE2) && defined(O_CLOEXEC) && defined(O_NONBLOCK) + pipe_error: +#endif mio_seterrwithsyserr (mio, 0, errno); goto oops; } @@ -67,11 +78,17 @@ static int dev_pipe_make_master (mio_dev_t* dev, void* ctx) if (mio_makesyshndasync(mio, pfds[0]) <= -1 || mio_makesyshndasync(mio, pfds[1]) <= -1) goto oops; + if (mio_makesyshndcloexec(mio, pfds[0]) <= -1 || + mio_makesyshndcloexec(mio, pfds[1]) <= -1) goto oops; + +#if defined(HAVE_PIPE2) && defined(O_CLOEXEC) && defined(O_NONBLOCK) +pipe_done: +#endif si.mi = info; si.pfd = pfds[0]; si.dev_cap = MIO_DEV_CAP_IN | MIO_DEV_CAP_STREAM; si.id = MIO_DEV_PIPE_IN; - + pfds[0] = MIO_SYSHND_INVALID; rdev->slave[MIO_DEV_PIPE_IN] = make_slave(mio, &si); if (!rdev->slave[MIO_DEV_PIPE_IN]) goto oops; rdev->slave_count++; @@ -80,7 +97,7 @@ static int dev_pipe_make_master (mio_dev_t* dev, void* ctx) si.pfd = pfds[1]; si.dev_cap = MIO_DEV_CAP_OUT | MIO_DEV_CAP_STREAM; si.id = MIO_DEV_PIPE_OUT; - + pfds[1] = MIO_SYSHND_INVALID; rdev->slave[MIO_DEV_PIPE_OUT] = make_slave(mio, &si); if (!rdev->slave[MIO_DEV_PIPE_OUT]) goto oops; rdev->slave_count++; @@ -97,20 +114,21 @@ static int dev_pipe_make_master (mio_dev_t* dev, void* ctx) return 0; oops: - for (i = 0; i < MIO_COUNTOF(rdev->slave); i++) - { - if (rdev->slave[i]) - { - mio_dev_kill ((mio_dev_t*)rdev->slave[i]); - rdev->slave[i] = MIO_NULL; - } - else if (pfds[i] != MIO_SYSHND_INVALID) - { - close (pfds[i]); - } - } - rdev->slave_count = 0; + if (pfds[0] != MIO_SYSHND_INVALID) close (pfds[0]); + if (pfds[1] != MIO_SYSHND_INVALID) close (pfds[0]); + if (rdev->slave[0]) + { + mio_dev_kill ((mio_dev_t*)rdev->slave[0]); + rdev->slave[0] = MIO_NULL; + } + if (rdev->slave[1]) + { + mio_dev_kill ((mio_dev_t*)rdev->slave[1]); + rdev->slave[1] = MIO_NULL; + } + + rdev->slave_count = 0; return -1; } @@ -204,6 +222,12 @@ static int dev_pipe_kill_slave (mio_dev_t* dev, int force) return 0; } +static void dev_pipe_fail_before_make_slave (void* ctx) +{ + slave_info_t* si = (slave_info_t*)ctx; + close (si->pfd); +} + static int dev_pipe_read_slave (mio_dev_t* dev, void* buf, mio_iolen_t* len, mio_devaddr_t* srcaddr) { mio_dev_pipe_slave_t* pipe = (mio_dev_pipe_slave_t*)dev; @@ -242,7 +266,7 @@ static int dev_pipe_write_slave (mio_dev_t* dev, const void* data, mio_iolen_t* if (MIO_UNLIKELY(*len <= 0)) { /* this is an EOF indicator */ - //mio_dev_halt (dev); /* halt this slave device to indicate EOF on the lower-level handle */* + /*mio_dev_halt (dev);*/ /* halt this slave device to indicate EOF on the lower-level handle */ if (MIO_LIKELY(pipe->pfd != MIO_SYSHND_INVALID)) /* halt() doesn't close the pipe immediately. so close the underlying pipe */ { mio_dev_watch (dev, MIO_DEV_WATCH_STOP, 0); @@ -363,7 +387,7 @@ static mio_dev_mth_t dev_pipe_methods_slave = { dev_pipe_make_slave, dev_pipe_kill_slave, - MIO_NULL, + dev_pipe_fail_before_make_slave, dev_pipe_getsyshnd_slave, dev_pipe_read_slave, diff --git a/mio/lib/pro.c b/mio/lib/pro.c index fd7cce3..0add04f 100644 --- a/mio/lib/pro.c +++ b/mio/lib/pro.c @@ -346,10 +346,10 @@ static int dev_pro_make_master (mio_dev_t* dev, void* ctx) si.dev_cap = MIO_DEV_CAP_OUT | MIO_DEV_CAP_STREAM; si.id = MIO_DEV_PRO_IN; + pfds[1] = MIO_SYSHND_INVALID; rdev->slave[MIO_DEV_PRO_IN] = make_slave(mio, &si); if (!rdev->slave[MIO_DEV_PRO_IN]) goto oops; - pfds[1] = MIO_SYSHND_INVALID; rdev->slave_count++; } @@ -363,10 +363,10 @@ static int dev_pro_make_master (mio_dev_t* dev, void* ctx) si.dev_cap = MIO_DEV_CAP_IN | MIO_DEV_CAP_STREAM; si.id = MIO_DEV_PRO_OUT; + pfds[2] = MIO_SYSHND_INVALID; rdev->slave[MIO_DEV_PRO_OUT] = make_slave(mio, &si); if (!rdev->slave[MIO_DEV_PRO_OUT]) goto oops; - pfds[2] = MIO_SYSHND_INVALID; rdev->slave_count++; } @@ -380,10 +380,10 @@ static int dev_pro_make_master (mio_dev_t* dev, void* ctx) si.dev_cap = MIO_DEV_CAP_IN | MIO_DEV_CAP_STREAM; si.id = MIO_DEV_PRO_ERR; + pfds[4] = MIO_SYSHND_INVALID; rdev->slave[MIO_DEV_PRO_ERR] = make_slave(mio, &si); if (!rdev->slave[MIO_DEV_PRO_ERR]) goto oops; - pfds[4] = MIO_SYSHND_INVALID; rdev->slave_count++; } @@ -425,20 +425,6 @@ oops: return -1; } -static int dev_pro_make_slave (mio_dev_t* dev, void* ctx) -{ - mio_dev_pro_slave_t* rdev = (mio_dev_pro_slave_t*)dev; - slave_info_t* si = (slave_info_t*)ctx; - - rdev->dev_cap = si->dev_cap; - rdev->id = si->id; - rdev->pfd = si->pfd; - /* keep rdev->master to MIO_NULL. it's set to the right master - * device in dev_pro_make() */ - - return 0; -} - static int dev_pro_kill_master (mio_dev_t* dev, int force) { mio_t* mio = dev->mio; @@ -507,6 +493,20 @@ static int dev_pro_kill_master (mio_dev_t* dev, int force) return 0; } +static int dev_pro_make_slave (mio_dev_t* dev, void* ctx) +{ + mio_dev_pro_slave_t* rdev = (mio_dev_pro_slave_t*)dev; + slave_info_t* si = (slave_info_t*)ctx; + + rdev->dev_cap = si->dev_cap; + rdev->id = si->id; + rdev->pfd = si->pfd; + /* keep rdev->master to MIO_NULL. it's set to the right master + * device in dev_pro_make() */ + + return 0; +} + static int dev_pro_kill_slave (mio_dev_t* dev, int force) { mio_t* mio = dev->mio; @@ -554,6 +554,12 @@ static int dev_pro_kill_slave (mio_dev_t* dev, int force) return 0; } +static void dev_pro_fail_before_make_slave (void* ctx) +{ + slave_info_t* si = (slave_info_t*)ctx; + close (si->pfd); +} + static int dev_pro_read_slave (mio_dev_t* dev, void* buf, mio_iolen_t* len, mio_devaddr_t* srcaddr) { mio_dev_pro_slave_t* pro = (mio_dev_pro_slave_t*)dev; diff --git a/mio/lib/sck.c b/mio/lib/sck.c index 8fcf367..6299671 100644 --- a/mio/lib/sck.c +++ b/mio/lib/sck.c @@ -381,8 +381,14 @@ static int dev_sck_make (mio_dev_t* dev, void* ctx) return 0; oops: - if (hnd != MIO_SYSHND_INVALID) close (hnd); - if (side_chan != MIO_SYSHND_INVALID) close (side_chan); + if (hnd != MIO_SYSHND_INVALID) + { + close (hnd); + } + if (side_chan != MIO_SYSHND_INVALID) + { + close (side_chan); + } return -1; } diff --git a/mio/lib/thr.c b/mio/lib/thr.c index 14865c4..c53f466 100644 --- a/mio/lib/thr.c +++ b/mio/lib/thr.c @@ -32,6 +32,7 @@ #include #include +#include /* ========================================================================= */ struct mio_dev_thr_info_t @@ -64,13 +65,21 @@ static void free_thr_info_resources (mio_t* mio, mio_dev_thr_info_t* ti) { if (ti->thr_iop.rfd != MIO_SYSHND_INVALID) { - close (ti->thr_iop.rfd); - ti->thr_iop.rfd = MIO_SYSHND_INVALID; + /* this function is called at the end of run_thr_func() and + * close() can be a thread cancellation point. + * + * i must invalidate ti->thr_iop.rfd calling close() with it. + * if resetting is done after close() and close() becomes a cancellation point, + * the invalidation operation gets skipped. */ + mio_syshnd_t tmp = ti->thr_iop.rfd; + ti->thr_iop.rfd = MIO_SYSHND_INVALID; + close (tmp); } if (ti->thr_iop.wfd != MIO_SYSHND_INVALID) { - close (ti->thr_iop.wfd); + mio_syshnd_t tmp = ti->thr_iop.wfd; ti->thr_iop.wfd = MIO_SYSHND_INVALID; + close (tmp); } } @@ -119,9 +128,7 @@ static void* run_thr_func (void* ctx) ti->thr_func (ti->mio, &ti->thr_iop, ti->thr_ctx); - /* This part may get partially executed or not executed if the thread is cancelled */ - free_thr_info_resources (ti->mio, ti); /* TODO: check if the close() call inside this call completes when it becomes a cancellation point. if so, the code must get changed */ - /* ---------------------------------------------------------- */ + free_thr_info_resources (ti->mio, ti); pthread_cleanup_pop (1); pthread_exit (MIO_NULL); @@ -137,8 +144,20 @@ static int dev_thr_make_master (mio_dev_t* dev, void* ctx) slave_info_t si; int i; +#if defined(HAVE_PIPE2) && defined(O_CLOEXEC) && defined(O_NONBLOCK) + if (pipe2(&pfds[0], O_CLOEXEC | O_NONBLOCK) == -1 || + pipe2(&pfds[2], O_CLOEXEC | O_NONBLOCK) == -1) + { + if (errno != ENOSYS) goto pipe_error; + } + else goto pipe_done; +#endif + if (pipe(&pfds[0]) == -1 || pipe(&pfds[2]) == -1) { +#if defined(HAVE_PIPE2) && defined(O_CLOEXEC) && defined(O_NONBLOCK) + pipe_error: +#endif mio_seterrwithsyserr (mio, 0, errno); goto oops; } @@ -146,26 +165,36 @@ static int dev_thr_make_master (mio_dev_t* dev, void* ctx) if (mio_makesyshndasync(mio, pfds[1]) <= -1 || mio_makesyshndasync(mio, pfds[2]) <= -1) goto oops; + if (mio_makesyshndcloexec(mio, pfds[0]) <= -1 || + mio_makesyshndcloexec(mio, pfds[1]) <= -1 || + mio_makesyshndcloexec(mio, pfds[2]) <= -1 || + mio_makesyshndcloexec(mio, pfds[1]) <= -1) goto oops; + +#if defined(HAVE_PIPE2) && defined(O_CLOEXEC) && defined(O_NONBLOCK) +pipe_done: +#endif si.mi = info; si.pfd = pfds[1]; si.dev_cap = MIO_DEV_CAP_OUT | MIO_DEV_CAP_STREAM; si.id = MIO_DEV_THR_IN; + /* invalidate pfds[1] before calling make_slave() because when it fails, the + * fail_before_make(dev_thr_fail_before_make_slave) and kill(dev_thr_kill_slave) callbacks close si.pfd */ + pfds[1] = MIO_SYSHND_INVALID; + rdev->slave[MIO_DEV_THR_IN] = make_slave(mio, &si); if (!rdev->slave[MIO_DEV_THR_IN]) goto oops; - - pfds[1] = MIO_SYSHND_INVALID; rdev->slave_count++; si.mi = info; si.pfd = pfds[2]; si.dev_cap = MIO_DEV_CAP_IN | MIO_DEV_CAP_STREAM; si.id = MIO_DEV_THR_OUT; - + /* invalidate pfds[2] before calling make_slave() because when it fails, the + * fail_before_make(dev_thr_fail_before_make_slave) and kill(dev_thr_kill_slave) callbacks close si.pfd */ + pfds[2] = MIO_SYSHND_INVALID; rdev->slave[MIO_DEV_THR_OUT] = make_slave(mio, &si); if (!rdev->slave[MIO_DEV_THR_OUT]) goto oops; - - pfds[2] = MIO_SYSHND_INVALID; rdev->slave_count++; for (i = 0; i < MIO_COUNTOF(rdev->slave); i++) @@ -200,6 +229,10 @@ static int dev_thr_make_master (mio_dev_t* dev, void* ctx) mio_freemem (mio, ti); goto oops; } + + /* the thread function is in charge of these two file descriptors */ + pfds[0] = MIO_SYSHND_INVALID; + pfds[3] = MIO_SYSHND_INVALID; } /* ---------------------------------------------------------- */ @@ -209,7 +242,10 @@ static int dev_thr_make_master (mio_dev_t* dev, void* ctx) oops: for (i = 0; i < MIO_COUNTOF(pfds); i++) { - if (pfds[i] != MIO_SYSHND_INVALID) close (pfds[i]); + if (pfds[i] != MIO_SYSHND_INVALID) + { + close (pfds[i]); + } } for (i = MIO_COUNTOF(rdev->slave); i > 0; ) @@ -248,7 +284,10 @@ static int dev_thr_kill_master (mio_dev_t* dev, int force) int i; ti = rdev->thr_info; - pthread_cancel (ti->thr_hnd); + /* pthread_cancel() seems to create some dangling file descriptors not closed properly. + * i don't seem to get it working correctly as of now. proper cancellation point management + * is very difficult. without pthread_cancel() here, higher pressure on cfmb is expected */ + /*pthread_cancel (ti->thr_hnd); */ if (rdev->slave_count > 0) { @@ -269,17 +308,26 @@ static int dev_thr_kill_master (mio_dev_t* dev, int force) } } + rdev->thr_info = MIO_NULL; if (ti->thr_done) { - pthread_detach (ti->thr_hnd); /* pthread_join() may be blocking. */ + pthread_detach (ti->thr_hnd); /* pthread_join() may be blocking. detach the thread instead */ free_thr_info_resources (mio, ti); mio_freemem (mio, ti); } else { + #if 0 + /* since pthread_join can be blocking, i'd schedule a resource destroyer with mio_addcfmb(). + * see after #else */ + pthread_join (ti->thr_hnd, MIO_NULL); + free_thr_info_resources (mio, ti); + mio_freemem (mio, ti); + #else + /* schedule a resource destroyer */ mio_addcfmb (mio, ti, ready_to_free_thr_info); + #endif } - rdev->thr_info = MIO_NULL; if (rdev->on_close) rdev->on_close (rdev, MIO_DEV_THR_MASTER); return 0; @@ -332,6 +380,14 @@ static int dev_thr_kill_slave (mio_dev_t* dev, int force) return 0; } +static void dev_thr_fail_before_make_slave (void* ctx) +{ + slave_info_t* si = (slave_info_t*)ctx; + /* mio_dev_make() failed before it called the make() callback. + * i will close the pipe fd here instead of in the caller of mio_dev_make() */ + close (si->pfd); +} + static int dev_thr_read_slave (mio_dev_t* dev, void* buf, mio_iolen_t* len, mio_devaddr_t* srcaddr) { mio_dev_thr_slave_t* thr = (mio_dev_thr_slave_t*)dev; @@ -377,7 +433,7 @@ static int dev_thr_write_slave (mio_dev_t* dev, const void* data, mio_iolen_t* l if (MIO_UNLIKELY(*len <= 0)) { /* this is an EOF indicator */ - /* It isn't apthrpriate to call mio_dev_halt(thr) or mio_dev_thr_close(thr->master, MIO_DEV_THR_IN) + /* It isn't appropriate to call mio_dev_halt(thr) or mio_dev_thr_close(thr->master, MIO_DEV_THR_IN) * as those functions destroy the device itself */ if (MIO_LIKELY(thr->pfd != MIO_SYSHND_INVALID)) { @@ -523,7 +579,7 @@ static mio_dev_mth_t dev_thr_methods_slave = { dev_thr_make_slave, dev_thr_kill_slave, - MIO_NULL, + dev_thr_fail_before_make_slave, dev_thr_getsyshnd_slave, dev_thr_read_slave,