changed to clear the on_write callbacks for completed write requestrs after having triggered timer callbacks collectively and also before triggering on_read() for each device
cleaned up debugging messages in dns-cli.c experimenting with http server implementation
This commit is contained in:
147
mio/lib/mio.c
147
mio/lib/mio.c
@ -298,6 +298,93 @@ static MIO_INLINE void unlink_wq (mio_t* mio, mio_wq_t* q)
|
||||
MIO_WQ_UNLINK (q);
|
||||
}
|
||||
|
||||
static void fire_cwq_handlers (mio_t* mio)
|
||||
{
|
||||
/* execute callbacks for completed write operations */
|
||||
while (!MIO_CWQ_IS_EMPTY(&mio->cwq))
|
||||
{
|
||||
mio_cwq_t* cwq;
|
||||
mio_oow_t cwqfl_index;
|
||||
mio_dev_t* dev_to_halt;
|
||||
|
||||
cwq = MIO_CWQ_HEAD(&mio->cwq);
|
||||
if (cwq->dev->dev_evcb->on_write(cwq->dev, cwq->olen, cwq->ctx, &cwq->dstaddr) <= -1)
|
||||
{
|
||||
MIO_DEBUG1 (mio, "MIO - Error returned by on_write() of device %p in cwq\n", cwq->dev);
|
||||
dev_to_halt = cwq->dev;
|
||||
}
|
||||
else
|
||||
{
|
||||
dev_to_halt = MIO_NULL;
|
||||
}
|
||||
cwq->dev->cw_count--;
|
||||
MIO_CWQ_UNLINK (cwq);
|
||||
|
||||
cwqfl_index = MIO_ALIGN_POW2(cwq->dstaddr.len, MIO_CWQFL_ALIGN) / MIO_CWQFL_SIZE;
|
||||
if (cwqfl_index < MIO_COUNTOF(mio->cwqfl))
|
||||
{
|
||||
/* reuse the cwq object if dstaddr is 0 in size. chain it to the free list */
|
||||
cwq->q_next = mio->cwqfl[cwqfl_index];
|
||||
mio->cwqfl[cwqfl_index] = cwq;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* TODO: more reuse of objects of different size? */
|
||||
mio_freemem (mio, cwq);
|
||||
}
|
||||
|
||||
if (dev_to_halt) mio_dev_halt (dev_to_halt);
|
||||
}
|
||||
}
|
||||
|
||||
static void fire_cwq_handlers_for_dev (mio_t* mio, mio_dev_t* dev)
|
||||
{
|
||||
mio_cwq_t* cwq, * next;
|
||||
|
||||
MIO_ASSERT (mio, dev->cw_count > 0); /* Ensure to check dev->cw_count before calling this function */
|
||||
|
||||
cwq = MIO_CWQ_HEAD(&mio->cwq);
|
||||
while (cwq != &mio->cwq)
|
||||
{
|
||||
next = MIO_CWQ_NEXT(cwq);
|
||||
if (cwq->dev == dev) /* TODO: THIS LOOP TOO INEFFICIENT??? MAINTAIN PER-DEVICE LIST OF CWQ? */
|
||||
{
|
||||
mio_dev_t* dev_to_halt;
|
||||
mio_oow_t cwqfl_index;
|
||||
|
||||
if (cwq->dev->dev_evcb->on_write(cwq->dev, cwq->olen, cwq->ctx, &cwq->dstaddr) <= -1)
|
||||
{
|
||||
MIO_DEBUG1 (mio, "MIO - Error returned by on_write() of device %p in cwq\n", cwq->dev);
|
||||
dev_to_halt = cwq->dev;
|
||||
}
|
||||
else
|
||||
{
|
||||
dev_to_halt = MIO_NULL;
|
||||
}
|
||||
|
||||
cwq->dev->cw_count--;
|
||||
MIO_CWQ_UNLINK (cwq);
|
||||
|
||||
cwqfl_index = MIO_ALIGN_POW2(cwq->dstaddr.len, MIO_CWQFL_ALIGN) / MIO_CWQFL_SIZE;
|
||||
if (cwqfl_index < MIO_COUNTOF(mio->cwqfl))
|
||||
{
|
||||
/* reuse the cwq object if dstaddr is 0 in size. chain it to the free list */
|
||||
cwq->q_next = mio->cwqfl[cwqfl_index];
|
||||
mio->cwqfl[cwqfl_index] = cwq;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* TODO: more reuse of objects of different size? */
|
||||
mio_freemem (mio, cwq);
|
||||
}
|
||||
|
||||
if (dev_to_halt) mio_dev_halt (dev_to_halt);
|
||||
}
|
||||
cwq = next;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
static MIO_INLINE void handle_event (mio_t* mio, mio_dev_t* dev, int events, int rdhup)
|
||||
{
|
||||
MIO_ASSERT (mio, mio == dev->mio);
|
||||
@ -389,7 +476,7 @@ static MIO_INLINE void handle_event (mio_t* mio, mio_dev_t* dev, int events, int
|
||||
|
||||
if (y <= -1)
|
||||
{
|
||||
MIO_DEBUG1 (mio, "Error returned by on_write() of device %p\n", dev);
|
||||
MIO_DEBUG1 (mio, "MIO - Error returned by on_write() of device %p\n", dev);
|
||||
mio_dev_halt (dev);
|
||||
dev = MIO_NULL;
|
||||
break;
|
||||
@ -474,6 +561,19 @@ static MIO_INLINE void handle_event (mio_t* mio, mio_dev_t* dev, int events, int
|
||||
}
|
||||
else /*if (x >= 1) */
|
||||
{
|
||||
/* call on_write() callbacks enqueued fro the device before calling on_read().
|
||||
* if on_write() callback is delayed, there can be out-of-order execution
|
||||
* between on_read() and on_write() callbacks. for instance, if a write request
|
||||
* is started from within on_read() callback, and the input data is available
|
||||
* in the next iteration of this loop, the on_read() callback is triggered
|
||||
* before the on_write() callbacks scheduled before that on_read() callback. */
|
||||
if (dev->cw_count > 0)
|
||||
{
|
||||
fire_cwq_handlers_for_dev (mio, dev);
|
||||
/* it will still invoke the on_read() callbak below even if
|
||||
* the device gets halted inside fire_cwq_handlers_for_dev() */
|
||||
}
|
||||
|
||||
if (len <= 0 && (dev->dev_cap & MIO_DEV_CAP_STREAM))
|
||||
{
|
||||
/* EOF received. for a stream device, a zero-length
|
||||
@ -565,45 +665,15 @@ int mio_exec (mio_t* mio)
|
||||
int ret = 0;
|
||||
|
||||
/* execute callbacks for completed write operations */
|
||||
while (!MIO_CWQ_IS_EMPTY(&mio->cwq))
|
||||
{
|
||||
mio_cwq_t* cwq;
|
||||
mio_oow_t cwqfl_index;
|
||||
mio_dev_t* dev_to_halt;
|
||||
|
||||
cwq = MIO_CWQ_HEAD(&mio->cwq);
|
||||
if (cwq->dev->dev_evcb->on_write(cwq->dev, cwq->olen, cwq->ctx, &cwq->dstaddr) <= -1)
|
||||
{
|
||||
MIO_DEBUG1 (mio, "Error returned by on_write() of device %p in cwq\n", cwq->dev);
|
||||
dev_to_halt = cwq->dev;
|
||||
}
|
||||
else
|
||||
{
|
||||
dev_to_halt = MIO_NULL;
|
||||
}
|
||||
cwq->dev->cw_count--;
|
||||
MIO_CWQ_UNLINK (cwq);
|
||||
|
||||
cwqfl_index = MIO_ALIGN_POW2(cwq->dstaddr.len, MIO_CWQFL_ALIGN) / MIO_CWQFL_SIZE;
|
||||
if (cwqfl_index < MIO_COUNTOF(mio->cwqfl))
|
||||
{
|
||||
/* reuse the cwq object if dstaddr is 0 in size. chain it to the free list */
|
||||
cwq->q_next = mio->cwqfl[cwqfl_index];
|
||||
mio->cwqfl[cwqfl_index] = cwq;
|
||||
}
|
||||
else
|
||||
{
|
||||
/* TODO: more reuse of objects of different size? */
|
||||
mio_freemem (mio, cwq);
|
||||
}
|
||||
|
||||
if (dev_to_halt) mio_dev_halt (dev_to_halt);
|
||||
}
|
||||
fire_cwq_handlers (mio);
|
||||
|
||||
/* execute the scheduled jobs before checking devices with the
|
||||
* multiplexer. the scheduled jobs can safely destroy the devices */
|
||||
mio_firetmrjobs (mio, MIO_NULL, MIO_NULL);
|
||||
|
||||
/* execute callbacks for completed write operations again in case there were some jobs initiaated in the timer jobs */
|
||||
//fire_cwq_handlers (mio);
|
||||
|
||||
if (!MIO_DEVL_IS_EMPTY(&mio->actdev))
|
||||
{
|
||||
/* wait on the multiplexer only if there is at least 1 active device */
|
||||
@ -618,7 +688,7 @@ int mio_exec (mio_t* mio)
|
||||
|
||||
if (mio_sys_waitmux(mio, &tmout, handle_event) <= -1)
|
||||
{
|
||||
MIO_DEBUG0 (mio, "WARNING - Failed to wait on mutiplexer\n");
|
||||
MIO_DEBUG0 (mio, "MIO - WARNING - Failed to wait on mutiplexer\n");
|
||||
ret = -1;
|
||||
}
|
||||
}
|
||||
@ -627,7 +697,7 @@ int mio_exec (mio_t* mio)
|
||||
while (!MIO_DEVL_IS_EMPTY(&mio->hltdev))
|
||||
{
|
||||
mio_dev_t* dev = MIO_DEVL_FIRST_DEV(&mio->hltdev);
|
||||
MIO_DEBUG1 (mio, "Killing HALTED device %p\n", dev);
|
||||
MIO_DEBUG1 (mio, "MIO - Killing HALTED device %p\n", dev);
|
||||
mio_dev_kill (dev);
|
||||
}
|
||||
|
||||
@ -846,6 +916,7 @@ void mio_dev_kill (mio_dev_t* dev)
|
||||
next = MIO_CWQ_NEXT(cwq);
|
||||
if (cwq->dev == dev)
|
||||
{
|
||||
cwq->dev->cw_count--;
|
||||
MIO_CWQ_UNLINK (cwq);
|
||||
mio_freemem (mio, cwq);
|
||||
}
|
||||
@ -906,7 +977,7 @@ void mio_dev_halt (mio_dev_t* dev)
|
||||
|
||||
if (dev->dev_cap & MIO_DEV_CAP_ACTIVE)
|
||||
{
|
||||
MIO_DEBUG1 (mio, "HALTING DEVICE %p\n", dev);
|
||||
MIO_DEBUG1 (mio, "MIO - HALTING DEVICE %p\n", dev);
|
||||
|
||||
/* delink the device object from the active device list */
|
||||
MIO_DEVL_UNLINK_DEV (dev);
|
||||
|
Reference in New Issue
Block a user