added qse_mux_setupchan() and qse_mux_interrupt()

This commit is contained in:
hyung-hwan 2020-09-08 16:26:05 +00:00
parent c8144cc4b4
commit 6907c9ca95
3 changed files with 123 additions and 22 deletions

View File

@ -137,6 +137,19 @@ QSE_EXPORT int qse_mux_poll (
const qse_ntime_t* tmout const qse_ntime_t* tmout
); );
QSE_EXPORT int qse_mux_setupchan (
qse_mux_t* mux
);
/**
* The qse_mux_interrupt() function can be use break the blocking call to
* qse_mux_poll() immediately. You must call qse_mux_setupchan() once on
* the initialized mux before using qse_mux_interrupt().
*/
QSE_EXPORT void qse_mux_interrupt (
qse_mux_t* mux
);
#if defined(__cplusplus) #if defined(__cplusplus)
} }
#endif #endif

View File

@ -670,15 +670,21 @@ static int fnc_dup (qse_awk_rtx_t* rtx, const qse_awk_fnc_info_t* fi)
#else #else
if (oflags) if (oflags)
{ {
int nflags = 0; int xflags;
#if defined(O_CLOEXEC) && defined(FD_CLOEXEC) #if defined(O_CLOEXEC) && defined(FD_CLOEXEC)
if (oflags & O_CLOEXEC) nflags |= FD_CLOEXEC; if (oflags & O_CLOEXEC)
{
xflags = fcntl(fd, F_GETFD);
if (xflags >= 0) fcntl(fd, F_SETFD, xflags | FD_CLOEXEC);
}
#endif #endif
#if defined(O_NONBLOCK) && defined(FD_NONBLOCK) #if defined(O_NONBLOCK)
/*if (oflags & O_NONBLOCK) nflags |= FD_NONBLOCK; dup3() doesn't seem to support NONBLOCK. */ /*if (oflags & O_NONBLOCK)
{
xflags = fcntl(fd, F_GETFL);
if (xflags >= 0) fcntl(fd, F_SETFL, xflags | O_NONBLOCK);
} dup3() doesn't seem to support NONBLOCK. */
#endif #endif
if (nflags) fcntl (fd, F_SETFD, nflags);
} }
#endif #endif
sys_node2->ctx.u.fd = fd; /* dup2 or dup3 closes the descriptor implicitly */ sys_node2->ctx.u.fd = fd; /* dup2 or dup3 closes the descriptor implicitly */
@ -799,21 +805,26 @@ static int fnc_pipe (qse_awk_rtx_t* rtx, const qse_awk_fnc_info_t* fi)
#else #else
if (flags > 0) if (flags > 0)
{ {
int nflags = 0; int xflags;
/* needs translation from O_XXXX to FD_XXXX */
#if defined(O_CLOEXEC) && defined(FD_CLOEXEC) #if defined(O_CLOEXEC) && defined(FD_CLOEXEC)
if (flags & O_CLOEXEC) nflags |= FD_CLOEXEC; if (flags & O_CLOEXEC)
#endif
#if defined(O_NONBLOCK) && defined(FD_NONBLOCK)
if (flags & O_NONBLOCK) nflags |= FD_NONBLOCK;
#endif
if (nflags > 0)
{ {
fcntl (fds[0], F_SETFD, nflags); xflags = fcntl(fds[0], F_GETFD);
fcntl (fds[1], F_SETFD, nflags); if (xflags >= 0) fcntl(fds[0], F_SETFD, xflags | FD_CLOEXEC);
xflags = fcntl(fds[1], F_GETFD);
if (xflags >= 0) fcntl(fds[1], F_SETFD, xflags | FD_CLOEXEC);
} }
#endif
#if defined(O_NONBLOCK)
if (flags & O_NONBLOCK)
{
xflags = fcntl(fds[0], F_GETFL);
if (xflags >= 0) fcntl(fds[0], F_SETFL, xflags | O_NONBLOCK);
xflags = fcntl(fds[1], F_GETFL);
if (xflags >= 0) fcntl(fds[1], F_SETFL, xflags | O_NONBLOCK);
}
#endif
} }
#endif #endif
node1 = new_sys_node_fd(rtx, sys_list, fds[0]); node1 = new_sys_node_fd(rtx, sys_list, fds[0]);

View File

@ -85,11 +85,14 @@
# endif # endif
#endif #endif
#define INVALID_CHAN (-1)
struct qse_mux_t struct qse_mux_t
{ {
qse_mmgr_t* mmgr; qse_mmgr_t* mmgr;
qse_mux_errnum_t errnum; qse_mux_errnum_t errnum;
qse_mux_evtcb_t evtcb; qse_mux_evtcb_t evtcb;
int chan[2]; /* pipe channels for simple interaction */
#if defined(USE_SELECT) #if defined(USE_SELECT)
fd_set rset; fd_set rset;
@ -236,7 +239,7 @@ static qse_mux_errnum_t skerr_to_errnum (int e)
case EEXIST: case EEXIST:
return QSE_MUX_EEXIST; return QSE_MUX_EEXIST;
case EINTR: case EINTR:
return QSE_MUX_EINTR; return QSE_MUX_EINTR;
@ -299,6 +302,8 @@ int qse_mux_init (
QSE_MEMSET (mux, 0, QSE_SIZEOF(*mux)); QSE_MEMSET (mux, 0, QSE_SIZEOF(*mux));
mux->mmgr = mmgr; mux->mmgr = mmgr;
mux->evtcb = evtcb; mux->evtcb = evtcb;
mux->chan[0] = INVALID_CHAN;
mux->chan[1] = INVALID_CHAN;
/* epoll_create returns an error and set errno to EINVAL /* epoll_create returns an error and set errno to EINVAL
* if size is 0. Having a positive size greater than 0 * if size is 0. Having a positive size greater than 0
@ -320,7 +325,7 @@ int qse_mux_init (
#endif #endif
if (mux->kq <= -1) if (mux->kq <= -1)
{ {
mux->errnum = skerr_to_errnum (errno); mux->errnum = skerr_to_errnum(errno);
return -1; return -1;
} }
@ -341,7 +346,7 @@ int qse_mux_init (
#endif #endif
if (mux->fd <= -1) if (mux->fd <= -1)
{ {
mux->errnum = skerr_to_errnum (errno); mux->errnum = skerr_to_errnum(errno);
return -1; return -1;
} }
@ -369,6 +374,22 @@ int qse_mux_init (
void qse_mux_fini (qse_mux_t* mux) void qse_mux_fini (qse_mux_t* mux)
{ {
if (mux->chan[0] != INVALID_CHAN)
{
qse_mux_evt_t evt;
QSE_MEMSET (&evt, 0, QSE_SIZEOF(evt));
evt.hnd = mux->chan[0];
evt.mask = QSE_MUX_IN;
/* evt.data = ... */
qse_mux_delete (mux, &evt);
close (mux->chan[0]);
mux->chan[0] = INVALID_CHAN;
}
if (mux->chan[1] != INVALID_CHAN)
{
close (mux->chan[1]);
mux->chan[1] = INVALID_CHAN;
}
#if defined(USE_SELECT) #if defined(USE_SELECT)
FD_ZERO (&mux->rset); FD_ZERO (&mux->rset);
@ -995,6 +1016,12 @@ int qse_mux_poll (qse_mux_t* mux, const qse_ntime_t* tmout)
QSE_ASSERT (evt->hnd == hnd);*/ QSE_ASSERT (evt->hnd == hnd);*/
evt = mux->ee.ptr[i].data.ptr; evt = mux->ee.ptr[i].data.ptr;
if (evt->hnd == mux->chan[0])
{
qse_uint8_t tmp[128];
while (read(evt->hnd, tmp, QSE_SIZEOF(tmp)) > 0) /* nothing */;
continue;
}
xevt = *evt; xevt = *evt;
xevt.mask = 0; xevt.mask = 0;
@ -1041,7 +1068,7 @@ int qse_mux_poll (qse_mux_t* mux, const qse_ntime_t* tmout)
} }
wcount = count - rcount; wcount = count - rcount;
n = os2_select (mux->fdarr, rcount, wcount, 0, tv); n = os2_select(mux->fdarr, rcount, wcount, 0, tv);
if (n <= -1) if (n <= -1)
{ {
mux->errnum = skerr_to_errnum(sock_errno()); mux->errnum = skerr_to_errnum(sock_errno());
@ -1079,3 +1106,53 @@ int qse_mux_poll (qse_mux_t* mux, const qse_ntime_t* tmout)
} }
int qse_mux_setupchan (qse_mux_t* mux)
{
qse_mux_evt_t evt;
if (mux->chan[0] != INVALID_CHAN)
{
mux->errnum = QSE_MUX_EPERM; /* no allowed to call again */
return -1;
}
#if defined(HAVE_PIPE2)
if (pipe2(mux->chan, O_CLOEXEC | O_NONBLOCK) <= -1)
#else
if (pipe(mux->chan) <= -1)
#endif
{
mux->errnum = skerr_to_errnum(errno);
return -1;
}
else
{
#if defined(HAVE_PIPE2)
/* do nothing */
#else
int flags;
flags = fcntl(mux->chan[0], F_GETFD);
if (flags >= 0) fcntl(mux->chan[0], F_SETFD, flags | FD_CLOEXEC);
flags = fcntl(mux->chan[1], F_GETFD);
if (flags >= 0) fcntl(mux->chan[1], F_SETFD, flags | FD_CLOEXEC);
flags = fcntl(mux->chan[0], F_GETFL);
if (flags >= 0) fcntl(mux->chan[0], F_SETFL, flags | O_NONBLOCK);
flags = fcntl(mux->chan[1], F_GETFL);
if (flags >= 0) fcntl(mux->chan[1], F_SETFL, flags | O_NONBLOCK);
#endif
}
QSE_MEMSET (&evt, 0, QSE_SIZEOF(evt));
evt.hnd = mux->chan[0];
evt.mask = QSE_MUX_IN;
/*evt.data = ... */
return qse_mux_insert(mux, &evt);
}
void qse_mux_interrupt (qse_mux_t* mux)
{
if (mux->chan[1] != INVALID_CHAN)
{
write(mux->chan[1], "Q", 1);
}
}