changed to push backward the write completion event into the main loop
This commit is contained in:
@ -382,6 +382,7 @@ int mio_init (mio_t* mio, mio_mmgr_t* mmgr, mio_oow_t tmrcapa)
|
||||
}
|
||||
mio->tmr.capa = tmrcapa;
|
||||
|
||||
MIO_CWQ_INIT (&mio->cwq);
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -394,6 +395,16 @@ void mio_fini (mio_t* mio)
|
||||
mio_dev_t* tail;
|
||||
} diehard;
|
||||
|
||||
/* clear completed write event queues - TODO: do i need to fire these? */
|
||||
while (!MIO_CWQ_ISEMPTY(&mio->cwq))
|
||||
{
|
||||
mio_cwq_t* cwq;
|
||||
cwq = MIO_CWQ_HEAD(&mio->cwq);
|
||||
MIO_CWQ_UNLINK (cwq);
|
||||
MIO_MMGR_FREE (mio->mmgr, cwq);
|
||||
}
|
||||
|
||||
|
||||
/* kill all registered devices */
|
||||
while (mio->actdev.head)
|
||||
{
|
||||
@ -723,11 +734,21 @@ static MIO_INLINE int __exec (mio_t* mio)
|
||||
|
||||
/*if (!mio->actdev.head) return 0;*/
|
||||
|
||||
/* execute callbacks for completed write operations */
|
||||
while (!MIO_CWQ_ISEMPTY(&mio->cwq))
|
||||
{
|
||||
mio_cwq_t* cwq;
|
||||
cwq = MIO_CWQ_HEAD(&mio->cwq);
|
||||
if (cwq->dev->dev_evcb->on_write(cwq->dev, cwq->olen, cwq->ctx, &cwq->dstaddr) <= -1) return -1;
|
||||
MIO_CWQ_UNLINK (cwq);
|
||||
MIO_MMGR_FREE (mio->mmgr, cwq);
|
||||
}
|
||||
|
||||
/* execute the scheduled jobs before checking devices with the
|
||||
* multiplexer. the scheduled jobs can safely destroy the devices */
|
||||
mio_firetmrjobs (mio, MIO_NULL, MIO_NULL);
|
||||
|
||||
if (mio_gettmrtmout (mio, MIO_NULL, &tmout) <= -1)
|
||||
if (mio_gettmrtmout(mio, MIO_NULL, &tmout) <= -1)
|
||||
{
|
||||
/* defaults to 1 second if timeout can't be acquired */
|
||||
tmout.sec = 1; /* TODO: make the default timeout configurable */
|
||||
@ -750,7 +771,7 @@ static MIO_INLINE int __exec (mio_t* mio)
|
||||
|
||||
mux = (mio_mux_t*)mio->mux;
|
||||
|
||||
nentries = poll (mux->pd.pfd, mux->pd.size, MIO_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec));
|
||||
nentries = poll(mux->pd.pfd, mux->pd.size, MIO_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec));
|
||||
if (nentries == -1)
|
||||
{
|
||||
if (errno == EINTR) return 0;
|
||||
@ -782,7 +803,7 @@ static MIO_INLINE int __exec (mio_t* mio)
|
||||
|
||||
mux = (mio_mux_t*)mio->mux;
|
||||
|
||||
nentries = epoll_wait (mux->hnd, mux->revs, MIO_COUNTOF(mux->revs), MIO_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec));
|
||||
nentries = epoll_wait(mux->hnd, mux->revs, MIO_COUNTOF(mux->revs), MIO_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec));
|
||||
if (nentries == -1)
|
||||
{
|
||||
if (errno == EINTR) return 0; /* it's actually ok */
|
||||
@ -887,7 +908,7 @@ mio_dev_t* mio_makedev (mio_t* mio, mio_oow_t dev_size, mio_dev_mth_t* dev_mth,
|
||||
dev->dev_capa = MIO_DEV_CAPA_IN | MIO_DEV_CAPA_OUT;
|
||||
dev->dev_mth = dev_mth;
|
||||
dev->dev_evcb = dev_evcb;
|
||||
MIO_WQ_INIT(&dev->wq);
|
||||
MIO_WQ_INIT (&dev->wq);
|
||||
|
||||
/* call the callback function first */
|
||||
mio->errnum = MIO_ENOERR;
|
||||
@ -909,7 +930,7 @@ mio_dev_t* mio_makedev (mio_t* mio, mio_oow_t dev_size, mio_dev_mth_t* dev_mth,
|
||||
if (!(dev->dev_capa & MIO_DEV_CAPA_OUT)) dev->dev_capa |= MIO_DEV_CAPA_OUT_CLOSED;
|
||||
|
||||
#if defined(_WIN32)
|
||||
if (CreateIoCompletionPort ((HANDLE)dev->dev_mth->getsyshnd(dev), mio->iocp, MIO_IOCP_KEY, 0) == NULL)
|
||||
if (CreateIoCompletionPort((HANDLE)dev->dev_mth->getsyshnd(dev), mio->iocp, MIO_IOCP_KEY, 0) == NULL)
|
||||
{
|
||||
/* TODO: set errnum from GetLastError()... */
|
||||
goto oops_after_make;
|
||||
@ -1043,6 +1064,17 @@ void mio_killdev (mio_t* mio, mio_dev_t* dev)
|
||||
goto kill_device;
|
||||
}
|
||||
|
||||
#if 0
|
||||
/* clear completed write event queues - TODO: do i need to fire these? */
|
||||
while (!MIO_CWQ_ISEMPTY(&dev->cwq))
|
||||
{
|
||||
mio_cwq_t* q;
|
||||
q = MIO_CWQ_HEAD(&dev->cwq);
|
||||
MIO_CWQ_UNLINK (q);
|
||||
MIO_MMGR_FREE (mio->mmgr, q);
|
||||
}
|
||||
#endif
|
||||
|
||||
/* clear pending send requests */
|
||||
while (!MIO_WQ_ISEMPTY(&dev->wq))
|
||||
{
|
||||
@ -1184,7 +1216,7 @@ int mio_dev_watch (mio_dev_t* dev, mio_dev_watch_cmd_t cmd, int events)
|
||||
}
|
||||
else
|
||||
{
|
||||
if (mux_control (dev, mux_cmd, dev->dev_mth->getsyshnd(dev), dev_capa) <= -1) return -1;
|
||||
if (mux_control(dev, mux_cmd, dev->dev_mth->getsyshnd(dev), dev_capa) <= -1) return -1;
|
||||
}
|
||||
|
||||
dev->dev_capa = dev_capa;
|
||||
@ -1214,7 +1246,7 @@ int mio_dev_read (mio_dev_t* dev, int enabled)
|
||||
return 0;
|
||||
|
||||
renew_watch_now:
|
||||
if (mio_dev_watch (dev, MIO_DEV_WATCH_RENEW, 0) <= -1) return -1;
|
||||
if (mio_dev_watch(dev, MIO_DEV_WATCH_RENEW, 0) <= -1) return -1;
|
||||
return 0;
|
||||
}
|
||||
|
||||
@ -1228,7 +1260,7 @@ static void on_write_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t*
|
||||
dev = q->dev;
|
||||
|
||||
dev->mio->errnum = MIO_ETMOUT;
|
||||
x = dev->dev_evcb->on_write (dev, -1, q->ctx, &q->dstaddr);
|
||||
x = dev->dev_evcb->on_write(dev, -1, q->ctx, &q->dstaddr);
|
||||
|
||||
MIO_ASSERT (q->tmridx == MIO_TMRIDX_INVALID);
|
||||
MIO_WQ_UNLINK(q);
|
||||
@ -1242,6 +1274,7 @@ static int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const
|
||||
const mio_uint8_t* uptr;
|
||||
mio_iolen_t urem, ulen;
|
||||
mio_wq_t* q;
|
||||
mio_cwq_t* cwq;
|
||||
int x;
|
||||
|
||||
if (dev->dev_capa & MIO_DEV_CAPA_OUT_CLOSED)
|
||||
@ -1266,7 +1299,7 @@ static int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const
|
||||
do
|
||||
{
|
||||
ulen = urem;
|
||||
x = dev->dev_mth->write (dev, data, &ulen, dstaddr);
|
||||
x = dev->dev_mth->write(dev, data, &ulen, dstaddr);
|
||||
if (x <= -1) return -1;
|
||||
else if (x == 0)
|
||||
{
|
||||
@ -1292,7 +1325,8 @@ static int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const
|
||||
dev->dev_capa |= MIO_DEV_CAPA_OUT_CLOSED;
|
||||
}
|
||||
|
||||
if (dev->dev_evcb->on_write(dev, len, wrctx, dstaddr) <= -1) return -1;
|
||||
//if (dev->dev_evcb->on_write(dev, len, wrctx, dstaddr) <= -1) return -1;
|
||||
goto enqueue_completed_write;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -1303,7 +1337,8 @@ static int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const
|
||||
else if (x == 0) goto enqueue_data;
|
||||
|
||||
/* partial writing is still considered ok for a non-stream device */
|
||||
if (dev->dev_evcb->on_write (dev, ulen, wrctx, dstaddr) <= -1) return -1;
|
||||
//if (dev->dev_evcb->on_write(dev, ulen, wrctx, dstaddr) <= -1) return -1;
|
||||
goto enqueue_completed_write;
|
||||
}
|
||||
|
||||
return 1; /* written immediately and called on_write callback */
|
||||
@ -1317,7 +1352,7 @@ enqueue_data:
|
||||
}
|
||||
|
||||
/* queue the remaining data*/
|
||||
q = (mio_wq_t*)MIO_MMGR_ALLOC (dev->mio->mmgr, MIO_SIZEOF(*q) + (dstaddr? dstaddr->len: 0) + urem);
|
||||
q = (mio_wq_t*)MIO_MMGR_ALLOC(dev->mio->mmgr, MIO_SIZEOF(*q) + (dstaddr? dstaddr->len: 0) + urem);
|
||||
if (!q)
|
||||
{
|
||||
dev->mio->errnum = MIO_ENOMEM;
|
||||
@ -1380,6 +1415,33 @@ enqueue_data:
|
||||
}
|
||||
|
||||
return 0; /* request pused to a write queue. */
|
||||
|
||||
enqueue_completed_write:
|
||||
/* queue the remaining data*/
|
||||
cwq = (mio_cwq_t*)MIO_MMGR_ALLOC(dev->mio->mmgr, MIO_SIZEOF(*cwq) + (dstaddr? dstaddr->len: 0));
|
||||
if (!cwq)
|
||||
{
|
||||
dev->mio->errnum = MIO_ENOMEM;
|
||||
return -1;
|
||||
}
|
||||
|
||||
cwq->dev = dev;
|
||||
cwq->ctx = wrctx;
|
||||
if (dstaddr)
|
||||
{
|
||||
cwq->dstaddr.ptr = (mio_uint8_t*)(cwq + 1);
|
||||
cwq->dstaddr.len = dstaddr->len;
|
||||
MIO_MEMCPY (cwq->dstaddr.ptr, dstaddr->ptr, dstaddr->len);
|
||||
}
|
||||
else
|
||||
{
|
||||
cwq->dstaddr.len = 0;
|
||||
}
|
||||
|
||||
cwq->olen = len;
|
||||
|
||||
MIO_CWQ_ENQ (&dev->mio->cwq, cwq);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int mio_dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, void* wrctx, const mio_devaddr_t* dstaddr)
|
||||
|
Reference in New Issue
Block a user