diff --git a/moo/lib/main.c b/moo/lib/main.c index 328d6a3..001bd6f 100644 --- a/moo/lib/main.c +++ b/moo/lib/main.c @@ -157,6 +157,18 @@ # endif #endif +#if defined(USE_THREAD) +# define MUTEX_INIT(x) pthread_mutex_init((x), MOO_NULL) +# define MUTEX_DESTROY(x) pthread_mutex_destroy(x) +# define MUTEX_LOCK(x) pthread_mutex_lock(x) +# define MUTEX_UNLOCK(x) pthread_mutex_unlock(x) +#else +# define MUTEX_INIT(x) +# define MUTEX_DESTROY(x) +# define MUTEX_LOCK(x) +# define MUTEX_UNLOCK(x) +#endif + typedef struct bb_t bb_t; struct bb_t { @@ -186,11 +198,16 @@ struct xtn_t HANDLE waitable_timer; #else - #if defined(USE_DEVPOLL) || defined(USE_EPOLL) - int ep; /* /dev/poll or epoll */ - #endif - - #if defined(USE_DEVPOLL) || defined(USE_POLL) + #if defined(USE_DEVPOLL) + int ep; /* /dev/poll */ + struct + { + moo_oow_t capa; + moo_ooi_t* ptr; + } epd; + #elif defined(USE_EPOLL) + int ep; /* epoll */ + #elif defined(USE_POLL) struct { moo_oow_t capa; @@ -199,7 +216,7 @@ struct xtn_t #elif defined(USE_SELECT) struct { - void* data[FD_SETSIZE]; + void* data[FD_SETSIZE]; } epd; #endif @@ -226,14 +243,21 @@ struct xtn_t struct pollfd* ptr; moo_oow_t capa; moo_oow_t len; + #if defined(USE_THREAD) + pthread_mutex_t pmtx; + #endif } reg; /* registrar */ struct pollfd* buf; + #elif defined(USE_SELECT) struct { fd_set rfds; fd_set wfds; int maxfd; + #if defined(USE_THREAD) + pthread_mutex_t smtx; + #endif } reg; struct select_fd_t buf[FD_SETSIZE]; @@ -780,7 +804,7 @@ static void log_write (moo_t* moo, moo_oow_t mask, const moo_ooch_t* msg, moo_oo /* ========================================================================= */ #if defined(USE_DEVPOLL) || defined(USE_POLL) -static int secure_poll_data_space (moo_t* moo, int fd) +static MOO_INLINE int secure_poll_data_space (moo_t* moo, int fd) { xtn_t* xtn = (xtn_t*)moo_getxtn(moo); if (fd >= xtn->epd.capa) @@ -798,6 +822,16 @@ static int secure_poll_data_space (moo_t* moo, int fd) return 0; } +static MOO_INLINE void destroy_poll_data_space (moo_t* moo) +{ + xtn_t* xtn = (xtn_t*)moo_getxtn(moo); + if (xtn->epd.ptr) + { + moo_freemem (moo, xtn->epd.ptr); + xtn->epd.ptr = MOO_NULL; + xtn->epd.capa = 0; + } +} #endif @@ -849,6 +883,7 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat if (secure_poll_data_space (moo, fd) <= -1) return -1; + MUTEX_LOCK (&xtn->ev.reg.pmtx); if (xtn->ev.reg.len >= xtn->ev.reg.capa) { struct pollfd* tmp, * tmp2; @@ -860,6 +895,7 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat if (!tmp || !tmp2) { MOO_DEBUG2 (moo, "Cannot add file descriptor %d to poll - %hs\n", fd, strerror(errno)); + MUTEX_UNLOCK (&xtn->ev.reg.pmtx); if (tmp) moo_freemem (moo, tmp); return -1; } @@ -874,12 +910,15 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat xtn->ev.reg.ptr[xtn->ev.reg.len].events = event_mask; xtn->ev.reg.ptr[xtn->ev.reg.len].revents = 0; xtn->ev.reg.len++; + MUTEX_UNLOCK (&xtn->ev.reg.pmtx); xtn->epd.ptr[fd] = event_data; return 0; #elif defined(USE_SELECT) xtn_t* xtn = (xtn_t*)moo_getxtn(moo); + + MUTEX_LOCK (&xtn->ev.reg.smtx); if (event_mask & XPOLLIN) { FD_SET (fd, &xtn->ev.reg.rfds); @@ -890,6 +929,7 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat FD_SET (fd, &xtn->ev.reg.wfds); if (fd > xtn->ev.reg.maxfd) xtn->ev.reg.maxfd = fd; } + MUTEX_UNLOCK (&xtn->ev.reg.smtx); xtn->epd.data[fd] = (void*)event_data; return 0; @@ -942,15 +982,18 @@ static int _del_poll_fd (moo_t* moo, int fd) moo_oow_t i; /* TODO: performance boost. no linear search */ + MUTEX_LOCK (&xtn->ev.reg.pmtx); for (i = 0; i < xtn->ev.reg.len; i++) { if (xtn->ev.reg.ptr[i].fd == fd) { xtn->ev.reg.len--; memmove (&xtn->ev.reg.ptr[i], &xtn->ev.reg.ptr[i+1], (xtn->ev.reg.len - i) * MOO_SIZEOF(*xtn->ev.reg.ptr)); + MUTEX_UNLOCK (&xtn->ev.reg.pmtx); return 0; } } + MUTEX_UNLOCK (&xtn->ev.reg.pmtx); MOO_DEBUG1 (moo, "Cannot remove file descriptor %d from poll - not found\n", fd); @@ -960,19 +1003,22 @@ static int _del_poll_fd (moo_t* moo, int fd) #elif defined(USE_SELECT) xtn_t* xtn = (xtn_t*)moo_getxtn(moo); + MUTEX_LOCK (&xtn->ev.reg.smtx); FD_CLR (fd, &xtn->ev.reg.rfds); FD_CLR (fd, &xtn->ev.reg.wfds); - if (fd == xtn->ev.reg.maxfd) + if (fd >= xtn->ev.reg.maxfd) { int i; /* TODO: any way to make this search faster or to do without the search like this */ - for (i = fd; i > 0;) + for (i = fd - 1; i >= 0; i--) { - i--; if (FD_ISSET(i, &xtn->ev.reg.rfds) || FD_ISSET(i, &xtn->ev.reg.wfds)) break; } xtn->ev.reg.maxfd = i; } + MUTEX_UNLOCK (&xtn->ev.reg.smtx); + + /* keep xtn->epd.data[fd] so that the data is still accessible after deletion */ return 0; #else @@ -1019,6 +1065,7 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat xtn_t* xtn = (xtn_t*)moo_getxtn(moo); moo_oow_t i; + MUTEX_LOCK (&xtn->ev.reg.pmtx); for (i = 0; i < xtn->ev.reg.len; i++) { if (xtn->ev.reg.ptr[i].fd == fd) @@ -1027,12 +1074,14 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat xtn->ev.reg.ptr[i].fd = fd; xtn->ev.reg.ptr[i].events = event_mask; xtn->ev.reg.ptr[i].revents = 0; + MUTEX_UNLOCK (&xtn->ev.reg.pmtx); - MOO_ASSERT (moo, fd < xtn->epd.capa); + MOO_ASSERT (moo, fd < xtn->epd.capa); xtn->epd.ptr[fd] = event_data; return 0; } } + MUTEX_UNLOCK (&xtn->ev.reg.pmtx); MOO_DEBUG1 (moo, "Cannot modify file descriptor %d in poll - not found\n", fd); moo_seterrnum (moo, MOO_ENOENT); @@ -1042,8 +1091,8 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat xtn_t* xtn = (xtn_t*)moo_getxtn(moo); + MUTEX_LOCK (&xtn->ev.reg.smtx); MOO_ASSERT (moo, fd <= xtn->ev.reg.maxfd); - MOO_ASSERT (moo, event_mask & (XPOLLIN | XPOLLOUT)); if (event_mask & XPOLLIN) FD_SET (fd, &xtn->ev.reg.rfds); @@ -1054,6 +1103,7 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat FD_SET (fd, &xtn->ev.reg.wfds); else FD_CLR (fd, &xtn->ev.reg.wfds); + MUTEX_UNLOCK (&xtn->ev.reg.smtx); xtn->epd.data[fd] = (void*)event_data; return 0; @@ -1066,114 +1116,6 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat } -#if defined(USE_THREAD) -static void* iothr_main (void* arg) -{ - moo_t* moo = (moo_t*)arg; - xtn_t* xtn = (xtn_t*)moo_getxtn(moo); - - /*while (!moo->abort_req)*/ - while (!xtn->iothr_abort) - { - if (xtn->ev.len <= 0) /* TODO: no mutex needed for this check? */ - { - int n; - #if defined(USE_DEVPOLL) - struct dvpoll dvp; - #elif defined(USE_SELECT) - struct timeval tv; - fd_set rfds; - fd_set wfds; - #endif - - poll_for_event: - - #if defined(USE_DEVPOLL) - dvp.dp_timeout = 10000; /* milliseconds */ - dvp.dp_fds = xtn->ev.buf; - dvp.dp_nfds = MOO_COUNTOF(xtn->ev.buf); - n = ioctl (xtn->ep, DP_POLL, &dvp); - #elif defined(USE_EPOLL) - n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), 10000); - #elif defined(USE_POLL) - memcpy (xtn->ev.buf, xtn->ev.reg.ptr, xtn->ev.reg.len * MOO_SIZEOF(*xtn->ev.buf)); - n = poll (xtn->ev.buf, xtn->ev.reg.len, 10000); - #elif defined(USE_SELECT) - tv.tv_sec = 10; - tv.tv_usec = 0; - MOO_MEMCPY (&rfds, &xtn->ev.reg.rfds, MOO_SIZEOF(rfds)); - MOO_MEMCPY (&wfds, &xtn->ev.reg.wfds, MOO_SIZEOF(wfds)); - n = select (xtn->ev.reg.maxfd + 1, &rfds, &wfds, NULL, &tv); - if (n > 0) - { - int fd, count = 0; - for (fd = 0; fd <= xtn->ev.reg.maxfd; fd++) - { - int events = 0; - if (FD_ISSET(fd, &rfds)) events |= XPOLLIN; - if (FD_ISSET(fd, &wfds)) events |= XPOLLOUT; - - if (events) - { - xtn->ev.buf[count].fd = fd; - xtn->ev.buf[count].events = events; - count++; - } - } - n = count; - MOO_ASSERT (moo, n > 0); - } - #endif - - pthread_mutex_lock (&xtn->ev.mtx); - if (n <= -1) - { - /* TODO: don't use MOO_DEBUG2. it's not thread safe... */ - /* the following call has a race-condition issue when called in this separate thread */ - /*MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno));*/ - } - else if (n > 0) - { - xtn->ev.len = n; - } - pthread_cond_signal (&xtn->ev.cnd2); - pthread_mutex_unlock (&xtn->ev.mtx); - } - else - { - /* the event buffer has not been emptied yet */ - struct timespec ts; - - pthread_mutex_lock (&xtn->ev.mtx); - if (xtn->ev.len <= 0) - { - /* it got emptied between the if check and pthread_mutex_lock() above */ - pthread_mutex_unlock (&xtn->ev.mtx); - goto poll_for_event; - } - - #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_REALTIME) - clock_gettime (CLOCK_REALTIME, &ts); - #else - { - struct timeval tv; - gettimeofday (&tv, MOO_NULL); - ts.tv_sec = tv.tv_sec; - ts.tv_nsec = MOO_USEC_TO_NSEC(tv.tv_usec); - } - #endif - ts.tv_sec += 10; - pthread_cond_timedwait (&xtn->ev.cnd, &xtn->ev.mtx, &ts); - pthread_mutex_unlock (&xtn->ev.mtx); - } - - /*sched_yield ();*/ - } - - return MOO_NULL; -} -#endif - static int vm_startup (moo_t* moo) { #if defined(_WIN32) @@ -1216,15 +1158,20 @@ static int vm_startup (moo_t* moo) flag = fcntl (xtn->ep, F_GETFD); if (flag >= 0) fcntl (xtn->ep, F_SETFD, flag | FD_CLOEXEC); #endif + +#elif defined(USE_POLL) + + MUTEX_INIT (&xtn->ev.reg.pmtx); + #elif defined(USE_SELECT) FD_ZERO (&xtn->ev.reg.rfds); FD_ZERO (&xtn->ev.reg.wfds); xtn->ev.reg.maxfd = -1; + MUTEX_INIT (&xtn->ev.reg.smtx); #endif /* USE_DEVPOLL */ - #if defined(USE_THREAD) - if (pipe (xtn->p) == -1) + if (pipe(xtn->p) == -1) { moo_syserrtoerrnum (errno); MOO_DEBUG1 (moo, "Cannot create pipes - %hs\n", strerror(errno)); @@ -1246,7 +1193,7 @@ static int vm_startup (moo_t* moo) if (flag >= 0) fcntl (xtn->p[1], F_SETFL, flag | O_NONBLOCK); #endif - if (_add_poll_fd (moo, xtn->p[0], XPOLLIN, MOO_TYPE_MAX(moo_oow_t)) <= -1) goto oops; + if (_add_poll_fd(moo, xtn->p[0], XPOLLIN, MOO_TYPE_MAX(moo_oow_t)) <= -1) goto oops; pthread_mutex_init (&xtn->ev.mtx, MOO_NULL); pthread_cond_init (&xtn->ev.cnd, MOO_NULL); @@ -1272,7 +1219,6 @@ oops: } #endif - #if defined(USE_DEVPOLL) || defined(USE_EPOLL) if (xtn->ep >= 0) { @@ -1317,8 +1263,15 @@ static void vm_cleanup (moo_t* moo) close (xtn->p[0]); #endif /* USE_THREAD */ -#if defined(USE_DEVPOLL) || defined(USE_EPOLL) - if (xtn->ep) +#if defined(USE_DEVPOLL) + if (xtn->ep >= 0) + { + close (xtn->ep); + xtn->ep = -1; + } + destroy_poll_data_space (moo); +#elif defined(USE_EPOLL) + if (xtn->ep >= 0) { close (xtn->ep); xtn->ep = -1; @@ -1336,19 +1289,13 @@ static void vm_cleanup (moo_t* moo) moo_freemem (moo, xtn->ev.buf); xtn->ev.buf = MOO_NULL; } + destroy_poll_data_space (moo); + MUTEX_DESTROY (&xtn->ev.reg.pmtx); #elif defined(USE_SELECT) FD_ZERO (&xtn->ev.reg.rfds); FD_ZERO (&xtn->ev.reg.wfds); xtn->ev.reg.maxfd = -1; -#endif - -#if defined(USE_DEVPOLL) || defined(USE_POLL) - if (xtn->epd.ptr) - { - moo_freemem (moo, xtn->epd.ptr); - xtn->epd.ptr = MOO_NULL; - xtn->epd.capa = 0; - } + MUTEX_DESTROY (&xtn->ev.reg.smtx); #endif #endif @@ -1467,6 +1414,139 @@ static int vm_muxdel (moo_t* moo, moo_oop_semaphore_t sem) return _del_poll_fd (moo, MOO_OOP_TO_SMOOI(sem->io_handle)); } +#if defined(USE_THREAD) +static void* iothr_main (void* arg) +{ + moo_t* moo = (moo_t*)arg; + xtn_t* xtn = (xtn_t*)moo_getxtn(moo); + + /*while (!moo->abort_req)*/ + while (!xtn->iothr_abort) + { + if (xtn->ev.len <= 0) /* TODO: no mutex needed for this check? */ + { + int n; + #if defined(USE_DEVPOLL) + struct dvpoll dvp; + #elif defined(USE_POLL) + moo_oow_t nfds; + #elif defined(USE_SELECT) + struct timeval tv; + fd_set rfds; + fd_set wfds; + int maxfd; + #endif + + poll_for_event: + + #if defined(USE_DEVPOLL) + dvp.dp_timeout = 10000; /* milliseconds */ + dvp.dp_fds = xtn->ev.buf; + dvp.dp_nfds = MOO_COUNTOF(xtn->ev.buf); + n = ioctl (xtn->ep, DP_POLL, &dvp); + #elif defined(USE_EPOLL) + n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), 10000); + #elif defined(USE_POLL) + MUTEX_LOCK (&xtn->ev.reg.pmtx); + memcpy (xtn->ev.buf, xtn->ev.reg.ptr, xtn->ev.reg.len * MOO_SIZEOF(*xtn->ev.buf)); + nfds = xtn->ev.reg.len; + MUTEX_UNLOCK (&xtn->ev.reg.pmtx); + n = poll (xtn->ev.buf, nfds, 10000); + if (n > 0) + { + /* compact the return buffer as poll() doesn't */ + moo_oow_t i, j; + for (i = 0, j = 0; i < nfds && j < n; i++) + { + if (xtn->ev.buf[i].revents) + { + if (j != i) xtn->ev.buf[j] = xtn->ev.buf[i]; + j++; + } + } + n = j; + } + #elif defined(USE_SELECT) + tv.tv_sec = 10; + tv.tv_usec = 0; + MUTEX_LOCK (&xtn->ev.reg.smtx); + maxfd = xtn->ev.reg.maxfd; + MOO_MEMCPY (&rfds, &xtn->ev.reg.rfds, MOO_SIZEOF(rfds)); + MOO_MEMCPY (&wfds, &xtn->ev.reg.wfds, MOO_SIZEOF(wfds)); + MUTEX_UNLOCK (&xtn->ev.reg.smtx); + n = select (maxfd + 1, &rfds, &wfds, NULL, &tv); + if (n > 0) + { + int fd, count = 0; + for (fd = 0; fd <= maxfd; fd++) + { + int events = 0; + if (FD_ISSET(fd, &rfds)) events |= XPOLLIN; + if (FD_ISSET(fd, &wfds)) events |= XPOLLOUT; + + if (events) + { + MOO_ASSERT (moo, count < MOO_COUNTOF(xtn->ev.buf)); + xtn->ev.buf[count].fd = fd; + xtn->ev.buf[count].events = events; + count++; + } + } + + n = count; + MOO_ASSERT (moo, n > 0); + } + #endif + + pthread_mutex_lock (&xtn->ev.mtx); + if (n <= -1) + { + /* TODO: don't use MOO_DEBUG2. it's not thread safe... */ + /* the following call has a race-condition issue when called in this separate thread */ + /*MOO_DEBUG2 (moo, "Warning: multiplexer wait failure - %d, %hs\n", errno, strerror(errno));*/ + } + else if (n > 0) + { + xtn->ev.len = n; + } + pthread_cond_signal (&xtn->ev.cnd2); + pthread_mutex_unlock (&xtn->ev.mtx); + } + else + { + /* the event buffer has not been emptied yet */ + struct timespec ts; + + pthread_mutex_lock (&xtn->ev.mtx); + if (xtn->ev.len <= 0) + { + /* it got emptied between the if check and pthread_mutex_lock() above */ + pthread_mutex_unlock (&xtn->ev.mtx); + goto poll_for_event; + } + + #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_REALTIME) + clock_gettime (CLOCK_REALTIME, &ts); + #else + { + struct timeval tv; + gettimeofday (&tv, MOO_NULL); + ts.tv_sec = tv.tv_sec; + ts.tv_nsec = MOO_USEC_TO_NSEC(tv.tv_usec); + } + #endif + ts.tv_sec += 10; + pthread_cond_timedwait (&xtn->ev.cnd, &xtn->ev.mtx, &ts); + pthread_mutex_unlock (&xtn->ev.mtx); + } + + /*sched_yield ();*/ + } + + return MOO_NULL; +} +#endif + static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_cb_t muxwcb) { xtn_t* xtn = (xtn_t*)moo_getxtn(moo); @@ -1526,10 +1606,12 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c { --n; - #if defined(USE_DEVPOLL) || defined(USE_POLL) + #if defined(USE_DEVPOLL) if (xtn->ev.buf[n].fd == xtn->p[0]) #elif defined(USE_EPOLL) if (xtn->ev.buf[n].data.ptr == (void*)MOO_TYPE_MAX(moo_oow_t)) + #elif defined(USE_POLL) + if (xtn->ev.buf[n].fd == xtn->p[0]) #elif defined(USE_SELECT) if (xtn->ev.buf[n].fd == xtn->p[0]) #else @@ -1548,10 +1630,12 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c int revents; moo_ooi_t mask; - #if defined(USE_DEVPOLL) || defined(USE_POLL) + #if defined(USE_DEVPOLL) revents = xtn->ev.buf[n].revents; #elif defined(USE_EPOLL) revents = xtn->ev.buf[n].events; + #elif defined(USE_POLL) + revents = xtn->ev.buf[n].revents; #elif defined(USE_SELECT) revents = xtn->ev.buf[n].events; #endif @@ -1562,13 +1646,18 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c if (revents & XPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; - #if defined(USE_DEVPOLL) || defined(USE_POLL) + #if defined(USE_DEVPOLL) MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd); muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]); #elif defined(USE_EPOLL) muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); + #elif defined(USE_POLL) + MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd); + muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]); #elif defined(USE_SELECT) muxwcb (moo, mask, (void*)xtn->epd.data[xtn->ev.buf[n].fd]); + #else + # error UNSUPPORTED #endif } } @@ -1586,6 +1675,8 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c struct dvpoll dvp; #elif defined(USE_SELECT) struct timeval tv; + fd_set rfds, wfds; + int maxfd; #endif if (dur) tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec); @@ -1600,17 +1691,61 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c #elif defined(USE_POLL) memcpy (xtn->ev.buf, xtn->ev.reg.ptr, xtn->ev.reg.len * MOO_SIZEOF(*xtn->ev.buf)); n = poll (xtn->ev.buf, xtn->ev.reg.len, tmout); + if (n > 0) + { + /* compact the return buffer as poll() doesn't */ + moo_oow_t i, j; + for (i = 0, j = 0; i < xtn->ev.reg.len && j < n; i++) + { + if (xtn->ev.buf[i].revents) + { + if (j != i) xtn->ev.buf[j] = xtn->ev.buf[i]; + j++; + } + } + n = j; + } #elif defined(USE_SELECT) - tv.tv_sec = dur->sec; - tv.tv_usec = MOO_NSEC_TO_USEC(dur->nsec); - xtn->ev.buf.rfds = xtn->ev.reg.rfds; - xtn->ev.buf.wfds = xtn->ev.reg.wfds; - n = select (xtn->ev.reg.maxfd + 1, &xtn->ev.buf.rfds, &xtn->ev.buf.wfds, NULL, &tv); + if (dur) + { + tv.tv_sec = dur->sec; + tv.tv_usec = MOO_NSEC_TO_USEC(dur->nsec); + } + else + { + tv.tv_sec = 0; + tv.tv_usec = 0; + } + maxfd = xtn->ev.reg.maxfd; + MOO_MEMCPY (&rfds, &xtn->ev.reg.rfds, MOO_SIZEOF(rfds)); + MOO_MEMCPY (&wfds, &xtn->ev.reg.wfds, MOO_SIZEOF(wfds)); + n = select (maxfd + 1, &rfds, &wfds, NULL, &tv); + if (n > 0) + { + int fd, count = 0; + for (fd = 0; fd <= maxfd; fd++) + { + int events = 0; + if (FD_ISSET(fd, &rfds)) events |= XPOLLIN; + if (FD_ISSET(fd, &wfds)) events |= XPOLLOUT; + + if (events) + { + MOO_ASSERT (moo, count < MOO_COUNTOF(xtn->ev.buf)); + xtn->ev.buf[count].fd = fd; + xtn->ev.buf[count].events = events; + count++; + } + } + + n = count; + MOO_ASSERT (moo, n > 0); + } #endif if (n <= -1) { - MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno)); + MOO_DEBUG2 (moo, "Warning: multiplexer wait failure - %d, %hs\n", errno, strerror(errno)); } else { @@ -1627,12 +1762,14 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c --n; - #if defined(USE_DEVPOLL) || defined(USE_POLL) + #if defined(USE_DEVPOLL) revents = xtn->ev.buf[n].revents; #elif defined(USE_EPOLL) revents = xtn->ev.buf[n].events; + #elif defined(USE_POLL) + revents = xtn->ev.buf[n].revents; #elif defined(USE_SELECT) - #error TODO: ... + revents = xtn->ev.buf[n].events; #else revents = 0; /* TODO: fake. unsupported */ #endif @@ -1643,16 +1780,18 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c if (revents & XPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; - #if defined(USE_DEVPOLL) || defined(USE_POLL) + #if defined(USE_DEVPOLL) MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd); muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]); #elif defined(USE_EPOLL) muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); + #elif defined(USE_POLL) + MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd); + muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]); #elif defined(USE_SELECT) - #error TODO... - muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); + muxwcb (moo, mask, (void*)xtn->epd.data[xtn->ev.buf[n].fd]); #else - /* TODO: fake. unsupported */ + # error UNSUPPORTED #endif }