From 3b5f059569b73b9c909927768f51ea985356ce6f Mon Sep 17 00:00:00 2001 From: "hyunghwan.chung" Date: Wed, 27 Sep 2017 06:48:01 +0000 Subject: [PATCH] added more code to use 'select' as a multiplexer --- moo/lib/main.c | 110 ++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 94 insertions(+), 16 deletions(-) diff --git a/moo/lib/main.c b/moo/lib/main.c index 2e4406a..328d6a3 100644 --- a/moo/lib/main.c +++ b/moo/lib/main.c @@ -168,6 +168,14 @@ struct bb_t moo_bch_t* fn; }; +#if defined(USE_SELECT) +struct select_fd_t +{ + int fd; + int events; +}; +#endif + typedef struct xtn_t xtn_t; struct xtn_t { @@ -188,6 +196,11 @@ struct xtn_t moo_oow_t capa; moo_ooi_t* ptr; } epd; + #elif defined(USE_SELECT) + struct + { + void* data[FD_SETSIZE]; + } epd; #endif #if defined(USE_THREAD) @@ -222,9 +235,12 @@ struct xtn_t fd_set wfds; int maxfd; } reg; + + struct select_fd_t buf[FD_SETSIZE]; #endif moo_oow_t len; + #if defined(USE_THREAD) pthread_mutex_t mtx; pthread_cond_t cnd; @@ -874,6 +890,10 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat FD_SET (fd, &xtn->ev.reg.wfds); if (fd > xtn->ev.reg.maxfd) xtn->ev.reg.maxfd = fd; } + + xtn->epd.data[fd] = (void*)event_data; + return 0; + #else MOO_DEBUG1 (moo, "Cannot add file descriptor %d to poll - not implemented\n", fd); @@ -953,6 +973,7 @@ static int _del_poll_fd (moo_t* moo, int fd) } xtn->ev.reg.maxfd = i; } + return 0; #else @@ -992,6 +1013,7 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat } return 0; + #elif defined(USE_POLL) xtn_t* xtn = (xtn_t*)moo_getxtn(moo); @@ -1005,6 +1027,9 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat xtn->ev.reg.ptr[i].fd = fd; xtn->ev.reg.ptr[i].events = event_mask; xtn->ev.reg.ptr[i].revents = 0; + + MOO_ASSERT (moo, fd < xtn->epd.capa); + xtn->epd.ptr[fd] = event_data; return 0; } } @@ -1030,6 +1055,9 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, moo_oow_t event_dat else FD_CLR (fd, &xtn->ev.reg.wfds); + xtn->epd.data[fd] = (void*)event_data; + return 0; + #else MOO_DEBUG1 (moo, "Cannot modify file descriptor %d in poll - not implemented\n", fd); moo_seterrnum (moo, MOO_ENOIMPL); @@ -1054,6 +1082,8 @@ static void* iothr_main (void* arg) struct dvpoll dvp; #elif defined(USE_SELECT) struct timeval tv; + fd_set rfds; + fd_set wfds; #endif poll_for_event: @@ -1071,7 +1101,28 @@ static void* iothr_main (void* arg) #elif defined(USE_SELECT) tv.tv_sec = 10; tv.tv_usec = 0; - n = select (xtn->ev.reg.maxfd + 1, &xtn->ev.reg.rfds, &xtn->ev.reg.wfds, NULL, &tv); + MOO_MEMCPY (&rfds, &xtn->ev.reg.rfds, MOO_SIZEOF(rfds)); + MOO_MEMCPY (&wfds, &xtn->ev.reg.wfds, MOO_SIZEOF(wfds)); + n = select (xtn->ev.reg.maxfd + 1, &rfds, &wfds, NULL, &tv); + if (n > 0) + { + int fd, count = 0; + for (fd = 0; fd <= xtn->ev.reg.maxfd; fd++) + { + int events = 0; + if (FD_ISSET(fd, &rfds)) events |= XPOLLIN; + if (FD_ISSET(fd, &wfds)) events |= XPOLLOUT; + + if (events) + { + xtn->ev.buf[count].fd = fd; + xtn->ev.buf[count].events = events; + count++; + } + } + n = count; + MOO_ASSERT (moo, n > 0); + } #endif pthread_mutex_lock (&xtn->ev.mtx); @@ -1101,7 +1152,16 @@ static void* iothr_main (void* arg) goto poll_for_event; } + #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_REALTIME) clock_gettime (CLOCK_REALTIME, &ts); + #else + { + struct timeval tv; + gettimeofday (&tv, MOO_NULL); + ts.tv_sec = tv.tv_sec; + ts.tv_nsec = MOO_USEC_TO_NSEC(tv.tv_usec); + } + #endif ts.tv_sec += 10; pthread_cond_timedwait (&xtn->ev.cnd, &xtn->ev.mtx, &ts); pthread_mutex_unlock (&xtn->ev.mtx); @@ -1157,8 +1217,8 @@ static int vm_startup (moo_t* moo) if (flag >= 0) fcntl (xtn->ep, F_SETFD, flag | FD_CLOEXEC); #endif #elif defined(USE_SELECT) - FD_ZERO (xtn->ev.reg.rfds); - FD_ZERO (xtn->ev.reg.wfds); + FD_ZERO (&xtn->ev.reg.rfds); + FD_ZERO (&xtn->ev.reg.wfds); xtn->ev.reg.maxfd = -1; #endif /* USE_DEVPOLL */ @@ -1277,8 +1337,8 @@ static void vm_cleanup (moo_t* moo) xtn->ev.buf = MOO_NULL; } #elif defined(USE_SELECT) - FD_ZERO (xtn->ev.reg.rfds); - FD_ZERO (xtn->ev.reg.wfds); + FD_ZERO (&xtn->ev.reg.rfds); + FD_ZERO (&xtn->ev.reg.wfds); xtn->ev.reg.maxfd = -1; #endif @@ -1433,9 +1493,18 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c if (!dur) return; /* immediate check is requested. and there is no event */ + #if defined(HAVE_CLOCK_GETTIME) && defined(CLOCK_REALTIME) clock_gettime (CLOCK_REALTIME, &ts); ns.sec = ts.tv_sec; ns.nsec = ts.tv_nsec; + #else + { + struct timeval tv; + gettimeofday (&tv, MOO_NULL); + ns.sec = tv.tv_sec; + ns.nsec = MOO_USEC_TO_NSEC(tv.tv_usec); + } + #endif MOO_ADDNTIME (&ns, &ns, dur); ts.tv_sec = ns.sec; ts.tv_nsec = ns.nsec; @@ -1461,6 +1530,8 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c if (xtn->ev.buf[n].fd == xtn->p[0]) #elif defined(USE_EPOLL) if (xtn->ev.buf[n].data.ptr == (void*)MOO_TYPE_MAX(moo_oow_t)) + #elif defined(USE_SELECT) + if (xtn->ev.buf[n].fd == xtn->p[0]) #else # error UNSUPPORTED #endif @@ -1477,13 +1548,13 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c int revents; moo_ooi_t mask; - #if defined(USE_DEVPOLL) || defined(USE_POLL) + #if defined(USE_DEVPOLL) || defined(USE_POLL) revents = xtn->ev.buf[n].revents; - #elif defined(USE_EPOLL) + #elif defined(USE_EPOLL) revents = xtn->ev.buf[n].events; - #else - # error UNSUPPORTED - #endif + #elif defined(USE_SELECT) + revents = xtn->ev.buf[n].events; + #endif mask = 0; if (revents & XPOLLIN) mask |= MOO_SEMAPHORE_IO_MASK_INPUT; @@ -1491,14 +1562,14 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c if (revents & XPOLLERR) mask |= MOO_SEMAPHORE_IO_MASK_ERROR; if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP; - #if defined(USE_DEVPOLL) || defined(USE_POLL) + #if defined(USE_DEVPOLL) || defined(USE_POLL) MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd); muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]); - #elif defined(USE_EPOLL) + #elif defined(USE_EPOLL) muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); - #else - # error UNSUPPORTED - #endif + #elif defined(USE_SELECT) + muxwcb (moo, mask, (void*)xtn->epd.data[xtn->ev.buf[n].fd]); + #endif } } while (n > 0); @@ -1532,7 +1603,9 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c #elif defined(USE_SELECT) tv.tv_sec = dur->sec; tv.tv_usec = MOO_NSEC_TO_USEC(dur->nsec); - n = select (xtn->ev.reg.maxfd + 1, &xtn->ev.reg.rfds, &xtn->ev.reg.wfds, NULL, &tv); + xtn->ev.buf.rfds = xtn->ev.reg.rfds; + xtn->ev.buf.wfds = xtn->ev.reg.wfds; + n = select (xtn->ev.reg.maxfd + 1, &xtn->ev.buf.rfds, &xtn->ev.buf.wfds, NULL, &tv); #endif if (n <= -1) @@ -1558,6 +1631,8 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c revents = xtn->ev.buf[n].revents; #elif defined(USE_EPOLL) revents = xtn->ev.buf[n].events; + #elif defined(USE_SELECT) + #error TODO: ... #else revents = 0; /* TODO: fake. unsupported */ #endif @@ -1573,6 +1648,9 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]); #elif defined(USE_EPOLL) muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); + #elif defined(USE_SELECT) + #error TODO... + muxwcb (moo, mask, xtn->ev.buf[n].data.ptr); #else /* TODO: fake. unsupported */ #endif