fixed potential race condition issues when 'select' or 'poll' is used as a multiplexer backend

This commit is contained in:
hyunghwan.chung 2017-09-27 14:03:05 +00:00
parent 3b5f059569
commit e38a4f1f20

View File

@ -157,6 +157,18 @@
# endif # endif
#endif #endif
#if defined(USE_THREAD)
# define MUTEX_INIT(x) pthread_mutex_init((x), MOO_NULL)
# define MUTEX_DESTROY(x) pthread_mutex_destroy(x)
# define MUTEX_LOCK(x) pthread_mutex_lock(x)
# define MUTEX_UNLOCK(x) pthread_mutex_unlock(x)
#else
# define MUTEX_INIT(x)
# define MUTEX_DESTROY(x)
# define MUTEX_LOCK(x)
# define MUTEX_UNLOCK(x)
#endif
typedef struct bb_t bb_t; typedef struct bb_t bb_t;
struct bb_t struct bb_t
{ {
@ -186,11 +198,16 @@ struct xtn_t
HANDLE waitable_timer; HANDLE waitable_timer;
#else #else
#if defined(USE_DEVPOLL) || defined(USE_EPOLL) #if defined(USE_DEVPOLL)
int ep; /* /dev/poll or epoll */ int ep; /* /dev/poll */
#endif struct
{
#if defined(USE_DEVPOLL) || defined(USE_POLL) moo_oow_t capa;
moo_ooi_t* ptr;
} epd;
#elif defined(USE_EPOLL)
int ep; /* epoll */
#elif defined(USE_POLL)
struct struct
{ {
moo_oow_t capa; moo_oow_t capa;
@ -226,14 +243,21 @@ struct xtn_t
struct pollfd* ptr; struct pollfd* ptr;
moo_oow_t capa; moo_oow_t capa;
moo_oow_t len; moo_oow_t len;
#if defined(USE_THREAD)
pthread_mutex_t pmtx;
#endif
} reg; /* registrar */ } reg; /* registrar */
struct pollfd* buf; struct pollfd* buf;
#elif defined(USE_SELECT) #elif defined(USE_SELECT)
struct struct
{ {
fd_set rfds; fd_set rfds;
fd_set wfds; fd_set wfds;
int maxfd; int maxfd;
#if defined(USE_THREAD)
pthread_mutex_t smtx;
#endif
} reg; } reg;
struct select_fd_t buf[FD_SETSIZE]; struct select_fd_t buf[FD_SETSIZE];
@ -780,7 +804,7 @@ static void log_write (moo_t* moo, moo_oow_t mask, const moo_ooch_t* msg, moo_oo
/* ========================================================================= */ /* ========================================================================= */
#if defined(USE_DEVPOLL) || defined(USE_POLL) #if defined(USE_DEVPOLL) || defined(USE_POLL)
static int secure_poll_data_space (moo_t* moo, int fd) static MOO_INLINE int secure_poll_data_space (moo_t* moo, int fd)
{ {
xtn_t* xtn = (xtn_t*)moo_getxtn(moo); xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
if (fd >= xtn->epd.capa) if (fd >= xtn->epd.capa)
@ -798,6 +822,16 @@ static int secure_poll_data_space (moo_t* moo, int fd)
return 0; return 0;
} }
static MOO_INLINE void destroy_poll_data_space (moo_t* moo)
{
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
if (xtn->epd.ptr)
{
moo_freemem (moo, xtn->epd.ptr);
xtn->epd.ptr = MOO_NULL;
xtn->epd.capa = 0;
}
}
#endif #endif
@ -849,6 +883,7 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat
if (secure_poll_data_space (moo, fd) <= -1) return -1; if (secure_poll_data_space (moo, fd) <= -1) return -1;
MUTEX_LOCK (&xtn->ev.reg.pmtx);
if (xtn->ev.reg.len >= xtn->ev.reg.capa) if (xtn->ev.reg.len >= xtn->ev.reg.capa)
{ {
struct pollfd* tmp, * tmp2; struct pollfd* tmp, * tmp2;
@ -860,6 +895,7 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat
if (!tmp || !tmp2) if (!tmp || !tmp2)
{ {
MOO_DEBUG2 (moo, "Cannot add file descriptor %d to poll - %hs\n", fd, strerror(errno)); MOO_DEBUG2 (moo, "Cannot add file descriptor %d to poll - %hs\n", fd, strerror(errno));
MUTEX_UNLOCK (&xtn->ev.reg.pmtx);
if (tmp) moo_freemem (moo, tmp); if (tmp) moo_freemem (moo, tmp);
return -1; return -1;
} }
@ -874,12 +910,15 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat
xtn->ev.reg.ptr[xtn->ev.reg.len].events = event_mask; xtn->ev.reg.ptr[xtn->ev.reg.len].events = event_mask;
xtn->ev.reg.ptr[xtn->ev.reg.len].revents = 0; xtn->ev.reg.ptr[xtn->ev.reg.len].revents = 0;
xtn->ev.reg.len++; xtn->ev.reg.len++;
MUTEX_UNLOCK (&xtn->ev.reg.pmtx);
xtn->epd.ptr[fd] = event_data; xtn->epd.ptr[fd] = event_data;
return 0; return 0;
#elif defined(USE_SELECT) #elif defined(USE_SELECT)
xtn_t* xtn = (xtn_t*)moo_getxtn(moo); xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
MUTEX_LOCK (&xtn->ev.reg.smtx);
if (event_mask & XPOLLIN) if (event_mask & XPOLLIN)
{ {
FD_SET (fd, &xtn->ev.reg.rfds); FD_SET (fd, &xtn->ev.reg.rfds);
@ -890,6 +929,7 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat
FD_SET (fd, &xtn->ev.reg.wfds); FD_SET (fd, &xtn->ev.reg.wfds);
if (fd > xtn->ev.reg.maxfd) xtn->ev.reg.maxfd = fd; if (fd > xtn->ev.reg.maxfd) xtn->ev.reg.maxfd = fd;
} }
MUTEX_UNLOCK (&xtn->ev.reg.smtx);
xtn->epd.data[fd] = (void*)event_data; xtn->epd.data[fd] = (void*)event_data;
return 0; return 0;
@ -942,15 +982,18 @@ static int _del_poll_fd (moo_t* moo, int fd)
moo_oow_t i; moo_oow_t i;
/* TODO: performance boost. no linear search */ /* TODO: performance boost. no linear search */
MUTEX_LOCK (&xtn->ev.reg.pmtx);
for (i = 0; i < xtn->ev.reg.len; i++) for (i = 0; i < xtn->ev.reg.len; i++)
{ {
if (xtn->ev.reg.ptr[i].fd == fd) if (xtn->ev.reg.ptr[i].fd == fd)
{ {
xtn->ev.reg.len--; xtn->ev.reg.len--;
memmove (&xtn->ev.reg.ptr[i], &xtn->ev.reg.ptr[i+1], (xtn->ev.reg.len - i) * MOO_SIZEOF(*xtn->ev.reg.ptr)); memmove (&xtn->ev.reg.ptr[i], &xtn->ev.reg.ptr[i+1], (xtn->ev.reg.len - i) * MOO_SIZEOF(*xtn->ev.reg.ptr));
MUTEX_UNLOCK (&xtn->ev.reg.pmtx);
return 0; return 0;
} }
} }
MUTEX_UNLOCK (&xtn->ev.reg.pmtx);
MOO_DEBUG1 (moo, "Cannot remove file descriptor %d from poll - not found\n", fd); MOO_DEBUG1 (moo, "Cannot remove file descriptor %d from poll - not found\n", fd);
@ -960,19 +1003,22 @@ static int _del_poll_fd (moo_t* moo, int fd)
#elif defined(USE_SELECT) #elif defined(USE_SELECT)
xtn_t* xtn = (xtn_t*)moo_getxtn(moo); xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
MUTEX_LOCK (&xtn->ev.reg.smtx);
FD_CLR (fd, &xtn->ev.reg.rfds); FD_CLR (fd, &xtn->ev.reg.rfds);
FD_CLR (fd, &xtn->ev.reg.wfds); FD_CLR (fd, &xtn->ev.reg.wfds);
if (fd == xtn->ev.reg.maxfd) if (fd >= xtn->ev.reg.maxfd)
{ {
int i; int i;
/* TODO: any way to make this search faster or to do without the search like this */ /* TODO: any way to make this search faster or to do without the search like this */
for (i = fd; i > 0;) for (i = fd - 1; i >= 0; i--)
{ {
i--;
if (FD_ISSET(i, &xtn->ev.reg.rfds) || FD_ISSET(i, &xtn->ev.reg.wfds)) break; if (FD_ISSET(i, &xtn->ev.reg.rfds) || FD_ISSET(i, &xtn->ev.reg.wfds)) break;
} }
xtn->ev.reg.maxfd = i; xtn->ev.reg.maxfd = i;
} }
MUTEX_UNLOCK (&xtn->ev.reg.smtx);
/* keep xtn->epd.data[fd] so that the data is still accessible after deletion */
return 0; return 0;
#else #else
@ -1019,6 +1065,7 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat
xtn_t* xtn = (xtn_t*)moo_getxtn(moo); xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
moo_oow_t i; moo_oow_t i;
MUTEX_LOCK (&xtn->ev.reg.pmtx);
for (i = 0; i < xtn->ev.reg.len; i++) for (i = 0; i < xtn->ev.reg.len; i++)
{ {
if (xtn->ev.reg.ptr[i].fd == fd) if (xtn->ev.reg.ptr[i].fd == fd)
@ -1027,12 +1074,14 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat
xtn->ev.reg.ptr[i].fd = fd; xtn->ev.reg.ptr[i].fd = fd;
xtn->ev.reg.ptr[i].events = event_mask; xtn->ev.reg.ptr[i].events = event_mask;
xtn->ev.reg.ptr[i].revents = 0; xtn->ev.reg.ptr[i].revents = 0;
MUTEX_UNLOCK (&xtn->ev.reg.pmtx);
MOO_ASSERT (moo, fd < xtn->epd.capa); MOO_ASSERT (moo, fd < xtn->epd.capa);
xtn->epd.ptr[fd] = event_data; xtn->epd.ptr[fd] = event_data;
return 0; return 0;
} }
} }
MUTEX_UNLOCK (&xtn->ev.reg.pmtx);
MOO_DEBUG1 (moo, "Cannot modify file descriptor %d in poll - not found\n", fd); MOO_DEBUG1 (moo, "Cannot modify file descriptor %d in poll - not found\n", fd);
moo_seterrnum (moo, MOO_ENOENT); moo_seterrnum (moo, MOO_ENOENT);
@ -1042,8 +1091,8 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat
xtn_t* xtn = (xtn_t*)moo_getxtn(moo); xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
MUTEX_LOCK (&xtn->ev.reg.smtx);
MOO_ASSERT (moo, fd <= xtn->ev.reg.maxfd); MOO_ASSERT (moo, fd <= xtn->ev.reg.maxfd);
MOO_ASSERT (moo, event_mask & (XPOLLIN | XPOLLOUT));
if (event_mask & XPOLLIN) if (event_mask & XPOLLIN)
FD_SET (fd, &xtn->ev.reg.rfds); FD_SET (fd, &xtn->ev.reg.rfds);
@ -1054,6 +1103,7 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat
FD_SET (fd, &xtn->ev.reg.wfds); FD_SET (fd, &xtn->ev.reg.wfds);
else else
FD_CLR (fd, &xtn->ev.reg.wfds); FD_CLR (fd, &xtn->ev.reg.wfds);
MUTEX_UNLOCK (&xtn->ev.reg.smtx);
xtn->epd.data[fd] = (void*)event_data; xtn->epd.data[fd] = (void*)event_data;
return 0; return 0;
@ -1066,114 +1116,6 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat
} }
#if defined(USE_THREAD)
static void* iothr_main (void* arg)
{
moo_t* moo = (moo_t*)arg;
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
/*while (!moo->abort_req)*/
while (!xtn->iothr_abort)
{
if (xtn->ev.len <= 0) /* TODO: no mutex needed for this check? */
{
int n;
#if defined(USE_DEVPOLL)
struct dvpoll dvp;
#elif defined(USE_SELECT)
struct timeval tv;
fd_set rfds;
fd_set wfds;
#endif
poll_for_event:
#if defined(USE_DEVPOLL)
dvp.dp_timeout = 10000; /* milliseconds */
dvp.dp_fds = xtn->ev.buf;
dvp.dp_nfds = MOO_COUNTOF(xtn->ev.buf);
n = ioctl (xtn->ep, DP_POLL, &dvp);
#elif defined(USE_EPOLL)
n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), 10000);
#elif defined(USE_POLL)
memcpy (xtn->ev.buf, xtn->ev.reg.ptr, xtn->ev.reg.len * MOO_SIZEOF(*xtn->ev.buf));
n = poll (xtn->ev.buf, xtn->ev.reg.len, 10000);
#elif defined(USE_SELECT)
tv.tv_sec = 10;
tv.tv_usec = 0;
MOO_MEMCPY (&rfds, &xtn->ev.reg.rfds, MOO_SIZEOF(rfds));
MOO_MEMCPY (&wfds, &xtn->ev.reg.wfds, MOO_SIZEOF(wfds));
n = select (xtn->ev.reg.maxfd + 1, &rfds, &wfds, NULL, &tv);
if (n > 0)
{
int fd, count = 0;
for (fd = 0; fd <= xtn->ev.reg.maxfd; fd++)
{
int events = 0;
if (FD_ISSET(fd, &rfds)) events |= XPOLLIN;
if (FD_ISSET(fd, &wfds)) events |= XPOLLOUT;
if (events)
{
xtn->ev.buf[count].fd = fd;
xtn->ev.buf[count].events = events;
count++;
}
}
n = count;
MOO_ASSERT (moo, n > 0);
}
#endif
pthread_mutex_lock (&xtn->ev.mtx);
if (n <= -1)
{
/* TODO: don't use MOO_DEBUG2. it's not thread safe... */
/* the following call has a race-condition issue when called in this separate thread */
/*MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno));*/
}
else if (n > 0)
{
xtn->ev.len = n;
}
pthread_cond_signal (&xtn->ev.cnd2);
pthread_mutex_unlock (&xtn->ev.mtx);
}
else
{
/* the event buffer has not been emptied yet */
struct timespec ts;
pthread_mutex_lock (&xtn->ev.mtx);
if (xtn->ev.len <= 0)
{
/* it got emptied between the if check and pthread_mutex_lock() above */
pthread_mutex_unlock (&xtn->ev.mtx);
goto poll_for_event;
}
#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_REALTIME)
clock_gettime (CLOCK_REALTIME, &ts);
#else
{
struct timeval tv;
gettimeofday (&tv, MOO_NULL);
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = MOO_USEC_TO_NSEC(tv.tv_usec);
}
#endif
ts.tv_sec += 10;
pthread_cond_timedwait (&xtn->ev.cnd, &xtn->ev.mtx, &ts);
pthread_mutex_unlock (&xtn->ev.mtx);
}
/*sched_yield ();*/
}
return MOO_NULL;
}
#endif
static int vm_startup (moo_t* moo) static int vm_startup (moo_t* moo)
{ {
#if defined(_WIN32) #if defined(_WIN32)
@ -1216,15 +1158,20 @@ static int vm_startup (moo_t* moo)
flag = fcntl (xtn->ep, F_GETFD); flag = fcntl (xtn->ep, F_GETFD);
if (flag >= 0) fcntl (xtn->ep, F_SETFD, flag | FD_CLOEXEC); if (flag >= 0) fcntl (xtn->ep, F_SETFD, flag | FD_CLOEXEC);
#endif #endif
#elif defined(USE_POLL)
MUTEX_INIT (&xtn->ev.reg.pmtx);
#elif defined(USE_SELECT) #elif defined(USE_SELECT)
FD_ZERO (&xtn->ev.reg.rfds); FD_ZERO (&xtn->ev.reg.rfds);
FD_ZERO (&xtn->ev.reg.wfds); FD_ZERO (&xtn->ev.reg.wfds);
xtn->ev.reg.maxfd = -1; xtn->ev.reg.maxfd = -1;
MUTEX_INIT (&xtn->ev.reg.smtx);
#endif /* USE_DEVPOLL */ #endif /* USE_DEVPOLL */
#if defined(USE_THREAD) #if defined(USE_THREAD)
if (pipe (xtn->p) == -1) if (pipe(xtn->p) == -1)
{ {
moo_syserrtoerrnum (errno); moo_syserrtoerrnum (errno);
MOO_DEBUG1 (moo, "Cannot create pipes - %hs\n", strerror(errno)); MOO_DEBUG1 (moo, "Cannot create pipes - %hs\n", strerror(errno));
@ -1246,7 +1193,7 @@ static int vm_startup (moo_t* moo)
if (flag >= 0) fcntl (xtn->p[1], F_SETFL, flag | O_NONBLOCK); if (flag >= 0) fcntl (xtn->p[1], F_SETFL, flag | O_NONBLOCK);
#endif #endif
if (_add_poll_fd (moo, xtn->p[0], XPOLLIN, MOO_TYPE_MAX(moo_oow_t)) <= -1) goto oops; if (_add_poll_fd(moo, xtn->p[0], XPOLLIN, MOO_TYPE_MAX(moo_oow_t)) <= -1) goto oops;
pthread_mutex_init (&xtn->ev.mtx, MOO_NULL); pthread_mutex_init (&xtn->ev.mtx, MOO_NULL);
pthread_cond_init (&xtn->ev.cnd, MOO_NULL); pthread_cond_init (&xtn->ev.cnd, MOO_NULL);
@ -1272,7 +1219,6 @@ oops:
} }
#endif #endif
#if defined(USE_DEVPOLL) || defined(USE_EPOLL) #if defined(USE_DEVPOLL) || defined(USE_EPOLL)
if (xtn->ep >= 0) if (xtn->ep >= 0)
{ {
@ -1317,8 +1263,15 @@ static void vm_cleanup (moo_t* moo)
close (xtn->p[0]); close (xtn->p[0]);
#endif /* USE_THREAD */ #endif /* USE_THREAD */
#if defined(USE_DEVPOLL) || defined(USE_EPOLL) #if defined(USE_DEVPOLL)
if (xtn->ep) if (xtn->ep >= 0)
{
close (xtn->ep);
xtn->ep = -1;
}
destroy_poll_data_space (moo);
#elif defined(USE_EPOLL)
if (xtn->ep >= 0)
{ {
close (xtn->ep); close (xtn->ep);
xtn->ep = -1; xtn->ep = -1;
@ -1336,19 +1289,13 @@ static void vm_cleanup (moo_t* moo)
moo_freemem (moo, xtn->ev.buf); moo_freemem (moo, xtn->ev.buf);
xtn->ev.buf = MOO_NULL; xtn->ev.buf = MOO_NULL;
} }
destroy_poll_data_space (moo);
MUTEX_DESTROY (&xtn->ev.reg.pmtx);
#elif defined(USE_SELECT) #elif defined(USE_SELECT)
FD_ZERO (&xtn->ev.reg.rfds); FD_ZERO (&xtn->ev.reg.rfds);
FD_ZERO (&xtn->ev.reg.wfds); FD_ZERO (&xtn->ev.reg.wfds);
xtn->ev.reg.maxfd = -1; xtn->ev.reg.maxfd = -1;
#endif MUTEX_DESTROY (&xtn->ev.reg.smtx);
#if defined(USE_DEVPOLL) || defined(USE_POLL)
if (xtn->epd.ptr)
{
moo_freemem (moo, xtn->epd.ptr);
xtn->epd.ptr = MOO_NULL;
xtn->epd.capa = 0;
}
#endif #endif
#endif #endif
@ -1467,6 +1414,139 @@ static int vm_muxdel (moo_t* moo, moo_oop_semaphore_t sem)
return _del_poll_fd (moo, MOO_OOP_TO_SMOOI(sem->io_handle)); return _del_poll_fd (moo, MOO_OOP_TO_SMOOI(sem->io_handle));
} }
#if defined(USE_THREAD)
static void* iothr_main (void* arg)
{
moo_t* moo = (moo_t*)arg;
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
/*while (!moo->abort_req)*/
while (!xtn->iothr_abort)
{
if (xtn->ev.len <= 0) /* TODO: no mutex needed for this check? */
{
int n;
#if defined(USE_DEVPOLL)
struct dvpoll dvp;
#elif defined(USE_POLL)
moo_oow_t nfds;
#elif defined(USE_SELECT)
struct timeval tv;
fd_set rfds;
fd_set wfds;
int maxfd;
#endif
poll_for_event:
#if defined(USE_DEVPOLL)
dvp.dp_timeout = 10000; /* milliseconds */
dvp.dp_fds = xtn->ev.buf;
dvp.dp_nfds = MOO_COUNTOF(xtn->ev.buf);
n = ioctl (xtn->ep, DP_POLL, &dvp);
#elif defined(USE_EPOLL)
n = epoll_wait (xtn->ep, xtn->ev.buf, MOO_COUNTOF(xtn->ev.buf), 10000);
#elif defined(USE_POLL)
MUTEX_LOCK (&xtn->ev.reg.pmtx);
memcpy (xtn->ev.buf, xtn->ev.reg.ptr, xtn->ev.reg.len * MOO_SIZEOF(*xtn->ev.buf));
nfds = xtn->ev.reg.len;
MUTEX_UNLOCK (&xtn->ev.reg.pmtx);
n = poll (xtn->ev.buf, nfds, 10000);
if (n > 0)
{
/* compact the return buffer as poll() doesn't */
moo_oow_t i, j;
for (i = 0, j = 0; i < nfds && j < n; i++)
{
if (xtn->ev.buf[i].revents)
{
if (j != i) xtn->ev.buf[j] = xtn->ev.buf[i];
j++;
}
}
n = j;
}
#elif defined(USE_SELECT)
tv.tv_sec = 10;
tv.tv_usec = 0;
MUTEX_LOCK (&xtn->ev.reg.smtx);
maxfd = xtn->ev.reg.maxfd;
MOO_MEMCPY (&rfds, &xtn->ev.reg.rfds, MOO_SIZEOF(rfds));
MOO_MEMCPY (&wfds, &xtn->ev.reg.wfds, MOO_SIZEOF(wfds));
MUTEX_UNLOCK (&xtn->ev.reg.smtx);
n = select (maxfd + 1, &rfds, &wfds, NULL, &tv);
if (n > 0)
{
int fd, count = 0;
for (fd = 0; fd <= maxfd; fd++)
{
int events = 0;
if (FD_ISSET(fd, &rfds)) events |= XPOLLIN;
if (FD_ISSET(fd, &wfds)) events |= XPOLLOUT;
if (events)
{
MOO_ASSERT (moo, count < MOO_COUNTOF(xtn->ev.buf));
xtn->ev.buf[count].fd = fd;
xtn->ev.buf[count].events = events;
count++;
}
}
n = count;
MOO_ASSERT (moo, n > 0);
}
#endif
pthread_mutex_lock (&xtn->ev.mtx);
if (n <= -1)
{
/* TODO: don't use MOO_DEBUG2. it's not thread safe... */
/* the following call has a race-condition issue when called in this separate thread */
/*MOO_DEBUG2 (moo, "Warning: multiplexer wait failure - %d, %hs\n", errno, strerror(errno));*/
}
else if (n > 0)
{
xtn->ev.len = n;
}
pthread_cond_signal (&xtn->ev.cnd2);
pthread_mutex_unlock (&xtn->ev.mtx);
}
else
{
/* the event buffer has not been emptied yet */
struct timespec ts;
pthread_mutex_lock (&xtn->ev.mtx);
if (xtn->ev.len <= 0)
{
/* it got emptied between the if check and pthread_mutex_lock() above */
pthread_mutex_unlock (&xtn->ev.mtx);
goto poll_for_event;
}
#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_REALTIME)
clock_gettime (CLOCK_REALTIME, &ts);
#else
{
struct timeval tv;
gettimeofday (&tv, MOO_NULL);
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = MOO_USEC_TO_NSEC(tv.tv_usec);
}
#endif
ts.tv_sec += 10;
pthread_cond_timedwait (&xtn->ev.cnd, &xtn->ev.mtx, &ts);
pthread_mutex_unlock (&xtn->ev.mtx);
}
/*sched_yield ();*/
}
return MOO_NULL;
}
#endif
static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_cb_t muxwcb) static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_cb_t muxwcb)
{ {
xtn_t* xtn = (xtn_t*)moo_getxtn(moo); xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
@ -1526,10 +1606,12 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
{ {
--n; --n;
#if defined(USE_DEVPOLL) || defined(USE_POLL) #if defined(USE_DEVPOLL)
if (xtn->ev.buf[n].fd == xtn->p[0]) if (xtn->ev.buf[n].fd == xtn->p[0])
#elif defined(USE_EPOLL) #elif defined(USE_EPOLL)
if (xtn->ev.buf[n].data.ptr == (void*)MOO_TYPE_MAX(moo_oow_t)) if (xtn->ev.buf[n].data.ptr == (void*)MOO_TYPE_MAX(moo_oow_t))
#elif defined(USE_POLL)
if (xtn->ev.buf[n].fd == xtn->p[0])
#elif defined(USE_SELECT) #elif defined(USE_SELECT)
if (xtn->ev.buf[n].fd == xtn->p[0]) if (xtn->ev.buf[n].fd == xtn->p[0])
#else #else
@ -1548,10 +1630,12 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
int revents; int revents;
moo_ooi_t mask; moo_ooi_t mask;
#if defined(USE_DEVPOLL) || defined(USE_POLL) #if defined(USE_DEVPOLL)
revents = xtn->ev.buf[n].revents; revents = xtn->ev.buf[n].revents;
#elif defined(USE_EPOLL) #elif defined(USE_EPOLL)
revents = xtn->ev.buf[n].events; revents = xtn->ev.buf[n].events;
#elif defined(USE_POLL)
revents = xtn->ev.buf[n].revents;
#elif defined(USE_SELECT) #elif defined(USE_SELECT)
revents = xtn->ev.buf[n].events; revents = xtn->ev.buf[n].events;
#endif #endif
@ -1562,13 +1646,18 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
if (revents & XPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; if (revents & XPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR;
if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP;
#if defined(USE_DEVPOLL) || defined(USE_POLL) #if defined(USE_DEVPOLL)
MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd); MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd);
muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]); muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]);
#elif defined(USE_EPOLL) #elif defined(USE_EPOLL)
muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);
#elif defined(USE_POLL)
MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd);
muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]);
#elif defined(USE_SELECT) #elif defined(USE_SELECT)
muxwcb (moo, mask, (void*)xtn->epd.data[xtn->ev.buf[n].fd]); muxwcb (moo, mask, (void*)xtn->epd.data[xtn->ev.buf[n].fd]);
#else
# error UNSUPPORTED
#endif #endif
} }
} }
@ -1586,6 +1675,8 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
struct dvpoll dvp; struct dvpoll dvp;
#elif defined(USE_SELECT) #elif defined(USE_SELECT)
struct timeval tv; struct timeval tv;
fd_set rfds, wfds;
int maxfd;
#endif #endif
if (dur) tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec); if (dur) tmout = MOO_SECNSEC_TO_MSEC(dur->sec, dur->nsec);
@ -1600,17 +1691,61 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
#elif defined(USE_POLL) #elif defined(USE_POLL)
memcpy (xtn->ev.buf, xtn->ev.reg.ptr, xtn->ev.reg.len * MOO_SIZEOF(*xtn->ev.buf)); memcpy (xtn->ev.buf, xtn->ev.reg.ptr, xtn->ev.reg.len * MOO_SIZEOF(*xtn->ev.buf));
n = poll (xtn->ev.buf, xtn->ev.reg.len, tmout); n = poll (xtn->ev.buf, xtn->ev.reg.len, tmout);
if (n > 0)
{
/* compact the return buffer as poll() doesn't */
moo_oow_t i, j;
for (i = 0, j = 0; i < xtn->ev.reg.len && j < n; i++)
{
if (xtn->ev.buf[i].revents)
{
if (j != i) xtn->ev.buf[j] = xtn->ev.buf[i];
j++;
}
}
n = j;
}
#elif defined(USE_SELECT) #elif defined(USE_SELECT)
tv.tv_sec = dur->sec; if (dur)
tv.tv_usec = MOO_NSEC_TO_USEC(dur->nsec); {
xtn->ev.buf.rfds = xtn->ev.reg.rfds; tv.tv_sec = dur->sec;
xtn->ev.buf.wfds = xtn->ev.reg.wfds; tv.tv_usec = MOO_NSEC_TO_USEC(dur->nsec);
n = select (xtn->ev.reg.maxfd + 1, &xtn->ev.buf.rfds, &xtn->ev.buf.wfds, NULL, &tv); }
else
{
tv.tv_sec = 0;
tv.tv_usec = 0;
}
maxfd = xtn->ev.reg.maxfd;
MOO_MEMCPY (&rfds, &xtn->ev.reg.rfds, MOO_SIZEOF(rfds));
MOO_MEMCPY (&wfds, &xtn->ev.reg.wfds, MOO_SIZEOF(wfds));
n = select (maxfd + 1, &rfds, &wfds, NULL, &tv);
if (n > 0)
{
int fd, count = 0;
for (fd = 0; fd <= maxfd; fd++)
{
int events = 0;
if (FD_ISSET(fd, &rfds)) events |= XPOLLIN;
if (FD_ISSET(fd, &wfds)) events |= XPOLLOUT;
if (events)
{
MOO_ASSERT (moo, count < MOO_COUNTOF(xtn->ev.buf));
xtn->ev.buf[count].fd = fd;
xtn->ev.buf[count].events = events;
count++;
}
}
n = count;
MOO_ASSERT (moo, n > 0);
}
#endif #endif
if (n <= -1) if (n <= -1)
{ {
MOO_DEBUG2 (moo, "Warning: epoll_wait failure - %d, %hs\n", errno, strerror(errno)); MOO_DEBUG2 (moo, "Warning: multiplexer wait failure - %d, %hs\n", errno, strerror(errno));
} }
else else
{ {
@ -1627,12 +1762,14 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
--n; --n;
#if defined(USE_DEVPOLL) || defined(USE_POLL) #if defined(USE_DEVPOLL)
revents = xtn->ev.buf[n].revents; revents = xtn->ev.buf[n].revents;
#elif defined(USE_EPOLL) #elif defined(USE_EPOLL)
revents = xtn->ev.buf[n].events; revents = xtn->ev.buf[n].events;
#elif defined(USE_POLL)
revents = xtn->ev.buf[n].revents;
#elif defined(USE_SELECT) #elif defined(USE_SELECT)
#error TODO: ... revents = xtn->ev.buf[n].events;
#else #else
revents = 0; /* TODO: fake. unsupported */ revents = 0; /* TODO: fake. unsupported */
#endif #endif
@ -1643,16 +1780,18 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
if (revents & XPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; if (revents & XPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR;
if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP;
#if defined(USE_DEVPOLL) || defined(USE_POLL) #if defined(USE_DEVPOLL)
MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd); MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd);
muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]); muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]);
#elif defined(USE_EPOLL) #elif defined(USE_EPOLL)
muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);
#elif defined(USE_POLL)
MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd);
muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]);
#elif defined(USE_SELECT) #elif defined(USE_SELECT)
#error TODO... muxwcb (moo, mask, (void*)xtn->epd.data[xtn->ev.buf[n].fd]);
muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);
#else #else
/* TODO: fake. unsupported */ # error UNSUPPORTED
#endif #endif
} }