added more code to support poll()

This commit is contained in:
hyunghwan.chung 2017-05-07 12:43:52 +00:00
parent 0a804402cd
commit f4a9a9abd8

View File

@ -163,7 +163,7 @@ struct xtn_t
int ep; /* /dev/poll or epoll */ int ep; /* /dev/poll or epoll */
#endif #endif
#if defined(USE_DEVPOLL) #if defined(USE_DEVPOLL) || defined(USE_POLL)
struct struct
{ {
moo_oow_t capa; moo_oow_t capa;
@ -176,6 +176,8 @@ struct xtn_t
pthread_t iothr; pthread_t iothr;
int iothr_up; int iothr_up;
int iothr_abort; int iothr_abort;
#endif
struct struct
{ {
#if defined(USE_DEVPOLL) #if defined(USE_DEVPOLL)
@ -193,25 +195,17 @@ struct xtn_t
moo_oow_t capa; moo_oow_t capa;
moo_oow_t len; moo_oow_t len;
} reg; /* registrar */ } reg; /* registrar */
struct pollfd* buf; struct pollfd* buf;
#endif #endif
moo_oow_t len; moo_oow_t len;
#if defined(USE_THREAD)
pthread_mutex_t mtx; pthread_mutex_t mtx;
pthread_cond_t cnd; pthread_cond_t cnd;
pthread_cond_t cnd2; pthread_cond_t cnd2;
} ev;
#else
struct
{
#if defined(USE_DEVPOLL)
struct pollfd buf[32];
#else
struct epoll_event buf[32]; /*TODO: make it a dynamically allocated memory block depending on the file descriptor added. */
#endif #endif
moo_oow_t len;
} ev; } ev;
#endif
#endif #endif
}; };
@ -745,12 +739,37 @@ if (mask & MOO_LOG_GC) return; /* don't show gc logs */
/* ========================================================================= */ /* ========================================================================= */
static int _add_poll_fd (moo_t* moo, int fd, int event_mask) #if defined(USE_DEVPOLL) || defined(USE_POLL)
static int secure_poll_data_space (moo_t* moo, int fd)
{
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
if (fd >= xtn->epd.capa)
{
moo_oow_t newcapa;
moo_ooi_t* tmp;
newcapa = MOO_ALIGN_POW2 (fd + 1, 256);
tmp = moo_reallocmem (moo, xtn->epd.ptr, newcapa * MOO_SIZEOF(*tmp));
if (!tmp) return -1;
xtn->epd.capa = newcapa;
xtn->epd.ptr = tmp;
}
return 0;
}
#endif
static int _add_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_data)
{ {
xtn_t* xtn = (xtn_t*)moo_getxtn(moo); xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
#if defined(USE_DEVPOLL) #if defined(USE_DEVPOLL)
struct pollfd ev; struct pollfd ev;
if (secure_poll_data_space (moo, fd) <= -1) return -1;
MOO_ASSERT (moo, xtn->ep >= 0); MOO_ASSERT (moo, xtn->ep >= 0);
ev.fd = fd; ev.fd = fd;
ev.events = event_mask; ev.events = event_mask;
@ -763,9 +782,10 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask)
} }
#elif defined(USE_EPOLL) #elif defined(USE_EPOLL)
struct epoll_event ev; struct epoll_event ev;
MOO_ASSERT (moo, xtn->ep >= 0); MOO_ASSERT (moo, xtn->ep >= 0);
ev.events = event_mask; ev.events = event_mask;
ev.data.ptr = (void*)MOO_TYPE_MAX(moo_oow_t); ev.data.ptr = (void*)event_data;
if (epoll_ctl (xtn->ep, EPOLL_CTL_ADD, fd, &ev) == -1) if (epoll_ctl (xtn->ep, EPOLL_CTL_ADD, fd, &ev) == -1)
{ {
moo_syserrtoerrnum (errno); moo_syserrtoerrnum (errno);
@ -773,6 +793,9 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask)
return -1; return -1;
} }
#elif defined(USE_POLL) #elif defined(USE_POLL)
if (secure_poll_data_space (moo, fd) <= -1) return -1;
if (xtn->ev.reg.len >= xtn->ev.reg.capa) if (xtn->ev.reg.len >= xtn->ev.reg.capa)
{ {
struct pollfd* tmp; struct pollfd* tmp;
@ -782,7 +805,7 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask)
tmp = (struct pollfd*)moo_reallocmem (moo, xtn->ev.reg.ptr, newcapa * MOO_SIZEOF(*tmp)); tmp = (struct pollfd*)moo_reallocmem (moo, xtn->ev.reg.ptr, newcapa * MOO_SIZEOF(*tmp));
if (!tmp) if (!tmp)
{ {
MOO_DEBUG2 (moo, "Cannot add file descriptor %d to epoll - %hs\n", fd, strerror(errno)); MOO_DEBUG2 (moo, "Cannot add file descriptor %d to poll - %hs\n", fd, strerror(errno));
return -1; return -1;
} }
@ -849,6 +872,61 @@ static int _del_poll_fd (moo_t* moo, int fd)
#endif #endif
} }
static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_data)
{
#if defined(USE_DEVPOLL)
if (_del_poll_fd (moo, fd) <= -1) return -1;
if (_add_poll_fd (moo, fd, event_mask, event_data) <= -1)
{
/* TODO: any good way to rollback successful deletion? */
return -1;
}
return 0;
#elif defined(USE_EPOLL)
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
struct epoll_event ev;
MOO_ASSERT (moo, xtn->ep >= 0);
ev.events = event_mask;
ev.data.ptr = (void*)event_data;
if (epoll_ctl (xtn->ep, EPOLL_CTL_MOD, fd, &ev) == -1)
{
moo_syserrtoerrnum (errno);
MOO_DEBUG2 (moo, "Cannot modify file descriptor %d in epoll - %hs\n", fd, strerror(errno));
return -1;
}
#elif defined(USE_POLL)
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
moo_oow_t i;
for (i = 0; i < xtn->ev.reg.len; i++)
{
if (xtn->ev.reg.ptr[i].fd == fd)
{
memmove (&xtn->ev.reg.ptr[i], &xtn->ev.reg.ptr[i+1], (xtn->ev.reg.len - i - 1) * MOO_SIZEOF(*xtn->ev.reg.ptr));
xtn->ev.reg.ptr[i].fd = fd;
xtn->ev.reg.ptr[i].events = event_mask;
xtn->ev.reg.ptr[i].revents = 0;
return 0;
}
}
MOO_DEBUG2 (moo, "Cannot modify file descriptor %d in poll - not found\n", fd);
moo_seterrnum (moo, MOO_ENOENT);
return -1;
#endif
return 0;
}
#if defined(USE_THREAD) #if defined(USE_THREAD)
static void* iothr_main (void* arg) static void* iothr_main (void* arg)
{ {
@ -985,7 +1063,7 @@ static int vm_startup (moo_t* moo)
if (flag >= 0) fcntl (xtn->p[1], F_SETFL, flag | O_NONBLOCK); if (flag >= 0) fcntl (xtn->p[1], F_SETFL, flag | O_NONBLOCK);
#endif #endif
if (_add_poll_fd (moo, xtn->p[0], XPOLLIN) <= -1) goto oops; if (_add_poll_fd (moo, xtn->p[0], XPOLLIN, MOO_TYPE_MAX(moo_oow_t)) <= -1) goto oops;
pthread_mutex_init (&xtn->ev.mtx, MOO_NULL); pthread_mutex_init (&xtn->ev.mtx, MOO_NULL);
pthread_cond_init (&xtn->ev.cnd, MOO_NULL); pthread_cond_init (&xtn->ev.cnd, MOO_NULL);
@ -1131,35 +1209,9 @@ static void vm_gettime (moo_t* moo, moo_ntime_t* now)
# endif # endif
#endif #endif
#if defined(USE_DEVPOLL)
static int secure_devpoll_data_space (moo_t* moo, int fd) static int vm_muxadd (moo_t* moo, moo_oop_semaphore_t sem)
{ {
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
if (fd >= xtn->epd.capa)
{
moo_oow_t newcapa;
moo_ooi_t* tmp;
newcapa = MOO_ALIGN_POW2 (fd + 1, 256);
tmp = moo_reallocmem (moo, xtn->epd.ptr, newcapa * MOO_SIZEOF(*tmp));
if (!tmp) return -1;
xtn->epd.capa = newcapa;
xtn->epd.ptr = tmp;
}
return 0;
}
#endif
static int _mux_add_or_mod (moo_t* moo, moo_oop_semaphore_t sem, int cmd)
{
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
#if defined(USE_DEVPOLL)
struct pollfd ev;
#elif defined(USE_EPOLL)
struct epoll_event ev;
#endif
moo_ooi_t mask; moo_ooi_t mask;
int event_mask; int event_mask;
@ -1179,109 +1231,36 @@ static int _mux_add_or_mod (moo_t* moo, moo_oop_semaphore_t sem, int cmd)
return -1; return -1;
} }
#if defined(USE_DEVPOLL) return _add_poll_fd (moo, MOO_OOP_TO_SMOOI(sem->io_handle), event_mask, MOO_OOP_TO_SMOOI(sem->io_index));
ev.fd = MOO_OOP_TO_SMOOI(sem->io_handle);
if (secure_devpoll_data_space (moo, ev.fd) <= -1)
{
MOO_DEBUG2 (moo, "<vm_muxadd> devpoll data set failure on handle %zd - %hs\n", MOO_OOP_TO_SMOOI(sem->io_handle), strerror(errno));
return -1;
}
if (cmd)
{
ev.events = POLLREMOVE;
ev.revents = 0;
if (write (xtn->ep, &ev, MOO_SIZEOF(ev)) != MOO_SIZEOF(ev))
{
moo_seterrnum (moo, moo_syserrtoerrnum (errno));
MOO_DEBUG2 (moo, "<vm_muxadd> devpoll failure on handle %zd - %hs\n", MOO_OOP_TO_SMOOI(sem->io_handle), strerror(errno));
return -1;
}
}
ev.events = event_mask;
ev.revents = 0;
if (write (xtn->ep, &ev, MOO_SIZEOF(ev)) != MOO_SIZEOF(ev))
{
moo_seterrnum (moo, moo_syserrtoerrnum (errno));
MOO_DEBUG2 (moo, "<vm_muxadd> devpoll failure on handle %zd - %hs\n", MOO_OOP_TO_SMOOI(sem->io_handle), strerror(errno));
return -1;
}
MOO_ASSERT (moo, ev.fd == MOO_OOP_TO_SMOOI(sem->io_handle));
MOO_ASSERT (moo, xtn->epd.capa > ev.fd);
xtn->epd.ptr[ev.fd] = MOO_OOP_TO_SMOOI(sem->io_index);
#elif defined(USE_EPOLL)
/* don't check MOO_SEMAPHORE_IO_MASK_ERROR and MOO_SEMAPHORE_IO_MASK_HANGUP
* as it's implicitly enabled by epoll() */
ev.events = event_mask;
ev.data.ptr = (void*)MOO_OOP_TO_SMOOI(sem->io_index);
if (epoll_ctl (xtn->ep, cmd, MOO_OOP_TO_SMOOI(sem->io_handle), &ev) == -1)
{
moo_seterrnum (moo, moo_syserrtoerrnum (errno));
MOO_DEBUG2 (moo, "<vm_muxadd> epoll_ctl failure on handle %zd - %hs\n", MOO_OOP_TO_SMOOI(sem->io_handle), strerror(errno));
return -1;
}
#elif defined(USE_POLL)
#endif
return 0;
}
static int vm_muxadd (moo_t* moo, moo_oop_semaphore_t sem)
{
#if defined(USE_DEVPOLL)
return _mux_add_or_mod (moo, sem, 0);
#elif defined(USE_EPOLL)
return _mux_add_or_mod (moo, sem, EPOLL_CTL_ADD);
#endif
} }
static int vm_muxmod (moo_t* moo, moo_oop_semaphore_t sem) static int vm_muxmod (moo_t* moo, moo_oop_semaphore_t sem)
{ {
#if defined(USE_DEVPOLL) moo_ooi_t mask;
return _mux_add_or_mod (moo, sem, 1); int event_mask;
#elif defined(USE_EPOLL)
return _mux_add_or_mod (moo, sem, EPOLL_CTL_MOD);
#endif
}
static int vm_muxdel (moo_t* moo, moo_oop_semaphore_t sem)
{
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
#if defined(USE_DEVPOLL)
struct pollfd ev;
#else
struct epoll_event ev;
#endif
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_index)); MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_index));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_handle)); MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_handle));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_mask)); MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->io_mask));
#if defined(USE_DEVPOLL) mask = MOO_OOP_TO_SMOOI(sem->io_mask);
ev.fd = MOO_OOP_TO_SMOOI(sem->io_handle); event_mask = 0; /*EPOLLET; */ /* TODO: use edge trigger(EPOLLLET)? */
ev.events = POLLREMOVE; if (mask & MOO_SEMAPHORE_IO_MASK_INPUT) event_mask |= XPOLLIN;
ev.revents = 0; if (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT) event_mask |= XPOLLOUT;
if (write (xtn->ep, &ev, MOO_SIZEOF(ev)) != MOO_SIZEOF(ev))
{
moo_seterrnum (moo, moo_syserrtoerrnum (errno));
MOO_DEBUG2 (moo, "<vm_muxdel> devpoll failure on handle %zd - %hs\n", MOO_OOP_TO_SMOOI(sem->io_handle), strerror(errno));
return -1;
}
#elif defined(USE_EPOLL)
if (epoll_ctl (xtn->ep, EPOLL_CTL_DEL, MOO_OOP_TO_SMOOI(sem->io_handle), &ev) == -1)
{
moo_seterrnum (moo, moo_syserrtoerrnum (errno));
MOO_DEBUG2 (moo, "<vm_muxdel> epoll_ctl failure on handle %zd - %hs\n", MOO_OOP_TO_SMOOI(sem->io_handle), strerror(errno));
return -1;
}
#endif
return 0; if (event_mask == 0)
{
MOO_DEBUG2 (moo, "<vm_muxadd> Invalid semaphore mask %zd on handle %zd\n", mask, MOO_OOP_TO_SMOOI(sem->io_handle));
moo_seterrnum (moo, MOO_EINVAL);
return -1;
}
return _mod_poll_fd (moo, MOO_OOP_TO_SMOOI(sem->io_handle), event_mask, MOO_OOP_TO_SMOOI(sem->io_index));
}
static int vm_muxdel (moo_t* moo, moo_oop_semaphore_t sem)
{
return _del_poll_fd (moo, MOO_OOP_TO_SMOOI(sem->io_handle));
} }
static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_cb_t muxwcb) static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_cb_t muxwcb)
@ -1334,10 +1313,12 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
{ {
--n; --n;
#if defined(USE_DEVPOLL) #if defined(USE_DEVPOLL) || defined(USE_POLL)
if (xtn->ev.buf[n].fd == xtn->p[0]) if (xtn->ev.buf[n].fd == xtn->p[0])
#else #elif defined(USE_EPOLL)
if (xtn->ev.buf[n].data.ptr == (void*)MOO_TYPE_MAX(moo_oow_t)) if (xtn->ev.buf[n].data.ptr == (void*)MOO_TYPE_MAX(moo_oow_t))
#else
# error UNSUPPORTED
#endif #endif
{ {
moo_uint8_t u8; moo_uint8_t u8;
@ -1352,10 +1333,12 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
int revents; int revents;
moo_ooi_t mask; moo_ooi_t mask;
#if defined(USE_DEVPOLL) #if defined(USE_DEVPOLL) || defined(USE_POLL)
revents = xtn->ev.buf[n].revents; revents = xtn->ev.buf[n].revents;
#else #elif defined(USE_EPOLL)
revents = xtn->ev.buf[n].events; revents = xtn->ev.buf[n].events;
#else
# error UNSUPPORTED
#endif #endif
mask = 0; mask = 0;
@ -1364,11 +1347,13 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
if (revents & XPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; if (revents & XPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR;
if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP;
#if defined(USE_DEVPOLL) #if defined(USE_DEVPOLL) || defined(USE_POLL)
MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd); MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd);
muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]); muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]);
#else #elif defined(USE_EPOLL)
muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);
#else
# error UNSUPPORTED
#endif #endif
} }
} }
@ -1406,10 +1391,12 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
--n; --n;
#if defined(USE_DEVPOLL) #if defined(USE_DEVPOLL) || defined(USE_POLL)
revents = xtn->ev.buf[n].revents; revents = xtn->ev.buf[n].revents;
#elif defined(USE_EPOLL)
revents = xtn->ev.buf[n].events;
#else #else
revetns = xtn->ev.buf[n].events; # error UNSUPPORTED
#endif #endif
mask = 0; mask = 0;
@ -1418,11 +1405,13 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
if (revents & XPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; if (revents & XPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR;
if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP;
#if defined(USE_DEVPOLL) #if defined(USE_DEVPOLL) || defined(USE_POLL)
MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd); MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd);
muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]); muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]);
#else #elif defined(USE_EPOLL)
muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);
#else
# error UNSUPPORTED
#endif #endif
} }
@ -1623,8 +1612,10 @@ static void handle_term (int sig)
{ {
if (g_moo) if (g_moo)
{ {
#if defined(USE_THREAD)
xtn_t* xtn = moo_getxtn(g_moo); xtn_t* xtn = moo_getxtn(g_moo);
write (xtn->p[1], "Q", 1); write (xtn->p[1], "Q", 1);
#endif
moo_abort (g_moo); moo_abort (g_moo);
} }
} }