enhanced the IO multiplexer code to utilize kqueue()/kevent() if available

This commit is contained in:
hyung-hwan 2021-07-25 17:52:05 +00:00
parent 5294db4d20
commit 9327c06745
5 changed files with 172 additions and 60 deletions

View File

@ -353,7 +353,7 @@ int hio_setoption (hio_t* hio, hio_option_t id, const void* value)
{
hio_uch_t* v1;
hio_bch_t* v2;
hio_ucs_t* v = (hio_bcs_t*)value;
hio_ucs_t* v = (hio_ucs_t*)value;
v1 = hio_dupuchars(hio, v->ptr, v->len);
if (HIO_UNLIKELY(!v1)) return -1;
@ -1243,7 +1243,7 @@ int hio_dev_watch (hio_dev_t* dev, hio_dev_watch_cmd_t cmd, int events)
}
/*ev.data.ptr = dev;*/
dev_cap = dev->dev_cap & ~(DEV_CAP_ALL_WATCHED | HIO_DEV_CAP_WATCH_SUSPENDED); /* UGLY to use HIO_DEV_CAP_WATCH_SUSPENDED here */
dev_cap = dev->dev_cap & ~(DEV_CAP_ALL_WATCHED | HIO_DEV_CAP_WATCH_SUSPENDED | HIO_DEV_CAP_WATCH_REREG_REQUIRED); /* UGLY to use HIO_DEV_CAP_WATCH_SUSPENDED and HIO_DEV_CAP_WATCH_REREG_REQUIRED here */
switch (cmd)
{
@ -1314,7 +1314,7 @@ int hio_dev_watch (hio_dev_t* dev, hio_dev_watch_cmd_t cmd, int events)
}
/* UGLY. HIO_DEV_CAP_WATCH_SUSPENDED may be set/unset by hio_sys_ctrlmux. I need this to reflect it */
dev->dev_cap = dev_cap | (dev->dev_cap & HIO_DEV_CAP_WATCH_SUSPENDED);
dev->dev_cap = dev_cap | (dev->dev_cap & (HIO_DEV_CAP_WATCH_SUSPENDED | HIO_DEV_CAP_WATCH_REREG_REQUIRED));
return 0;
}

View File

@ -431,7 +431,8 @@ enum hio_dev_cap_t
HIO_DEV_CAP_ZOMBIE = (1 << 17),
HIO_DEV_CAP_RENEW_REQUIRED = (1 << 18),
HIO_DEV_CAP_WATCH_STARTED = (1 << 19),
HIO_DEV_CAP_WATCH_SUSPENDED = (1 << 20)
HIO_DEV_CAP_WATCH_SUSPENDED = (1 << 20),
HIO_DEV_CAP_WATCH_REREG_REQUIRED = (1 << 21),
};
typedef enum hio_dev_cap_t hio_dev_cap_t;

View File

@ -1366,6 +1366,22 @@ fcntl (rdev->hnd, F_SETFL, flags | O_NONBLOCK);
return -1;
}
if (rdev->dev_cap & HIO_DEV_CAP_WATCH_REREG_REQUIRED)
{
/* On NetBSD, the listening socket added before listen()
* doesn't generate an event even if a new connection is ready
* to be accepted. */
/* TODO: need to keep the old watch flags before STOP and
* use the flags witn START */
if (hio_dev_watch(rdev, HIO_DEV_WATCH_STOP, 0) <= -1 ||
hio_dev_watch(rdev, HIO_DEV_WATCH_START, 0) <= -1)
{
hio_stop (hio, HIO_STOPREQ_WATCHER_ERROR);
return -1;
}
}
rdev->tmout = lstn->accept_tmout;
HIO_DEV_SCK_SET_PROGRESS (rdev, HIO_DEV_SCK_LISTENING);
@ -1652,7 +1668,21 @@ static int accept_incoming_connection (hio_dev_sck_t* rdev)
/* this is a server(lisening) socket */
#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC) && defined(HAVE_ACCEPT4)
#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC) && defined(HAVE_PACCEPT)
flags = SOCK_NONBLOCK | SOCK_CLOEXEC;
addrlen = HIO_SIZEOF(remoteaddr);
clisck = paccept(rdev->hnd, (struct sockaddr*)&remoteaddr, &addrlen, HIO_NULL, flags);
if (clisck <= -1)
{
if (errno != ENOSYS) goto accept_error;
/* go on for the normal 3-parameter accept */
}
else
{
goto accept_done;
}
#elif defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC) && defined(HAVE_ACCEPT4)
flags = SOCK_NONBLOCK | SOCK_CLOEXEC;
addrlen = HIO_SIZEOF(remoteaddr);
@ -2002,7 +2032,7 @@ static int dev_evcb_sck_on_read_qx (hio_dev_t* dev, const void* data, hio_iolen_
{
if (make_accepted_client_connection(rdev, qxmsg->syshnd, &qxmsg->remoteaddr, qxmsg->scktype) <= -1)
{
printf ("unable to accept new client connection %d\n", qxmsg->syshnd);
/*printf ("unable to accept new client connection %d\n", qxmsg->syshnd);*/
return (rdev->state & HIO_DEV_SCK_LENIENT)? 0: -1;
}
}

View File

@ -111,6 +111,13 @@ int hio_sys_initmux (hio_t* hio)
}
#endif
/* register the control pipe */
{
struct kevent chlist;
EV_SET(&chlist, mux->ctrlp[0], EVFILT_READ, EV_ADD | EV_ENABLE, 0, 0, 0);
kevent(mux->kq, &chlist, 1, HIO_NULL, 0, HIO_NULL);
}
#elif defined(USE_EPOLL)
#if defined(HAVE_EPOLL_CREATE1) && defined(EPOLL_CLOEXEC)
@ -186,7 +193,9 @@ void hio_sys_finimux (hio_t* hio)
#elif defined(USE_KQUEUE)
if (mux->ctrlp[0] != HIO_SYSHND_INVALID)
{
/* TODO: */
struct kevent chlist;
EV_SET(&chlist, mux->ctrlp[0], EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, 0);
kevent(mux->kq, &chlist, 1, HIO_NULL, 0, HIO_NULL);
}
close (mux->kq);
@ -291,7 +300,6 @@ int hio_sys_ctrlmux (hio_t* hio, hio_sys_mux_cmd_t cmd, hio_dev_t* dev, int dev_
hnd = dev->dev_mth->getsyshnd(dev);
#if 1
if (cmd == HIO_SYS_MUX_CMD_INSERT)
{
if (secure_poll_map_slot_for_hnd(hio, hnd) <= -1) return -1;
@ -304,30 +312,6 @@ int hio_sys_ctrlmux (hio_t* hio, hio_sys_mux_cmd_t cmd, hio_dev_t* dev, int dev_
return -1;
}
}
#else
if (hnd >= mux->map.capa)
{
hio_oow_t new_capa;
hio_oow_t* tmp;
if (cmd != HIO_SYS_MUX_CMD_INSERT)
{
hio_seterrnum (hio, HIO_ENOENT);
return -1;
}
new_capa = HIO_ALIGN_POW2((hnd + 1), 256);
tmp = hio_reallocmem(hio, mux->map.ptr, new_capa * HIO_SIZEOF(*tmp));
if (HIO_UNLIKELY(!tmp)) return -1;
for (idx = mux->map.capa; idx < new_capa; idx++) tmp[idx] = MUX_INDEX_INVALID;
mux->map.ptr = tmp;
mux->map.capa = new_capa;
}
#endif
idx = mux->map.ptr[hnd];
switch (cmd)
@ -340,34 +324,7 @@ int hio_sys_ctrlmux (hio_t* hio, hio_sys_mux_cmd_t cmd, hio_dev_t* dev, int dev_
}
do_insert:
#if 1
if (HIO_UNLIKELY(secure_poll_data_slot_for_insert(hio) <= -1)) return -1;
#else
if (mux->pd.size >= mux->pd.capa)
{
hio_oow_t new_capa;
struct pollfd* tmp1;
hio_dev_t** tmp2;
new_capa = HIO_ALIGN_POW2(mux->pd.size + 1, 256);
tmp1 = hio_reallocmem(hio, mux->pd.pfd, new_capa * HIO_SIZEOF(*tmp1));
if (HIO_UNLIKELY(!tmp1)) return -1;
tmp2 = hio_reallocmem(hio, mux->pd.dptr, new_capa * HIO_SIZEOF(*tmp2));
if (HIO_UNLIKELY(!tmp2))
{
hio_freemem (hio, tmp1);
return -1;
}
mux->pd.pfd = tmp1;
mux->pd.dptr = tmp2;
mux->pd.capa = new_capa;
}
#endif
idx = mux->pd.size++;
mux->pd.pfd[idx].fd = hnd;
@ -453,6 +410,83 @@ int hio_sys_ctrlmux (hio_t* hio, hio_sys_mux_cmd_t cmd, hio_dev_t* dev, int dev_
hio_seterrnum (hio, HIO_EINVAL);
return -1;
}
#elif defined(USE_KQUEUE)
hio_sys_mux_t* mux = &hio->sysdep->mux;
hio_syshnd_t hnd;
struct kevent chlist[2];
int x;
HIO_ASSERT (hio, hio == dev->hio);
hnd = dev->dev_mth->getsyshnd(dev);
switch (cmd)
{
case HIO_SYS_MUX_CMD_INSERT:
{
int i_flag, o_flag;
if (HIO_UNLIKELY(dev->dev_cap & HIO_DEV_CAP_WATCH_SUSPENDED))
{
hio_seterrnum (hio, HIO_EEXIST);
return -1;
}
i_flag = (dev_cap & HIO_DEV_CAP_IN_WATCHED)? EV_ENABLE: EV_DISABLE;
o_flag = (dev_cap & HIO_DEV_CAP_OUT_WATCHED)? EV_ENABLE: EV_DISABLE;
EV_SET (&chlist[1], hnd, EVFILT_READ, EV_ADD | i_flag, 0, 0, dev);
EV_SET (&chlist[0], hnd, EVFILT_WRITE, EV_ADD | o_flag, 0, 0, dev);
x = kevent(mux->kq, chlist, 2, HIO_NULL, 0, HIO_NULL);
if (x >= 0) dev->dev_cap |= HIO_DEV_CAP_WATCH_REREG_REQUIRED; /* ugly hack for the listening sockets in NetBSD */
/* the CMD_INSERT comes with at MIO_DEV_CAP_IN_WATCHD set.
* skip checking to set WATCH_SUSPENDED */
break;
}
case HIO_SYS_MUX_CMD_UPDATE:
{
int i_flag, o_flag;
i_flag = (dev_cap & HIO_DEV_CAP_IN_WATCHED)? EV_ENABLE: EV_DISABLE;
o_flag = (dev_cap & HIO_DEV_CAP_OUT_WATCHED)? EV_ENABLE: EV_DISABLE;
EV_SET (&chlist[0], hnd, EVFILT_READ, EV_ADD | i_flag, 0, 0, dev);
EV_SET (&chlist[1], hnd, EVFILT_WRITE, EV_ADD | o_flag, 0, 0, dev);
x = kevent(mux->kq, chlist, 2, HIO_NULL, 0, HIO_NULL);
if (x >= 0)
{
if (i_flag == EV_DISABLE && o_flag == EV_DISABLE)
dev->dev_cap &= ~HIO_DEV_CAP_WATCH_SUSPENDED;
else
dev->dev_cap |= HIO_DEV_CAP_WATCH_SUSPENDED;
}
break;
}
case HIO_SYS_MUX_CMD_DELETE:
EV_SET (&chlist[0], hnd, EVFILT_READ, EV_DELETE | EV_DISABLE, 0, 0, dev);
EV_SET (&chlist[1], hnd, EVFILT_WRITE, EV_DELETE | EV_DISABLE, 0, 0, dev);
x = kevent(mux->kq, chlist, 2, HIO_NULL, 0, HIO_NULL);
if (x >= 0) dev->dev_cap &= ~HIO_DEV_CAP_WATCH_SUSPENDED; /* just clear this */
break;
default:
hio_seterrnum (hio, HIO_EINVAL);
return -1;
}
if (x <= -1)
{
hio_seterrwithsyserr (hio, 0, errno);
return -1;
}
return 0;
#elif defined(USE_EPOLL)
hio_sys_mux_t* mux = &hio->sysdep->mux;
struct epoll_event ev;
@ -586,6 +620,52 @@ int hio_sys_waitmux (hio_t* hio, const hio_ntime_t* tmout, hio_sys_mux_evtcb_t e
event_handler (hio, dev, events, 0);
}
}
#elif defined(USE_KQUEUE)
hio_sys_mux_t* mux = &hio->sysdep->mux;
struct timespec ts;
int nentries, i;
ts.tv_sec = tmout->sec;
ts.tv_nsec = tmout->nsec;
nentries = kevent(mux->kq, HIO_NULL, 0, mux->revs, HIO_COUNTOF(mux->revs), &ts);
if (nentries <= -1)
{
if (errno == EINTR) return 0; /* it's actually ok */
/* other errors are critical - EBADF, EFAULT, EINVAL */
hio_seterrwithsyserr (hio, 0, errno);
return -1;
}
for (i = 0; i < nentries; i++)
{
int events = 0;
hio_dev_t* dev;
dev = mux->revs[i].udata;
if (HIO_LIKELY(dev))
{
HIO_ASSERT (hio, mux->revs[i].ident == dev->dev_mth->getsyshnd(dev));
if (mux->revs[i].flags & EV_ERROR) events |= HIO_DEV_EVENT_ERR;
if (mux->revs[i].flags & EV_EOF) events |= HIO_DEV_EVENT_HUP;
if (mux->revs[i].filter == EVFILT_READ) events |= HIO_DEV_EVENT_IN;
else if (mux->revs[i].filter == EVFILT_WRITE) events |= HIO_DEV_EVENT_OUT;
if (HIO_LIKELY(events)) event_handler (hio, dev, events, 0);
}
else if (mux->ctrlp[0] != HIO_SYSHND_INVALID)
{
/* internal pipe for signaling */
hio_uint8_t tmp[16];
HIO_ASSERT (hio, mux->revs[i].ident == mux->ctrlp[0]);
while (read(mux->ctrlp[0], tmp, HIO_SIZEOF(tmp)) > 0) ;
}
}
#elif defined(USE_EPOLL)

View File

@ -72,7 +72,7 @@ struct hio_sys_mux_t
#elif defined(USE_KQUEUE)
struct hio_sys_mutx_t
struct hio_sys_mux_t
{
int kq;
@ -80,6 +80,7 @@ struct hio_sys_mutx_t
int ctrlp[2];
};
#elif defined(USE_EPOLL)
struct hio_sys_mux_t