unfinished refactoring

This commit is contained in:
2019-01-27 02:09:22 +00:00
parent 276d15877d
commit 446809d7f4
23 changed files with 1070 additions and 695 deletions

View File

@ -26,20 +26,6 @@
#include "mio-prv.h"
#if defined(HAVE_SYS_EPOLL_H)
# include <sys/epoll.h>
# define USE_EPOLL
#elif defined(HAVE_SYS_POLL_H)
# include <sys/poll.h>
# define USE_POLL
#else
# error NO SUPPORTED MULTIPLEXER
#endif
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#define DEV_CAPA_ALL_WATCHED (MIO_DEV_CAPA_IN_WATCHED | MIO_DEV_CAPA_OUT_WATCHED | MIO_DEV_CAPA_PRI_WATCHED)
static int schedule_kill_zombie_job (mio_dev_t* dev);
@ -65,294 +51,28 @@ static void on_read_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* j
static void on_write_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job);
/* ========================================================================= */
#if defined(USE_POLL)
#define MUX_CMD_INSERT 1
#define MUX_CMD_UPDATE 2
#define MUX_CMD_DELETE 3
#define MUX_INDEX_INVALID MIO_TYPE_MAX(mio_oow_t)
struct mio_mux_t
{
struct
{
mio_oow_t* ptr;
mio_oow_t size;
mio_oow_t capa;
} map; /* handle to index */
struct
{
struct pollfd* pfd;
mio_dev_t** dptr;
mio_oow_t size;
mio_oow_t capa;
} pd; /* poll data */
};
static int mux_open (mio_t* mio)
{
mio_mux_t* mux;
mux = MIO_MMGR_ALLOC (mio->mmgr, MIO_SIZEOF(*mux));
if (!mux)
{
mio->errnum = MIO_ESYSMEM;
return -1;
}
MIO_MEMSET (mux, 0, MIO_SIZEOF(*mux));
mio->mux = mux;
return 0;
}
static void mux_close (mio_t* mio)
{
if (mio->mux)
{
MIO_MMGR_FREE (mio->mmgr, mio->mux);
mio->mux = MIO_NULL;
}
}
static int mux_control (mio_dev_t* dev, int cmd, mio_syshnd_t hnd, int dev_capa)
{
mio_t* mio;
mio_mux_t* mux;
mio_oow_t idx;
mio = dev->mio;
mux = (mio_mux_t*)mio->mux;
if (hnd >= mux->map.capa)
{
mio_oow_t new_capa;
mio_oow_t* tmp;
if (cmd != MUX_CMD_INSERT)
{
mio->errnum = MIO_ENOENT;
return -1;
}
new_capa = MIO_ALIGN_POW2((hnd + 1), 256);
tmp = MIO_MMGR_REALLOC(mio->mmgr, mux->map.ptr, new_capa * MIO_SIZEOF(*tmp));
if (!tmp)
{
mio->errnum = MIO_ESYSMEM;
return -1;
}
for (idx = mux->map.capa; idx < new_capa; idx++)
tmp[idx] = MUX_INDEX_INVALID;
mux->map.ptr = tmp;
mux->map.capa = new_capa;
}
idx = mux->map.ptr[hnd];
if (idx != MUX_INDEX_INVALID)
{
if (cmd == MUX_CMD_INSERT)
{
mio->errnum = MIO_EEXIST;
return -1;
}
}
else
{
if (cmd != MUX_CMD_INSERT)
{
mio->errnum = MIO_ENOENT;
return -1;
}
}
switch (cmd)
{
case MUX_CMD_INSERT:
if (mux->pd.size >= mux->pd.capa)
{
mio_oow_t new_capa;
struct pollfd* tmp1;
mio_dev_t** tmp2;
new_capa = MIO_ALIGN_POW2(mux->pd.size + 1, 256);
tmp1 = MIO_MMGR_REALLOC(mio->mmgr, mux->pd.pfd, new_capa * MIO_SIZEOF(*tmp1));
if (!tmp1)
{
mio->errnum = MIO_ESYSMEM;
return -1;
}
tmp2 = MIO_MMGR_REALLOC (mio->mmgr, mux->pd.dptr, new_capa * MIO_SIZEOF(*tmp2));
if (!tmp2)
{
MIO_MMGR_FREE (mio->mmgr, tmp1);
mio->errnum = MIO_ESYSMEM;
return -1;
}
mux->pd.pfd = tmp1;
mux->pd.dptr = tmp2;
mux->pd.capa = new_capa;
}
idx = mux->pd.size++;
mux->pd.pfd[idx].fd = hnd;
mux->pd.pfd[idx].events = 0;
if (dev_capa & MIO_DEV_CAPA_IN_WATCHED) mux->pd.pfd[idx].events |= POLLIN;
if (dev_capa & MIO_DEV_CAPA_OUT_WATCHED) mux->pd.pfd[idx].events |= POLLOUT;
mux->pd.pfd[idx].revents = 0;
mux->pd.dptr[idx] = dev;
mux->map.ptr[hnd] = idx;
return 0;
case MUX_CMD_UPDATE:
MIO_ASSERT (mux->pd.dptr[idx] == dev);
mux->pd.pfd[idx].events = 0;
if (dev_capa & MIO_DEV_CAPA_IN_WATCHED) mux->pd.pfd[idx].events |= POLLIN;
if (dev_capa & MIO_DEV_CAPA_OUT_WATCHED) mux->pd.pfd[idx].events |= POLLOUT;
return 0;
case MUX_CMD_DELETE:
MIO_ASSERT (mux->pd.dptr[idx] == dev);
mux->map.ptr[hnd] = MUX_INDEX_INVALID;
/* TODO: speed up deletion. allow a hole in the array.
* delay array compaction if there is a hole.
* set fd for the hole to -1 such that poll()
* ignores it. compact the array if another deletion
* is requested when there is an existing hole. */
idx++;
while (idx < mux->pd.size)
{
int fd;
mux->pd.pfd[idx - 1] = mux->pd.pfd[idx];
mux->pd.dptr[idx - 1] = mux->pd.dptr[idx];
fd = mux->pd.pfd[idx].fd;
mux->map.ptr[fd] = idx - 1;
idx++;
}
mux->pd.size--;
return 0;
default:
mio->errnum = MIO_EINVAL;
return -1;
}
}
#elif defined(USE_EPOLL)
#define MUX_CMD_INSERT EPOLL_CTL_ADD
#define MUX_CMD_UPDATE EPOLL_CTL_MOD
#define MUX_CMD_DELETE EPOLL_CTL_DEL
struct mio_mux_t
{
int hnd;
struct epoll_event revs[100]; /* TODO: is it a good size? */
};
static int mux_open (mio_t* mio)
{
mio_mux_t* mux;
mux = MIO_MMGR_ALLOC (mio->mmgr, MIO_SIZEOF(*mux));
if (!mux)
{
mio->errnum = MIO_ESYSMEM;
return -1;
}
MIO_MEMSET (mux, 0, MIO_SIZEOF(*mux));
mux->hnd = epoll_create (1000);
if (mux->hnd == -1)
{
mio->errnum = mio_syserrtoerrnum(errno);
MIO_MMGR_FREE (mio->mmgr, mux);
return -1;
}
mio->mux = mux;
return 0;
}
static void mux_close (mio_t* mio)
{
if (mio->mux)
{
close (mio->mux->hnd);
MIO_MMGR_FREE (mio->mmgr, mio->mux);
mio->mux = MIO_NULL;
}
}
static MIO_INLINE int mux_control (mio_dev_t* dev, int cmd, mio_syshnd_t hnd, int dev_capa)
{
struct epoll_event ev;
ev.data.ptr = dev;
ev.events = EPOLLHUP | EPOLLERR /*| EPOLLET*/;
if (dev_capa & MIO_DEV_CAPA_IN_WATCHED)
{
ev.events |= EPOLLIN;
#if defined(EPOLLRDHUP)
ev.events |= EPOLLRDHUP;
#endif
if (dev_capa & MIO_DEV_CAPA_PRI_WATCHED) ev.events |= EPOLLPRI;
}
if (dev_capa & MIO_DEV_CAPA_OUT_WATCHED) ev.events |= EPOLLOUT;
if (epoll_ctl (dev->mio->mux->hnd, cmd, hnd, &ev) == -1)
{
dev->mio->errnum = mio_syserrtoerrnum(errno);
return -1;
}
return 0;
}
#endif
/* ========================================================================= */
mio_t* mio_open (mio_mmgr_t* mmgr, mio_oow_t xtnsize, mio_oow_t tmrcapa, mio_errnum_t* errnum)
mio_t* mio_open (mio_mmgr_t* mmgr, mio_oow_t xtnsize, mio_cmgr_t* cmgr, mio_oow_t tmrcapa, mio_errinf_t* errinfo)
{
mio_t* mio;
mio = MIO_MMGR_ALLOC (mmgr, MIO_SIZEOF(mio_t) + xtnsize);
if (!cmgr) cmgr = mio_get_utf8_cmgr();
mio = (mio_t*)MIO_MMGR_ALLOC(mmgr, MIO_SIZEOF(mio_t) + xtnsize);
if (mio)
{
if (mio_init (mio, mmgr, tmrcapa) <= -1)
if (mio_init(mio, mmgr, cmgr, tmrcapa) <= -1)
{
if (errnum) *errnum = mio->errnum;
if (errinfo) mio_geterrinf (mio, errinfo);
MIO_MMGR_FREE (mmgr, mio);
mio = MIO_NULL;
}
else MIO_MEMSET (mio + 1, 0, xtnsize);
}
else
else if (errinfo)
{
if (errnum) *errnum = MIO_ESYSMEM;
errinfo->num = MIO_ESYSMEM;
mio_copy_oocstr (errinfo->msg, MIO_COUNTOF(errinfo->msg), mio_errnum_to_errstr(MIO_ESYSMEM));
}
return mio;
@ -364,28 +84,53 @@ void mio_close (mio_t* mio)
MIO_MMGR_FREE (mio->mmgr, mio);
}
int mio_init (mio_t* mio, mio_mmgr_t* mmgr, mio_oow_t tmrcapa)
int mio_init (mio_t* mio, mio_mmgr_t* mmgr, mio_cmgr_t* cmgr, mio_oow_t tmrcapa)
{
int sys_log_inited = 0;
int sys_mux_inited = 0;
MIO_MEMSET (mio, 0, MIO_SIZEOF(*mio));
mio->mmgr = mmgr;
mio->cmgr = cmgr;
/* initialize data for logging support */
mio->option.log_mask = MIO_LOG_ALL_LEVELS | MIO_LOG_ALL_TYPES;
mio->log.capa = MIO_ALIGN_POW2(1, MIO_LOG_CAPA_ALIGN); /* TODO: is this a good initial size? */
/* alloate the log buffer in advance though it may get reallocated
* in put_oocs and put_ooch in fmtout.c. this is to let the logging
* routine still function despite some side-effects when
* reallocation fails */
/* +1 required for consistency with put_oocs and put_ooch in fmtout.c */
mio->log.ptr = mio_allocmem(mio, (mio->log.capa + 1) * MIO_SIZEOF(*mio->log.ptr));
if (!mio->log.ptr) goto oops;
/* inititalize the system-side logging */
mio_sys_initlog (mio);
sys_log_inited = 1;
/* intialize the multiplexer object */
if (mux_open (mio) <= -1) return -1;
if (mio_sys_initmux(mio) <= -1) goto oops;
sys_mux_inited = 0;
/* initialize the timer object */
if (tmrcapa <= 0) tmrcapa = 1;
mio->tmr.jobs = MIO_MMGR_ALLOC (mio->mmgr, tmrcapa * MIO_SIZEOF(mio_tmrjob_t));
if (!mio->tmr.jobs)
{
mio->errnum = MIO_ESYSMEM;
mux_close (mio);
return -1;
}
mio->tmr.jobs = mio_allocmem(mio, tmrcapa * MIO_SIZEOF(mio_tmrjob_t));
if (!mio->tmr.jobs) goto oops;
mio->tmr.capa = tmrcapa;
MIO_CWQ_INIT (&mio->cwq);
return 0;
oops:
if (mio->tmr.jobs) mio_freemem (mio, mio->tmr.jobs);
if (sys_mux_inited) mio_sys_finimux (mio);
if (sys_log_inited) mio_sys_finilog (mio);
if (mio->log.ptr) mio_freemem (mio, mio->log.ptr);
mio->log.capa = 0;
return -1;
}
void mio_fini (mio_t* mio)
@ -449,7 +194,7 @@ void mio_fini (mio_t* mio)
* because the device is freed regardless of the failure when 2
* is given to kill_and_free_device(). */
dev = diehard.head;
MIO_ASSERT (!(dev->dev_capa & (MIO_DEV_CAPA_ACTIVE | MIO_DEV_CAPA_HALTED | MIO_DEV_CAPA_ZOMBIE)));
MIO_ASSERT (mio, !(dev->dev_capa & (MIO_DEV_CAPA_ACTIVE | MIO_DEV_CAPA_HALTED | MIO_DEV_CAPA_ZOMBIE)));
UNLINK_DEVICE_FROM_LIST (&diehard, dev);
kill_and_free_device (dev, 2);
}
@ -458,10 +203,52 @@ void mio_fini (mio_t* mio)
mio_cleartmrjobs (mio);
MIO_MMGR_FREE (mio->mmgr, mio->tmr.jobs);
/* close the multiplexer */
mux_close (mio);
mio_sys_finimux (mio); /* close the multiplexer */
mio_sys_finilog (mio); /* close the system logger */
}
int mio_setoption (mio_t* mio, mio_option_t id, const void* value)
{
switch (id)
{
case MIO_TRAIT:
mio->option.trait = *(mio_bitmask_t*)value;
return 0;
case MIO_LOG_MASK:
mio->option.log_mask = *(mio_bitmask_t*)value;
return 0;
case MIO_LOG_MAXCAPA:
mio->option.log_maxcapa = *(mio_oow_t*)value;
return 0;
}
einval:
mio_seterrnum (mio, MIO_EINVAL);
return -1;
}
int mio_getoption (mio_t* mio, mio_option_t id, void* value)
{
switch (id)
{
case MIO_TRAIT:
*(mio_bitmask_t*)value = mio->option.trait;
return 0;
case MIO_LOG_MASK:
*(mio_bitmask_t*)value = mio->option.log_mask;
return 0;
case MIO_LOG_MAXCAPA:
*(mio_oow_t*)value = mio->option.log_maxcapa;
return 0;
};
mio_seterrnum (mio, MIO_EINVAL);
return -1;
}
int mio_prologue (mio_t* mio)
{
@ -479,7 +266,7 @@ static MIO_INLINE void unlink_wq (mio_t* mio, mio_wq_t* q)
if (q->tmridx != MIO_TMRIDX_INVALID)
{
mio_deltmrjob (mio, q->tmridx);
MIO_ASSERT (q->tmridx == MIO_TMRIDX_INVALID);
MIO_ASSERT (mio, q->tmridx == MIO_TMRIDX_INVALID);
}
MIO_WQ_UNLINK (q);
}
@ -491,7 +278,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup)
mio = dev->mio;
mio->renew_watch = 0;
MIO_ASSERT (mio == dev->mio);
MIO_ASSERT (mio, mio == dev->mio);
if (dev->dev_evcb->ready)
{
@ -642,8 +429,8 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup)
MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob));
tmrjob.ctx = dev;
mio_gettime (&tmrjob.when);
mio_addtime (&tmrjob.when, &dev->rtmout, &tmrjob.when);
mio_sys_gettime (&tmrjob.when);
MIO_ADD_NTIME (&tmrjob.when, &tmrjob.when, &dev->rtmout);
tmrjob.handler = on_read_timeout;
tmrjob.idxptr = &dev->rtmridx;
@ -752,12 +539,8 @@ static MIO_INLINE int __exec (mio_t* mio)
{
mio_ntime_t tmout;
#if defined(_WIN32)
ULONG nentries, i;
#else
int nentries, i;
mio_mux_t* mux;
#endif
mio_sys_mux_t* mux;
/*if (!mio->actdev.head) return 0;*/
@ -811,13 +594,13 @@ static MIO_INLINE int __exec (mio_t* mio)
*/
#elif defined(USE_POLL)
mux = (mio_mux_t*)mio->mux;
mux = (mio_sys_mux_t*)mio->sys.mux;
nentries = poll(mux->pd.pfd, mux->pd.size, MIO_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec));
if (nentries == -1)
{
if (errno == EINTR) return 0;
mio->errnum = mio_syserrtoerrnum(errno);
mio_seterrwithsyserr (mio, 0, errno);
return -1;
}
@ -830,7 +613,7 @@ static MIO_INLINE int __exec (mio_t* mio)
dev = mux->pd.dptr[i];
MIO_ASSERT (!(mux->pd.pfd[i].revents & POLLNVAL));
MIO_ASSERT (mio, !(mux->pd.pfd[i].revents & POLLNVAL));
if (mux->pd.pfd[i].revents & POLLIN) events |= MIO_DEV_EVENT_IN;
if (mux->pd.pfd[i].revents & POLLOUT) events |= MIO_DEV_EVENT_OUT;
if (mux->pd.pfd[i].revents & POLLPRI) events |= MIO_DEV_EVENT_PRI;
@ -843,14 +626,14 @@ static MIO_INLINE int __exec (mio_t* mio)
#elif defined(USE_EPOLL)
mux = (mio_mux_t*)mio->mux;
mux = (mio_sys_mux_t*)mio->sys.mux;
nentries = epoll_wait(mux->hnd, mux->revs, MIO_COUNTOF(mux->revs), MIO_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec));
if (nentries == -1)
{
if (errno == EINTR) return 0; /* it's actually ok */
/* other errors are critical - EBADF, EFAULT, EINVAL */
mio->errnum = mio_syserrtoerrnum(errno);
mio_seterrwithsyserr (mio, 0, errno);
return -1;
}
@ -882,10 +665,10 @@ static MIO_INLINE int __exec (mio_t* mio)
/* kill all halted devices */
while (mio->hltdev.head)
{
printf (">>>>>>>>>>>>>> KILLING HALTED DEVICE %p\n", mio->hltdev.head);
MIO_DEBUG1 (mio, "Killing HALTED device %p\n", mio->hltdev.head);
mio_killdev (mio, mio->hltdev.head);
}
MIO_ASSERT (mio->hltdev.tail == MIO_NULL);
MIO_ASSERT (mio, mio->hltdev.tail == MIO_NULL);
return 0;
}
@ -950,7 +733,7 @@ mio_dev_t* mio_makedev (mio_t* mio, mio_oow_t dev_size, mio_dev_mth_t* dev_mth,
dev->dev_capa = MIO_DEV_CAPA_IN | MIO_DEV_CAPA_OUT;
dev->dev_mth = dev_mth;
dev->dev_evcb = dev_evcb;
mio_inittime (&dev->rtmout, 0, 0);
MIO_INIT_NTIME (&dev->rtmout, 0, 0);
dev->rtmridx = MIO_TMRIDX_INVALID;
MIO_WQ_INIT (&dev->wq);
dev->cw_count = 0;
@ -964,10 +747,10 @@ mio_dev_t* mio_makedev (mio_t* mio, mio_oow_t dev_size, mio_dev_mth_t* dev_mth,
}
/* the make callback must not change these fields */
MIO_ASSERT (dev->dev_mth == dev_mth);
MIO_ASSERT (dev->dev_evcb == dev_evcb);
MIO_ASSERT (dev->dev_prev == MIO_NULL);
MIO_ASSERT (dev->dev_next == MIO_NULL);
MIO_ASSERT (mio, dev->dev_mth == dev_mth);
MIO_ASSERT (mio, dev->dev_evcb == dev_evcb);
MIO_ASSERT (mio, dev->dev_prev == MIO_NULL);
MIO_ASSERT (mio, dev->dev_next == MIO_NULL);
/* set some internal capability bits according to the capabilities
* removed by the device making callback for convenience sake. */
@ -1026,8 +809,8 @@ static int kill_and_free_device (mio_dev_t* dev, int force)
{
mio_t* mio;
MIO_ASSERT (!(dev->dev_capa & MIO_DEV_CAPA_ACTIVE));
MIO_ASSERT (!(dev->dev_capa & MIO_DEV_CAPA_HALTED));
MIO_ASSERT (mio, !(dev->dev_capa & MIO_DEV_CAPA_ACTIVE));
MIO_ASSERT (mio, !(dev->dev_capa & MIO_DEV_CAPA_HALTED));
mio = dev->mio;
@ -1060,7 +843,7 @@ static void kill_zombie_job_handler (mio_t* mio, const mio_ntime_t* now, mio_tmr
{
mio_dev_t* dev = (mio_dev_t*)job->ctx;
MIO_ASSERT (dev->dev_capa & MIO_DEV_CAPA_ZOMBIE);
MIO_ASSERT (mio, dev->dev_capa & MIO_DEV_CAPA_ZOMBIE);
if (kill_and_free_device(dev, 0) <= -1)
{
@ -1087,12 +870,12 @@ static int schedule_kill_zombie_job (mio_dev_t* dev)
mio_tmrjob_t kill_zombie_job;
mio_ntime_t tmout;
mio_inittime (&tmout, 3, 0); /* TODO: take it from configuration */
MIO_INIT_NTIME (&tmout, 3, 0); /* TODO: take it from configuration */
MIO_MEMSET (&kill_zombie_job, 0, MIO_SIZEOF(kill_zombie_job));
kill_zombie_job.ctx = dev;
mio_gettime (&kill_zombie_job.when);
mio_addtime (&kill_zombie_job.when, &tmout, &kill_zombie_job.when);
mio_sys_gettime (&kill_zombie_job.when);
MIO_ADD_NTIME (&kill_zombie_job.when, &kill_zombie_job.when, &tmout);
kill_zombie_job.handler = kill_zombie_job_handler;
/*kill_zombie_job.idxptr = &rdev->tmridx_kill_zombie;*/
@ -1101,13 +884,13 @@ static int schedule_kill_zombie_job (mio_dev_t* dev)
void mio_killdev (mio_t* mio, mio_dev_t* dev)
{
MIO_ASSERT (mio == dev->mio);
MIO_ASSERT (mio, mio == dev->mio);
if (dev->dev_capa & MIO_DEV_CAPA_ZOMBIE)
{
MIO_ASSERT (MIO_WQ_ISEMPTY(&dev->wq));
MIO_ASSERT (dev->cw_count == 0);
MIO_ASSERT (dev->rtmridx == MIO_TMRIDX_INVALID);
MIO_ASSERT (mio, MIO_WQ_ISEMPTY(&dev->wq));
MIO_ASSERT (mio, dev->cw_count == 0);
MIO_ASSERT (mio, dev->rtmridx == MIO_TMRIDX_INVALID);
goto kill_device;
}
@ -1152,7 +935,7 @@ void mio_killdev (mio_t* mio, mio_dev_t* dev)
}
else
{
MIO_ASSERT (dev->dev_capa & MIO_DEV_CAPA_ACTIVE);
MIO_ASSERT (mio, dev->dev_capa & MIO_DEV_CAPA_ACTIVE);
UNLINK_DEVICE_FROM_LIST (&mio->actdev, dev);
dev->dev_capa &= ~MIO_DEV_CAPA_ACTIVE;
}
@ -1162,7 +945,7 @@ void mio_killdev (mio_t* mio, mio_dev_t* dev)
kill_device:
if (kill_and_free_device(dev, 0) <= -1)
{
MIO_ASSERT (dev->dev_capa & MIO_DEV_CAPA_ZOMBIE);
MIO_ASSERT (mio, dev->dev_capa & MIO_DEV_CAPA_ZOMBIE);
if (schedule_kill_zombie_job (dev) <= -1)
{
/* i have to choice but to free up the devide by force */
@ -1223,7 +1006,7 @@ int mio_dev_watch (mio_dev_t* dev, mio_dev_watch_cmd_t cmd, int events)
case MIO_DEV_WATCH_START:
/* upon start, only input watching is requested */
events = MIO_DEV_EVENT_IN;
mux_cmd = MUX_CMD_INSERT;
mux_cmd = MIO_SYS_MUX_CMD_INSERT;
break;
case MIO_DEV_WATCH_RENEW:
@ -1235,12 +1018,12 @@ int mio_dev_watch (mio_dev_t* dev, mio_dev_watch_cmd_t cmd, int events)
/* fall through */
case MIO_DEV_WATCH_UPDATE:
/* honor event watching requests as given by the caller */
mux_cmd = MUX_CMD_UPDATE;
mux_cmd = MIO_SYS_MUX_CMD_UPDATE;
break;
case MIO_DEV_WATCH_STOP:
events = 0; /* override events */
mux_cmd = MUX_CMD_DELETE;
mux_cmd = MIO_SYS_MUX_CMD_DELETE;
break;
default:
@ -1269,13 +1052,13 @@ int mio_dev_watch (mio_dev_t* dev, mio_dev_watch_cmd_t cmd, int events)
if (dev->dev_capa & MIO_DEV_CAPA_OUT) dev_capa |= MIO_DEV_CAPA_OUT_WATCHED;
}
if (mux_cmd == MUX_CMD_UPDATE && (dev_capa & DEV_CAPA_ALL_WATCHED) == (dev->dev_capa & DEV_CAPA_ALL_WATCHED))
if (mux_cmd == MIO_SYS_MUX_CMD_UPDATE && (dev_capa & DEV_CAPA_ALL_WATCHED) == (dev->dev_capa & DEV_CAPA_ALL_WATCHED))
{
/* no change in the device capacity. skip calling epoll_ctl */
}
else
{
if (mux_control(dev, mux_cmd, dev->dev_mth->getsyshnd(dev), dev_capa) <= -1) return -1;
if (mio_sys_ctrlmux(dev->mio, mux_cmd, dev, dev_capa) <= -1) return -1;
}
dev->dev_capa = dev_capa;
@ -1292,7 +1075,7 @@ static void on_read_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* j
dev->mio->errnum = MIO_ETMOUT;
x = dev->dev_evcb->on_read(dev, MIO_NULL, -1, MIO_NULL);
MIO_ASSERT (dev->rtmridx == MIO_TMRIDX_INVALID);
MIO_ASSERT (mio, dev->rtmridx == MIO_TMRIDX_INVALID);
if (x <= -1) mio_dev_halt (dev);
}
@ -1331,14 +1114,14 @@ update_timer:
dev->rtmridx = MIO_TMRIDX_INVALID;
}
if (tmout && mio_ispostime(tmout))
if (tmout && MIO_IS_POS_NTIME(tmout))
{
mio_tmrjob_t tmrjob;
MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob));
tmrjob.ctx = dev;
mio_gettime (&tmrjob.when);
mio_addtime (&tmrjob.when, tmout, &tmrjob.when);
mio_sys_gettime (&tmrjob.when);
MIO_ADD_NTIME (&tmrjob.when, &tmrjob.when, tmout);
tmrjob.handler = on_read_timeout;
tmrjob.idxptr = &dev->rtmridx;
@ -1401,7 +1184,7 @@ static void on_write_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t*
dev->mio->errnum = MIO_ETMOUT;
x = dev->dev_evcb->on_write(dev, -1, q->ctx, &q->dstaddr);
MIO_ASSERT (q->tmridx == MIO_TMRIDX_INVALID);
MIO_ASSERT (mio, q->tmridx == MIO_TMRIDX_INVALID);
MIO_WQ_UNLINK(q);
MIO_MMGR_FREE (mio->mmgr, q);
@ -1526,14 +1309,14 @@ enqueue_data:
q->olen = len;
MIO_MEMCPY (q->ptr, uptr, urem);
if (tmout && mio_ispostime(tmout))
if (tmout && MIO_IS_POS_NTIME(tmout))
{
mio_tmrjob_t tmrjob;
MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob));
tmrjob.ctx = q;
mio_gettime (&tmrjob.when);
mio_addtime (&tmrjob.when, tmout, &tmrjob.when);
mio_sys_gettime (&tmrjob.when);
MIO_ADD_NTIME (&tmrjob.when, &tmrjob.when, tmout);
tmrjob.handler = on_write_timeout;
tmrjob.idxptr = &q->tmridx;
@ -1624,74 +1407,17 @@ int mio_makesyshndasync (mio_t* mio, mio_syshnd_t hnd)
if ((flags = fcntl(hnd, F_GETFL)) <= -1 ||
(flags = fcntl(hnd, F_SETFL, flags | O_NONBLOCK)) <= -1)
{
mio->errnum = mio_syserrtoerrnum (errno);
mio_seterrwithsyserr (mio, 0, errno);
return -1;
}
return 0;
#else
mio->errnum = MIO_ENOSUP;
mio->errnum = MIO_ENOIMPL;
return -1;
#endif
}
mio_errnum_t mio_syserrtoerrnum (int no)
{
switch (no)
{
case ENOMEM:
return MIO_ESYSMEM;
case EINVAL:
return MIO_EINVAL;
case EEXIST:
return MIO_EEXIST;
case ENOENT:
return MIO_ENOENT;
case EMFILE:
return MIO_EMFILE;
#if defined(ENFILE)
case ENFILE:
return MIO_ENFILE;
#endif
#if defined(EWOULDBLOCK) && defined(EAGAIN) && (EWOULDBLOCK != EAGAIN)
case EAGAIN:
case EWOULDBLOCK:
return MIO_EAGAIN;
#elif defined(EAGAIN)
case EAGAIN:
return MIO_EAGAIN;
#elif defined(EWOULDBLOCK)
case EWOULDBLOCK:
return MIO_EAGAIN;
#endif
#if defined(ECONNREFUSED)
case ECONNREFUSED:
return MIO_ECONRF;
#endif
#if defined(ECONNRESETD)
case ECONNRESET:
return MIO_ECONRS;
#endif
#if defined(EPERM)
case EPERM:
return MIO_EPERM;
#endif
default:
return MIO_ESYSERR;
}
}
/* -------------------------------------------------------------------------- */
void* mio_allocmem (mio_t* mio, mio_oow_t size)