simplified the event handling thread implementation by letting only 1 thread waiting for events

This commit is contained in:
hyunghwan.chung 2017-02-22 09:48:58 +00:00
parent 513ed296a0
commit 90f46cc1fe
3 changed files with 186 additions and 234 deletions

View File

@ -139,9 +139,9 @@ static MOO_INLINE void vm_sleep (moo_t* moo, const moo_ntime_t* dur)
moo->vmprim.vm_sleep (moo, dur);
}
static MOO_INLINE void vm_mux_wait (moo_t* moo, const moo_ntime_t* dur)
static MOO_INLINE void vm_vm_muxwait (moo_t* moo, const moo_ntime_t* dur)
{
moo->vmprim.mux_wait (moo, dur, signal_io_semaphore);
moo->vmprim.vm_muxwait (moo, dur, signal_io_semaphore);
}
/* ------------------------------------------------------------------------- */
@ -779,7 +779,7 @@ static int add_to_sem_io (moo_t* moo, moo_oop_semaphore_t sem)
moo->sem_io_count++;
moo_pushtmp (moo, (moo_oop_t*)&sem);
n = moo->vmprim.mux_add (moo, sem);
n = moo->vmprim.vm_muxadd (moo, sem);
moo_poptmp (moo);
if (n <= -1)
{
@ -797,7 +797,7 @@ static void delete_from_sem_io (moo_t* moo, moo_ooi_t index)
sem = moo->sem_io[index];
moo_pushtmp (moo, (moo_oop_t*)&sem);
moo->vmprim.mux_del (moo, sem);
moo->vmprim.vm_muxdel (moo, sem);
moo_poptmp (moo);
sem->io_index = MOO_SMOOI_TO_OOP(-1);
@ -3163,7 +3163,7 @@ static MOO_INLINE int switch_process_if_needed (moo_t* moo)
if (moo->sem_io_wait_count > 0)
{
MOO_DEBUG0 (moo, "ABOUT TO CALL VM_MUX_WAIT()\n");
vm_mux_wait (moo, &ft);
vm_vm_muxwait (moo, &ft);
}
else
{
@ -3184,7 +3184,7 @@ MOO_DEBUG0 (moo, "ABOUT TO CALL VM_MUX_SLEEP()\n");
if (moo->sem_io_wait_count > 0)
{
MOO_DEBUG0 (moo, "ABOUT TO CALL VM_MUX_WAIT 222()\n");
vm_mux_wait (moo, MOO_NULL);
vm_vm_muxwait (moo, MOO_NULL);
}
if (moo->processor->active == moo->nil_process)

View File

@ -134,16 +134,14 @@ struct xtn_t
#if defined(USE_THREAD)
int p[2]; /* pipe for signaling */
pthread_t iothr;
int iothr_abort;
struct
{
struct epoll_event buf[32]; /*TODO: make it a dynamically allocated memory block depending on the file descriptor added. */
struct epoll_event buf2[32]; /*TODO: make it a dynamically allocated memory block depending on the file descriptor added. */
moo_oow_t len;
pthread_mutex_t mtx;
pthread_cond_t cnd;
pthread_cond_t cnd2;
int waiting;
int need_cnd2;
} ev;
#else
struct
@ -687,22 +685,15 @@ static void* iothr_main (void* arg)
moo_t* moo = (moo_t*)arg;
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
while (!moo->abort_req)
/*while (!moo->abort_req)*/
while (!xtn->iothr_abort)
{
if (xtn->ev.len <= 0) /* TODO: no mutex needed for this check? */
{
int n;
pthread_mutex_lock (&xtn->ev.mtx);
if (xtn->ev.waiting)
{
pthread_mutex_unlock (&xtn->ev.mtx);
goto cond_wait;
}
xtn->ev.waiting = 1;
pthread_mutex_unlock (&xtn->ev.mtx);
n = epoll_wait (xtn->ep, xtn->ev.buf2, MOO_COUNTOF(xtn->ev.buf2), 10000);
poll_for_event:
n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), 10000);
pthread_mutex_lock (&xtn->ev.mtx);
if (n <= -1)
@ -712,11 +703,9 @@ static void* iothr_main (void* arg)
}
else if (n > 0)
{
memcpy (xtn->ev.buf, xtn->ev.buf2, MOO_SIZEOF(xtn->ev.buf2));
xtn->ev.len = n;
}
xtn->ev.waiting = 0;
if (xtn->ev.need_cnd2) pthread_cond_signal (&xtn->ev.cnd2);
pthread_cond_signal (&xtn->ev.cnd2);
pthread_mutex_unlock (&xtn->ev.mtx);
}
else
@ -724,16 +713,21 @@ static void* iothr_main (void* arg)
/* the event buffer has not been emptied yet */
struct timespec ts;
cond_wait:
pthread_mutex_lock (&xtn->ev.mtx);
if (xtn->ev.len <= 0)
{
/* it got emptied between the if check and pthread_mutex_lock() above */
pthread_mutex_unlock (&xtn->ev.mtx);
goto poll_for_event;
}
clock_gettime (CLOCK_REALTIME, &ts);
ts.tv_sec += 10;
pthread_mutex_lock (&xtn->ev.mtx);
pthread_cond_timedwait (&xtn->ev.cnd, &xtn->ev.mtx, &ts);
pthread_mutex_unlock (&xtn->ev.mtx);
}
sched_yield ();
/*sched_yield ();*/
}
return MOO_NULL;
@ -804,6 +798,8 @@ static int vm_startup (moo_t* moo)
pthread_mutex_init (&xtn->ev.mtx, MOO_NULL);
pthread_cond_init (&xtn->ev.cnd, MOO_NULL);
pthread_cond_init (&xtn->ev.cnd2, MOO_NULL);
xtn->iothr_abort = 0;
pthread_create (&xtn->iothr, MOO_NULL, iothr_main, moo);
#endif
@ -924,6 +920,153 @@ static void vm_gettime (moo_t* moo, moo_ntime_t* now)
# endif
#endif
static int vm_muxadd (moo_t* moo, moo_oop_semaphore_t sem)
{
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
struct epoll_event ev;
moo_ooi_t mask;
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_index));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_handle));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_mask));
mask = MOO_OOP_TO_SMOOI(sem->io_mask);
ev.events = 0; /*EPOLLET; *//* edge trigger */
if (mask & MOO_SEMAPHORE_IO_MASK_INPUT) ev.events |= EPOLLIN; /*TODO: define io mask constants... */
if (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT) ev.events |= EPOLLOUT;
/* don't check MOO_SEMAPHORE_IO_MASK_ERROR and MOO_SEMAPHORE_IO_MASK_HANGUP as it's implicitly enabled by epoll() */
ev.data.ptr = (void*)MOO_OOP_TO_SMOOI(sem->io_index);
if (ev.events == 0)
{
MOO_DEBUG2 (moo, "<vm_muxadd> Invalid semaphore mask %zd on handle %zd\n", mask, MOO_OOP_TO_SMOOI(sem->io_handle));
moo_seterrnum (moo, MOO_EINVAL);
return -1;
}
if (epoll_ctl (xtn->ep, EPOLL_CTL_ADD, MOO_OOP_TO_SMOOI(sem->io_handle), &ev) == -1)
{
moo_seterrnum (moo, moo_syserrtoerrnum (errno));
MOO_DEBUG2 (moo, "<vm_muxadd> epoll_ctl failure on handle %zd - %hs\n", MOO_OOP_TO_SMOOI(sem->io_handle), strerror(errno));
return -1;
}
return 0;
}
static void vm_muxdel (moo_t* moo, moo_oop_semaphore_t sem)
{
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
struct epoll_event ev;
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_index));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_handle));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_mask));
epoll_ctl (xtn->ep, EPOLL_CTL_DEL, MOO_OOP_TO_SMOOI(sem->io_handle), &ev);
}
static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_cb_t muxwcb)
{
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
#if defined(USE_THREAD)
int n;
if (xtn->ev.len <= 0)
{
struct timespec ts;
moo_ntime_t ns;
if (!dur) return; /* immediate check is requested. and there is no event */
clock_gettime (CLOCK_REALTIME, &ts);
ns.sec = ts.tv_sec;
ns.nsec = ts.tv_nsec;
MOO_ADDNTIME (&ns, &ns, dur);
ts.tv_sec = ns.sec;
ts.tv_nsec = ns.nsec;
pthread_mutex_lock (&xtn->ev.mtx);
if (xtn->ev.len <= 0)
{
/* the event buffer is still empty */
pthread_cond_wait (&xtn->ev.cnd2, &xtn->ev.mtx);
}
pthread_mutex_unlock (&xtn->ev.mtx);
}
n = xtn->ev.len;
if (n > 0)
{
do
{
--n;
if (xtn->ev.buf[n].data.ptr == (void*)MOO_TYPE_MAX(moo_oow_t))
{
moo_uint8_t u8;
while (read (xtn->p[0], &u8, MOO_SIZEOF(u8)) > 0)
{
/* consume as much as possible */;
if (u8 == 'Q') xtn->iothr_abort = 1;
}
}
else if (muxwcb)
{
int mask = 0;
if (xtn->ev.buf[n].events & EPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT;
if (xtn->ev.buf[n].events & EPOLLOUT) mask |= MOO_SEMAPHORE_IO_MASK_OUTPUT;
if (xtn->ev.buf[n].events & EPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR;
if (xtn->ev.buf[n].events & EPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP;
muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);
}
}
while (n > 0);
pthread_mutex_lock (&xtn->ev.mtx);
xtn->ev.len = 0;
pthread_cond_signal (&xtn->ev.cnd);
pthread_mutex_unlock (&xtn->ev.mtx);
}
#else
int tmout = 0, n;
if (dur) tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec);
n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), tmout);
if (n <= -1)
{
MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno));
}
else
{
xtn->ev.len = n;
}
while (n > 0)
{
int mask;
--n;
mask = 0;
if (xtn->ev.buf[n].events & EPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT; /* TODO define constants for IO Mask */
if (xtn->ev.buf[n].events & EPOLLOUT) mask |= MOO_SEMAPHORE_IO_MASK_OUTPUT;
if (xtn->ev.buf[n].events & EPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR;
if (xtn->ev.buf[n].events & EPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP;
muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);
}
xtn->ev.len = 0;
#endif
}
static void vm_sleep (moo_t* moo, const moo_ntime_t* dur)
{
#if defined(_WIN32)
@ -978,206 +1121,18 @@ static void vm_sleep (moo_t* moo, const moo_ntime_t* dur)
}
#else
#if defined(USE_THREAD)
/* the sleep callback is called only if there is no IO semaphore
* waiting. so i can safely use vm_muxwait when USE_THREAD is true */
vm_muxwait (moo, dur, MOO_NULL);
#else
struct timespec ts;
ts.tv_sec = dur->sec;
ts.tv_nsec = dur->nsec;
nanosleep (&ts, MOO_NULL);
#endif
}
#endif
static int mux_add (moo_t* moo, moo_oop_semaphore_t sem)
{
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
struct epoll_event ev;
moo_ooi_t mask;
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_index));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_handle));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_mask));
mask = MOO_OOP_TO_SMOOI(sem->io_mask);
ev.events = 0; /*EPOLLET; *//* edge trigger */
if (mask & MOO_SEMAPHORE_IO_MASK_INPUT) ev.events |= EPOLLIN; /*TODO: define io mask constants... */
if (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT) ev.events |= EPOLLOUT;
/* don't check MOO_SEMAPHORE_IO_MASK_ERROR and MOO_SEMAPHORE_IO_MASK_HANGUP as it's implicitly enabled by epoll() */
ev.data.ptr = (void*)MOO_OOP_TO_SMOOI(sem->io_index);
if (ev.events == 0)
{
MOO_DEBUG2 (moo, "<mux_add> Invalid semaphore mask %zd on handle %zd\n", mask, MOO_OOP_TO_SMOOI(sem->io_handle));
moo_seterrnum (moo, MOO_EINVAL);
return -1;
}
if (epoll_ctl (xtn->ep, EPOLL_CTL_ADD, MOO_OOP_TO_SMOOI(sem->io_handle), &ev) == -1)
{
moo_seterrnum (moo, moo_syserrtoerrnum (errno));
MOO_DEBUG2 (moo, "<mux_add> epoll_ctl failure on handle %zd - %hs\n", MOO_OOP_TO_SMOOI(sem->io_handle), strerror(errno));
return -1;
}
return 0;
}
static void mux_del (moo_t* moo, moo_oop_semaphore_t sem)
{
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
struct epoll_event ev;
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_index));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_handle));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_mask));
epoll_ctl (xtn->ep, EPOLL_CTL_DEL, MOO_OOP_TO_SMOOI(sem->io_handle), &ev);
}
static void mux_wait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_cb_t muxwcb)
{
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
int tmout = 0, n;
#if defined(USE_THREAD)
int take_over = 0;
if (xtn->ev.len <= 0)
{
if (!dur) return; /* immediate check is requested. and there is no event */
tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec);
pthread_mutex_lock (&xtn->ev.mtx);
if (xtn->ev.waiting)
{
take_over_attempt:
write (xtn->p[1], "X", 1);
xtn->ev.need_cnd2 = 1;
/* wait until the peer thread sets xtn->ev.waiting to 0 after epoll_wait().
* mutex is unlocked and locked back by pthread_cond_wait() */
while (xtn->ev.waiting) pthread_cond_wait (&xtn->ev.cnd2, &xtn->ev.mtx);
xtn->ev.need_cnd2 = 0;
if (xtn->ev.waiting)
{
/* the peer thread got the control again. but can this happen?
* anyway, try again. */
goto take_over_attempt;
}
else
{
/* fake. but to prevent the peer thread from epoll_waiting */
xtn->ev.waiting = 1;
/* the peer thread must have finished setting xtn->ev.len and
* can't change it again because xtn->ev.mtx is currently locked.
* i can safely fetch it and put it to n wihtout mutex. */
n = xtn->ev.len;
/* indicate that this thread is taking over epoll_wait() operation */
take_over = 1;
pthread_mutex_unlock (&xtn->ev.mtx);
goto handle_event;
}
pthread_mutex_unlock (&xtn->ev.mtx);
}
xtn->ev.waiting = 1;
pthread_mutex_unlock (&xtn->ev.mtx);
epoll_start:
n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), tmout);
pthread_mutex_lock (&xtn->ev.mtx);
if (n <= -1)
{
MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno));
}
else
{
xtn->ev.len = n;
}
xtn->ev.waiting = 0;
pthread_mutex_unlock (&xtn->ev.mtx);
}
else n = xtn->ev.len;
handle_event:
if (n > 0)
{
do
{
int mask;
--n;
if (xtn->ev.buf[n].data.ptr == (void*)MOO_TYPE_MAX(moo_oow_t))
{
moo_uint8_t u8[16];
while (read (xtn->p[0], u8, MOO_COUNTOF(u8)) > 0) /* consume as much as possible */;
}
else
{
mask = 0;
if (xtn->ev.buf[n].events & EPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT; /* TODO define constants for IO Mask */
if (xtn->ev.buf[n].events & EPOLLOUT) mask |= MOO_SEMAPHORE_IO_MASK_OUTPUT;
if (xtn->ev.buf[n].events & EPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR;
if (xtn->ev.buf[n].events & EPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP;
muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);
}
}
while (n > 0);
if (take_over)
{
take_over = 0;
xtn->ev.len = 0;
goto epoll_start;
}
pthread_mutex_lock (&xtn->ev.mtx);
xtn->ev.len = 0;
pthread_cond_signal (&xtn->ev.cnd);
pthread_mutex_unlock (&xtn->ev.mtx);
}
else
{
if (take_over)
{
take_over = 0;
goto epoll_start;
}
}
#else
if (dur) tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec);
n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), tmout);
if (n <= -1)
{
MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno));
}
else
{
xtn->ev.len = n;
}
while (n > 0)
{
int mask;
--n;
mask = 0;
if (xtn->ev.buf[n].events & EPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT; /* TODO define constants for IO Mask */
if (xtn->ev.buf[n].events & EPOLLOUT) mask |= MOO_SEMAPHORE_IO_MASK_OUTPUT;
if (xtn->ev.buf[n].events & EPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR;
if (xtn->ev.buf[n].events & EPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP;
muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);
}
xtn->ev.len = 0;
#endif
}
/* ========================================================================= */
@ -1304,6 +1259,8 @@ static void handle_term (int sig)
{
if (g_moo)
{
xtn_t* xtn = moo_getxtn(g_moo);
write (xtn->p[1], "Q", 1);
moo_abort (g_moo);
}
}
@ -1346,10 +1303,10 @@ int main (int argc, char* argv[])
vmprim.vm_startup = vm_startup;
vmprim.vm_cleanup = vm_cleanup;
vmprim.vm_gettime = vm_gettime;
vmprim.vm_muxadd = vm_muxadd;
vmprim.vm_muxdel = vm_muxdel;
vmprim.vm_muxwait = vm_muxwait;
vmprim.vm_sleep = vm_sleep;
vmprim.mux_add = mux_add;
vmprim.mux_del = mux_del;
vmprim.mux_wait = mux_wait;
#if defined(USE_LTDL)
lt_dlinit ();

View File

@ -763,8 +763,6 @@ typedef void (*moo_log_write_t) (moo_t* moo, moo_oow_t mask, const moo_ooch_t* m
typedef int (*moo_vmprim_startup_t) (moo_t* moo);
typedef void (*moo_vmprim_cleanup_t) (moo_t* moo);
typedef void (*moo_vmprim_gettime_t) (moo_t* moo, moo_ntime_t* now);
typedef void (*moo_vmprim_sleep_t) (moo_t* moo, const moo_ntime_t* duration);
typedef int (*moo_vmprim_muxadd_t) (moo_t* moo, moo_oop_semaphore_t sem);
typedef void (*moo_vmprim_muxdel_t) (moo_t* moo, moo_oop_semaphore_t sem);
@ -772,6 +770,7 @@ typedef void (*moo_vmprim_muxdel_t) (moo_t* moo, moo_oop_semaphore_t sem);
typedef void (*moo_vmprim_muxwait_cb_t) (moo_t* moo, int mask, void* ctx);
typedef void (*moo_vmprim_muxwait_t) (moo_t* moo, const moo_ntime_t* duration, moo_vmprim_muxwait_cb_t muxwcb);
typedef void (*moo_vmprim_sleep_t) (moo_t* moo, const moo_ntime_t* duration);
struct moo_vmprim_t
{
moo_vmprim_dlopen_t dl_open;
@ -779,17 +778,13 @@ struct moo_vmprim_t
moo_vmprim_dlsym_t dl_getsym;
moo_log_write_t log_write;
moo_vmprim_startup_t vm_startup;
moo_vmprim_cleanup_t vm_cleanup;
moo_vmprim_gettime_t vm_gettime;
moo_vmprim_muxadd_t vm_muxadd;
moo_vmprim_muxdel_t vm_muxdel;
moo_vmprim_muxwait_t vm_muxwait;
moo_vmprim_sleep_t vm_sleep;
moo_vmprim_muxadd_t mux_add;
moo_vmprim_muxdel_t mux_del;
moo_vmprim_muxwait_t mux_wait;
};
typedef struct moo_vmprim_t moo_vmprim_t;