qse/qse/lib/si/aio.c
2016-04-30 04:20:10 +00:00

1469 lines
34 KiB
C

/*
* $Id$
*
Copyright (c) 2006-2016 Chung, Hyung-Hwan. All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
1. Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
THIS SOFTWARE IS PROVIDED BY THE AUTHOR "AS IS" AND ANY EXPRESS OR
IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WAfRRANTIES
OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT
NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#include "aio-prv.h"
#if defined(HAVE_SYS_EPOLL_H)
# include <sys/epoll.h>
# define USE_EPOLL
#elif defined(HAVE_SYS_POLL_H)
# include <sys/poll.h>
# define USE_POLL
#else
# error NO SUPPORTED MULTIPLEXER
#endif
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>
#define DEV_CAPA_ALL_WATCHED (QSE_AIO_DEV_CAPA_IN_WATCHED | QSE_AIO_DEV_CAPA_OUT_WATCHED | QSE_AIO_DEV_CAPA_PRI_WATCHED)
static int schedule_kill_zombie_job (qse_aio_dev_t* dev);
static int kill_and_free_device (qse_aio_dev_t* dev, int force);
#define APPEND_DEVICE_TO_LIST(list,dev) do { \
if ((list)->tail) (list)->tail->dev_next = (dev); \
else (list)->head = (dev); \
(dev)->dev_prev = (list)->tail; \
(dev)->dev_next = QSE_NULL; \
(list)->tail = (dev); \
} while(0)
#define UNLINK_DEVICE_FROM_LIST(list,dev) do { \
if ((dev)->dev_prev) (dev)->dev_prev->dev_next = (dev)->dev_next; \
else (list)->head = (dev)->dev_next; \
if ((dev)->dev_next) (dev)->dev_next->dev_prev = (dev)->dev_prev; \
else (list)->tail = (dev)->dev_prev; \
} while (0)
/* ========================================================================= */
#if defined(USE_POLL)
#define MUX_CMD_INSERT 1
#define MUX_CMD_UPDATE 2
#define MUX_CMD_DELETE 3
#define MUX_INDEX_INVALID QSE_TYPE_MAX(qse_size_t)
struct qse_aio_mux_t
{
struct
{
qse_size_t* ptr;
qse_size_t size;
qse_size_t capa;
} map; /* handle to index */
struct
{
struct pollfd* pfd;
qse_aio_dev_t** dptr;
qse_size_t size;
qse_size_t capa;
} pd; /* poll data */
};
static int mux_open (qse_aio_t* aio)
{
qse_aio_mux_t* mux;
mux = QSE_MMGR_ALLOC (aio->mmgr, QSE_SIZEOF(*mux));
if (!mux)
{
aio->errnum = QSE_AIO_ENOMEM;
return -1;
}
QSE_MEMSET (mux, 0, QSE_SIZEOF(*mux));
aio->mux = mux;
return 0;
}
static void mux_close (qse_aio_t* aio)
{
if (aio->mux)
{
QSE_MMGR_FREE (aio->mmgr, aio->mux);
aio->mux = QSE_NULL;
}
}
static int mux_control (qse_aio_dev_t* dev, int cmd, qse_aio_syshnd_t hnd, int dev_capa)
{
qse_aio_t* aio;
qse_aio_mux_t* mux;
qse_size_t idx;
aio = dev->aio;
mux = (qse_aio_mux_t*)aio->mux;
if (hnd >= mux->map.capa)
{
qse_size_t new_capa;
qse_size_t* tmp;
if (cmd != MUX_CMD_INSERT)
{
aio->errnum = QSE_AIO_ENOENT;
return -1;
}
new_capa = QSE_ALIGNTO_POW2((hnd + 1), 256);
tmp = QSE_MMGR_REALLOC (aio->mmgr, mux->map.ptr, new_capa * QSE_SIZEOF(*tmp));
if (!tmp)
{
aio->errnum = QSE_AIO_ENOMEM;
return -1;
}
for (idx = mux->map.capa; idx < new_capa; idx++)
tmp[idx] = MUX_INDEX_INVALID;
mux->map.ptr = tmp;
mux->map.capa = new_capa;
}
idx = mux->map.ptr[hnd];
if (idx != MUX_INDEX_INVALID)
{
if (cmd == MUX_CMD_INSERT)
{
aio->errnum = QSE_AIO_EEXIST;
return -1;
}
}
else
{
if (cmd != MUX_CMD_INSERT)
{
aio->errnum = QSE_AIO_ENOENT;
return -1;
}
}
switch (cmd)
{
case MUX_CMD_INSERT:
if (mux->pd.size >= mux->pd.capa)
{
qse_size_t new_capa;
struct pollfd* tmp1;
qse_aio_dev_t** tmp2;
new_capa = QSE_ALIGNTO_POW2(mux->pd.size + 1, 256);
tmp1 = QSE_MMGR_REALLOC (aio->mmgr, mux->pd.pfd, new_capa * QSE_SIZEOF(*tmp1));
if (!tmp1)
{
aio->errnum = QSE_AIO_ENOMEM;
return -1;
}
tmp2 = QSE_MMGR_REALLOC (aio->mmgr, mux->pd.dptr, new_capa * QSE_SIZEOF(*tmp2));
if (!tmp2)
{
QSE_MMGR_FREE (aio->mmgr, tmp1);
aio->errnum = QSE_AIO_ENOMEM;
return -1;
}
mux->pd.pfd = tmp1;
mux->pd.dptr = tmp2;
mux->pd.capa = new_capa;
}
idx = mux->pd.size++;
mux->pd.pfd[idx].fd = hnd;
mux->pd.pfd[idx].events = 0;
if (dev_capa & QSE_AIO_DEV_CAPA_IN_WATCHED) mux->pd.pfd[idx].events |= POLLIN;
if (dev_capa & QSE_AIO_DEV_CAPA_OUT_WATCHED) mux->pd.pfd[idx].events |= POLLOUT;
mux->pd.pfd[idx].revents = 0;
mux->pd.dptr[idx] = dev;
mux->map.ptr[hnd] = idx;
return 0;
case MUX_CMD_UPDATE:
QSE_ASSERT (mux->pd.dptr[idx] == dev);
mux->pd.pfd[idx].events = 0;
if (dev_capa & QSE_AIO_DEV_CAPA_IN_WATCHED) mux->pd.pfd[idx].events |= POLLIN;
if (dev_capa & QSE_AIO_DEV_CAPA_OUT_WATCHED) mux->pd.pfd[idx].events |= POLLOUT;
return 0;
case MUX_CMD_DELETE:
QSE_ASSERT (mux->pd.dptr[idx] == dev);
mux->map.ptr[hnd] = MUX_INDEX_INVALID;
/* TODO: speed up deletion. allow a hole in the array.
* delay array compaction if there is a hole.
* set fd for the hole to -1 such that poll()
* ignores it. compact the array if another deletion
* is requested when there is an existing hole. */
idx++;
while (idx < mux->pd.size)
{
int fd;
mux->pd.pfd[idx - 1] = mux->pd.pfd[idx];
mux->pd.dptr[idx - 1] = mux->pd.dptr[idx];
fd = mux->pd.pfd[idx].fd;
mux->map.ptr[fd] = idx - 1;
idx++;
}
mux->pd.size--;
return 0;
default:
aio->errnum = QSE_AIO_EINVAL;
return -1;
}
}
#elif defined(USE_EPOLL)
#define MUX_CMD_INSERT EPOLL_CTL_ADD
#define MUX_CMD_UPDATE EPOLL_CTL_MOD
#define MUX_CMD_DELETE EPOLL_CTL_DEL
struct qse_aio_mux_t
{
int hnd;
struct epoll_event revs[100]; /* TODO: is it a good size? */
};
static int mux_open (qse_aio_t* aio)
{
qse_aio_mux_t* mux;
mux = QSE_MMGR_ALLOC (aio->mmgr, QSE_SIZEOF(*mux));
if (!mux)
{
aio->errnum = QSE_AIO_ENOMEM;
return -1;
}
QSE_MEMSET (mux, 0, QSE_SIZEOF(*mux));
mux->hnd = epoll_create (1000);
if (mux->hnd == -1)
{
aio->errnum = qse_aio_syserrtoerrnum(errno);
QSE_MMGR_FREE (aio->mmgr, mux);
return -1;
}
aio->mux = mux;
return 0;
}
static void mux_close (qse_aio_t* aio)
{
if (aio->mux)
{
close (aio->mux->hnd);
QSE_MMGR_FREE (aio->mmgr, aio->mux);
aio->mux = QSE_NULL;
}
}
static QSE_INLINE int mux_control (qse_aio_dev_t* dev, int cmd, qse_aio_syshnd_t hnd, int dev_capa)
{
struct epoll_event ev;
ev.data.ptr = dev;
ev.events = EPOLLHUP | EPOLLERR /*| EPOLLET*/;
if (dev_capa & QSE_AIO_DEV_CAPA_IN_WATCHED)
{
ev.events |= EPOLLIN;
#if defined(EPOLLRDHUP)
ev.events |= EPOLLRDHUP;
#endif
if (dev_capa & QSE_AIO_DEV_CAPA_PRI_WATCHED) ev.events |= EPOLLPRI;
}
if (dev_capa & QSE_AIO_DEV_CAPA_OUT_WATCHED) ev.events |= EPOLLOUT;
if (epoll_ctl (dev->aio->mux->hnd, cmd, hnd, &ev) == -1)
{
dev->aio->errnum = qse_aio_syserrtoerrnum(errno);
return -1;
}
return 0;
}
#endif
/* ========================================================================= */
qse_aio_t* qse_aio_open (qse_mmgr_t* mmgr, qse_size_t xtnsize, qse_size_t tmrcapa, qse_aio_errnum_t* errnum)
{
qse_aio_t* aio;
aio = QSE_MMGR_ALLOC (mmgr, QSE_SIZEOF(qse_aio_t) + xtnsize);
if (aio)
{
if (qse_aio_init (aio, mmgr, tmrcapa) <= -1)
{
if (errnum) *errnum = aio->errnum;
QSE_MMGR_FREE (mmgr, aio);
aio = QSE_NULL;
}
else QSE_MEMSET (aio + 1, 0, xtnsize);
}
else
{
if (errnum) *errnum = QSE_AIO_ENOMEM;
}
return aio;
}
void qse_aio_close (qse_aio_t* aio)
{
qse_aio_fini (aio);
QSE_MMGR_FREE (aio->mmgr, aio);
}
int qse_aio_init (qse_aio_t* aio, qse_mmgr_t* mmgr, qse_size_t tmrcapa)
{
QSE_MEMSET (aio, 0, QSE_SIZEOF(*aio));
aio->mmgr = mmgr;
/* intialize the multiplexer object */
if (mux_open (aio) <= -1) return -1;
/* initialize the timer object */
if (tmrcapa <= 0) tmrcapa = 1;
aio->tmr.jobs = QSE_MMGR_ALLOC (aio->mmgr, tmrcapa * QSE_SIZEOF(qse_aio_tmrjob_t));
if (!aio->tmr.jobs)
{
aio->errnum = QSE_AIO_ENOMEM;
mux_close (aio);
return -1;
}
aio->tmr.capa = tmrcapa;
return 0;
}
void qse_aio_fini (qse_aio_t* aio)
{
qse_aio_dev_t* dev, * next_dev;
struct
{
qse_aio_dev_t* head;
qse_aio_dev_t* tail;
} diehard;
/* kill all registered devices */
while (aio->actdev.head)
{
qse_aio_killdev (aio, aio->actdev.head);
}
while (aio->hltdev.head)
{
qse_aio_killdev (aio, aio->hltdev.head);
}
/* clean up all zombie devices */
QSE_MEMSET (&diehard, 0, QSE_SIZEOF(diehard));
for (dev = aio->zmbdev.head; dev; )
{
kill_and_free_device (dev, 1);
if (aio->zmbdev.head == dev)
{
/* the deive has not been freed. go on to the next one */
next_dev = dev->dev_next;
/* remove the device from the zombie device list */
UNLINK_DEVICE_FROM_LIST (&aio->zmbdev, dev);
dev->dev_capa &= ~QSE_AIO_DEV_CAPA_ZOMBIE;
/* put it to a private list for aborting */
APPEND_DEVICE_TO_LIST (&diehard, dev);
dev = next_dev;
}
else dev = aio->zmbdev.head;
}
while (diehard.head)
{
/* if the kill method returns failure, it can leak some resource
* because the device is freed regardless of the failure when 2
* is given to kill_and_free_device(). */
dev = diehard.head;
QSE_ASSERT (!(dev->dev_capa & (QSE_AIO_DEV_CAPA_ACTIVE | QSE_AIO_DEV_CAPA_HALTED | QSE_AIO_DEV_CAPA_ZOMBIE)));
UNLINK_DEVICE_FROM_LIST (&diehard, dev);
kill_and_free_device (dev, 2);
}
/* purge scheduled timer jobs and kill the timer */
qse_aio_cleartmrjobs (aio);
QSE_MMGR_FREE (aio->mmgr, aio->tmr.jobs);
/* close the multiplexer */
mux_close (aio);
}
int qse_aio_prologue (qse_aio_t* aio)
{
/* TODO: */
return 0;
}
void qse_aio_epilogue (qse_aio_t* aio)
{
/* TODO: */
}
static QSE_INLINE void unlink_wq (qse_aio_t* aio, qse_aio_wq_t* q)
{
if (q->tmridx != QSE_AIO_TMRIDX_INVALID)
{
qse_aio_deltmrjob (aio, q->tmridx);
QSE_ASSERT (q->tmridx == QSE_AIO_TMRIDX_INVALID);
}
QSE_AIO_WQ_UNLINK (q);
}
static QSE_INLINE void handle_event (qse_aio_dev_t* dev, int events, int rdhup)
{
qse_aio_t* aio;
aio = dev->aio;
aio->renew_watch = 0;
QSE_ASSERT (aio == dev->aio);
if (dev->dev_evcb->ready)
{
int x, xevents;
xevents = events;
if (rdhup) xevents |= QSE_AIO_DEV_EVENT_HUP;
/* return value of ready()
* <= -1 - failure. kill the device.
* == 0 - ok. but don't invoke recv() or send().
* >= 1 - everything is ok. */
x = dev->dev_evcb->ready (dev, xevents);
if (x <= -1)
{
qse_aio_dev_halt (dev);
return;
}
else if (x == 0) goto skip_evcb;
}
if (dev && (events & QSE_AIO_DEV_EVENT_PRI))
{
/* urgent data */
/* TODO: implement urgent data handling */
/*x = dev->dev_mth->urgread (dev, aio->bugbuf, &len);*/
}
if (dev && (events & QSE_AIO_DEV_EVENT_OUT))
{
/* write pending requests */
while (!QSE_AIO_WQ_ISEMPTY(&dev->wq))
{
qse_aio_wq_t* q;
const qse_uint8_t* uptr;
qse_aio_iolen_t urem, ulen;
int x;
q = QSE_AIO_WQ_HEAD(&dev->wq);
uptr = q->ptr;
urem = q->len;
send_leftover:
ulen = urem;
x = dev->dev_mth->write (dev, uptr, &ulen, &q->dstaddr);
if (x <= -1)
{
qse_aio_dev_halt (dev);
dev = QSE_NULL;
break;
}
else if (x == 0)
{
/* keep the left-over */
QSE_MEMMOVE (q->ptr, uptr, urem);
q->len = urem;
break;
}
else
{
uptr += ulen;
urem -= ulen;
if (urem <= 0)
{
/* finished writing a single write request */
int y, out_closed = 0;
if (q->len <= 0 && (dev->dev_capa & QSE_AIO_DEV_CAPA_STREAM))
{
/* it was a zero-length write request.
* for a stream, it is to close the output. */
dev->dev_capa |= QSE_AIO_DEV_CAPA_OUT_CLOSED;
aio->renew_watch = 1;
out_closed = 1;
}
unlink_wq (aio, q);
y = dev->dev_evcb->on_write (dev, q->olen, q->ctx, &q->dstaddr);
QSE_MMGR_FREE (aio->mmgr, q);
if (y <= -1)
{
qse_aio_dev_halt (dev);
dev = QSE_NULL;
break;
}
if (out_closed)
{
/* drain all pending requests.
* callbacks are skipped for drained requests */
while (!QSE_AIO_WQ_ISEMPTY(&dev->wq))
{
q = QSE_AIO_WQ_HEAD(&dev->wq);
unlink_wq (aio, q);
QSE_MMGR_FREE (dev->aio->mmgr, q);
}
break;
}
}
else goto send_leftover;
}
}
if (dev && QSE_AIO_WQ_ISEMPTY(&dev->wq))
{
/* no pending request to write */
if ((dev->dev_capa & QSE_AIO_DEV_CAPA_IN_CLOSED) &&
(dev->dev_capa & QSE_AIO_DEV_CAPA_OUT_CLOSED))
{
qse_aio_dev_halt (dev);
dev = QSE_NULL;
}
else
{
aio->renew_watch = 1;
}
}
}
if (dev && (events & QSE_AIO_DEV_EVENT_IN))
{
qse_aio_devaddr_t srcaddr;
qse_aio_iolen_t len;
int x;
/* the devices are all non-blocking. read as much as possible
* if on_read callback returns 1 or greater. read only once
* if the on_read calllback returns 0. */
while (1)
{
len = QSE_COUNTOF(aio->bigbuf);
x = dev->dev_mth->read (dev, aio->bigbuf, &len, &srcaddr);
if (x <= -1)
{
qse_aio_dev_halt (dev);
dev = QSE_NULL;
break;
}
else if (x == 0)
{
/* no data is available - EWOULDBLOCK or something similar */
break;
}
else if (x >= 1)
{
if (len <= 0 && (dev->dev_capa & QSE_AIO_DEV_CAPA_STREAM))
{
/* EOF received. for a stream device, a zero-length
* read is interpreted as EOF. */
dev->dev_capa |= QSE_AIO_DEV_CAPA_IN_CLOSED;
aio->renew_watch = 1;
/* call the on_read callback to report EOF */
if (dev->dev_evcb->on_read (dev, aio->bigbuf, len, &srcaddr) <= -1 ||
(dev->dev_capa & QSE_AIO_DEV_CAPA_OUT_CLOSED))
{
/* 1. input ended and its reporting failed or
* 2. input ended and no writing is possible */
qse_aio_dev_halt (dev);
dev = QSE_NULL;
}
/* since EOF is received, reading can't be greedy */
break;
}
else
{
int y;
/* TODO: for a stream device, merge received data if bigbuf isn't full and fire the on_read callback
* when x == 0 or <= -1. you can */
/* data available */
y = dev->dev_evcb->on_read (dev, aio->bigbuf, len, &srcaddr);
if (y <= -1)
{
qse_aio_dev_halt (dev);
dev = QSE_NULL;
break;
}
else if (y == 0)
{
/* don't be greedy. read only once
* for this loop iteration */
break;
}
}
}
}
}
if (dev)
{
if (events & (QSE_AIO_DEV_EVENT_ERR | QSE_AIO_DEV_EVENT_HUP))
{
/* if error or hangup has been reported on the device,
* halt the device. this check is performed after
* EPOLLIN or EPOLLOUT check because EPOLLERR or EPOLLHUP
* can be set together with EPOLLIN or EPOLLOUT. */
dev->dev_capa |= QSE_AIO_DEV_CAPA_IN_CLOSED | QSE_AIO_DEV_CAPA_OUT_CLOSED;
aio->renew_watch = 1;
}
else if (dev && rdhup)
{
if (events & (QSE_AIO_DEV_EVENT_IN | QSE_AIO_DEV_EVENT_OUT | QSE_AIO_DEV_EVENT_PRI))
{
/* it may be a half-open state. don't do anything here
* to let the next read detect EOF */
}
else
{
dev->dev_capa |= QSE_AIO_DEV_CAPA_IN_CLOSED | QSE_AIO_DEV_CAPA_OUT_CLOSED;
aio->renew_watch = 1;
}
}
if ((dev->dev_capa & QSE_AIO_DEV_CAPA_IN_CLOSED) &&
(dev->dev_capa & QSE_AIO_DEV_CAPA_OUT_CLOSED))
{
qse_aio_dev_halt (dev);
dev = QSE_NULL;
}
}
skip_evcb:
if (dev && aio->renew_watch && qse_aio_dev_watch (dev, QSE_AIO_DEV_WATCH_RENEW, 0) <= -1)
{
qse_aio_dev_halt (dev);
dev = QSE_NULL;
}
}
static QSE_INLINE int __exec (qse_aio_t* aio)
{
qse_ntime_t tmout;
#if defined(_WIN32)
ULONG nentries, i;
#else
int nentries, i;
qse_aio_mux_t* mux;
#endif
/*if (!aio->actdev.head) return 0;*/
/* execute the scheduled jobs before checking devices with the
* multiplexer. the scheduled jobs can safely destroy the devices */
qse_aio_firetmrjobs (aio, QSE_NULL, QSE_NULL);
if (qse_aio_gettmrtmout (aio, QSE_NULL, &tmout) <= -1)
{
/* defaults to 1 second if timeout can't be acquired */
tmout.sec = 1; /* TODO: make the default timeout configurable */
tmout.nsec = 0;
}
#if defined(_WIN32)
/*
if (GetQueuedCompletionStatusEx (aio->iocp, aio->ovls, QSE_COUNTOF(aio->ovls), &nentries, timeout, FALSE) == FALSE)
{
// TODO: set errnum
return -1;
}
for (i = 0; i < nentries; i++)
{
}
*/
#elif defined(USE_POLL)
mux = (qse_aio_mux_t*)aio->mux;
nentries = poll (mux->pd.pfd, mux->pd.size, QSE_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec));
if (nentries == -1)
{
if (errno == EINTR) return 0;
aio->errnum = qse_aio_syserrtoerrnum(errno);
return -1;
}
for (i = 0; i < mux->pd.size; i++)
{
if (mux->pd.pfd[i].fd >= 0 && mux->pd.pfd[i].revents)
{
int events = 0;
qse_aio_dev_t* dev;
dev = mux->pd.dptr[i];
QSE_ASSERT (!(mux->pd.pfd[i].revents & POLLNVAL));
if (mux->pd.pfd[i].revents & POLLIN) events |= QSE_AIO_DEV_EVENT_IN;
if (mux->pd.pfd[i].revents & POLLOUT) events |= QSE_AIO_DEV_EVENT_OUT;
if (mux->pd.pfd[i].revents & POLLPRI) events |= QSE_AIO_DEV_EVENT_PRI;
if (mux->pd.pfd[i].revents & POLLERR) events |= QSE_AIO_DEV_EVENT_ERR;
if (mux->pd.pfd[i].revents & POLLHUP) events |= QSE_AIO_DEV_EVENT_HUP;
handle_event (dev, events, 0);
}
}
#elif defined(USE_EPOLL)
mux = (qse_aio_mux_t*)aio->mux;
nentries = epoll_wait (mux->hnd, mux->revs, QSE_COUNTOF(mux->revs), QSE_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec));
if (nentries == -1)
{
if (errno == EINTR) return 0; /* it's actually ok */
/* other errors are critical - EBADF, EFAULT, EINVAL */
aio->errnum = qse_aio_syserrtoerrnum(errno);
return -1;
}
/* TODO: merge events??? for the same descriptor */
for (i = 0; i < nentries; i++)
{
int events = 0, rdhup = 0;
qse_aio_dev_t* dev;
dev = mux->revs[i].data.ptr;
if (mux->revs[i].events & EPOLLIN) events |= QSE_AIO_DEV_EVENT_IN;
if (mux->revs[i].events & EPOLLOUT) events |= QSE_AIO_DEV_EVENT_OUT;
if (mux->revs[i].events & EPOLLPRI) events |= QSE_AIO_DEV_EVENT_PRI;
if (mux->revs[i].events & EPOLLERR) events |= QSE_AIO_DEV_EVENT_ERR;
if (mux->revs[i].events & EPOLLHUP) events |= QSE_AIO_DEV_EVENT_HUP;
#if defined(EPOLLRDHUP)
else if (mux->revs[i].events & EPOLLRDHUP) rdhup = 1;
#endif
handle_event (dev, events, rdhup);
}
#else
# error NO SUPPORTED MULTIPLEXER
#endif
/* kill all halted devices */
while (aio->hltdev.head)
{
printf (">>>>>>>>>>>>>> KILLING HALTED DEVICE %p\n", aio->hltdev.head);
qse_aio_killdev (aio, aio->hltdev.head);
}
QSE_ASSERT (aio->hltdev.tail == QSE_NULL);
return 0;
}
int qse_aio_exec (qse_aio_t* aio)
{
int n;
aio->in_exec = 1;
n = __exec (aio);
aio->in_exec = 0;
return n;
}
void qse_aio_stop (qse_aio_t* aio, qse_aio_stopreq_t stopreq)
{
aio->stopreq = stopreq;
}
int qse_aio_loop (qse_aio_t* aio)
{
if (!aio->actdev.head) return 0;
aio->stopreq = QSE_AIO_STOPREQ_NONE;
aio->renew_watch = 0;
if (qse_aio_prologue (aio) <= -1) return -1;
while (aio->stopreq == QSE_AIO_STOPREQ_NONE && aio->actdev.head)
{
if (qse_aio_exec (aio) <= -1) break;
/* you can do other things here */
}
qse_aio_epilogue (aio);
return 0;
}
qse_aio_dev_t* qse_aio_makedev (qse_aio_t* aio, qse_size_t dev_size, qse_aio_dev_mth_t* dev_mth, qse_aio_dev_evcb_t* dev_evcb, void* make_ctx)
{
qse_aio_dev_t* dev;
if (dev_size < QSE_SIZEOF(qse_aio_dev_t))
{
aio->errnum = QSE_AIO_EINVAL;
return QSE_NULL;
}
dev = QSE_MMGR_ALLOC (aio->mmgr, dev_size);
if (!dev)
{
aio->errnum = QSE_AIO_ENOMEM;
return QSE_NULL;
}
QSE_MEMSET (dev, 0, dev_size);
dev->aio = aio;
dev->dev_size = dev_size;
/* default capability. dev->dev_mth->make() can change this.
* qse_aio_dev_watch() is affected by the capability change. */
dev->dev_capa = QSE_AIO_DEV_CAPA_IN | QSE_AIO_DEV_CAPA_OUT;
dev->dev_mth = dev_mth;
dev->dev_evcb = dev_evcb;
QSE_AIO_WQ_INIT(&dev->wq);
/* call the callback function first */
aio->errnum = QSE_AIO_ENOERR;
if (dev->dev_mth->make (dev, make_ctx) <= -1)
{
if (aio->errnum == QSE_AIO_ENOERR) aio->errnum = QSE_AIO_EDEVMAKE;
goto oops;
}
/* the make callback must not change these fields */
QSE_ASSERT (dev->dev_mth == dev_mth);
QSE_ASSERT (dev->dev_evcb == dev_evcb);
QSE_ASSERT (dev->dev_prev == QSE_NULL);
QSE_ASSERT (dev->dev_next == QSE_NULL);
/* set some internal capability bits according to the capabilities
* removed by the device making callback for convenience sake. */
if (!(dev->dev_capa & QSE_AIO_DEV_CAPA_IN)) dev->dev_capa |= QSE_AIO_DEV_CAPA_IN_CLOSED;
if (!(dev->dev_capa & QSE_AIO_DEV_CAPA_OUT)) dev->dev_capa |= QSE_AIO_DEV_CAPA_OUT_CLOSED;
#if defined(_WIN32)
if (CreateIoCompletionPort ((HANDLE)dev->dev_mth->getsyshnd(dev), aio->iocp, QSE_AIO_IOCP_KEY, 0) == NULL)
{
/* TODO: set errnum from GetLastError()... */
goto oops_after_make;
}
#else
if (qse_aio_dev_watch (dev, QSE_AIO_DEV_WATCH_START, 0) <= -1) goto oops_after_make;
#endif
/* and place the new device object at the back of the active device list */
APPEND_DEVICE_TO_LIST (&aio->actdev, dev);
dev->dev_capa |= QSE_AIO_DEV_CAPA_ACTIVE;
return dev;
oops_after_make:
if (kill_and_free_device (dev, 0) <= -1)
{
/* schedule a timer job that reattempts to destroy the device */
if (schedule_kill_zombie_job (dev) <= -1)
{
/* job scheduling failed. i have no choice but to
* destroy the device now.
*
* NOTE: this while loop can block the process
* if the kill method keep returning failure */
while (kill_and_free_device (dev, 1) <= -1)
{
if (aio->stopreq != QSE_AIO_STOPREQ_NONE)
{
/* i can't wait until destruction attempt gets
* fully successful. there is a chance that some
* resources can leak inside the device */
kill_and_free_device (dev, 2);
break;
}
}
}
return QSE_NULL;
}
oops:
QSE_MMGR_FREE (aio->mmgr, dev);
return QSE_NULL;
}
static int kill_and_free_device (qse_aio_dev_t* dev, int force)
{
qse_aio_t* aio;
QSE_ASSERT (!(dev->dev_capa & QSE_AIO_DEV_CAPA_ACTIVE));
QSE_ASSERT (!(dev->dev_capa & QSE_AIO_DEV_CAPA_HALTED));
aio = dev->aio;
if (dev->dev_mth->kill(dev, force) <= -1)
{
if (force >= 2) goto free_device;
if (!(dev->dev_capa & QSE_AIO_DEV_CAPA_ZOMBIE))
{
APPEND_DEVICE_TO_LIST (&aio->zmbdev, dev);
dev->dev_capa |= QSE_AIO_DEV_CAPA_ZOMBIE;
}
return -1;
}
free_device:
if (dev->dev_capa & QSE_AIO_DEV_CAPA_ZOMBIE)
{
/* detach it from the zombie device list */
UNLINK_DEVICE_FROM_LIST (&aio->zmbdev, dev);
dev->dev_capa &= ~QSE_AIO_DEV_CAPA_ZOMBIE;
}
QSE_MMGR_FREE (aio->mmgr, dev);
return 0;
}
static void kill_zombie_job_handler (qse_aio_t* aio, const qse_ntime_t* now, qse_aio_tmrjob_t* job)
{
qse_aio_dev_t* dev = (qse_aio_dev_t*)job->ctx;
QSE_ASSERT (dev->dev_capa & QSE_AIO_DEV_CAPA_ZOMBIE);
if (kill_and_free_device (dev, 0) <= -1)
{
if (schedule_kill_zombie_job (dev) <= -1)
{
/* i have to choice but to free up the devide by force */
while (kill_and_free_device (dev, 1) <= -1)
{
if (aio->stopreq != QSE_AIO_STOPREQ_NONE)
{
/* i can't wait until destruction attempt gets
* fully successful. there is a chance that some
* resources can leak inside the device */
kill_and_free_device (dev, 2);
break;
}
}
}
}
}
static int schedule_kill_zombie_job (qse_aio_dev_t* dev)
{
qse_aio_tmrjob_t kill_zombie_job;
qse_ntime_t tmout;
qse_inittime (&tmout, 3, 0); /* TODO: take it from configuration */
QSE_MEMSET (&kill_zombie_job, 0, QSE_SIZEOF(kill_zombie_job));
kill_zombie_job.ctx = dev;
qse_gettime (&kill_zombie_job.when);
qse_addtime (&kill_zombie_job.when, &tmout, &kill_zombie_job.when);
kill_zombie_job.handler = kill_zombie_job_handler;
/*kill_zombie_job.idxptr = &rdev->tmridx_kill_zombie;*/
return qse_aio_instmrjob (dev->aio, &kill_zombie_job) == QSE_AIO_TMRIDX_INVALID? -1: 0;
}
void qse_aio_killdev (qse_aio_t* aio, qse_aio_dev_t* dev)
{
QSE_ASSERT (aio == dev->aio);
if (dev->dev_capa & QSE_AIO_DEV_CAPA_ZOMBIE)
{
QSE_ASSERT (QSE_AIO_WQ_ISEMPTY(&dev->wq));
goto kill_device;
}
/* clear pending send requests */
while (!QSE_AIO_WQ_ISEMPTY(&dev->wq))
{
qse_aio_wq_t* q;
q = QSE_AIO_WQ_HEAD(&dev->wq);
unlink_wq (aio, q);
QSE_MMGR_FREE (aio->mmgr, q);
}
if (dev->dev_capa & QSE_AIO_DEV_CAPA_HALTED)
{
/* this device is in the halted state.
* unlink it from the halted device list */
UNLINK_DEVICE_FROM_LIST (&aio->hltdev, dev);
dev->dev_capa &= ~QSE_AIO_DEV_CAPA_HALTED;
}
else
{
QSE_ASSERT (dev->dev_capa & QSE_AIO_DEV_CAPA_ACTIVE);
UNLINK_DEVICE_FROM_LIST (&aio->actdev, dev);
dev->dev_capa &= ~QSE_AIO_DEV_CAPA_ACTIVE;
}
qse_aio_dev_watch (dev, QSE_AIO_DEV_WATCH_STOP, 0);
kill_device:
if (kill_and_free_device(dev, 0) <= -1)
{
QSE_ASSERT (dev->dev_capa & QSE_AIO_DEV_CAPA_ZOMBIE);
if (schedule_kill_zombie_job (dev) <= -1)
{
/* i have to choice but to free up the devide by force */
while (kill_and_free_device (dev, 1) <= -1)
{
if (aio->stopreq != QSE_AIO_STOPREQ_NONE)
{
/* i can't wait until destruction attempt gets
* fully successful. there is a chance that some
* resources can leak inside the device */
kill_and_free_device (dev, 2);
break;
}
}
}
}
}
void qse_aio_dev_halt (qse_aio_dev_t* dev)
{
if (dev->dev_capa & QSE_AIO_DEV_CAPA_ACTIVE)
{
qse_aio_t* aio;
aio = dev->aio;
/* delink the device object from the active device list */
UNLINK_DEVICE_FROM_LIST (&aio->actdev, dev);
dev->dev_capa &= ~QSE_AIO_DEV_CAPA_ACTIVE;
/* place it at the back of the halted device list */
APPEND_DEVICE_TO_LIST (&aio->hltdev, dev);
dev->dev_capa |= QSE_AIO_DEV_CAPA_HALTED;
}
}
int qse_aio_dev_ioctl (qse_aio_dev_t* dev, int cmd, void* arg)
{
if (dev->dev_mth->ioctl) return dev->dev_mth->ioctl (dev, cmd, arg);
dev->aio->errnum = QSE_AIO_ENOSUP; /* TODO: different error code ? */
return -1;
}
int qse_aio_dev_watch (qse_aio_dev_t* dev, qse_aio_dev_watch_cmd_t cmd, int events)
{
int mux_cmd;
int dev_capa;
/* the virtual device doesn't perform actual I/O.
* it's different from not hanving QSE_AIO_DEV_CAPA_IN and QSE_AIO_DEV_CAPA_OUT.
* a non-virtual device without the capabilities still gets attention
* of the system multiplexer for hangup and error. */
if (dev->dev_capa & QSE_AIO_DEV_CAPA_VIRTUAL) return 0;
/*ev.data.ptr = dev;*/
switch (cmd)
{
case QSE_AIO_DEV_WATCH_START:
/* upon start, only input watching is requested */
events = QSE_AIO_DEV_EVENT_IN;
mux_cmd = MUX_CMD_INSERT;
break;
case QSE_AIO_DEV_WATCH_RENEW:
/* auto-renwal mode. input watching is requested all the time.
* output watching is requested only if there're enqueued
* data for writing. */
events = QSE_AIO_DEV_EVENT_IN;
if (!QSE_AIO_WQ_ISEMPTY(&dev->wq)) events |= QSE_AIO_DEV_EVENT_OUT;
/* fall through */
case QSE_AIO_DEV_WATCH_UPDATE:
/* honor event watching requests as given by the caller */
mux_cmd = MUX_CMD_UPDATE;
break;
case QSE_AIO_DEV_WATCH_STOP:
events = 0; /* override events */
mux_cmd = MUX_CMD_DELETE;
break;
default:
dev->aio->errnum = QSE_AIO_EINVAL;
return -1;
}
dev_capa = dev->dev_capa;
dev_capa &= ~(DEV_CAPA_ALL_WATCHED);
/* this function honors QSE_AIO_DEV_EVENT_IN and QSE_AIO_DEV_EVENT_OUT only
* as valid input event bits. it intends to provide simple abstraction
* by reducing the variety of event bits that the caller has to handle. */
if ((events & QSE_AIO_DEV_EVENT_IN) && !(dev->dev_capa & (QSE_AIO_DEV_CAPA_IN_CLOSED | QSE_AIO_DEV_CAPA_IN_DISABLED)))
{
if (dev->dev_capa & QSE_AIO_DEV_CAPA_IN)
{
if (dev->dev_capa & QSE_AIO_DEV_CAPA_PRI) dev_capa |= QSE_AIO_DEV_CAPA_PRI_WATCHED;
dev_capa |= QSE_AIO_DEV_CAPA_IN_WATCHED;
}
}
if ((events & QSE_AIO_DEV_EVENT_OUT) && !(dev->dev_capa & QSE_AIO_DEV_CAPA_OUT_CLOSED))
{
if (dev->dev_capa & QSE_AIO_DEV_CAPA_OUT) dev_capa |= QSE_AIO_DEV_CAPA_OUT_WATCHED;
}
if (mux_cmd == MUX_CMD_UPDATE && (dev_capa & DEV_CAPA_ALL_WATCHED) == (dev->dev_capa & DEV_CAPA_ALL_WATCHED))
{
/* no change in the device capacity. skip calling epoll_ctl */
}
else
{
if (mux_control (dev, mux_cmd, dev->dev_mth->getsyshnd(dev), dev_capa) <= -1) return -1;
}
dev->dev_capa = dev_capa;
return 0;
}
int qse_aio_dev_read (qse_aio_dev_t* dev, int enabled)
{
if (dev->dev_capa & QSE_AIO_DEV_CAPA_IN_CLOSED)
{
dev->aio->errnum = QSE_AIO_ENOCAPA;
return -1;
}
if (enabled)
{
dev->dev_capa &= ~QSE_AIO_DEV_CAPA_IN_DISABLED;
if (!dev->aio->in_exec && (dev->dev_capa & QSE_AIO_DEV_CAPA_IN_WATCHED)) goto renew_watch_now;
}
else
{
dev->dev_capa |= QSE_AIO_DEV_CAPA_IN_DISABLED;
if (!dev->aio->in_exec && !(dev->dev_capa & QSE_AIO_DEV_CAPA_IN_WATCHED)) goto renew_watch_now;
}
dev->aio->renew_watch = 1;
return 0;
renew_watch_now:
if (qse_aio_dev_watch (dev, QSE_AIO_DEV_WATCH_RENEW, 0) <= -1) return -1;
return 0;
}
static void on_write_timeout (qse_aio_t* aio, const qse_ntime_t* now, qse_aio_tmrjob_t* job)
{
qse_aio_wq_t* q;
qse_aio_dev_t* dev;
int x;
q = (qse_aio_wq_t*)job->ctx;
dev = q->dev;
dev->aio->errnum = QSE_AIO_ETMOUT;
x = dev->dev_evcb->on_write (dev, -1, q->ctx, &q->dstaddr);
QSE_ASSERT (q->tmridx == QSE_AIO_TMRIDX_INVALID);
QSE_AIO_WQ_UNLINK(q);
QSE_MMGR_FREE (aio->mmgr, q);
if (x <= -1) qse_aio_dev_halt (dev);
}
static int __dev_write (qse_aio_dev_t* dev, const void* data, qse_aio_iolen_t len, const qse_ntime_t* tmout, void* wrctx, const qse_aio_devaddr_t* dstaddr)
{
const qse_uint8_t* uptr;
qse_aio_iolen_t urem, ulen;
qse_aio_wq_t* q;
int x;
if (dev->dev_capa & QSE_AIO_DEV_CAPA_OUT_CLOSED)
{
dev->aio->errnum = QSE_AIO_ENOCAPA;
return -1;
}
uptr = data;
urem = len;
if (!QSE_AIO_WQ_ISEMPTY(&dev->wq))
{
/* the writing queue is not empty.
* enqueue this request immediately */
goto enqueue_data;
}
if (dev->dev_capa & QSE_AIO_DEV_CAPA_STREAM)
{
/* use the do..while() loop to be able to send a zero-length data */
do
{
ulen = urem;
x = dev->dev_mth->write (dev, data, &ulen, dstaddr);
if (x <= -1) return -1;
else if (x == 0)
{
/* [NOTE]
* the write queue is empty at this moment. a zero-length
* request for a stream device can still get enqueued if the
* write callback returns 0 though i can't figure out if there
* is a compelling reason to do so
*/
goto enqueue_data; /* enqueue remaining data */
}
else
{
urem -= ulen;
uptr += ulen;
}
}
while (urem > 0);
if (len <= 0) /* original length */
{
/* a zero-length writing request is to close the writing end */
dev->dev_capa |= QSE_AIO_DEV_CAPA_OUT_CLOSED;
}
if (dev->dev_evcb->on_write (dev, len, wrctx, dstaddr) <= -1) return -1;
}
else
{
ulen = urem;
x = dev->dev_mth->write (dev, data, &ulen, dstaddr);
if (x <= -1) return -1;
else if (x == 0) goto enqueue_data;
/* partial writing is still considered ok for a non-stream device */
if (dev->dev_evcb->on_write (dev, ulen, wrctx, dstaddr) <= -1) return -1;
}
return 1; /* written immediately and called on_write callback */
enqueue_data:
if (!(dev->dev_capa & QSE_AIO_DEV_CAPA_OUT_QUEUED))
{
/* writing queuing is not requested. so return failure */
dev->aio->errnum = QSE_AIO_ENOCAPA;
return -1;
}
/* queue the remaining data*/
q = (qse_aio_wq_t*)QSE_MMGR_ALLOC (dev->aio->mmgr, QSE_SIZEOF(*q) + (dstaddr? dstaddr->len: 0) + urem);
if (!q)
{
dev->aio->errnum = QSE_AIO_ENOMEM;
return -1;
}
q->tmridx = QSE_AIO_TMRIDX_INVALID;
q->dev = dev;
q->ctx = wrctx;
if (dstaddr)
{
q->dstaddr.ptr = (qse_uint8_t*)(q + 1);
q->dstaddr.len = dstaddr->len;
QSE_MEMCPY (q->dstaddr.ptr, dstaddr->ptr, dstaddr->len);
}
else
{
q->dstaddr.len = 0;
}
q->ptr = (qse_uint8_t*)(q + 1) + q->dstaddr.len;
q->len = urem;
q->olen = len;
QSE_MEMCPY (q->ptr, uptr, urem);
if (tmout && qse_ispostime(tmout))
{
qse_aio_tmrjob_t tmrjob;
QSE_MEMSET (&tmrjob, 0, QSE_SIZEOF(tmrjob));
tmrjob.ctx = q;
qse_gettime (&tmrjob.when);
qse_addtime (&tmrjob.when, tmout, &tmrjob.when);
tmrjob.handler = on_write_timeout;
tmrjob.idxptr = &q->tmridx;
q->tmridx = qse_aio_instmrjob (dev->aio, &tmrjob);
if (q->tmridx == QSE_AIO_TMRIDX_INVALID)
{
QSE_MMGR_FREE (dev->aio->mmgr, q);
return -1;
}
}
QSE_AIO_WQ_ENQ (&dev->wq, q);
if (!dev->aio->in_exec && !(dev->dev_capa & QSE_AIO_DEV_CAPA_OUT_WATCHED))
{
/* if output is not being watched, arrange to do so */
if (qse_aio_dev_watch (dev, QSE_AIO_DEV_WATCH_RENEW, 0) <= -1)
{
unlink_wq (dev->aio, q);
QSE_MMGR_FREE (dev->aio->mmgr, q);
return -1;
}
}
else
{
dev->aio->renew_watch = 1;
}
return 0; /* request pused to a write queue. */
}
int qse_aio_dev_write (qse_aio_dev_t* dev, const void* data, qse_aio_iolen_t len, void* wrctx, const qse_aio_devaddr_t* dstaddr)
{
return __dev_write (dev, data, len, QSE_NULL, wrctx, dstaddr);
}
int qse_aio_dev_timedwrite (qse_aio_dev_t* dev, const void* data, qse_aio_iolen_t len, const qse_ntime_t* tmout, void* wrctx, const qse_aio_devaddr_t* dstaddr)
{
return __dev_write (dev, data, len, tmout, wrctx, dstaddr);
}
int qse_aio_makesyshndasync (qse_aio_t* aio, qse_aio_syshnd_t hnd)
{
#if defined(F_GETFL) && defined(F_SETFL) && defined(O_NONBLOCK)
int flags;
if ((flags = fcntl (hnd, F_GETFL)) <= -1 ||
(flags = fcntl (hnd, F_SETFL, flags | O_NONBLOCK)) <= -1)
{
aio->errnum = qse_aio_syserrtoerrnum (errno);
return -1;
}
return 0;
#else
aio->errnum = QSE_AIO_ENOSUP;
return -1;
#endif
}
qse_aio_errnum_t qse_aio_syserrtoerrnum (int no)
{
switch (no)
{
case ENOMEM:
return QSE_AIO_ENOMEM;
case EINVAL:
return QSE_AIO_EINVAL;
case EEXIST:
return QSE_AIO_EEXIST;
case ENOENT:
return QSE_AIO_ENOENT;
case EMFILE:
return QSE_AIO_EMFILE;
#if defined(ENFILE)
case ENFILE:
return QSE_AIO_ENFILE;
#endif
#if defined(EWOULDBLOCK) && defined(EAGAIN) && (EWOULDBLOCK != EAGAIN)
case EAGAIN:
case EWOULDBLOCK:
return QSE_AIO_EAGAIN;
#elif defined(EAGAIN)
case EAGAIN:
return QSE_AIO_EAGAIN;
#elif defined(EWOULDBLOCK)
case EWOULDBLOCK:
return QSE_AIO_EAGAIN;
#endif
#if defined(ECONNREFUSED)
case ECONNREFUSED:
return QSE_AIO_ECONRF;
#endif
#if defined(ECONNRESETD)
case ECONNRESET:
return QSE_AIO_ECONRS;
#endif
#if defined(EPERM)
case EPERM:
return QSE_AIO_EPERM;
#endif
default:
return QSE_AIO_ESYSERR;
}
}