added moo_dev_timedread()

This commit is contained in:
2019-01-17 08:09:19 +00:00
parent 2372fc535e
commit 115031d2ce
6 changed files with 129 additions and 32 deletions

View File

@ -502,7 +502,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup)
* <= -1 - failure. kill the device.
* == 0 - ok. but don't invoke recv() or send().
* >= 1 - everything is ok. */
x = dev->dev_evcb->ready (dev, xevents);
x = dev->dev_evcb->ready(dev, xevents);
if (x <= -1)
{
mio_dev_halt (dev);
@ -515,7 +515,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup)
{
/* urgent data */
/* TODO: implement urgent data handling */
/*x = dev->dev_mth->urgread (dev, mio->bugbuf, &len);*/
/*x = dev->dev_mth->urgread(dev, mio->bugbuf, &len);*/
}
if (dev && (events & MIO_DEV_EVENT_OUT))
@ -535,7 +535,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup)
send_leftover:
ulen = urem;
x = dev->dev_mth->write (dev, uptr, &ulen, &q->dstaddr);
x = dev->dev_mth->write(dev, uptr, &ulen, &q->dstaddr);
if (x <= -1)
{
mio_dev_halt (dev);
@ -569,7 +569,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup)
}
unlink_wq (mio, q);
y = dev->dev_evcb->on_write (dev, q->olen, q->ctx, &q->dstaddr);
y = dev->dev_evcb->on_write(dev, q->olen, q->ctx, &q->dstaddr);
MIO_MMGR_FREE (mio->mmgr, q);
if (y <= -1)
@ -624,7 +624,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup)
while (1)
{
len = MIO_COUNTOF(mio->bigbuf);
x = dev->dev_mth->read (dev, mio->bigbuf, &len, &srcaddr);
x = dev->dev_mth->read(dev, mio->bigbuf, &len, &srcaddr);
if (x <= -1)
{
mio_dev_halt (dev);
@ -646,7 +646,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup)
mio->renew_watch = 1;
/* call the on_read callback to report EOF */
if (dev->dev_evcb->on_read (dev, mio->bigbuf, len, &srcaddr) <= -1 ||
if (dev->dev_evcb->on_read(dev, mio->bigbuf, len, &srcaddr) <= -1 ||
(dev->dev_capa & MIO_DEV_CAPA_OUT_CLOSED))
{
/* 1. input ended and its reporting failed or
@ -665,7 +665,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup)
* when x == 0 or <= -1. you can */
/* data available */
y = dev->dev_evcb->on_read (dev, mio->bigbuf, len, &srcaddr);
y = dev->dev_evcb->on_read(dev, mio->bigbuf, len, &srcaddr);
if (y <= -1)
{
mio_dev_halt (dev);
@ -717,7 +717,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup)
}
skip_evcb:
if (dev && mio->renew_watch && mio_dev_watch (dev, MIO_DEV_WATCH_RENEW, 0) <= -1)
if (dev && mio->renew_watch && mio_dev_watch(dev, MIO_DEV_WATCH_RENEW, 0) <= -1)
{
mio_dev_halt (dev);
dev = MIO_NULL;
@ -926,6 +926,7 @@ mio_dev_t* mio_makedev (mio_t* mio, mio_oow_t dev_size, mio_dev_mth_t* dev_mth,
dev->dev_capa = MIO_DEV_CAPA_IN | MIO_DEV_CAPA_OUT;
dev->dev_mth = dev_mth;
dev->dev_evcb = dev_evcb;
dev->rtmridx = MIO_TMRIDX_INVALID;
MIO_WQ_INIT (&dev->wq);
dev->cw_count = 0;
@ -1080,9 +1081,18 @@ void mio_killdev (mio_t* mio, mio_dev_t* dev)
if (dev->dev_capa & MIO_DEV_CAPA_ZOMBIE)
{
MIO_ASSERT (MIO_WQ_ISEMPTY(&dev->wq));
MIO_ASSERT (dev->cw_count == 0);
MIO_ASSERT (dev->rtmridx == MIO_TMRIDX_INVALID);
goto kill_device;
}
if (dev->rtmridx != MIO_TMRIDX_INVALID)
{
printf ("REMOVING.... TIMER FOR DEV\n");
mio_deltmrjob (mio, dev->rtmridx);
dev->rtmridx = MIO_TMRIDX_INVALID;
}
/* clear completed write event queues - TODO: do i need to fire these? */
if (dev->cw_count > 0)
{
@ -1248,8 +1258,81 @@ int mio_dev_watch (mio_dev_t* dev, mio_dev_watch_cmd_t cmd, int events)
return 0;
}
static void on_read_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job)
{
mio_dev_t* dev;
int x;
dev = (mio_dev_t*)job->ctx;
dev->mio->errnum = MIO_ETMOUT;
x = dev->dev_evcb->on_read(dev, MIO_NULL, -1, MIO_NULL);
MIO_ASSERT (dev->rtmridx == MIO_TMRIDX_INVALID);
if (x <= -1) mio_dev_halt (dev);
}
static int __dev_read (mio_dev_t* dev, int enabled, const mio_ntime_t* tmout, void* rdctx)
{
if (dev->dev_capa & MIO_DEV_CAPA_IN_CLOSED)
{
dev->mio->errnum = MIO_ENOCAPA;
return -1;
}
if (enabled)
{
dev->dev_capa &= ~MIO_DEV_CAPA_IN_DISABLED;
if (!dev->mio->in_exec && (dev->dev_capa & MIO_DEV_CAPA_IN_WATCHED)) goto renew_watch_now;
}
else
{
dev->dev_capa |= MIO_DEV_CAPA_IN_DISABLED;
if (!dev->mio->in_exec && !(dev->dev_capa & MIO_DEV_CAPA_IN_WATCHED)) goto renew_watch_now;
}
dev->mio->renew_watch = 1;
goto update_timer;
renew_watch_now:
if (mio_dev_watch(dev, MIO_DEV_WATCH_RENEW, 0) <= -1) return -1;
goto update_timer;
update_timer:
if (dev->rtmridx != MIO_TMRIDX_INVALID)
{
/* read timeout already on the socket. remove it first */
mio_deltmrjob (dev->mio, dev->rtmridx);
dev->rtmridx = MIO_TMRIDX_INVALID;
}
if (tmout && mio_ispostime(tmout))
{
mio_tmrjob_t tmrjob;
MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob));
tmrjob.ctx = dev;
mio_gettime (&tmrjob.when);
mio_addtime (&tmrjob.when, tmout, &tmrjob.when);
tmrjob.handler = on_read_timeout;
tmrjob.idxptr = &dev->rtmridx;
dev->rtmridx = mio_instmrjob(dev->mio, &tmrjob);
if (dev->rtmridx == MIO_TMRIDX_INVALID)
{
/* if timer registration fails, timeout will never be triggered */
return -1;
}
}
return 0;
}
int mio_dev_read (mio_dev_t* dev, int enabled)
{
return __dev_read(dev, enabled, MIO_NULL, MIO_NULL);
#if 0
if (dev->dev_capa & MIO_DEV_CAPA_IN_CLOSED)
{
dev->mio->errnum = MIO_ENOCAPA;
@ -1273,6 +1356,12 @@ int mio_dev_read (mio_dev_t* dev, int enabled)
renew_watch_now:
if (mio_dev_watch(dev, MIO_DEV_WATCH_RENEW, 0) <= -1) return -1;
return 0;
#endif
}
int mio_dev_timedread (mio_dev_t* dev, int enabled, const mio_ntime_t* tmout)
{
return __dev_read(dev, enabled, tmout, MIO_NULL);
}
static void on_write_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job)