diff --git a/moo/lib/Makefile.am b/moo/lib/Makefile.am index ab68688..d82d640 100644 --- a/moo/lib/Makefile.am +++ b/moo/lib/Makefile.am @@ -80,7 +80,7 @@ bin_PROGRAMS = moo moo_SOURCES = main.c moo_CPPFLAGS = $(CPPFLAGS_LIB_COMMON) moo_LDFLAGS = $(LDFLAGS_LIB_COMMON) -moo_LDADD = $(LIBADD_LIB_COMMON) -lmoo +moo_LDADD = $(LIBADD_LIB_COMMON) -lmoo -lpthread install-data-hook: diff --git a/moo/lib/Makefile.in b/moo/lib/Makefile.in index d9cdd7d..89353b4 100644 --- a/moo/lib/Makefile.in +++ b/moo/lib/Makefile.in @@ -454,7 +454,7 @@ libmoo_la_LIBADD = $(LIBADD_LIB_COMMON) $(am__append_5) moo_SOURCES = main.c moo_CPPFLAGS = $(CPPFLAGS_LIB_COMMON) moo_LDFLAGS = $(LDFLAGS_LIB_COMMON) -moo_LDADD = $(LIBADD_LIB_COMMON) -lmoo +moo_LDADD = $(LIBADD_LIB_COMMON) -lmoo -lpthread all: moo-cfg.h $(MAKE) $(AM_MAKEFLAGS) all-am diff --git a/moo/lib/main.c b/moo/lib/main.c index 87ea862..d90bdb3 100644 --- a/moo/lib/main.c +++ b/moo/lib/main.c @@ -57,7 +57,7 @@ # include # include #else -# include + # if defined(MOO_ENABLE_LIBLTDL) # include # define USE_LTDL @@ -76,7 +76,11 @@ # include # endif +# include +# include # include +# include +# include #endif #if !defined(MOO_DEFAULT_PFMODPREFIX) @@ -122,6 +126,19 @@ struct xtn_t HANDLE waitable_timer; #else int ep; /* epoll */ + int p[2]; /* pipe for signaling */ + pthread_t iothr; + struct + { + struct epoll_event buf[32]; /*TODO: make it a dynamically allocated memory block depending on the file descriptor added. */ + struct epoll_event buf2[32]; /*TODO: make it a dynamically allocated memory block depending on the file descriptor added. */ + moo_oow_t len; + pthread_mutex_t mtx; + pthread_cond_t cnd; + pthread_cond_t cnd2; + int waiting; + int need_cnd2; + } ev; #endif }; @@ -650,6 +667,63 @@ if (mask & MOO_LOG_GC) return; /* don't show gc logs */ /* ========================================================================= */ +static void* iothr_main (void* arg) +{ + moo_t* moo = (moo_t*)arg; + xtn_t* xtn = (xtn_t*)moo_getxtn(moo); + + while (!moo->abort_req) + { + if (xtn->ev.len <= 0) /* TODO: no mutex needed for this check? */ + { + int n; + + pthread_mutex_lock (&xtn->ev.mtx); + if (xtn->ev.waiting) + { + pthread_mutex_unlock (&xtn->ev.mtx); + goto cond_wait; + } + xtn->ev.waiting = 1; + pthread_mutex_unlock (&xtn->ev.mtx); + + n = epoll_wait (xtn->ep, xtn->ev.buf2, MOO_COUNTOF(xtn->ev.buf2), 10000); + + pthread_mutex_lock (&xtn->ev.mtx); + if (n <= -1) + { + /* TODO: don't use MOO_DEBUG2. it's not thread safe... */ + MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno)); + } + else if (n > 0) + { + memcpy (xtn->ev.buf, xtn->ev.buf2, MOO_SIZEOF(xtn->ev.buf2)); + xtn->ev.len = n; + } + xtn->ev.waiting = 0; + if (xtn->ev.need_cnd2) pthread_cond_signal (&xtn->ev.cnd2); + pthread_mutex_unlock (&xtn->ev.mtx); + } + else + { + /* the event buffer has not been emptied yet */ + struct timespec ts; + + cond_wait: + clock_gettime (CLOCK_REALTIME, &ts); + ts.tv_sec += 10; + + pthread_mutex_lock (&xtn->ev.mtx); + pthread_cond_timedwait (&xtn->ev.cnd, &xtn->ev.mtx, &ts); + pthread_mutex_unlock (&xtn->ev.mtx); + } + + sched_yield (); + } + + return MOO_NULL; +} + static int vm_startup (moo_t* moo) { #if defined(_WIN32) @@ -658,18 +732,60 @@ static int vm_startup (moo_t* moo) #else xtn_t* xtn = (xtn_t*)moo_getxtn(moo); + struct epoll_event ev; + int pcount = 0, flag; xtn->ep = epoll_create1 (EPOLL_CLOEXEC); if (xtn->ep == -1) { - MOO_DEBUG0 (moo, "Cannot create epoll\n"); moo_syserrtoerrnum (errno); + MOO_DEBUG1 (moo, "Cannot create epoll - %hs\n", strerror(errno)); goto oops; } + if (pipe (xtn->p) == -1) + { + moo_syserrtoerrnum (errno); + MOO_DEBUG1 (moo, "Cannot create pipes - %hs\n", strerror(errno)); + goto oops; + } + pcount = 2; + +#if defined(O_CLOEXEC) + flag = fcntl (xtn->p[0], F_GETFD); + if (flag >= 0) fcntl (xtn->p[0], F_SETFD, flag | FD_CLOEXEC); + flag = fcntl (xtn->p[1], F_GETFD); + if (flag >= 0) fcntl (xtn->p[1], F_SETFD, flag | FD_CLOEXEC); +#endif +#if defined(O_NONBLOCK) + flag = fcntl (xtn->p[0], F_GETFL); + if (flag >= 0) fcntl (xtn->p[0], F_SETFL, flag | O_NONBLOCK); + flag = fcntl (xtn->p[1], F_GETFL); + if (flag >= 0) fcntl (xtn->p[1], F_SETFL, flag | O_NONBLOCK); +#endif + + ev.events = EPOLLIN; + ev.data.ptr = (void*)MOO_TYPE_MAX(moo_oow_t); + if (epoll_ctl (xtn->ep, EPOLL_CTL_ADD, xtn->p[0], &ev) == -1) + { + moo_syserrtoerrnum (errno); + MOO_DEBUG1 (moo, "Cannot add a pipe to epoll - %hs\n", strerror(errno)); + goto oops; + } + + pthread_mutex_init (&xtn->ev.mtx, MOO_NULL); + pthread_cond_init (&xtn->ev.cnd, MOO_NULL); + pthread_cond_init (&xtn->ev.cnd2, MOO_NULL); + pthread_create (&xtn->iothr, MOO_NULL, iothr_main, moo); + return 0; oops: + if (pcount) + { + close (xtn->p[0]); + close (xtn->p[1]); + } if (xtn->ep >= 0) { close (xtn->ep); @@ -692,8 +808,21 @@ static void vm_cleanup (moo_t* moo) #else xtn_t* xtn = (xtn_t*)moo_getxtn(moo); + write (xtn->p[1], "Q", 1); + pthread_cond_signal (&xtn->ev.cnd); + pthread_join (xtn->iothr, MOO_NULL); + pthread_cond_destroy (&xtn->ev.cnd); + pthread_cond_destroy (&xtn->ev.cnd2); + pthread_mutex_destroy (&xtn->ev.mtx); + if (xtn->ep) { + struct epoll_event ev; + epoll_ctl (xtn->ep, EPOLL_CTL_DEL, xtn->p[1], &ev); + + close (xtn->p[1]); + close (xtn->p[0]); + close (xtn->ep); xtn->ep = -1; } @@ -832,7 +961,7 @@ static int mux_add (moo_t* moo, moo_oop_semaphore_t sem) MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_mask)); mask = MOO_OOP_TO_SMOOI(sem->io_mask); - ev.events = EPOLLET; /* edge trigger */ + ev.events = 0; /*EPOLLET; *//* edge trigger */ if (mask & MOO_SEMAPHORE_IO_MASK_INPUT) ev.events |= EPOLLIN; /*TODO: define io mask constants... */ if (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT) ev.events |= EPOLLOUT; /* don't check MOO_SEMAPHORE_IO_MASK_ERROR and MOO_SEMAPHORE_IO_MASK_HANGUP as it's implicitly enabled by epoll() */ @@ -870,25 +999,117 @@ static void mux_del (moo_t* moo, moo_oop_semaphore_t sem) static void mux_wait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_cb_t muxwcb) { xtn_t* xtn = (xtn_t*)moo_getxtn(moo); - int tmout = 0, n; - struct epoll_event ev[32]; /*TODO: make it into xtn->evt_ptr or somewhere else as a dynamically allocated memory block */ + int tmout = 0, n, take_over = 0; if (dur) tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec); - n = epoll_wait (xtn->ep, ev, MOO_COUNTOF(ev), tmout); - while (n > 0) + if (xtn->ev.len <= 0) { - int mask; + pthread_mutex_lock (&xtn->ev.mtx); + if (xtn->ev.waiting) + { + take_over_attempt: + write (xtn->p[1], "X", 1); + xtn->ev.need_cnd2 = 1; + pthread_mutex_unlock (&xtn->ev.mtx); - --n; + /* wait until the peer thread sets xtn->ev.waiting to 0 after epoll_wait() */ + pthread_mutex_lock (&xtn->ev.mtx); + while (xtn->ev.waiting) pthread_cond_wait (&xtn->ev.cnd2, &xtn->ev.mtx); + xtn->ev.need_cnd2 = 0; - mask = 0; - if (ev[n].events & EPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT; /* TODO define constants for IO Mask */ - if (ev[n].events & EPOLLOUT) mask |= MOO_SEMAPHORE_IO_MASK_OUTPUT; - if (ev[n].events & EPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; - if (ev[n].events & EPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; + if (xtn->ev.waiting) + { + /* the peer thread got the control again. this may happen if + * epoll_wait() timed out at the same time as write() above. + * it won't time out again. if i retry as the timeout value + * of epoll_wait() in the peer thread is relatively long. */ + goto take_over_attempt; + } + else + { + /* fake. but to prevent the peer thread from epoll_waiting */ + xtn->ev.waiting = 1; - muxwcb (moo, mask, ev[n].data.ptr); + /* the peer thread must have finished setting xtn->ev.len and + * can't change it again because xtn->ev.mtx is currently locked. + * i can safely fetch it and put it to n wihtout mutex. */ + n = xtn->ev.len; + + /* indicate that this thread is taking over epoll_wait() operation */ + take_over = 1; + + pthread_mutex_unlock (&xtn->ev.mtx); + goto handle_event; + } + pthread_mutex_unlock (&xtn->ev.mtx); + } + xtn->ev.waiting = 1; + pthread_mutex_unlock (&xtn->ev.mtx); + + epoll_start: + n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), tmout); + + pthread_mutex_lock (&xtn->ev.mtx); + if (n <= -1) + { + MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno)); + } + else + { + xtn->ev.len = n; + } + xtn->ev.waiting = 0; + pthread_mutex_unlock (&xtn->ev.mtx); + } + else n = xtn->ev.len; + +handle_event: + if (n > 0) + { + do + { + int mask; + + --n; + + if (xtn->ev.buf[n].data.ptr == (void*)MOO_TYPE_MAX(moo_oow_t)) + { + moo_uint8_t u8[16]; + while (read (xtn->p[0], u8, MOO_COUNTOF(u8)) > 0) /* consume as much as possible */; + } + else + { + mask = 0; + if (xtn->ev.buf[n].events & EPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT; /* TODO define constants for IO Mask */ + if (xtn->ev.buf[n].events & EPOLLOUT) mask |= MOO_SEMAPHORE_IO_MASK_OUTPUT; + if (xtn->ev.buf[n].events & EPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; + if (xtn->ev.buf[n].events & EPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; + + muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); + } + } + while (n > 0); + + if (take_over) + { + take_over = 0; + xtn->ev.len = 0; + goto epoll_start; + } + + pthread_mutex_lock (&xtn->ev.mtx); + xtn->ev.len = 0; + pthread_cond_signal (&xtn->ev.cnd); + pthread_mutex_unlock (&xtn->ev.mtx); + } + else + { + if (take_over) + { + take_over = 0; + goto epoll_start; + } } } /* ========================================================================= */ @@ -1013,12 +1234,16 @@ static void cancel_tick (void) static void handle_term (int sig) { - if (g_moo) moo_abort (g_moo); + if (g_moo) + { + moo_abort (g_moo); + } } static void setup_term (void) { struct sigaction sa; + memset (&sa, 0, MOO_SIZEOF(sa)); sa.sa_handler = handle_term; sigaction (SIGINT, &sa, MOO_NULL); } diff --git a/moo/lib/moo.h b/moo/lib/moo.h index cde493e..ff2882b 100644 --- a/moo/lib/moo.h +++ b/moo/lib/moo.h @@ -765,6 +765,7 @@ typedef void (*moo_vmprim_cleanup_t) (moo_t* moo); typedef void (*moo_vmprim_gettime_t) (moo_t* moo, moo_ntime_t* now); typedef void (*moo_vmprim_sleep_t) (moo_t* moo, const moo_ntime_t* duration); + typedef int (*moo_vmprim_muxadd_t) (moo_t* moo, moo_oop_semaphore_t sem); typedef void (*moo_vmprim_muxdel_t) (moo_t* moo, moo_oop_semaphore_t sem); @@ -778,6 +779,7 @@ struct moo_vmprim_t moo_vmprim_dlsym_t dl_getsym; moo_log_write_t log_write; + moo_vmprim_startup_t vm_startup; moo_vmprim_cleanup_t vm_cleanup; moo_vmprim_gettime_t vm_gettime; @@ -786,6 +788,8 @@ struct moo_vmprim_t moo_vmprim_muxadd_t mux_add; moo_vmprim_muxdel_t mux_del; moo_vmprim_muxwait_t mux_wait; + + }; typedef struct moo_vmprim_t moo_vmprim_t;