From 90f46cc1fe940add1acd9360bcd0681901f4db3c Mon Sep 17 00:00:00 2001 From: "hyunghwan.chung" Date: Wed, 22 Feb 2017 09:48:58 +0000 Subject: [PATCH] simplified the event handling thread implementation by letting only 1 thread waiting for events --- moo/lib/exec.c | 12 +- moo/lib/main.c | 395 ++++++++++++++++++++++--------------------------- moo/lib/moo.h | 13 +- 3 files changed, 186 insertions(+), 234 deletions(-) diff --git a/moo/lib/exec.c b/moo/lib/exec.c index 93a732d..47cadd0 100644 --- a/moo/lib/exec.c +++ b/moo/lib/exec.c @@ -139,9 +139,9 @@ static MOO_INLINE void vm_sleep (moo_t* moo, const moo_ntime_t* dur) moo->vmprim.vm_sleep (moo, dur); } -static MOO_INLINE void vm_mux_wait (moo_t* moo, const moo_ntime_t* dur) +static MOO_INLINE void vm_vm_muxwait (moo_t* moo, const moo_ntime_t* dur) { - moo->vmprim.mux_wait (moo, dur, signal_io_semaphore); + moo->vmprim.vm_muxwait (moo, dur, signal_io_semaphore); } /* ------------------------------------------------------------------------- */ @@ -779,7 +779,7 @@ static int add_to_sem_io (moo_t* moo, moo_oop_semaphore_t sem) moo->sem_io_count++; moo_pushtmp (moo, (moo_oop_t*)&sem); - n = moo->vmprim.mux_add (moo, sem); + n = moo->vmprim.vm_muxadd (moo, sem); moo_poptmp (moo); if (n <= -1) { @@ -797,7 +797,7 @@ static void delete_from_sem_io (moo_t* moo, moo_ooi_t index) sem = moo->sem_io[index]; moo_pushtmp (moo, (moo_oop_t*)&sem); - moo->vmprim.mux_del (moo, sem); + moo->vmprim.vm_muxdel (moo, sem); moo_poptmp (moo); sem->io_index = MOO_SMOOI_TO_OOP(-1); @@ -3163,7 +3163,7 @@ static MOO_INLINE int switch_process_if_needed (moo_t* moo) if (moo->sem_io_wait_count > 0) { MOO_DEBUG0 (moo, "ABOUT TO CALL VM_MUX_WAIT()\n"); - vm_mux_wait (moo, &ft); + vm_vm_muxwait (moo, &ft); } else { @@ -3184,7 +3184,7 @@ MOO_DEBUG0 (moo, "ABOUT TO CALL VM_MUX_SLEEP()\n"); if (moo->sem_io_wait_count > 0) { MOO_DEBUG0 (moo, "ABOUT TO CALL VM_MUX_WAIT 222()\n"); - vm_mux_wait (moo, MOO_NULL); + vm_vm_muxwait (moo, MOO_NULL); } if (moo->processor->active == moo->nil_process) diff --git a/moo/lib/main.c b/moo/lib/main.c index d5bc284..4aeb96a 100644 --- a/moo/lib/main.c +++ b/moo/lib/main.c @@ -134,16 +134,14 @@ struct xtn_t #if defined(USE_THREAD) int p[2]; /* pipe for signaling */ pthread_t iothr; + int iothr_abort; struct { struct epoll_event buf[32]; /*TODO: make it a dynamically allocated memory block depending on the file descriptor added. */ - struct epoll_event buf2[32]; /*TODO: make it a dynamically allocated memory block depending on the file descriptor added. */ moo_oow_t len; pthread_mutex_t mtx; pthread_cond_t cnd; pthread_cond_t cnd2; - int waiting; - int need_cnd2; } ev; #else struct @@ -687,22 +685,15 @@ static void* iothr_main (void* arg) moo_t* moo = (moo_t*)arg; xtn_t* xtn = (xtn_t*)moo_getxtn(moo); - while (!moo->abort_req) + /*while (!moo->abort_req)*/ + while (!xtn->iothr_abort) { if (xtn->ev.len <= 0) /* TODO: no mutex needed for this check? */ { int n; - pthread_mutex_lock (&xtn->ev.mtx); - if (xtn->ev.waiting) - { - pthread_mutex_unlock (&xtn->ev.mtx); - goto cond_wait; - } - xtn->ev.waiting = 1; - pthread_mutex_unlock (&xtn->ev.mtx); - - n = epoll_wait (xtn->ep, xtn->ev.buf2, MOO_COUNTOF(xtn->ev.buf2), 10000); + poll_for_event: + n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), 10000); pthread_mutex_lock (&xtn->ev.mtx); if (n <= -1) @@ -712,11 +703,9 @@ static void* iothr_main (void* arg) } else if (n > 0) { - memcpy (xtn->ev.buf, xtn->ev.buf2, MOO_SIZEOF(xtn->ev.buf2)); xtn->ev.len = n; } - xtn->ev.waiting = 0; - if (xtn->ev.need_cnd2) pthread_cond_signal (&xtn->ev.cnd2); + pthread_cond_signal (&xtn->ev.cnd2); pthread_mutex_unlock (&xtn->ev.mtx); } else @@ -724,16 +713,21 @@ static void* iothr_main (void* arg) /* the event buffer has not been emptied yet */ struct timespec ts; - cond_wait: + pthread_mutex_lock (&xtn->ev.mtx); + if (xtn->ev.len <= 0) + { + /* it got emptied between the if check and pthread_mutex_lock() above */ + pthread_mutex_unlock (&xtn->ev.mtx); + goto poll_for_event; + } + clock_gettime (CLOCK_REALTIME, &ts); ts.tv_sec += 10; - - pthread_mutex_lock (&xtn->ev.mtx); pthread_cond_timedwait (&xtn->ev.cnd, &xtn->ev.mtx, &ts); pthread_mutex_unlock (&xtn->ev.mtx); } - sched_yield (); + /*sched_yield ();*/ } return MOO_NULL; @@ -804,6 +798,8 @@ static int vm_startup (moo_t* moo) pthread_mutex_init (&xtn->ev.mtx, MOO_NULL); pthread_cond_init (&xtn->ev.cnd, MOO_NULL); pthread_cond_init (&xtn->ev.cnd2, MOO_NULL); + + xtn->iothr_abort = 0; pthread_create (&xtn->iothr, MOO_NULL, iothr_main, moo); #endif @@ -924,6 +920,153 @@ static void vm_gettime (moo_t* moo, moo_ntime_t* now) # endif #endif +static int vm_muxadd (moo_t* moo, moo_oop_semaphore_t sem) +{ + xtn_t* xtn = (xtn_t*)moo_getxtn(moo); + struct epoll_event ev; + moo_ooi_t mask; + + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_index)); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_handle)); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_mask)); + + mask = MOO_OOP_TO_SMOOI(sem->io_mask); + ev.events = 0; /*EPOLLET; *//* edge trigger */ + if (mask & MOO_SEMAPHORE_IO_MASK_INPUT) ev.events |= EPOLLIN; /*TODO: define io mask constants... */ + if (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT) ev.events |= EPOLLOUT; + /* don't check MOO_SEMAPHORE_IO_MASK_ERROR and MOO_SEMAPHORE_IO_MASK_HANGUP as it's implicitly enabled by epoll() */ + ev.data.ptr = (void*)MOO_OOP_TO_SMOOI(sem->io_index); + + if (ev.events == 0) + { + MOO_DEBUG2 (moo, " Invalid semaphore mask %zd on handle %zd\n", mask, MOO_OOP_TO_SMOOI(sem->io_handle)); + moo_seterrnum (moo, MOO_EINVAL); + return -1; + } + + if (epoll_ctl (xtn->ep, EPOLL_CTL_ADD, MOO_OOP_TO_SMOOI(sem->io_handle), &ev) == -1) + { + moo_seterrnum (moo, moo_syserrtoerrnum (errno)); + MOO_DEBUG2 (moo, " epoll_ctl failure on handle %zd - %hs\n", MOO_OOP_TO_SMOOI(sem->io_handle), strerror(errno)); + return -1; + } + + return 0; +} + +static void vm_muxdel (moo_t* moo, moo_oop_semaphore_t sem) +{ + xtn_t* xtn = (xtn_t*)moo_getxtn(moo); + struct epoll_event ev; + + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_index)); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_handle)); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_mask)); + + epoll_ctl (xtn->ep, EPOLL_CTL_DEL, MOO_OOP_TO_SMOOI(sem->io_handle), &ev); +} + +static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_cb_t muxwcb) +{ + xtn_t* xtn = (xtn_t*)moo_getxtn(moo); + +#if defined(USE_THREAD) + int n; + + if (xtn->ev.len <= 0) + { + struct timespec ts; + moo_ntime_t ns; + + if (!dur) return; /* immediate check is requested. and there is no event */ + + clock_gettime (CLOCK_REALTIME, &ts); + ns.sec = ts.tv_sec; + ns.nsec = ts.tv_nsec; + MOO_ADDNTIME (&ns, &ns, dur); + ts.tv_sec = ns.sec; + ts.tv_nsec = ns.nsec; + + pthread_mutex_lock (&xtn->ev.mtx); + if (xtn->ev.len <= 0) + { + /* the event buffer is still empty */ + pthread_cond_wait (&xtn->ev.cnd2, &xtn->ev.mtx); + } + pthread_mutex_unlock (&xtn->ev.mtx); + } + + n = xtn->ev.len; + + if (n > 0) + { + do + { + --n; + + if (xtn->ev.buf[n].data.ptr == (void*)MOO_TYPE_MAX(moo_oow_t)) + { + moo_uint8_t u8; + while (read (xtn->p[0], &u8, MOO_SIZEOF(u8)) > 0) + { + /* consume as much as possible */; + if (u8 == 'Q') xtn->iothr_abort = 1; + } + } + else if (muxwcb) + { + int mask = 0; + if (xtn->ev.buf[n].events & EPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT; + if (xtn->ev.buf[n].events & EPOLLOUT) mask |= MOO_SEMAPHORE_IO_MASK_OUTPUT; + if (xtn->ev.buf[n].events & EPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; + if (xtn->ev.buf[n].events & EPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; + + muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); + } + } + while (n > 0); + + + pthread_mutex_lock (&xtn->ev.mtx); + xtn->ev.len = 0; + pthread_cond_signal (&xtn->ev.cnd); + pthread_mutex_unlock (&xtn->ev.mtx); + } + +#else + int tmout = 0, n; + + if (dur) tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec); + + n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), tmout); + + if (n <= -1) + { + MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno)); + } + else + { + xtn->ev.len = n; + } + + while (n > 0) + { + int mask; + + --n; + + mask = 0; + if (xtn->ev.buf[n].events & EPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT; /* TODO define constants for IO Mask */ + if (xtn->ev.buf[n].events & EPOLLOUT) mask |= MOO_SEMAPHORE_IO_MASK_OUTPUT; + if (xtn->ev.buf[n].events & EPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; + if (xtn->ev.buf[n].events & EPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; + muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); + } + + xtn->ev.len = 0; +#endif +} + static void vm_sleep (moo_t* moo, const moo_ntime_t* dur) { #if defined(_WIN32) @@ -978,206 +1121,18 @@ static void vm_sleep (moo_t* moo, const moo_ntime_t* dur) } #else - + + #if defined(USE_THREAD) + /* the sleep callback is called only if there is no IO semaphore + * waiting. so i can safely use vm_muxwait when USE_THREAD is true */ + vm_muxwait (moo, dur, MOO_NULL); + #else struct timespec ts; ts.tv_sec = dur->sec; ts.tv_nsec = dur->nsec; nanosleep (&ts, MOO_NULL); -#endif -} + #endif -static int mux_add (moo_t* moo, moo_oop_semaphore_t sem) -{ - xtn_t* xtn = (xtn_t*)moo_getxtn(moo); - struct epoll_event ev; - moo_ooi_t mask; - - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_index)); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_handle)); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_mask)); - - mask = MOO_OOP_TO_SMOOI(sem->io_mask); - ev.events = 0; /*EPOLLET; *//* edge trigger */ - if (mask & MOO_SEMAPHORE_IO_MASK_INPUT) ev.events |= EPOLLIN; /*TODO: define io mask constants... */ - if (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT) ev.events |= EPOLLOUT; - /* don't check MOO_SEMAPHORE_IO_MASK_ERROR and MOO_SEMAPHORE_IO_MASK_HANGUP as it's implicitly enabled by epoll() */ - ev.data.ptr = (void*)MOO_OOP_TO_SMOOI(sem->io_index); - - if (ev.events == 0) - { - MOO_DEBUG2 (moo, " Invalid semaphore mask %zd on handle %zd\n", mask, MOO_OOP_TO_SMOOI(sem->io_handle)); - moo_seterrnum (moo, MOO_EINVAL); - return -1; - } - - if (epoll_ctl (xtn->ep, EPOLL_CTL_ADD, MOO_OOP_TO_SMOOI(sem->io_handle), &ev) == -1) - { - moo_seterrnum (moo, moo_syserrtoerrnum (errno)); - MOO_DEBUG2 (moo, " epoll_ctl failure on handle %zd - %hs\n", MOO_OOP_TO_SMOOI(sem->io_handle), strerror(errno)); - return -1; - } - - return 0; -} - -static void mux_del (moo_t* moo, moo_oop_semaphore_t sem) -{ - xtn_t* xtn = (xtn_t*)moo_getxtn(moo); - struct epoll_event ev; - - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_index)); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_handle)); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_mask)); - - epoll_ctl (xtn->ep, EPOLL_CTL_DEL, MOO_OOP_TO_SMOOI(sem->io_handle), &ev); -} - -static void mux_wait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_cb_t muxwcb) -{ - xtn_t* xtn = (xtn_t*)moo_getxtn(moo); - int tmout = 0, n; - -#if defined(USE_THREAD) - int take_over = 0; - - if (xtn->ev.len <= 0) - { - if (!dur) return; /* immediate check is requested. and there is no event */ - - tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec); - - pthread_mutex_lock (&xtn->ev.mtx); - if (xtn->ev.waiting) - { - take_over_attempt: - write (xtn->p[1], "X", 1); - xtn->ev.need_cnd2 = 1; - - /* wait until the peer thread sets xtn->ev.waiting to 0 after epoll_wait(). - * mutex is unlocked and locked back by pthread_cond_wait() */ - while (xtn->ev.waiting) pthread_cond_wait (&xtn->ev.cnd2, &xtn->ev.mtx); - xtn->ev.need_cnd2 = 0; - - if (xtn->ev.waiting) - { - /* the peer thread got the control again. but can this happen? - * anyway, try again. */ - goto take_over_attempt; - } - else - { - /* fake. but to prevent the peer thread from epoll_waiting */ - xtn->ev.waiting = 1; - - /* the peer thread must have finished setting xtn->ev.len and - * can't change it again because xtn->ev.mtx is currently locked. - * i can safely fetch it and put it to n wihtout mutex. */ - n = xtn->ev.len; - - /* indicate that this thread is taking over epoll_wait() operation */ - take_over = 1; - - pthread_mutex_unlock (&xtn->ev.mtx); - goto handle_event; - } - pthread_mutex_unlock (&xtn->ev.mtx); - } - xtn->ev.waiting = 1; - pthread_mutex_unlock (&xtn->ev.mtx); - - epoll_start: - n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), tmout); - - pthread_mutex_lock (&xtn->ev.mtx); - if (n <= -1) - { - MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno)); - } - else - { - xtn->ev.len = n; - } - xtn->ev.waiting = 0; - pthread_mutex_unlock (&xtn->ev.mtx); - } - else n = xtn->ev.len; - -handle_event: - if (n > 0) - { - do - { - int mask; - - --n; - - if (xtn->ev.buf[n].data.ptr == (void*)MOO_TYPE_MAX(moo_oow_t)) - { - moo_uint8_t u8[16]; - while (read (xtn->p[0], u8, MOO_COUNTOF(u8)) > 0) /* consume as much as possible */; - } - else - { - mask = 0; - if (xtn->ev.buf[n].events & EPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT; /* TODO define constants for IO Mask */ - if (xtn->ev.buf[n].events & EPOLLOUT) mask |= MOO_SEMAPHORE_IO_MASK_OUTPUT; - if (xtn->ev.buf[n].events & EPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; - if (xtn->ev.buf[n].events & EPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; - - muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); - } - } - while (n > 0); - - if (take_over) - { - take_over = 0; - xtn->ev.len = 0; - goto epoll_start; - } - - pthread_mutex_lock (&xtn->ev.mtx); - xtn->ev.len = 0; - pthread_cond_signal (&xtn->ev.cnd); - pthread_mutex_unlock (&xtn->ev.mtx); - } - else - { - if (take_over) - { - take_over = 0; - goto epoll_start; - } - } -#else - if (dur) tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec); - - n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), tmout); - - if (n <= -1) - { - MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno)); - } - else - { - xtn->ev.len = n; - } - - while (n > 0) - { - int mask; - - --n; - - mask = 0; - if (xtn->ev.buf[n].events & EPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT; /* TODO define constants for IO Mask */ - if (xtn->ev.buf[n].events & EPOLLOUT) mask |= MOO_SEMAPHORE_IO_MASK_OUTPUT; - if (xtn->ev.buf[n].events & EPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; - if (xtn->ev.buf[n].events & EPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; - muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); - } - - xtn->ev.len = 0; #endif } /* ========================================================================= */ @@ -1304,6 +1259,8 @@ static void handle_term (int sig) { if (g_moo) { + xtn_t* xtn = moo_getxtn(g_moo); + write (xtn->p[1], "Q", 1); moo_abort (g_moo); } } @@ -1346,10 +1303,10 @@ int main (int argc, char* argv[]) vmprim.vm_startup = vm_startup; vmprim.vm_cleanup = vm_cleanup; vmprim.vm_gettime = vm_gettime; + vmprim.vm_muxadd = vm_muxadd; + vmprim.vm_muxdel = vm_muxdel; + vmprim.vm_muxwait = vm_muxwait; vmprim.vm_sleep = vm_sleep; - vmprim.mux_add = mux_add; - vmprim.mux_del = mux_del; - vmprim.mux_wait = mux_wait; #if defined(USE_LTDL) lt_dlinit (); diff --git a/moo/lib/moo.h b/moo/lib/moo.h index ff2882b..f94362d 100644 --- a/moo/lib/moo.h +++ b/moo/lib/moo.h @@ -763,8 +763,6 @@ typedef void (*moo_log_write_t) (moo_t* moo, moo_oow_t mask, const moo_ooch_t* m typedef int (*moo_vmprim_startup_t) (moo_t* moo); typedef void (*moo_vmprim_cleanup_t) (moo_t* moo); typedef void (*moo_vmprim_gettime_t) (moo_t* moo, moo_ntime_t* now); -typedef void (*moo_vmprim_sleep_t) (moo_t* moo, const moo_ntime_t* duration); - typedef int (*moo_vmprim_muxadd_t) (moo_t* moo, moo_oop_semaphore_t sem); typedef void (*moo_vmprim_muxdel_t) (moo_t* moo, moo_oop_semaphore_t sem); @@ -772,6 +770,7 @@ typedef void (*moo_vmprim_muxdel_t) (moo_t* moo, moo_oop_semaphore_t sem); typedef void (*moo_vmprim_muxwait_cb_t) (moo_t* moo, int mask, void* ctx); typedef void (*moo_vmprim_muxwait_t) (moo_t* moo, const moo_ntime_t* duration, moo_vmprim_muxwait_cb_t muxwcb); +typedef void (*moo_vmprim_sleep_t) (moo_t* moo, const moo_ntime_t* duration); struct moo_vmprim_t { moo_vmprim_dlopen_t dl_open; @@ -779,17 +778,13 @@ struct moo_vmprim_t moo_vmprim_dlsym_t dl_getsym; moo_log_write_t log_write; - moo_vmprim_startup_t vm_startup; moo_vmprim_cleanup_t vm_cleanup; moo_vmprim_gettime_t vm_gettime; + moo_vmprim_muxadd_t vm_muxadd; + moo_vmprim_muxdel_t vm_muxdel; + moo_vmprim_muxwait_t vm_muxwait; moo_vmprim_sleep_t vm_sleep; - - moo_vmprim_muxadd_t mux_add; - moo_vmprim_muxdel_t mux_del; - moo_vmprim_muxwait_t mux_wait; - - }; typedef struct moo_vmprim_t moo_vmprim_t;