adding writev support

This commit is contained in:
hyung-hwan 2020-02-22 18:24:49 +00:00
parent 052e10694d
commit 75887e4f74
6 changed files with 391 additions and 13 deletions

View File

@ -50,7 +50,7 @@
# if defined(HAVE_OPENSSL_ENGINE_H)
# include <openssl/engine.h>
# endif
//# define USE_SSL
# define USE_SSL
#endif
/* ========================================================================= */
@ -160,6 +160,12 @@ static void tcp_sck_on_disconnect (mio_dev_sck_t* tcp)
static void tcp_sck_on_connect (mio_dev_sck_t* tcp)
{
mio_bch_t buf1[128], buf2[128];
mio_iovec_t iov[] =
{
{ "hello", 5 },
{ "world", 5 },
{ "mio test", 8 }
};
mio_skadtobcstr (tcp->mio, &tcp->localaddr, buf1, MIO_COUNTOF(buf1), MIO_SKAD_TO_BCSTR_ADDR | MIO_SKAD_TO_BCSTR_PORT);
mio_skadtobcstr (tcp->mio, &tcp->remoteaddr, buf2, MIO_COUNTOF(buf2), MIO_SKAD_TO_BCSTR_ADDR | MIO_SKAD_TO_BCSTR_PORT);
@ -173,7 +179,7 @@ static void tcp_sck_on_connect (mio_dev_sck_t* tcp)
MIO_INFO3 (tcp->mio, "DEVICE accepted client device... .LOCAL %hs REMOTE %hs SCK: %d\n", buf1, buf2, tcp->sck);
}
if (mio_dev_sck_write(tcp, "hello", 5, MIO_NULL, MIO_NULL) <= -1)
if (mio_dev_sck_writev(tcp, iov, MIO_COUNTOF(iov), MIO_NULL, MIO_NULL) <= -1)
{
mio_dev_sck_halt (tcp);
}

View File

@ -454,6 +454,14 @@ MIO_EXPORT int mio_dev_sck_write (
const mio_skad_t* dstaddr
);
MIO_EXPORT int mio_dev_sck_writev (
mio_dev_sck_t* dev,
mio_iovec_t* iov,
mio_iolen_t iovcnt,
void* wrctx,
const mio_skad_t* dstaddr
);
MIO_EXPORT int mio_dev_sck_timedwrite (
mio_dev_sck_t* dev,
const void* data,
@ -466,7 +474,7 @@ MIO_EXPORT int mio_dev_sck_timedwrite (
MIO_EXPORT int mio_dev_sck_timedwritev (
mio_dev_sck_t* dev,
const mio_iovec_t* iov,
mio_iovec_t* iov,
mio_iolen_t iovcnt,
const mio_ntime_t* tmout,
void* wrctx,

View File

@ -1136,7 +1136,7 @@ static int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const
if (len <= 0) /* original length */
{
/* a zero-length writing request is to close the writing end */
/* a zero-length writing request is to close the writing end. this causes further write request to fail */
dev->dev_cap |= MIO_DEV_CAP_OUT_CLOSED;
}
@ -1159,7 +1159,7 @@ static int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const
/* read the comment in the 'if' block above for why i enqueue the write completion event
* instead of calling the event callback here...
* --> if (dev->dev_evcb->on_write(dev, ulen, wrctx, dstaddr) <= -1) return -1; */
* ---> if (dev->dev_evcb->on_write(dev, ulen, wrctx, dstaddr) <= -1) return -1; */
goto enqueue_completed_write;
}
@ -1269,20 +1269,235 @@ enqueue_completed_write:
return 0;
}
static int __dev_writev (mio_dev_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr)
{
mio_t* mio = dev->mio;
mio_iolen_t urem, len;
mio_iolen_t index = 0, i, j;
mio_wq_t* q;
mio_cwq_t* cwq;
mio_oow_t cwq_extra_aligned, cwqfl_index;
int x;
if (dev->dev_cap & MIO_DEV_CAP_OUT_CLOSED)
{
mio_seterrbfmt (mio, MIO_ENOCAPA, "unable to write to closed device");
return -1;
}
len = 0;
for (i = 0; i < iovcnt; i++) len += iov[i].iov_len;
if (!MIO_WQ_ISEMPTY(&dev->wq))
{
/* the writing queue is not empty.
* enqueue this request immediately */
urem = len;
goto enqueue_data;
}
if (dev->dev_cap & MIO_DEV_CAP_STREAM)
{
/* use the do..while() loop to be able to send a zero-length data */
mio_iolen_t backup_index = -1, dcnt;
mio_iovec_t backup;
do
{
dcnt = iovcnt - index;
x = dev->dev_mth->writev(dev, &iov[index], &dcnt, dstaddr);
if (x <= -1) return -1;
else if (x == 0)
{
/* [NOTE]
* the write queue is empty at this moment. a zero-length
* request for a stream device can still get enqueued if the
* write callback returns 0 though i can't figure out if there
* is a compelling reason to do so
*/
goto enqueue_data; /* enqueue remaining data */
}
urem -= dcnt;
while (index < iovcnt && (mio_oow_t)dcnt >= iov[index].iov_len)
dcnt -= iov[index++].iov_len;
if (index == iovcnt) break;
if (backup_index != index)
{
if (backup_index >= 0) iov[backup_index] = backup;
backup = iov[index];
backup_index = index;
}
iov[index].iov_ptr = (void*)((mio_uint8_t*)iov[index].iov_ptr + x);
iov[index].iov_len -= dcnt;
}
while (1);
if (backup_index >= 0) iov[backup_index] = backup;
if (iovcnt <= 0) /* original vector count */
{
/* a zero-length writing request is to close the writing end. this causes further write request to fail */
dev->dev_cap |= MIO_DEV_CAP_OUT_CLOSED;
}
/* if i trigger the write completion callback here, the performance
* may increase, but there can be annoying recursion issues if the
* callback requests another writing operation. it's imperative to
* delay the callback until this write function is finished.
* ---> if (dev->dev_evcb->on_write(dev, len, wrctx, dstaddr) <= -1) return -1; */
goto enqueue_completed_write;
}
else
{
mio_iolen_t dcnt;
dcnt = iovcnt;
x = dev->dev_mth->writev(dev, iov, &dcnt, dstaddr);
if (x <= -1) return -1;
else if (x == 0) goto enqueue_data;
urem -= dcnt;
/* partial writing is still considered ok for a non-stream device. */
/* read the comment in the 'if' block above for why i enqueue the write completion event
* instead of calling the event callback here...
* ---> if (dev->dev_evcb->on_write(dev, ulen, wrctx, dstaddr) <= -1) return -1; */
goto enqueue_completed_write;
}
return 1; /* written immediately and called on_write callback */
enqueue_data:
if (dev->dev_cap & MIO_DEV_CAP_OUT_UNQUEUEABLE)
{
/* writing queuing is not requested. so return failure */
mio_seterrbfmt (mio, MIO_ENOCAPA, "device incapable of queuing");
return -1;
}
/* queue the remaining data*/
q = (mio_wq_t*)mio_allocmem(mio, MIO_SIZEOF(*q) + (dstaddr? dstaddr->len: 0) + urem);
if (!q) return -1;
q->tmridx = MIO_TMRIDX_INVALID;
q->dev = dev;
q->ctx = wrctx;
if (dstaddr)
{
q->dstaddr.ptr = (mio_uint8_t*)(q + 1);
q->dstaddr.len = dstaddr->len;
MIO_MEMCPY (q->dstaddr.ptr, dstaddr->ptr, dstaddr->len);
}
else
{
q->dstaddr.len = 0;
}
q->ptr = (mio_uint8_t*)(q + 1) + q->dstaddr.len;
q->len = urem;
q->olen = len;
for (i = index, j = 0; i < iovcnt; i++)
{
MIO_MEMCPY (&q->ptr[j], iov[i].iov_ptr, iov[i].iov_len);
j += iov[i].iov_len;
}
if (tmout && MIO_IS_POS_NTIME(tmout))
{
mio_tmrjob_t tmrjob;
MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob));
tmrjob.ctx = q;
mio_gettime (mio, &tmrjob.when);
MIO_ADD_NTIME (&tmrjob.when, &tmrjob.when, tmout);
tmrjob.handler = on_write_timeout;
tmrjob.idxptr = &q->tmridx;
q->tmridx = mio_instmrjob(mio, &tmrjob);
if (q->tmridx == MIO_TMRIDX_INVALID)
{
mio_freemem (mio, q);
return -1;
}
}
MIO_WQ_ENQ (&dev->wq, q);
if (!(dev->dev_cap & MIO_DEV_CAP_OUT_WATCHED))
{
/* if output is not being watched, arrange to do so */
if (mio_dev_watch(dev, MIO_DEV_WATCH_RENEW, 0) <= -1)
{
unlink_wq (mio, q);
mio_freemem (mio, q);
return -1;
}
}
return 0; /* request pused to a write queue. */
enqueue_completed_write:
/* queue the remaining data*/
cwq_extra_aligned = (dstaddr? dstaddr->len: 0);
cwq_extra_aligned = MIO_ALIGN_POW2(cwq_extra_aligned, MIO_CWQFL_ALIGN);
cwqfl_index = cwq_extra_aligned / MIO_CWQFL_SIZE;
if (cwqfl_index < MIO_COUNTOF(mio->cwqfl) && mio->cwqfl[cwqfl_index])
{
/* take an available cwq object from the free cwq list */
cwq = dev->mio->cwqfl[cwqfl_index];
dev->mio->cwqfl[cwqfl_index] = cwq->next;
}
else
{
cwq = (mio_cwq_t*)mio_allocmem(mio, MIO_SIZEOF(*cwq) + cwq_extra_aligned);
if (!cwq) return -1;
}
MIO_MEMSET (cwq, 0, MIO_SIZEOF(*cwq));
cwq->dev = dev;
cwq->ctx = wrctx;
if (dstaddr)
{
cwq->dstaddr.ptr = (mio_uint8_t*)(cwq + 1);
cwq->dstaddr.len = dstaddr->len;
MIO_MEMCPY (cwq->dstaddr.ptr, dstaddr->ptr, dstaddr->len);
}
else
{
cwq->dstaddr.len = 0;
}
cwq->olen = len;
MIO_CWQ_ENQ (&dev->mio->cwq, cwq);
dev->cw_count++; /* increment the number of complete write operations */
return 0;
}
int mio_dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, void* wrctx, const mio_devaddr_t* dstaddr)
{
return __dev_write(dev, data, len, MIO_NULL, wrctx, dstaddr);
}
int mio_dev_writev (mio_dev_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, void* wrctx, const mio_devaddr_t* dstaddr)
{
return __dev_writev(dev, iov, iovcnt, MIO_NULL, wrctx, dstaddr);
}
int mio_dev_timedwrite (mio_dev_t* dev, const void* data, mio_iolen_t len, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr)
{
return __dev_write(dev, data, len, tmout, wrctx, dstaddr);
}
int mio_dev_timedwritev (mio_dev_t* dev, const mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr)
int mio_dev_timedwritev (mio_dev_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr)
{
//return __dev_write(dev, data, len, tmout, wrctx, dstaddr);
return -1;
return __dev_writev(dev, iov, iovcnt, tmout, wrctx, dstaddr);
}
/* -------------------------------------------------------------------------- */

View File

@ -193,6 +193,7 @@ struct mio_dev_mth_t
/* ------------------------------------------------------------------ */
int (*write) (mio_dev_t* dev, const void* data, mio_iolen_t* len, const mio_devaddr_t* dstaddr);
int (*writev) (mio_dev_t* dev, const mio_iovec_t* iov, mio_iolen_t* iovcnt, const mio_devaddr_t* dstaddr);
/* ------------------------------------------------------------------ */
int (*ioctl) (mio_dev_t* dev, int cmd, void* arg);
@ -803,6 +804,13 @@ MIO_EXPORT int mio_dev_write (
const mio_devaddr_t* dstaddr
);
MIO_EXPORT int mio_dev_writev (
mio_dev_t* dev,
mio_iovec_t* iov,
mio_iolen_t iovcnt,
void* wrctx,
const mio_devaddr_t* dstaddr
);
MIO_EXPORT int mio_dev_timedwrite (
mio_dev_t* dev,
@ -816,7 +824,7 @@ MIO_EXPORT int mio_dev_timedwrite (
MIO_EXPORT int mio_dev_timedwritev (
mio_dev_t* dev,
const mio_iovec_t* iov,
mio_iovec_t* iov,
mio_iolen_t iovcnt,
const mio_ntime_t* tmout,
void* wrctx,

View File

@ -576,6 +576,24 @@ static int dev_pro_write_slave (mio_dev_t* dev, const void* data, mio_iolen_t* l
return 1;
}
static int dev_pro_writev_slave (mio_dev_t* dev, const mio_iovec_t* iov, mio_iolen_t* iovcnt, const mio_devaddr_t* dstaddr)
{
mio_dev_pro_slave_t* pro = (mio_dev_pro_slave_t*)dev;
ssize_t x;
x = writev(pro->pfd, iov, *iovcnt);
if (x <= -1)
{
if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EAGAIN) return 0; /* no data can be written */
if (errno == EINTR) return 0;
mio_seterrwithsyserr (pro->mio, 0, errno);
return -1;
}
*iovcnt = x;
return 1;
}
static mio_syshnd_t dev_pro_getsyshnd (mio_dev_t* dev)
{
return MIO_SYSHND_INVALID;
@ -638,6 +656,7 @@ static mio_dev_mth_t dev_pro_methods =
dev_pro_kill_master,
dev_pro_getsyshnd,
MIO_NULL,
MIO_NULL,
MIO_NULL,
dev_pro_ioctl
@ -651,6 +670,7 @@ static mio_dev_mth_t dev_pro_methods_slave =
dev_pro_read_slave,
dev_pro_write_slave,
dev_pro_writev_slave,
dev_pro_ioctl
};

View File

@ -548,6 +548,88 @@ static int dev_sck_write_stateful (mio_dev_t* dev, const void* data, mio_iolen_t
return 1;
}
static int dev_sck_writev_stateful (mio_dev_t* dev, const mio_iovec_t* iov, mio_iolen_t* iovcnt, const mio_devaddr_t* dstaddr)
{
mio_t* mio = dev->mio;
mio_dev_sck_t* rdev = (mio_dev_sck_t*)dev;
#if 0 && defined(USE_SSL)
if (rdev->ssl)
{
int x;
if (*iovcnt <= 0)
{
/* it's a writing finish indicator. close the writing end of
* the socket, probably leaving it in the half-closed state */
if ((x = SSL_shutdown((SSL*)rdev->ssl)) == -1)
{
set_ssl_error (mio, SSL_get_error((SSL*)rdev->ssl, x));
return -1;
}
return 1;
}
x = SSL_write((SSL*)rdev->ssl, data, *len);
if (x <= -1)
{
int err = SSL_get_error ((SSL*)rdev->ssl, x);
if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) return 0;
set_ssl_error (mio, err);
return -1;
}
*len = x;
}
else
{
#endif
ssize_t x;
int flags = 0;
struct msghdr msg;
if (*iovcnt <= 0)
{
/* it's a writing finish indicator. close the writing end of
* the socket, probably leaving it in the half-closed state */
if (shutdown(rdev->sck, SHUT_WR) == -1)
{
mio_seterrwithsyserr (mio, 0, errno);
return -1;
}
return 1;
}
/* TODO: flags MSG_DONTROUTE, MSG_DONTWAIT, MSG_MORE, MSG_OOB, MSG_NOSIGNAL */
#if defined(MSG_NOSIGNAL)
flags |= MSG_NOSIGNAL;
#endif
#if defined(HAVE_SENDMSG)
MIO_MEMSET (&msg, 0, MIO_SIZEOF(msg));
msg.msg_iov = (struct iovec*)iov;
msg.msg_iovlen = *iovcnt;
x = sendmsg(rdev->sck, &msg, flags);
#else
x = writev(rdev->sck, (const struct iovec*)iov, *iovcnt);
#endif
if (x == -1)
{
if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EAGAIN) return 0; /* no data can be written */
if (errno == EINTR) return 0;
mio_seterrwithsyserr (mio, 0, errno);
return -1;
}
*iovcnt = x;
#if 0 && defined(USE_SSL)
}
#endif
return 1;
}
static int dev_sck_write_stateless (mio_dev_t* dev, const void* data, mio_iolen_t* len, const mio_devaddr_t* dstaddr)
{
mio_t* mio = dev->mio;
@ -567,6 +649,36 @@ static int dev_sck_write_stateless (mio_dev_t* dev, const void* data, mio_iolen_
return 1;
}
static int dev_sck_writev_stateless (mio_dev_t* dev, const mio_iovec_t* iov, mio_iolen_t* iovcnt, const mio_devaddr_t* dstaddr)
{
mio_t* mio = dev->mio;
mio_dev_sck_t* rdev = (mio_dev_sck_t*)dev;
struct msghdr msg;
ssize_t x;
MIO_MEMSET (&msg, 0, MIO_SIZEOF(msg));
if (MIO_LIKELY(dstaddr))
{
msg.msg_name = dstaddr->ptr;
msg.msg_namelen = dstaddr->len;
}
msg.msg_iov = (struct iovec*)iov;
msg.msg_iovlen = *iovcnt;
x = sendmsg(rdev->sck, &msg, 0);
if (x <= -1)
{
if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EAGAIN) return 0; /* no data can be written */
if (errno == EINTR) return 0;
mio_seterrwithsyserr (mio, 0, errno);
return -1;
}
*iovcnt = x;
return 1;
}
#if defined(USE_SSL)
static int do_ssl (mio_dev_sck_t* dev, int (*ssl_func)(SSL*))
@ -1039,6 +1151,7 @@ static mio_dev_mth_t dev_sck_methods_stateless =
dev_sck_read_stateless,
dev_sck_write_stateless,
dev_sck_writev_stateless,
dev_sck_ioctl, /* ioctl */
};
@ -1051,6 +1164,7 @@ static mio_dev_mth_t dev_sck_methods_stateful =
dev_sck_read_stateful,
dev_sck_write_stateful,
dev_sck_writev_stateful,
dev_sck_ioctl, /* ioctl */
};
@ -1062,6 +1176,7 @@ static mio_dev_mth_t dev_mth_clisck =
dev_sck_read_stateful,
dev_sck_write_stateful,
dev_sck_writev_stateful,
dev_sck_ioctl
};
/* ========================================================================= */
@ -1595,16 +1710,22 @@ int mio_dev_sck_write (mio_dev_sck_t* dev, const void* data, mio_iolen_t dlen, v
return mio_dev_write((mio_dev_t*)dev, data, dlen, wrctx, skad_to_devaddr(dev, dstaddr, &devaddr));
}
int mio_dev_sck_writev (mio_dev_sck_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, void* wrctx, const mio_skad_t* dstaddr)
{
mio_devaddr_t devaddr;
return mio_dev_writev((mio_dev_t*)dev, iov, iovcnt, wrctx, skad_to_devaddr(dev, dstaddr, &devaddr));
}
int mio_dev_sck_timedwrite (mio_dev_sck_t* dev, const void* data, mio_iolen_t dlen, const mio_ntime_t* tmout, void* wrctx, const mio_skad_t* dstaddr)
{
mio_devaddr_t devaddr;
return mio_dev_timedwrite((mio_dev_t*)dev, data, dlen, tmout, wrctx, skad_to_devaddr(dev, dstaddr, &devaddr));
}
int mio_dev_sck_timedwritev (mio_dev_sck_t* dev, const mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, const mio_skad_t* dstaddr)
int mio_dev_sck_timedwritev (mio_dev_sck_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, const mio_skad_t* dstaddr)
{
mio_devaddr_t devaddr;
return mio_dev_timedwrite((mio_dev_t*)dev, iov, iovcnt, tmout, wrctx, skad_to_devaddr(dev, dstaddr, &devaddr));
return mio_dev_timedwritev((mio_dev_t*)dev, iov, iovcnt, tmout, wrctx, skad_to_devaddr(dev, dstaddr, &devaddr));
}