cleaned up code related to IO semaphores and multiplexing

This commit is contained in:
hyunghwan.chung 2017-12-26 15:55:06 +00:00
parent ded869708d
commit 3c6b73b2b5
5 changed files with 93 additions and 101 deletions

View File

@ -46,7 +46,7 @@ class MyObject(Object)
[ sg wait. ] fork.
[ sg wait. ] fork.
##System sleepForSecs: 1.
System sleepForSecs: 1.
sg wait.
sg removeSemaphore: s1.
'********** END OF TESTER *************' dump.

View File

@ -135,7 +135,7 @@ static MOO_INLINE int vm_startup (moo_t* moo)
MOO_DEBUG0 (moo, "VM started up\n");
for (i = 0; i < MOO_COUNTOF(moo->sem_io_map); i++)
for (i = 0; i < moo->sem_io_map_capa; i++)
{
moo->sem_io_map[i] = -1;
}
@ -153,7 +153,7 @@ static MOO_INLINE void vm_cleanup (moo_t* moo)
/* TODO: clean up semaphores being waited on
MOO_ASSERT (moo, moo->sem_io_wait_count == 0); */
for (i = 0; i < MOO_COUNTOF(moo->sem_io_map);)
for (i = 0; i < moo->sem_io_map_capa;)
{
moo_ooi_t sem_io_index;
if ((sem_io_index = moo->sem_io_map[i]) >= 0)
@ -1114,13 +1114,36 @@ static int add_to_sem_io (moo_t* moo, moo_oop_semaphore_t sem, moo_ooi_t io_hand
MOO_ASSERT (moo, sem->io_handle == (moo_oop_t)moo->_nil);
MOO_ASSERT (moo, sem->io_type == (moo_oop_t)moo->_nil);
if (io_handle < 0 || io_handle >= MOO_COUNTOF(moo->sem_io_map)) /* TODO: change the coindition when sem_io_map changes to a dynamic structure */
if (io_handle < 0)
{
moo_seterrbfmt (moo, MOO_EINVAL, "handle %zd out of supported range", io_handle);
moo_seterrbfmt (moo, MOO_EINVAL, "handle %zd ouit of supported range", io_handle);
return -1;
}
index = moo->sem_io_map[io_handle]; /* TODO: make it dynamic */
if (io_handle >= moo->sem_io_map_capa)
{
moo_oow_t new_capa, i;
moo_ooi_t* tmp;
/* TODO: specify the maximum io_handle supported and check it here? */
new_capa = MOO_ALIGN_POW2 (io_handle + 1, 1024);
tmp = moo_reallocmem (moo, moo->sem_io_map, MOO_SIZEOF(*tmp) * new_capa);
if (!tmp)
{
moo_copyoocstr (moo->errmsg.buf2, MOO_COUNTOF(moo->errmsg.buf2), moo->errmsg.buf);
moo_seterrbfmt (moo, MOO_EINVAL, "handle %zd out of supported range - %js", moo->errmsg.buf2);
return -1;
}
for (i = moo->sem_io_map_capa; i < new_capa; i++) tmp[i] = -1;
moo->sem_io_map = tmp;
moo->sem_io_map_capa = new_capa;
}
index = moo->sem_io_map[io_handle];
if (index <= -1)
{
/* this handle is not in any tuples. add it to a new tuple */
@ -1162,7 +1185,7 @@ static int add_to_sem_io (moo_t* moo, moo_oop_semaphore_t sem, moo_ooi_t io_hand
new_mask = ((moo_ooi_t)1 << io_type);
moo_pushtmp (moo, (moo_oop_t*)&sem);
n = moo->vmprim.vm_muxadd(moo, io_handle, new_mask, (void*)index);
n = moo->vmprim.vm_muxadd(moo, io_handle, new_mask);
moo_poptmp (moo);
}
else
@ -1177,7 +1200,7 @@ static int add_to_sem_io (moo_t* moo, moo_oop_semaphore_t sem, moo_ooi_t io_hand
new_mask |= ((moo_ooi_t)1 << io_type);
moo_pushtmp (moo, (moo_oop_t*)&sem);
n = moo->vmprim.vm_muxmod(moo, io_handle, new_mask, (void*)index);
n = moo->vmprim.vm_muxmod(moo, io_handle, new_mask);
moo_poptmp (moo);
}
@ -1230,7 +1253,7 @@ static int delete_from_sem_io (moo_t* moo, moo_oop_semaphore_t sem)
MOO_ASSERT (moo, index >= 0 && index < moo->sem_io_tuple_count);
io_handle = MOO_OOP_TO_SMOOI(sem->io_handle);
if (io_handle < 0 || io_handle >= MOO_COUNTOF(moo->sem_io_map)) /* TODO: change the condition when sem_io_map changes to a dynamic structure */
if (io_handle < 0 || io_handle >= moo->sem_io_map_capa)
{
moo_seterrbfmt (moo, MOO_EINVAL, "handle %zd out of supported range", io_handle);
return -1;
@ -1243,7 +1266,7 @@ static int delete_from_sem_io (moo_t* moo, moo_oop_semaphore_t sem)
new_mask &= ~((moo_ooi_t)1 << io_type); /* this is the new mask after deletion */
moo_pushtmp (moo, (moo_oop_t*)&sem);
x = new_mask? moo->vmprim.vm_muxmod(moo, io_handle, new_mask, (void*)index):
x = new_mask? moo->vmprim.vm_muxmod(moo, io_handle, new_mask):
moo->vmprim.vm_muxdel(moo, io_handle);
moo_poptmp (moo);
if (x <= -1)
@ -1322,7 +1345,7 @@ static void _signal_io_semaphore (moo_t* moo, moo_oop_semaphore_t sem)
static void signal_io_semaphore (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask)
{
if (io_handle >= 0 && io_handle < MOO_COUNTOF(moo->sem_io_map) && moo->sem_io_map[io_handle] >= 0)
if (io_handle >= 0 && io_handle < moo->sem_io_map_capa && moo->sem_io_map[io_handle] >= 0)
{
moo_oop_semaphore_t sem;
moo_ooi_t sem_io_index;

View File

@ -204,24 +204,12 @@ struct xtn_t
#if defined(USE_DEVPOLL)
int ep; /* /dev/poll */
struct
{
moo_oow_t capa;
moo_ooi_t* ptr;
} epd;
#elif defined(USE_EPOLL)
int ep; /* epoll */
#elif defined(USE_POLL)
struct
{
moo_oow_t capa;
moo_ooi_t* ptr;
} epd;
/* nothing */
#elif defined(USE_SELECT)
struct
{
void* data[FD_SETSIZE];
} epd;
/* nothing */
#endif
#if defined(USE_THREAD)
@ -823,46 +811,12 @@ static void log_write (moo_t* moo, moo_oow_t mask, const moo_ooch_t* msg, moo_oo
}
/* ========================================================================= */
#if defined(USE_DEVPOLL) || defined(USE_POLL)
static MOO_INLINE int secure_poll_data_space (moo_t* moo, int fd)
{
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
if (fd >= xtn->epd.capa)
{
moo_oow_t newcapa;
moo_ooi_t* tmp;
newcapa = MOO_ALIGN_POW2 (fd + 1, 256);
tmp = moo_reallocmem (moo, xtn->epd.ptr, newcapa * MOO_SIZEOF(*tmp));
if (!tmp) return -1;
xtn->epd.capa = newcapa;
xtn->epd.ptr = tmp;
}
return 0;
}
static MOO_INLINE void destroy_poll_data_space (moo_t* moo)
{
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
if (xtn->epd.ptr)
{
moo_freemem (moo, xtn->epd.ptr);
xtn->epd.ptr = MOO_NULL;
xtn->epd.capa = 0;
}
}
#endif
static int _add_poll_fd (moo_t* moo, int fd, int event_mask, void* event_data)
static int _add_poll_fd (moo_t* moo, int fd, int event_mask)
{
#if defined(USE_DEVPOLL)
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
struct pollfd ev;
if (secure_poll_data_space (moo, fd) <= -1) return -1;
MOO_ASSERT (moo, xtn->ep >= 0);
ev.fd = fd;
ev.events = event_mask;
@ -874,7 +828,6 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, void* event_data)
return -1;
}
xtn->epd.ptr[fd] = event_data;
return 0;
#elif defined(USE_EPOLL)
@ -894,7 +847,6 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, void* event_data)
ev.data.fd = fd;
if (epoll_ctl(xtn->ep, EPOLL_CTL_ADD, fd, &ev) == -1)
{
moo_seterrwithsyserr (moo, errno);
MOO_DEBUG2 (moo, "Cannot add file descriptor %d to epoll - %hs\n", fd, strerror(errno));
return -1;
@ -904,8 +856,6 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, void* event_data)
#elif defined(USE_POLL)
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
if (secure_poll_data_space (moo, fd) <= -1) return -1;
MUTEX_LOCK (&xtn->ev.reg.pmtx);
if (xtn->ev.reg.len >= xtn->ev.reg.capa)
{
@ -935,7 +885,6 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, void* event_data)
xtn->ev.reg.len++;
MUTEX_UNLOCK (&xtn->ev.reg.pmtx);
xtn->epd.ptr[fd] = event_data;
return 0;
#elif defined(USE_SELECT)
@ -954,7 +903,6 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask, void* event_data)
}
MUTEX_UNLOCK (&xtn->ev.reg.smtx);
xtn->epd.data[fd] = (void*)event_data;
return 0;
#else
@ -1041,7 +989,6 @@ static int _del_poll_fd (moo_t* moo, int fd)
}
MUTEX_UNLOCK (&xtn->ev.reg.smtx);
/* keep xtn->epd.data[fd] so that the data is still accessible after deletion */
return 0;
#else
@ -1053,7 +1000,7 @@ static int _del_poll_fd (moo_t* moo, int fd)
}
static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, void* event_data)
static int _mod_poll_fd (moo_t* moo, int fd, int event_mask)
{
#if defined(USE_DEVPOLL)
@ -1074,7 +1021,6 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, void* event_data)
MOO_ASSERT (moo, xtn->ep >= 0);
memset (&ev, 0, MOO_SIZEOF(ev));
ev.events = event_mask;
/*ev.data.ptr = (void*)event_data;*/
ev.data.fd = fd;
if (epoll_ctl (xtn->ep, EPOLL_CTL_MOD, fd, &ev) == -1)
{
@ -1101,8 +1047,6 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, void* event_data)
xtn->ev.reg.ptr[i].revents = 0;
MUTEX_UNLOCK (&xtn->ev.reg.pmtx);
MOO_ASSERT (moo, fd < xtn->epd.capa);
xtn->epd.ptr[fd] = event_data;
return 0;
}
}
@ -1130,7 +1074,6 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask, void* event_data)
FD_CLR (fd, &xtn->ev.reg.wfds);
MUTEX_UNLOCK (&xtn->ev.reg.smtx);
xtn->epd.data[fd] = (void*)event_data;
return 0;
#else
@ -1218,7 +1161,7 @@ static int vm_startup (moo_t* moo)
if (flag >= 0) fcntl (xtn->p[1], F_SETFL, flag | O_NONBLOCK);
#endif
if (_add_poll_fd(moo, xtn->p[0], XPOLLIN, (void*)MOO_TYPE_MAX(moo_oow_t)) <= -1) goto oops;
if (_add_poll_fd(moo, xtn->p[0], XPOLLIN) <= -1) goto oops;
pthread_mutex_init (&xtn->ev.mtx, MOO_NULL);
pthread_cond_init (&xtn->ev.cnd, MOO_NULL);
@ -1259,7 +1202,7 @@ oops:
static void vm_cleanup (moo_t* moo)
{
#if defined(_WIN32)
xtn_t* xtn = (xtn_t*)moo_getxtn(moo);
xtn_t* xtn = (xatn_t*)moo_getxtn(moo);
if (xtn->waitable_timer)
{
CloseHandle (xtn->waitable_timer);
@ -1294,7 +1237,7 @@ static void vm_cleanup (moo_t* moo)
close (xtn->ep);
xtn->ep = -1;
}
destroy_poll_data_space (moo);
/*destroy_poll_data_space (moo);*/
#elif defined(USE_EPOLL)
if (xtn->ep >= 0)
{
@ -1314,7 +1257,7 @@ static void vm_cleanup (moo_t* moo)
moo_freemem (moo, xtn->ev.buf);
xtn->ev.buf = MOO_NULL;
}
destroy_poll_data_space (moo);
/*destroy_poll_data_space (moo);*/
MUTEX_DESTROY (&xtn->ev.reg.pmtx);
#elif defined(USE_SELECT)
FD_ZERO (&xtn->ev.reg.rfds);
@ -1386,7 +1329,7 @@ static void vm_gettime (moo_t* moo, moo_ntime_t* now)
#endif
static int vm_muxadd (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask, void* ctx)
static int vm_muxadd (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask)
{
int event_mask;
@ -1401,10 +1344,10 @@ static int vm_muxadd (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask, void* ctx
return -1;
}
return _add_poll_fd (moo, io_handle, event_mask, ctx);
return _add_poll_fd (moo, io_handle, event_mask);
}
static int vm_muxmod (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask, void* ctx)
static int vm_muxmod (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask)
{
int event_mask;
@ -1419,7 +1362,7 @@ static int vm_muxmod (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask, void* ctx
return -1;
}
return _mod_poll_fd (moo, io_handle, event_mask, ctx);
return _mod_poll_fd (moo, io_handle, event_mask);
}
static int vm_muxdel (moo_t* moo, moo_ooi_t io_handle)
@ -1661,16 +1604,13 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP;
#if defined(USE_DEVPOLL)
MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd);
muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]);
muxwcb (moo, xtn->ev.buf[n].fd, mask);
#elif defined(USE_EPOLL)
/*muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);*/
muxwcb (moo, xtn->ev.buf[n].data.fd, mask);
#elif defined(USE_POLL)
MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd);
muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]);
muxwcb (moo, xtn->ev.buf[n].fd, mask);
#elif defined(USE_SELECT)
muxwcb (moo, mask, (void*)xtn->epd.data[xtn->ev.buf[n].fd]);
muxwcb (moo, xtn->ev.buf[n].fd, mask);
#else
# error UNSUPPORTED
#endif
@ -1738,7 +1678,7 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
if (n > 0)
{
int fd, count = 0;
for (fd = 0; fd <= maxfd; fd++)
for (fd = 0; fd <= maxfd; fd++)
{
int events = 0;
if (FD_ISSET(fd, &rfds)) events |= XPOLLIN;
@ -1796,16 +1736,13 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
if (revents & XPOLLHUP) mask |= MOO_SEMAPHORE_IO_MASK_HANGUP;
#if defined(USE_DEVPOLL)
MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd);
muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]);
muxwcb (moo, xtn->ev.buf[n].fd, mask);
#elif defined(USE_EPOLL)
/*muxwcb (moo, mask, xtn->ev.buf[n].data.ptr);*/
muxwcb (moo, xtn->ev.buf[n].data.fd, mask);
#elif defined(USE_POLL)
MOO_ASSERT (moo, xtn->epd.capa > xtn->ev.buf[n].fd);
muxwcb (moo, mask, (void*)xtn->epd.ptr[xtn->ev.buf[n].fd]);
muxwcb (moo, xtn->ev.buf[n].fd, mask);
#elif defined(USE_SELECT)
muxwcb (moo, mask, (void*)xtn->epd.data[xtn->ev.buf[n].fd]);
muxwcb (moo, xtn->ev.buf[n].fd, mask);
#endif
}

View File

@ -183,6 +183,12 @@ void moo_fini (moo_t* moo)
moo->sem_io_tuple_count = 0;
}
if (moo->sem_io_map)
{
moo_freemem (moo, moo->sem_io_map);
moo->sem_io_map_capa = 0;
}
if (moo->proc_map)
{
moo_freemem (moo, moo->proc_map);

View File

@ -937,14 +937,40 @@ typedef int (*moo_vmprim_startup_t) (moo_t* moo);
typedef void (*moo_vmprim_cleanup_t) (moo_t* moo);
typedef void (*moo_vmprim_gettime_t) (moo_t* moo, moo_ntime_t* now);
typedef int (*moo_vmprim_muxadd_t) (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask, void* ctx);
typedef int (*moo_vmprim_muxmod_t) (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask, void* ctx);
typedef int (*moo_vmprim_muxdel_t) (moo_t* moo, moo_ooi_t sem);
typedef int (*moo_vmprim_muxadd_t) (
moo_t* moo,
moo_ooi_t io_handle,
moo_ooi_t masks
);
typedef void (*moo_vmprim_muxwait_cb_t) (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask);
typedef void (*moo_vmprim_muxwait_t) (moo_t* moo, const moo_ntime_t* duration, moo_vmprim_muxwait_cb_t muxwcb);
typedef int (*moo_vmprim_muxmod_t) (
moo_t* moo,
moo_ooi_t io_handle,
moo_ooi_t masks
);
typedef int (*moo_vmprim_muxdel_t) (
moo_t* moo,
moo_ooi_t io_handle
);
typedef void (*moo_vmprim_muxwait_cb_t) (
moo_t* moo,
moo_ooi_t io_handle,
moo_ooi_t masks
);
typedef void (*moo_vmprim_muxwait_t) (
moo_t* moo,
const moo_ntime_t* duration,
moo_vmprim_muxwait_cb_t muxwcb
);
typedef void (*moo_vmprim_sleep_t) (
moo_t* moo,
const moo_ntime_t* duration
);
typedef void (*moo_vmprim_sleep_t) (moo_t* moo, const moo_ntime_t* duration);
struct moo_vmprim_t
{
moo_vmprim_dlopen_t dl_open;
@ -1271,8 +1297,8 @@ struct moo_t
moo_oow_t sem_io_count;
moo_oow_t sem_io_wait_count;
moo_ooi_t sem_io_map[10240]; /* TODO: make it dynamic */
moo_ooi_t* sem_io_map;
moo_oow_t sem_io_map_capa;
/* semaphore to notify finalizable objects */
moo_oop_semaphore_t sem_gcfin;