added more sendfile related code

This commit is contained in:
2020-07-19 06:23:43 +00:00
parent e439c29cb8
commit 8655269e07
7 changed files with 300 additions and 64 deletions

View File

@ -37,6 +37,13 @@ static int kill_and_free_device (mio_dev_t* dev, int force);
static void on_read_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job);
static void on_write_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job);
struct wq_sendfile_data_t
{
mio_syshnd_t in_fd;
mio_foff_t foff;
};
typedef struct wq_sendfile_data_t wq_sendfile_data_t;
/* ========================================================================= */
static void* mmgr_alloc (mio_mmgr_t* mmgr, mio_oow_t size)
@ -444,7 +451,14 @@ static MIO_INLINE void handle_event (mio_t* mio, mio_dev_t* dev, int events, int
send_leftover:
ulen = urem;
x = dev->dev_mth->write(dev, uptr, &ulen, &q->dstaddr);
if (q->sendfile)
{
x = dev->dev_mth->sendfile(dev, ((wq_sendfile_data_t*)uptr)->in_fd, ((wq_sendfile_data_t*)uptr)->foff, &ulen);
}
else
{
x = dev->dev_mth->write(dev, uptr, &ulen, &q->dstaddr);
}
if (x <= -1)
{
mio_dev_halt (dev);
@ -1291,6 +1305,7 @@ static MIO_INLINE int __enqueue_pending_write (mio_dev_t* dev, mio_iolen_t olen,
q = (mio_wq_t*)mio_allocmem(mio, MIO_SIZEOF(*q) + (dstaddr? dstaddr->len: 0) + urem);
if (MIO_UNLIKELY(!q)) return -1;
q->sendfile = 0;
q->tmridx = MIO_TMRIDX_INVALID;
q->dev = dev;
q->ctx = wrctx;
@ -1349,7 +1364,81 @@ static MIO_INLINE int __enqueue_pending_write (mio_dev_t* dev, mio_iolen_t olen,
return 0; /* request pused to a write queue. */
}
static int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr)
static MIO_INLINE int __enqueue_pending_sendfile (mio_dev_t* dev, mio_iolen_t olen, mio_iolen_t urem, mio_foff_t foff, mio_syshnd_t in_fd, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr)
{
mio_t* mio = dev->mio;
mio_wq_t* q;
if (dev->dev_cap & MIO_DEV_CAP_OUT_UNQUEUEABLE)
{
/* writing queuing is not requested. so return failure */
mio_seterrbfmt (mio, MIO_ENOCAPA, "device incapable of queuing");
return -1;
}
/* queue the remaining data*/
q = (mio_wq_t*)mio_allocmem(mio, MIO_SIZEOF(*q) + (dstaddr? dstaddr->len: 0) + MIO_SIZEOF(wq_sendfile_data_t));
if (MIO_UNLIKELY(!q)) return -1;
q->sendfile = 1;
q->tmridx = MIO_TMRIDX_INVALID;
q->dev = dev;
q->ctx = wrctx;
if (dstaddr)
{
q->dstaddr.ptr = (mio_uint8_t*)(q + 1);
q->dstaddr.len = dstaddr->len;
MIO_MEMCPY (q->dstaddr.ptr, dstaddr->ptr, dstaddr->len);
}
else
{
q->dstaddr.len = 0;
}
q->ptr = (mio_uint8_t*)(q + 1) + q->dstaddr.len;
q->len = urem;
q->olen = olen; /* original length to use when invoking on_write() */
((wq_sendfile_data_t*)q->ptr)->in_fd = in_fd;
((wq_sendfile_data_t*)q->ptr)->foff = foff;
if (tmout && MIO_IS_POS_NTIME(tmout))
{
mio_tmrjob_t tmrjob;
MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob));
tmrjob.ctx = q;
mio_gettime (mio, &tmrjob.when);
MIO_ADD_NTIME (&tmrjob.when, &tmrjob.when, tmout);
tmrjob.handler = on_write_timeout;
tmrjob.idxptr = &q->tmridx;
q->tmridx = mio_instmrjob(mio, &tmrjob);
if (q->tmridx == MIO_TMRIDX_INVALID)
{
mio_freemem (mio, q);
return -1;
}
}
MIO_WQ_ENQ (&dev->wq, q);
if (!(dev->dev_cap & MIO_DEV_CAP_OUT_WATCHED))
{
/* if output is not being watched, arrange to do so */
if (mio_dev_watch(dev, MIO_DEV_WATCH_RENEW, MIO_DEV_EVENT_IN) <= -1)
{
unlink_wq (mio, q);
mio_freemem (mio, q);
return -1;
}
}
return 0; /* request pused to a write queue. */
}
static MIO_INLINE int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr)
{
mio_t* mio = dev->mio;
const mio_uint8_t* uptr;
@ -1444,7 +1533,7 @@ enqueue_completed_write:
return __enqueue_completed_write(dev, len, wrctx, dstaddr);
}
static int __dev_writev (mio_dev_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr)
static MIO_INLINE int __dev_writev (mio_dev_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr)
{
mio_t* mio = dev->mio;
mio_iolen_t urem, len;
@ -1551,18 +1640,23 @@ enqueue_completed_write:
static int __dev_sendfile (mio_dev_t* dev, mio_syshnd_t in_fd, mio_foff_t foff, mio_iolen_t len, const mio_ntime_t* tmout, void* wrctx)
{
#if 1
mio_t* mio = dev->mio;
mio_foff_t uoff;
mio_iolen_t urem, ulen;
int x;
if (dev->dev_cap & MIO_DEV_CAP_OUT_CLOSED)
if (MIO_UNLIKELY(dev->dev_cap & MIO_DEV_CAP_OUT_CLOSED))
{
mio_seterrbfmt (mio, MIO_ENOCAPA, "unable to sendfile to closed device");
return -1;
}
if (MIO_UNLIKELY(!dev->dev_mth->sendfile))
{
mio_seterrbfmt (mio, MIO_ENOCAPA, "unable to senfile over unsupported device");
return -1;
}
uoff = foff;
urem = len;
@ -1579,7 +1673,7 @@ static int __dev_sendfile (mio_dev_t* dev, mio_syshnd_t in_fd, mio_foff_t foff,
do
{
ulen = urem;
x = dev->dev_mth->sendfile(dev, in_fd, foff, &ulen);
x = dev->dev_mth->sendfile(dev, in_fd, uoff, &ulen);
if (x <= -1) return -1;
else if (x == 0)
{
@ -1626,21 +1720,11 @@ static int __dev_sendfile (mio_dev_t* dev, mio_syshnd_t in_fd, mio_foff_t foff,
return 1; /* written immediately and called on_write callback. but this line will never be reached */
enqueue_data:
#if 0
iov.iov_ptr = (void*)uptr;
iov.iov_len = urem;
return __enqueue_pending_write(dev, len, urem, &iov, 1, 0, tmout, wrctx, dstaddr);
#else
/* TODO: */
return -1;
#endif
printf ("queueing sendfiel...\n");
return __enqueue_pending_sendfile(dev, len, urem, uoff, in_fd, tmout, wrctx, MIO_NULL);
enqueue_completed_write:
return __enqueue_completed_write(dev, len, wrctx, dstaddr);
#else
return 0;
#endif
return __enqueue_completed_write(dev, len, wrctx, MIO_NULL);
}
int mio_dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, void* wrctx, const mio_devaddr_t* dstaddr)
@ -1653,6 +1737,11 @@ int mio_dev_writev (mio_dev_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, void*
return __dev_writev(dev, iov, iovcnt, MIO_NULL, wrctx, dstaddr);
}
int mio_dev_sendfile (mio_dev_t* dev, mio_syshnd_t in_fd, mio_foff_t foff, mio_iolen_t len, void* wrctx)
{
return __dev_sendfile(dev,in_fd, foff, len, MIO_NULL, wrctx);
}
int mio_dev_timedwrite (mio_dev_t* dev, const void* data, mio_iolen_t len, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr)
{
return __dev_write(dev, data, len, tmout, wrctx, dstaddr);
@ -1663,6 +1752,11 @@ int mio_dev_timedwritev (mio_dev_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, c
return __dev_writev(dev, iov, iovcnt, tmout, wrctx, dstaddr);
}
int mio_dev_timedsendfile (mio_dev_t* dev, mio_syshnd_t in_fd, mio_foff_t foff, mio_iolen_t len, const mio_ntime_t* tmout, void* wrctx)
{
return __dev_sendfile(dev,in_fd, foff, len, tmout, wrctx);
}
/* -------------------------------------------------------------------------- */
void mio_gettime (mio_t* mio, mio_ntime_t* now)