added more code to use 'select' as a multiplexer

This commit is contained in:
hyunghwan.chung 2017-09-27 06:48:01 +00:00
parent 64e1cee44a
commit 3b5f059569

View File

@ -168,6 +168,14 @@ struct bb_t
moo_bch_t* fn;
};
#if defined(USE_SELECT)
struct select_fd_t
{
int fd;
int events;
};
#endif
typedef struct xtn_t xtn_t;
struct xtn_t
{
@ -188,6 +196,11 @@ struct xtn_t
moo_oow_t capa;
moo_ooi_t* ptr;
} epd;
#elif defined(USE_SELECT)
struct
{
void* data[FD_SETSIZE];
} epd;
#endif
#if defined(USE_THREAD)
@ -222,9 +235,12 @@ struct xtn_t
fd_set wfds;
int maxfd;
} reg;
struct select_fd_t buf[FD_SETSIZE];
#endif
moo_oow_t len;
#if defined(USE_THREAD)
pthread_mutex_t mtx;
pthread_cond_t cnd;
@ -874,6 +890,10 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat
FD_SET (fd, &xtn->ev.reg.wfds);
if (fd > xtn->ev.reg.maxfd) xtn->ev.reg.maxfd = fd;
}
xtn->epd.data[fd] = (void*)event_data;
return 0;
#else
MOO_DEBUG1 (moo, "Cannot add file descriptor %d to poll - not implemented\n", fd);
@ -953,6 +973,7 @@ static int _del_poll_fd (moo_t* moo, int fd)
}
xtn->ev.reg.maxfd = i;
}
return 0;
#else
@ -992,6 +1013,7 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat
}
return 0;
#elif defined(USE_POLL)
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
@ -1005,6 +1027,9 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat
xtn->ev.reg.ptr[i].fd = fd;
xtn->ev.reg.ptr[i].events = event_mask;
xtn->ev.reg.ptr[i].revents = 0;
MOO_ASSERT (moo, fd < xtn->epd.capa);
xtn->epd.ptr[fd] = event_data;
return 0;
}
}
@ -1030,6 +1055,9 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat
else
FD_CLR (fd, &xtn->ev.reg.wfds);
xtn->epd.data[fd] = (void*)event_data;
return 0;
#else
MOO_DEBUG1 (moo, "Cannot modify file descriptor %d in poll - not implemented\n", fd);
moo_seterrnum (moo, MOO_ENOIMPL);
@ -1054,6 +1082,8 @@ static void* iothr_main (void* arg)
struct dvpoll dvp;
#elif defined(USE_SELECT)
struct timeval tv;
fd_set rfds;
fd_set wfds;
#endif
poll_for_event:
@ -1071,7 +1101,28 @@ static void* iothr_main (void* arg)
#elif defined(USE_SELECT)
tv.tv_sec = 10;
tv.tv_usec = 0;
n = select (xtn->ev.reg.maxfd + 1, &xtn->ev.reg.rfds, &xtn->ev.reg.wfds, NULL, &tv);
MOO_MEMCPY (&rfds, &xtn->ev.reg.rfds, MOO_SIZEOF(rfds));
MOO_MEMCPY (&wfds, &xtn->ev.reg.wfds, MOO_SIZEOF(wfds));
n = select (xtn->ev.reg.maxfd + 1, &rfds, &wfds, NULL, &tv);
if (n > 0)
{
int fd, count = 0;
for (fd = 0; fd <= xtn->ev.reg.maxfd; fd++)
{
int events = 0;
if (FD_ISSET(fd, &rfds)) events |= XPOLLIN;
if (FD_ISSET(fd, &wfds)) events |= XPOLLOUT;
if (events)
{
xtn->ev.buf[count].fd = fd;
xtn->ev.buf[count].events = events;
count++;
}
}
n = count;
MOO_ASSERT (moo, n > 0);
}
#endif
pthread_mutex_lock (&xtn->ev.mtx);
@ -1101,7 +1152,16 @@ static void* iothr_main (void* arg)
goto poll_for_event;
}
#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_REALTIME)
clock_gettime (CLOCK_REALTIME, &ts);
#else
{
struct timeval tv;
gettimeofday (&tv, MOO_NULL);
ts.tv_sec = tv.tv_sec;
ts.tv_nsec = MOO_USEC_TO_NSEC(tv.tv_usec);
}
#endif
ts.tv_sec += 10;
pthread_cond_timedwait (&xtn->ev.cnd, &xtn->ev.mtx, &ts);
pthread_mutex_unlock (&xtn->ev.mtx);
@ -1157,8 +1217,8 @@ static int vm_startup (moo_t* moo)
if (flag >= 0) fcntl (xtn->ep, F_SETFD, flag | FD_CLOEXEC);
#endif
#elif defined(USE_SELECT)
FD_ZERO (xtn->ev.reg.rfds);
FD_ZERO (xtn->ev.reg.wfds);
FD_ZERO (&xtn->ev.reg.rfds);
FD_ZERO (&xtn->ev.reg.wfds);
xtn->ev.reg.maxfd = -1;
#endif /* USE_DEVPOLL */
@ -1277,8 +1337,8 @@ static void vm_cleanup (moo_t* moo)
xtn->ev.buf = MOO_NULL;
}
#elif defined(USE_SELECT)
FD_ZERO (xtn->ev.reg.rfds);
FD_ZERO (xtn->ev.reg.wfds);
FD_ZERO (&xtn->ev.reg.rfds);
FD_ZERO (&xtn->ev.reg.wfds);
xtn->ev.reg.maxfd = -1;
#endif
@ -1433,9 +1493,18 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
if (!dur) return; /* immediate check is requested. and there is no event */
#if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_REALTIME)
clock_gettime (CLOCK_REALTIME, &ts);
ns.sec = ts.tv_sec;
ns.nsec = ts.tv_nsec;
#else
{
struct timeval tv;
gettimeofday (&tv, MOO_NULL);
ns.sec = tv.tv_sec;
ns.nsec = MOO_USEC_TO_NSEC(tv.tv_usec);
}
#endif
MOO_ADDNTIME (&ns, &ns, dur);
ts.tv_sec = ns.sec;
ts.tv_nsec = ns.nsec;
@ -1461,6 +1530,8 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
if (xtn->ev.buf[n].fd == xtn->p[0])
#elif defined(USE_EPOLL)
if (xtn->ev.buf[n].data.ptr == (void*)MOO_TYPE_MAX(moo_oow_t))
#elif defined(USE_SELECT)
if (xtn->ev.buf[n].fd == xtn->p[0])
#else
# error UNSUPPORTED
#endif
@ -1477,13 +1548,13 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
int revents;
moo_ooi_t mask;
#if defined(USE_DEVPOLL) || defined(USE_POLL)
#if defined(USE_DEVPOLL) || defined(USE_POLL)
revents = xtn->ev.buf[n].revents;
#elif defined(USE_EPOLL)
#elif defined(USE_EPOLL)
revents = xtn->ev.buf[n].events;
#else
# error UNSUPPORTED
#endif
#elif defined(USE_SELECT)
revents = xtn->ev.buf[n].events;
#endif
mask = 0;
if (revents & XPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT;
@ -1491,14 +1562,14 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
if (revents & XPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR;
if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP;
#if defined(USE_DEVPOLL) || defined(USE_POLL)
#if defined(USE_DEVPOLL) || defined(USE_POLL)
MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd);
muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]);
#elif defined(USE_EPOLL)
#elif defined(USE_EPOLL)
muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);
#else
# error UNSUPPORTED
#endif
#elif defined(USE_SELECT)
muxwcb (moo, mask, (void*)xtn->epd.data[xtn->ev.buf[n].fd]);
#endif
}
}
while (n > 0);
@ -1532,7 +1603,9 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
#elif defined(USE_SELECT)
tv.tv_sec = dur->sec;
tv.tv_usec = MOO_NSEC_TO_USEC(dur->nsec);
n = select (xtn->ev.reg.maxfd + 1, &xtn->ev.reg.rfds, &xtn->ev.reg.wfds, NULL, &tv);
xtn->ev.buf.rfds = xtn->ev.reg.rfds;
xtn->ev.buf.wfds = xtn->ev.reg.wfds;
n = select (xtn->ev.reg.maxfd + 1, &xtn->ev.buf.rfds, &xtn->ev.buf.wfds, NULL, &tv);
#endif
if (n <= -1)
@ -1558,6 +1631,8 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
revents = xtn->ev.buf[n].revents;
#elif defined(USE_EPOLL)
revents = xtn->ev.buf[n].events;
#elif defined(USE_SELECT)
#error TODO: ...
#else
revents = 0; /* TODO: fake. unsupported */
#endif
@ -1573,6 +1648,9 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]);
#elif defined(USE_EPOLL)
muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);
#elif defined(USE_SELECT)
#error TODO...
muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);
#else
/* TODO: fake. unsupported */
#endif