From ef4477ea7cebfc7096fecd15e304fa4bd32ca95e Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Sun, 24 Apr 2016 17:30:43 +0000 Subject: [PATCH] enhanced the mutiplexer code to support the ordinary poll() --- stio/configure | 43 ++++- stio/configure.ac | 4 +- stio/lib/main.c | 14 +- stio/lib/stio-cfg.h.in | 3 + stio/lib/stio-cmn.h | 18 ++ stio/lib/stio-prv.h | 7 +- stio/lib/stio-sck.c | 41 ++-- stio/lib/stio.c | 428 +++++++++++++++++++++++++++++++++++------ stio/lib/stio.h | 2 +- 9 files changed, 484 insertions(+), 76 deletions(-) diff --git a/stio/configure b/stio/configure index db2e495..775271d 100755 --- a/stio/configure +++ b/stio/configure @@ -17100,7 +17100,7 @@ fi done -for ac_header in sys/sendfile.h sys/epoll.h sys/event.h +for ac_header in sys/sendfile.h sys/epoll.h sys/event.h sys/poll.h do : as_ac_Header=`$as_echo "ac_cv_header_$ac_header" | $as_tr_sh` ac_fn_c_check_header_mongrel "$LINENO" "$ac_header" "$as_ac_Header" "$ac_includes_default" @@ -17391,6 +17391,47 @@ done LIBS="$OLDLIBS" +{ $as_echo "$as_me:${as_lineno-$LINENO}: checking for clock_gettime in -lrt" >&5 +$as_echo_n "checking for clock_gettime in -lrt... " >&6; } +if ${ac_cv_lib_rt_clock_gettime+:} false; then : + $as_echo_n "(cached) " >&6 +else + ac_check_lib_save_LIBS=$LIBS +LIBS="-lrt $LIBS" +cat confdefs.h - <<_ACEOF >conftest.$ac_ext +/* end confdefs.h. */ + +/* Override any GCC internal prototype to avoid an error. + Use char because int might match the return type of a GCC + builtin and then its argument prototype would still apply. */ +#ifdef __cplusplus +extern "C" +#endif +char clock_gettime (); +int +main () +{ +return clock_gettime (); + ; + return 0; +} +_ACEOF +if ac_fn_c_try_link "$LINENO"; then : + ac_cv_lib_rt_clock_gettime=yes +else + ac_cv_lib_rt_clock_gettime=no +fi +rm -f core conftest.err conftest.$ac_objext \ + conftest$ac_exeext conftest.$ac_ext +LIBS=$ac_check_lib_save_LIBS +fi +{ $as_echo "$as_me:${as_lineno-$LINENO}: result: $ac_cv_lib_rt_clock_gettime" >&5 +$as_echo "$ac_cv_lib_rt_clock_gettime" >&6; } +if test "x$ac_cv_lib_rt_clock_gettime" = xyes; then : + LIBS="$LIBS -lrt" +fi + + for ac_func in connect gethostbyname do : as_ac_var=`$as_echo "ac_cv_func_$ac_func" | $as_tr_sh` diff --git a/stio/configure.ac b/stio/configure.ac index 80d5bc2..e746c6e 100644 --- a/stio/configure.ac +++ b/stio/configure.ac @@ -130,7 +130,7 @@ AC_HEADER_STDC AC_CHECK_HEADERS([stddef.h wchar.h wctype.h errno.h signal.h fcntl.h dirent.h]) AC_CHECK_HEADERS([time.h sys/time.h utime.h spawn.h execinfo.h ucontext.h]) AC_CHECK_HEADERS([sys/resource.h sys/wait.h sys/syscall.h sys/ioctl.h]) -AC_CHECK_HEADERS([sys/sendfile.h sys/epoll.h sys/event.h]) +AC_CHECK_HEADERS([sys/sendfile.h sys/epoll.h sys/event.h sys/poll.h]) AC_CHECK_HEADERS([sys/sysctl.h sys/socket.h sys/sockio.h sys/un.h]) AC_CHECK_HEADERS([ifaddrs.h tiuser.h linux/netfilter_ipv4.h netinet/sctp.h]) AC_CHECK_HEADERS([net/if.h net/if_dl.h], [], [], [ @@ -168,6 +168,8 @@ AC_CHECK_FUNCS([powl fmodl sinl cosl tanl sinhl coshl tanhl asinl acosl atanl at AC_CHECK_FUNCS([pow fmod sin cos tan sinh cosh tanh asin acos atan atan2 log log10 exp sqrt ceil floor round]) AC_CHECK_FUNCS([powf fmodf sinf cosf tanf sinhf coshf tanhf asinf acosf atanf atan2f logf log10f expf sqrtf ceilf floorf roundf]) LIBS="$OLDLIBS" + +AC_CHECK_LIB([rt], [clock_gettime], [LIBS="$LIBS -lrt"]) dnl OLDLIBS="$LIBS" dnl AC_SEARCH_LIBS([connect], [socket]) diff --git a/stio/lib/main.c b/stio/lib/main.c index 1555cdb..227074a 100644 --- a/stio/lib/main.c +++ b/stio/lib/main.c @@ -64,13 +64,15 @@ static mmgr_stat_t mmgr_stat; static void* mmgr_alloc (stio_mmgr_t* mmgr, stio_size_t size) { + void* x; + if (((mmgr_stat_t*)mmgr->ctx)->total_count > 100) { printf ("CRITICAL ERROR ---> too many heap chunks...\n"); return STIO_NULL; } - void* x = malloc (size); + x = malloc (size); if (x) ((mmgr_stat_t*)mmgr->ctx)->total_count++; return x; } @@ -207,6 +209,8 @@ printf ("ENABLING READING..............................\n"); static int tcp_sck_on_read (stio_dev_sck_t* tcp, const void* buf, stio_iolen_t len, const stio_sckaddr_t* srcaddr) { + int n; + if (len <= 0) { printf ("STREAM DEVICE: EOF RECEIVED...\n"); @@ -217,17 +221,21 @@ static int tcp_sck_on_read (stio_dev_sck_t* tcp, const void* buf, stio_iolen_t l printf ("on read %d\n", (int)len); +{ stio_ntime_t tmout; -int n; + static char a ='A'; char* xxx = malloc (1000000); memset (xxx, a++ ,1000000); + //return stio_dev_sck_write (tcp, "HELLO", 5, STIO_NULL); - stio_inittime (&tmout, 1, 0); + stio_inittime (&tmout, 5, 0); n = stio_dev_sck_timedwrite (tcp, xxx, 1000000, &tmout, STIO_NULL, STIO_NULL); free (xxx); + if (n <= -1) return -1; +} /* post the write finisher */ n = stio_dev_sck_write (tcp, STIO_NULL, 0, STIO_NULL, STIO_NULL); diff --git a/stio/lib/stio-cfg.h.in b/stio/lib/stio-cfg.h.in index 0c09dbe..917230a 100644 --- a/stio/lib/stio-cfg.h.in +++ b/stio/lib/stio-cfg.h.in @@ -555,6 +555,9 @@ */ #undef HAVE_SYS_NDIR_H +/* Define to 1 if you have the header file. */ +#undef HAVE_SYS_POLL_H + /* Define to 1 if you have the header file. */ #undef HAVE_SYS_PRCTL_H diff --git a/stio/lib/stio-cmn.h b/stio/lib/stio-cmn.h index d16b3c3..66182ae 100644 --- a/stio/lib/stio-cmn.h +++ b/stio/lib/stio-cmn.h @@ -304,6 +304,24 @@ typedef int stio_mcint_t; #define STIO_ALIGNOF(type) STIO_OFFSETOF(struct { stio_uint8_t d1; type d2; }, d2) /*(sizeof(struct { stio_uint8_t d1; type d2; }) - sizeof(type))*/ +/** + * Round up a positive integer to the nearest multiple of 'align' + */ +#define STIO_ALIGNTO(num,align) ((((num) + (align) - 1) / (align)) * (align)) + +#if 0 +/** + * Round up a number, both positive and negative, to the nearest multiple of 'align' + */ +#define STIO_ALIGNTO(num,align) ((((num) + (num >= 0? 1: -1) * (align) - 1) / (align)) * (align)) +#endif + +/** + * Round up a positive integer to to the nearest multiple of 'align' which + * should be a multiple of a power of 2 + */ +#define STIO_ALIGNTO_POW2(num,align) ((((num) + (align) - 1)) & ~((align) - 1)) + #if defined(__cplusplus) # if (__cplusplus >= 201103L) /* C++11 */ # define STIO_NULL nullptr diff --git a/stio/lib/stio-prv.h b/stio/lib/stio-prv.h index 4f181d7..66557e4 100644 --- a/stio/lib/stio-prv.h +++ b/stio/lib/stio-prv.h @@ -29,8 +29,6 @@ #include "stio.h" -#include - /*TODO: redefine and remove these */ #include #include @@ -53,6 +51,8 @@ struct stio_tmrjob_t #define STIO_TMRIDX_INVALID ((stio_tmridx_t)-1) +typedef struct stio_mux_t stio_mux_t; + struct stio_t { stio_mmgr_t* mmgr; @@ -93,8 +93,7 @@ struct stio_t #if defined(_WIN32) HANDLE iocp; #else - int mux; - struct epoll_event revs[100]; + stio_mux_t* mux; #endif }; diff --git a/stio/lib/stio-sck.c b/stio/lib/stio-sck.c index 1f5b478..c65f1c3 100644 --- a/stio/lib/stio-sck.c +++ b/stio/lib/stio-sck.c @@ -533,6 +533,8 @@ static int dev_sck_write_stateful (stio_dev_t* dev, const void* data, stio_iolen { int err = SSL_get_error ((SSL*)rdev->ssl, x); if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) return 0; + /*else if (err == SSL_ERROR_SYSCALL) + rdev->stio->errnum = stio_syserrtoerrnum(errno); */ rdev->stio->errnum = STIO_ESYSERR; return -1; } @@ -663,6 +665,7 @@ static int do_ssl (stio_dev_sck_t* dev, int (*ssl_func)(SSL*)) stio_stop (dev->stio, STIO_STOPREQ_WATCHER_ERROR); ret = -1; } + return ret; } @@ -794,6 +797,10 @@ static int dev_sck_ioctl (stio_dev_t* dev, int cmd, void* arg) } SSL_CTX_set_read_ahead (ssl_ctx, 0); + SSL_CTX_set_mode (ssl_ctx, SSL_CTX_get_mode(ssl_ctx) | + /*SSL_MODE_ENABLE_PARTIAL_WRITE |*/ + SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); + SSL_CTX_set_options(ssl_ctx, SSL_OP_NO_SSLv2); /* no outdated SSLv2 by default */ rdev->tmout = bnd->accept_tmout; @@ -879,6 +886,9 @@ static int dev_sck_ioctl (stio_dev_t* dev, int cmd, void* arg) } SSL_CTX_set_read_ahead (ssl_ctx, 0); + SSL_CTX_set_mode (ssl_ctx, SSL_CTX_get_mode(ssl_ctx) | + /* SSL_MODE_ENABLE_PARTIAL_WRITE | */ + SSL_MODE_ACCEPT_MOVING_WRITE_BUFFER); } #endif /*{ @@ -915,6 +925,7 @@ fcntl (rdev->sck, F_SETFL, flags | O_NONBLOCK); else { /* update rdev->tmout to the deadline of the connect timeout job */ + STIO_ASSERT (rdev->tmrjob_index != STIO_TMRIDX_INVALID); stio_gettmrjobdeadline (rdev->stio, rdev->tmrjob_index, &rdev->tmout); } } @@ -972,23 +983,31 @@ fcntl (rdev->sck, F_SETFL, flags | O_NONBLOCK); if (x == 0) { STIO_ASSERT (rdev->tmrjob_index == STIO_TMRIDX_INVALID); + stio_inittime (&rdev->tmout, 0, 0); /* just in case */ /* it's ok to use conn->connect_tmout for ssl-connect as * the underlying socket connection has been established immediately */ - if (stio_ispostime(&conn->connect_tmout) && - schedule_timer_job_after (rdev, &conn->connect_tmout, ssl_connect_timedout) <= -1) + if (stio_ispostime(&conn->connect_tmout)) { - /* no device halting in spite of failure. - * let the caller handle this after having - * checked the return code as it is an IOCTL call. */ - SSL_CTX_free (rdev->ssl_ctx); - rdev->ssl_ctx = STIO_NULL; + if (schedule_timer_job_after (rdev, &conn->connect_tmout, ssl_connect_timedout) <= -1) + { + /* no device halting in spite of failure. + * let the caller handle this after having + * checked the return code as it is an IOCTL call. */ + SSL_CTX_free (rdev->ssl_ctx); + rdev->ssl_ctx = STIO_NULL; - STIO_ASSERT (rdev->ssl == STIO_NULL); - return -1; + STIO_ASSERT (rdev->ssl == STIO_NULL); + return -1; + } + else + { + /* update rdev->tmout to the deadline of the connect timeout job */ + STIO_ASSERT (rdev->tmrjob_index != STIO_TMRIDX_INVALID); + stio_gettmrjobdeadline (rdev->stio, rdev->tmrjob_index, &rdev->tmout); + } } - rdev->tmout = conn->connect_tmout; STIO_DEV_SCK_SET_PROGRESS (rdev, STIO_DEV_SCK_CONNECTING_SSL); } else @@ -1132,7 +1151,7 @@ static int harvest_outgoing_connection (stio_dev_sck_t* rdev) /* rdev->tmout has been set to the deadline of the connect task * when the CONNECT IOCTL command has been executed. use the - * same dead line here */ + * same deadline here */ if (stio_ispostime(&rdev->tmout) && schedule_timer_job_at (rdev, &rdev->tmout, ssl_connect_timedout) <= -1) { diff --git a/stio/lib/stio.c b/stio/lib/stio.c index b3be147..f853281 100644 --- a/stio/lib/stio.c +++ b/stio/lib/stio.c @@ -25,15 +25,23 @@ */ #include "stio-prv.h" + +#if defined(HAVE_SYS_EPOLL_H) +# include +# define USE_EPOLL +#elif defined(HAVE_SYS_POLL_H) +# include +# define USE_POLL +#else +# error NO SUPPORTED MULTIPLEXER +#endif -#include #include #include #include #define DEV_CAPA_ALL_WATCHED (STIO_DEV_CAPA_IN_WATCHED | STIO_DEV_CAPA_OUT_WATCHED | STIO_DEV_CAPA_PRI_WATCHED) - static int schedule_kill_zombie_job (stio_dev_t* dev); static int kill_and_free_device (stio_dev_t* dev, int force); @@ -52,6 +60,279 @@ static int kill_and_free_device (stio_dev_t* dev, int force); else (list)->tail = (dev)->dev_prev; \ } while (0) + + +/* ========================================================================= */ +#if defined(USE_POLL) + +#define MUX_CMD_INSERT 1 +#define MUX_CMD_UPDATE 2 +#define MUX_CMD_DELETE 3 + +#define MUX_INDEX_INVALID STIO_TYPE_MAX(stio_size_t) + +struct stio_mux_t +{ + struct + { + stio_size_t* ptr; + stio_size_t size; + stio_size_t capa; + } map; /* handle to index */ + + struct + { + struct pollfd* pfd; + stio_dev_t** dptr; + stio_size_t size; + stio_size_t capa; + } pd; /* poll data */ +}; + + +static int mux_open (stio_t* stio) +{ + stio_mux_t* mux; + + mux = STIO_MMGR_ALLOC (stio->mmgr, STIO_SIZEOF(*mux)); + if (!mux) + { + stio->errnum = STIO_ENOMEM; + return -1; + } + + STIO_MEMSET (mux, 0, STIO_SIZEOF(*mux)); + + stio->mux = mux; + return 0; +} + +static void mux_close (stio_t* stio) +{ + if (stio->mux) + { + STIO_MMGR_FREE (stio->mmgr, stio->mux); + stio->mux = STIO_NULL; + } +} + +static int mux_control (stio_dev_t* dev, int cmd, stio_syshnd_t hnd, int dev_capa) +{ + stio_t* stio; + stio_mux_t* mux; + stio_size_t idx; + + stio = dev->stio; + mux = (stio_mux_t*)stio->mux; + + if (hnd >= mux->map.capa) + { + stio_size_t new_capa; + stio_size_t* tmp; + + if (cmd != MUX_CMD_INSERT) + { + stio->errnum = STIO_ENOENT; + return -1; + } + + new_capa = STIO_ALIGNTO_POW2((hnd + 1), 256); + + tmp = STIO_MMGR_REALLOC (stio->mmgr, mux->map.ptr, new_capa * STIO_SIZEOF(*tmp)); + if (!tmp) + { + stio->errnum = STIO_ENOMEM; + return -1; + } + + for (idx = mux->map.capa; idx < new_capa; idx++) + tmp[idx] = MUX_INDEX_INVALID; + + mux->map.ptr = tmp; + mux->map.capa = new_capa; + } + + idx = mux->map.ptr[hnd]; + if (idx != MUX_INDEX_INVALID) + { + if (cmd == MUX_CMD_INSERT) + { + stio->errnum = STIO_EEXIST; + return -1; + } + } + else + { + if (cmd != MUX_CMD_INSERT) + { + stio->errnum = STIO_ENOENT; + return -1; + } + } + + switch (cmd) + { + case MUX_CMD_INSERT: + + if (mux->pd.size >= mux->pd.capa) + { + stio_size_t new_capa; + struct pollfd* tmp1; + stio_dev_t** tmp2; + + new_capa = STIO_ALIGNTO_POW2(mux->pd.size + 1, 256); + + tmp1 = STIO_MMGR_REALLOC (stio->mmgr, mux->pd.pfd, new_capa * STIO_SIZEOF(*tmp1)); + if (!tmp1) + { + stio->errnum = STIO_ENOMEM; + return -1; + } + + tmp2 = STIO_MMGR_REALLOC (stio->mmgr, mux->pd.dptr, new_capa * STIO_SIZEOF(*tmp2)); + if (!tmp2) + { + STIO_MMGR_FREE (stio->mmgr, tmp1); + stio->errnum = STIO_ENOMEM; + return -1; + } + + mux->pd.pfd = tmp1; + mux->pd.dptr = tmp2; + mux->pd.capa = new_capa; + } + + idx = mux->pd.size++; + + mux->pd.pfd[idx].fd = hnd; + mux->pd.pfd[idx].events = 0; + if (dev_capa & STIO_DEV_CAPA_IN_WATCHED) mux->pd.pfd[idx].events |= POLLIN; + if (dev_capa & STIO_DEV_CAPA_OUT_WATCHED) mux->pd.pfd[idx].events |= POLLOUT; + mux->pd.pfd[idx].revents = 0; + mux->pd.dptr[idx] = dev; + + mux->map.ptr[hnd] = idx; + + return 0; + + case MUX_CMD_UPDATE: + STIO_ASSERT (mux->pd.dptr[idx] == dev); + mux->pd.pfd[idx].events = 0; + if (dev_capa & STIO_DEV_CAPA_IN_WATCHED) mux->pd.pfd[idx].events |= POLLIN; + if (dev_capa & STIO_DEV_CAPA_OUT_WATCHED) mux->pd.pfd[idx].events |= POLLOUT; + return 0; + + case MUX_CMD_DELETE: + STIO_ASSERT (mux->pd.dptr[idx] == dev); + mux->map.ptr[hnd] = MUX_INDEX_INVALID; + + /* TODO: speed up deletion. allow a hole in the array. + * delay array compaction if there is a hole. + * set fd for the hole to -1 such that poll() + * ignores it. compact the array if another deletion + * is requested when there is an existing hole. */ + idx++; + while (idx < mux->pd.size) + { + int fd; + + mux->pd.pfd[idx - 1] = mux->pd.pfd[idx]; + mux->pd.dptr[idx - 1] = mux->pd.dptr[idx]; + + fd = mux->pd.pfd[idx].fd; + mux->map.ptr[fd] = idx - 1; + + idx++; + } + + mux->pd.size--; + + return 0; + + default: + stio->errnum = STIO_EINVAL; + return -1; + } +} + +#elif defined(USE_EPOLL) + +#define MUX_CMD_INSERT EPOLL_CTL_ADD +#define MUX_CMD_UPDATE EPOLL_CTL_MOD +#define MUX_CMD_DELETE EPOLL_CTL_DEL + +struct stio_mux_t +{ + int hnd; + struct epoll_event revs[100]; +}; + +static int mux_open (stio_t* stio) +{ + stio_mux_t* mux; + + mux = STIO_MMGR_ALLOC (stio->mmgr, STIO_SIZEOF(*mux)); + if (!mux) + { + stio->errnum = STIO_ENOMEM; + return -1; + } + + STIO_MEMSET (mux, 0, STIO_SIZEOF(*mux)); + + mux->hnd = epoll_create (1000); + if (mux->hnd == -1) + { + stio->errnum = stio_syserrtoerrnum(errno); + STIO_MMGR_FREE (stio->mmgr, mux); + return -1; + } + + stio->mux = mux; + return 0; +} + +static void mux_close (stio_t* stio) +{ + if (stio->mux) + { + close (stio->mux->hnd); + STIO_MMGR_FREE (stio->mmgr, stio->mux); + stio->mux = STIO_NULL; + } +} + + +static STIO_INLINE int mux_control (stio_dev_t* dev, int cmd, stio_syshnd_t hnd, int dev_capa) +{ + struct epoll_event ev; + + ev.data.ptr = dev; + ev.events = EPOLLHUP | EPOLLERR /*| EPOLLET*/; + + if (dev_capa & STIO_DEV_CAPA_IN_WATCHED) + { + ev.events |= EPOLLIN; + #if defined(EPOLLRDHUP) + ev.events |= EPOLLRDHUP; + #endif + if (dev_capa & STIO_DEV_CAPA_PRI_WATCHED) ev.events |= EPOLLPRI; + } + + if (dev_capa & STIO_DEV_CAPA_OUT_WATCHED) ev.events |= EPOLLOUT; + + if (epoll_ctl (dev->stio->mux->hnd, cmd, hnd, &ev) == -1) + { + dev->stio->errnum = stio_syserrtoerrnum(errno); + return -1; + } + + return 0; +} +#endif + +/* ========================================================================= */ + stio_t* stio_open (stio_mmgr_t* mmgr, stio_size_t xtnsize, stio_size_t tmrcapa, stio_errnum_t* errnum) { stio_t* stio; @@ -87,12 +368,8 @@ int stio_init (stio_t* stio, stio_mmgr_t* mmgr, stio_size_t tmrcapa) stio->mmgr = mmgr; /* intialize the multiplexer object */ - stio->mux = epoll_create (1000); - if (stio->mux == -1) - { - stio->errnum = stio_syserrtoerrnum(errno); - return -1; - } + + if (mux_open (stio) <= -1) return -1; /* initialize the timer object */ if (tmrcapa <= 0) tmrcapa = 1; @@ -100,7 +377,7 @@ int stio_init (stio_t* stio, stio_mmgr_t* mmgr, stio_size_t tmrcapa) if (!stio->tmr.jobs) { stio->errnum = STIO_ENOMEM; - close (stio->mux); + mux_close (stio); return -1; } stio->tmr.capa = tmrcapa; @@ -166,7 +443,7 @@ void stio_fini (stio_t* stio) STIO_MMGR_FREE (stio->mmgr, stio->tmr.jobs); /* close the multiplexer */ - close (stio->mux); + mux_close (stio); } @@ -191,33 +468,27 @@ static STIO_INLINE void unlink_wq (stio_t* stio, stio_wq_t* q) STIO_WQ_UNLINK (q); } -static STIO_INLINE void handle_event (stio_t* stio, stio_size_t i) +static STIO_INLINE void handle_event (stio_dev_t* dev, int events, int rdhup) { - stio_dev_t* dev; + stio_t* stio; + stio = dev->stio; stio->renew_watch = 0; - dev = stio->revs[i].data.ptr; STIO_ASSERT (stio == dev->stio); if (dev->dev_evcb->ready) { - int x, events = 0; + int x, xevents; - if (stio->revs[i].events & EPOLLIN) events |= STIO_DEV_EVENT_IN; - if (stio->revs[i].events & EPOLLOUT) events |= STIO_DEV_EVENT_OUT; - if (stio->revs[i].events & EPOLLPRI) events |= STIO_DEV_EVENT_PRI; - if (stio->revs[i].events & EPOLLERR) events |= STIO_DEV_EVENT_ERR; - if (stio->revs[i].events & EPOLLHUP) events |= STIO_DEV_EVENT_HUP; - #if defined(EPOLLRDHUP) - else if (stio->revs[i].events & EPOLLRDHUP) events |= STIO_DEV_EVENT_HUP; - #endif + xevents = events; + if (rdhup) xevents |= STIO_DEV_EVENT_HUP; /* return value of ready() * <= -1 - failure. kill the device. * == 0 - ok. but don't invoke recv() or send(). * >= 1 - everything is ok. */ - x = dev->dev_evcb->ready (dev, events); + x = dev->dev_evcb->ready (dev, xevents); if (x <= -1) { stio_dev_halt (dev); @@ -226,7 +497,7 @@ static STIO_INLINE void handle_event (stio_t* stio, stio_size_t i) else if (x == 0) goto skip_evcb; } - if (dev && stio->revs[i].events & EPOLLPRI) + if (dev && (events & STIO_DEV_EVENT_PRI)) { /* urgent data */ /* TODO: urgent data.... */ @@ -234,7 +505,7 @@ static STIO_INLINE void handle_event (stio_t* stio, stio_size_t i) printf ("has urgent data...\n"); } - if (dev && stio->revs[i].events & EPOLLOUT) + if (dev && (events & STIO_DEV_EVENT_OUT)) { /* write pending requests */ while (!STIO_WQ_ISEMPTY(&dev->wq)) @@ -251,7 +522,6 @@ printf ("has urgent data...\n"); send_leftover: ulen = urem; -printf ("&&&&&&&&&&&&&&&&&&&&&&&&&&&&7WRITING..........................\n"); x = dev->dev_mth->write (dev, uptr, &ulen, &q->dstaddr); if (x <= -1) { @@ -329,7 +599,7 @@ printf ("&&&&&&&&&&&&&&&&&&&&&&&&&&&&7WRITING..........................\n"); } } - if (dev && stio->revs[i].events & EPOLLIN) + if (dev && (events & STIO_DEV_EVENT_IN)) { stio_devaddr_t srcaddr; stio_iolen_t len; @@ -402,7 +672,7 @@ printf ("&&&&&&&&&&&&&&&&&&&&&&&&&&&&7WRITING..........................\n"); if (dev) { - if (stio->revs[i].events & (EPOLLERR | EPOLLHUP)) + if (events & (STIO_DEV_EVENT_ERR | STIO_DEV_EVENT_HUP)) { /* if error or hangup has been reported on the device, * halt the device. this check is performed after @@ -411,10 +681,9 @@ printf ("&&&&&&&&&&&&&&&&&&&&&&&&&&&&7WRITING..........................\n"); dev->dev_capa |= STIO_DEV_CAPA_IN_CLOSED | STIO_DEV_CAPA_OUT_CLOSED; stio->renew_watch = 1; } - #if defined(EPOLLRDHUP) - else if (dev && stio->revs[i].events & EPOLLRDHUP) + else if (dev && rdhup) { - if (stio->revs[i].events & (EPOLLIN | EPOLLOUT | EPOLLPRI)) + if (events & (STIO_DEV_EVENT_IN | STIO_DEV_EVENT_OUT | STIO_DEV_EVENT_PRI)) { /* it may be a half-open state. don't do anything here * to let the next read detect EOF */ @@ -425,7 +694,6 @@ printf ("&&&&&&&&&&&&&&&&&&&&&&&&&&&&7WRITING..........................\n"); stio->renew_watch = 1; } } - #endif if ((dev->dev_capa & STIO_DEV_CAPA_IN_CLOSED) && (dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED)) @@ -451,6 +719,7 @@ static STIO_INLINE int __exec (stio_t* stio) ULONG nentries, i; #else int nentries, i; + stio_mux_t* mux; #endif /*if (!stio->actdev.head) return 0;*/ @@ -478,10 +747,44 @@ static STIO_INLINE int __exec (stio_t* stio) { } */ +#elif defined(USE_POLL) -#else - nentries = epoll_wait (stio->mux, stio->revs, STIO_COUNTOF(stio->revs), STIO_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec)); - if (nentries <= -1) + mux = (stio_mux_t*)stio->mux; + + nentries = poll (mux->pd.pfd, mux->pd.size, STIO_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec)); + if (nentries == -1) + { + if (errno == EINTR) return 0; + stio->errnum = stio_syserrtoerrnum(errno); + return -1; + } + + for (i = 0; i < mux->pd.size; i++) + { + if (mux->pd.pfd[i].fd >= 0 && mux->pd.pfd[i].revents) + { + int events = 0; + stio_dev_t* dev; + + dev = mux->pd.dptr[i]; + + STIO_ASSERT (!(mux->pd.pfd[i].revents & POLLNVAL)); + if (mux->pd.pfd[i].revents & POLLIN) events |= STIO_DEV_EVENT_IN; + if (mux->pd.pfd[i].revents & POLLOUT) events |= STIO_DEV_EVENT_OUT; + if (mux->pd.pfd[i].revents & POLLPRI) events |= STIO_DEV_EVENT_PRI; + if (mux->pd.pfd[i].revents & POLLERR) events |= STIO_DEV_EVENT_ERR; + if (mux->pd.pfd[i].revents & POLLHUP) events |= STIO_DEV_EVENT_HUP; + + handle_event (dev, events, 0); + } + } + +#elif defined(USE_EPOLL) + + mux = (stio_mux_t*)stio->mux; + + nentries = epoll_wait (mux->hnd, mux->revs, STIO_COUNTOF(mux->revs), STIO_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec)); + if (nentries == -1) { if (errno == EINTR) return 0; /* it's actually ok */ /* other errors are critical - EBADF, EFAULT, EINVAL */ @@ -493,9 +796,27 @@ static STIO_INLINE int __exec (stio_t* stio) for (i = 0; i < nentries; i++) { - handle_event (stio, i); + int events = 0, rdhup = 0; + stio_dev_t* dev; + + dev = mux->revs[i].data.ptr; + + if (mux->revs[i].events & EPOLLIN) events |= STIO_DEV_EVENT_IN; + if (mux->revs[i].events & EPOLLOUT) events |= STIO_DEV_EVENT_OUT; + if (mux->revs[i].events & EPOLLPRI) events |= STIO_DEV_EVENT_PRI; + if (mux->revs[i].events & EPOLLERR) events |= STIO_DEV_EVENT_ERR; + if (mux->revs[i].events & EPOLLHUP) events |= STIO_DEV_EVENT_HUP; + #if defined(EPOLLRDHUP) + else if (mux->revs[i].events & EPOLLRDHUP) rdhup = 1; + #endif + handle_event (dev, events, rdhup); } +#else + +# error NO SUPPORTED MULTIPLEXER +#endif + /* kill all halted devices */ while (stio->hltdev.head) { @@ -504,8 +825,6 @@ printf (">>>>>>>>>>>>>> KILLING HALTED DEVICE %p\n", stio->hltdev.head); } STIO_ASSERT (stio->hltdev.tail == STIO_NULL); -#endif - return 0; } @@ -799,8 +1118,8 @@ int stio_dev_ioctl (stio_dev_t* dev, int cmd, void* arg) int stio_dev_watch (stio_dev_t* dev, stio_dev_watch_cmd_t cmd, int events) { - struct epoll_event ev; - int epoll_op; + /*struct epoll_event ev;*/ + int mux_cmd; int dev_capa; /* the virtual device doesn't perform actual I/O. @@ -809,13 +1128,13 @@ int stio_dev_watch (stio_dev_t* dev, stio_dev_watch_cmd_t cmd, int events) * of the system multiplexer for hangup and error. */ if (dev->dev_capa & STIO_DEV_CAPA_VIRTUAL) return 0; - ev.data.ptr = dev; + /*ev.data.ptr = dev;*/ switch (cmd) { case STIO_DEV_WATCH_START: /* upon start, only input watching is requested */ events = STIO_DEV_EVENT_IN; - epoll_op = EPOLL_CTL_ADD; + mux_cmd = MUX_CMD_INSERT; break; case STIO_DEV_WATCH_RENEW: @@ -827,12 +1146,12 @@ int stio_dev_watch (stio_dev_t* dev, stio_dev_watch_cmd_t cmd, int events) /* fall through */ case STIO_DEV_WATCH_UPDATE: /* honor event watching requests as given by the caller */ - epoll_op = EPOLL_CTL_MOD; + mux_cmd = MUX_CMD_UPDATE; break; case STIO_DEV_WATCH_STOP: events = 0; /* override events */ - epoll_op = EPOLL_CTL_DEL; + mux_cmd = MUX_CMD_DELETE; break; default: @@ -846,19 +1165,19 @@ int stio_dev_watch (stio_dev_t* dev, stio_dev_watch_cmd_t cmd, int events) /* this function honors STIO_DEV_EVENT_IN and STIO_DEV_EVENT_OUT only * as valid input event bits. it intends to provide simple abstraction * by reducing the variety of event bits that the caller has to handle. */ - ev.events = EPOLLHUP | EPOLLERR /*| EPOLLET*/; + /*ev.events = EPOLLHUP | EPOLLERR; */ if ((events & STIO_DEV_EVENT_IN) && !(dev->dev_capa & (STIO_DEV_CAPA_IN_CLOSED | STIO_DEV_CAPA_IN_DISABLED))) { if (dev->dev_capa & STIO_DEV_CAPA_IN) { - ev.events |= EPOLLIN; + /*ev.events |= EPOLLIN; #if defined(EPOLLRDHUP) ev.events |= EPOLLRDHUP; - #endif + #endif*/ if (dev->dev_capa & STIO_DEV_CAPA_PRI) { - ev.events |= EPOLLPRI; + /*ev.events |= EPOLLPRI;*/ dev_capa |= STIO_DEV_CAPA_PRI_WATCHED; } @@ -870,22 +1189,18 @@ int stio_dev_watch (stio_dev_t* dev, stio_dev_watch_cmd_t cmd, int events) { if (dev->dev_capa & STIO_DEV_CAPA_OUT) { - ev.events |= EPOLLOUT; + /*ev.events |= EPOLLOUT;*/ dev_capa |= STIO_DEV_CAPA_OUT_WATCHED; } } - if (epoll_op == EPOLL_CTL_MOD && (dev_capa & DEV_CAPA_ALL_WATCHED) == (dev->dev_capa & DEV_CAPA_ALL_WATCHED)) + if (mux_cmd == MUX_CMD_UPDATE && (dev_capa & DEV_CAPA_ALL_WATCHED) == (dev->dev_capa & DEV_CAPA_ALL_WATCHED)) { /* no change in the device capacity. skip calling epoll_ctl */ } else { - if (epoll_ctl (dev->stio->mux, epoll_op, dev->dev_mth->getsyshnd(dev), &ev) == -1) - { - dev->stio->errnum = stio_syserrtoerrnum(errno); - return -1; - } + if (mux_control (dev, mux_cmd, dev->dev_mth->getsyshnd(dev), dev_capa) <= -1) return -1; } dev->dev_capa = dev_capa; @@ -1122,6 +1437,9 @@ stio_errnum_t stio_syserrtoerrnum (int no) case EINVAL: return STIO_EINVAL; + case EEXIST: + return STIO_EEXIST; + case ENOENT: return STIO_ENOENT; diff --git a/stio/lib/stio.h b/stio/lib/stio.h index 5b53297..b1e2354 100644 --- a/stio/lib/stio.h +++ b/stio/lib/stio.h @@ -98,6 +98,7 @@ enum stio_errnum_t STIO_ENOMEM, STIO_EINVAL, + STIO_EEXIST, STIO_ENOENT, STIO_ENOSUP, /* not supported */ STIO_EMFILE, /* too many open files */ @@ -403,7 +404,6 @@ typedef struct stio_etharp_pkt_t stio_etharp_pkt_t; /* ========================================================================= */ - #ifdef __cplusplus extern "C" { #endif