From 513ed296a0ab4734152e0f87c0a4fa7498ad3dfe Mon Sep 17 00:00:00 2001 From: "hyunghwan.chung" Date: Tue, 21 Feb 2017 19:51:10 +0000 Subject: [PATCH] enhanced the event handling code further --- moo/lib/main.c | 92 +++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 80 insertions(+), 12 deletions(-) diff --git a/moo/lib/main.c b/moo/lib/main.c index d90bdb3..d5bc284 100644 --- a/moo/lib/main.c +++ b/moo/lib/main.c @@ -32,6 +32,7 @@ #include #include +#define USE_THREAD #if defined(_WIN32) # include @@ -77,10 +78,13 @@ # endif # include -# include # include -# include -# include +# include + +# if defined(USE_THREAD) +# include +# include +# endif #endif #if !defined(MOO_DEFAULT_PFMODPREFIX) @@ -126,6 +130,8 @@ struct xtn_t HANDLE waitable_timer; #else int ep; /* epoll */ + +#if defined(USE_THREAD) int p[2]; /* pipe for signaling */ pthread_t iothr; struct @@ -139,6 +145,14 @@ struct xtn_t int waiting; int need_cnd2; } ev; +#else + struct + { + struct epoll_event buf[32]; /*TODO: make it a dynamically allocated memory block depending on the file descriptor added. */ + moo_oow_t len; + } ev; +#endif + #endif }; @@ -667,6 +681,7 @@ if (mask & MOO_LOG_GC) return; /* don't show gc logs */ /* ========================================================================= */ +#if defined(USE_THREAD) static void* iothr_main (void* arg) { moo_t* moo = (moo_t*)arg; @@ -723,6 +738,7 @@ static void* iothr_main (void* arg) return MOO_NULL; } +#endif static int vm_startup (moo_t* moo) { @@ -735,7 +751,11 @@ static int vm_startup (moo_t* moo) struct epoll_event ev; int pcount = 0, flag; +#if defined(EPOLL_CLOEXEC) xtn->ep = epoll_create1 (EPOLL_CLOEXEC); +#else + xtn->ep = epoll_create (1024); +#endif if (xtn->ep == -1) { moo_syserrtoerrnum (errno); @@ -743,6 +763,14 @@ static int vm_startup (moo_t* moo) goto oops; } +#if defined(EPOLL_CLOEXEC) + /* do nothing */ +#else + flag = fcntl (xtn->ep, F_GETFD); + if (flag >= 0) fcntl (xtn->ep, F_SETFD, flag | FD_CLOEXEC); +#endif + +#if defined(USE_THREAD) if (pipe (xtn->p) == -1) { moo_syserrtoerrnum (errno); @@ -777,15 +805,19 @@ static int vm_startup (moo_t* moo) pthread_cond_init (&xtn->ev.cnd, MOO_NULL); pthread_cond_init (&xtn->ev.cnd2, MOO_NULL); pthread_create (&xtn->iothr, MOO_NULL, iothr_main, moo); +#endif return 0; oops: + +#if defined(USE_THREAD) if (pcount) { close (xtn->p[0]); close (xtn->p[1]); } +#endif if (xtn->ep >= 0) { close (xtn->ep); @@ -808,20 +840,24 @@ static void vm_cleanup (moo_t* moo) #else xtn_t* xtn = (xtn_t*)moo_getxtn(moo); +#if defined(USE_THREAD) write (xtn->p[1], "Q", 1); pthread_cond_signal (&xtn->ev.cnd); pthread_join (xtn->iothr, MOO_NULL); pthread_cond_destroy (&xtn->ev.cnd); pthread_cond_destroy (&xtn->ev.cnd2); pthread_mutex_destroy (&xtn->ev.mtx); +#endif if (xtn->ep) { +#if defined(USE_THREAD) struct epoll_event ev; epoll_ctl (xtn->ep, EPOLL_CTL_DEL, xtn->p[1], &ev); close (xtn->p[1]); close (xtn->p[0]); +#endif close (xtn->ep); xtn->ep = -1; @@ -999,31 +1035,33 @@ static void mux_del (moo_t* moo, moo_oop_semaphore_t sem) static void mux_wait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_cb_t muxwcb) { xtn_t* xtn = (xtn_t*)moo_getxtn(moo); - int tmout = 0, n, take_over = 0; + int tmout = 0, n; - if (dur) tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec); +#if defined(USE_THREAD) + int take_over = 0; if (xtn->ev.len <= 0) { + if (!dur) return; /* immediate check is requested. and there is no event */ + + tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec); + pthread_mutex_lock (&xtn->ev.mtx); if (xtn->ev.waiting) { take_over_attempt: write (xtn->p[1], "X", 1); xtn->ev.need_cnd2 = 1; - pthread_mutex_unlock (&xtn->ev.mtx); - /* wait until the peer thread sets xtn->ev.waiting to 0 after epoll_wait() */ - pthread_mutex_lock (&xtn->ev.mtx); + /* wait until the peer thread sets xtn->ev.waiting to 0 after epoll_wait(). + * mutex is unlocked and locked back by pthread_cond_wait() */ while (xtn->ev.waiting) pthread_cond_wait (&xtn->ev.cnd2, &xtn->ev.mtx); xtn->ev.need_cnd2 = 0; if (xtn->ev.waiting) { - /* the peer thread got the control again. this may happen if - * epoll_wait() timed out at the same time as write() above. - * it won't time out again. if i retry as the timeout value - * of epoll_wait() in the peer thread is relatively long. */ + /* the peer thread got the control again. but can this happen? + * anyway, try again. */ goto take_over_attempt; } else @@ -1111,6 +1149,36 @@ handle_event: goto epoll_start; } } +#else + if (dur) tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec); + + n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), tmout); + + if (n <= -1) + { + MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno)); + } + else + { + xtn->ev.len = n; + } + + while (n > 0) + { + int mask; + + --n; + + mask = 0; + if (xtn->ev.buf[n].events & EPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT; /* TODO define constants for IO Mask */ + if (xtn->ev.buf[n].events & EPOLLOUT) mask |= MOO_SEMAPHORE_IO_MASK_OUTPUT; + if (xtn->ev.buf[n].events & EPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; + if (xtn->ev.buf[n].events & EPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; + muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); + } + + xtn->ev.len = 0; +#endif } /* ========================================================================= */