From 75887e4f746135006766290cdb344946823579db Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Sat, 22 Feb 2020 18:24:49 +0000 Subject: [PATCH] adding writev support --- mio/bin/t01.c | 10 ++- mio/lib/mio-sck.h | 10 ++- mio/lib/mio.c | 225 ++++++++++++++++++++++++++++++++++++++++++++-- mio/lib/mio.h | 12 ++- mio/lib/pro.c | 22 ++++- mio/lib/sck.c | 125 +++++++++++++++++++++++++- 6 files changed, 391 insertions(+), 13 deletions(-) diff --git a/mio/bin/t01.c b/mio/bin/t01.c index e38e6c4..cb0021a 100644 --- a/mio/bin/t01.c +++ b/mio/bin/t01.c @@ -50,7 +50,7 @@ # if defined(HAVE_OPENSSL_ENGINE_H) # include # endif -//# define USE_SSL +# define USE_SSL #endif /* ========================================================================= */ @@ -160,6 +160,12 @@ static void tcp_sck_on_disconnect (mio_dev_sck_t* tcp) static void tcp_sck_on_connect (mio_dev_sck_t* tcp) { mio_bch_t buf1[128], buf2[128]; + mio_iovec_t iov[] = + { + { "hello", 5 }, + { "world", 5 }, + { "mio test", 8 } + }; mio_skadtobcstr (tcp->mio, &tcp->localaddr, buf1, MIO_COUNTOF(buf1), MIO_SKAD_TO_BCSTR_ADDR | MIO_SKAD_TO_BCSTR_PORT); mio_skadtobcstr (tcp->mio, &tcp->remoteaddr, buf2, MIO_COUNTOF(buf2), MIO_SKAD_TO_BCSTR_ADDR | MIO_SKAD_TO_BCSTR_PORT); @@ -173,7 +179,7 @@ static void tcp_sck_on_connect (mio_dev_sck_t* tcp) MIO_INFO3 (tcp->mio, "DEVICE accepted client device... .LOCAL %hs REMOTE %hs SCK: %d\n", buf1, buf2, tcp->sck); } - if (mio_dev_sck_write(tcp, "hello", 5, MIO_NULL, MIO_NULL) <= -1) + if (mio_dev_sck_writev(tcp, iov, MIO_COUNTOF(iov), MIO_NULL, MIO_NULL) <= -1) { mio_dev_sck_halt (tcp); } diff --git a/mio/lib/mio-sck.h b/mio/lib/mio-sck.h index 4013d50..0be2ad0 100644 --- a/mio/lib/mio-sck.h +++ b/mio/lib/mio-sck.h @@ -454,6 +454,14 @@ MIO_EXPORT int mio_dev_sck_write ( const mio_skad_t* dstaddr ); +MIO_EXPORT int mio_dev_sck_writev ( + mio_dev_sck_t* dev, + mio_iovec_t* iov, + mio_iolen_t iovcnt, + void* wrctx, + const mio_skad_t* dstaddr +); + MIO_EXPORT int mio_dev_sck_timedwrite ( mio_dev_sck_t* dev, const void* data, @@ -466,7 +474,7 @@ MIO_EXPORT int mio_dev_sck_timedwrite ( MIO_EXPORT int mio_dev_sck_timedwritev ( mio_dev_sck_t* dev, - const mio_iovec_t* iov, + mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, diff --git a/mio/lib/mio.c b/mio/lib/mio.c index 328f259..aa3ea62 100644 --- a/mio/lib/mio.c +++ b/mio/lib/mio.c @@ -1136,7 +1136,7 @@ static int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const if (len <= 0) /* original length */ { - /* a zero-length writing request is to close the writing end */ + /* a zero-length writing request is to close the writing end. this causes further write request to fail */ dev->dev_cap |= MIO_DEV_CAP_OUT_CLOSED; } @@ -1159,7 +1159,7 @@ static int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const /* read the comment in the 'if' block above for why i enqueue the write completion event * instead of calling the event callback here... - * --> if (dev->dev_evcb->on_write(dev, ulen, wrctx, dstaddr) <= -1) return -1; */ + * ---> if (dev->dev_evcb->on_write(dev, ulen, wrctx, dstaddr) <= -1) return -1; */ goto enqueue_completed_write; } @@ -1269,20 +1269,235 @@ enqueue_completed_write: return 0; } +static int __dev_writev (mio_dev_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr) +{ + mio_t* mio = dev->mio; + mio_iolen_t urem, len; + mio_iolen_t index = 0, i, j; + mio_wq_t* q; + mio_cwq_t* cwq; + mio_oow_t cwq_extra_aligned, cwqfl_index; + int x; + + if (dev->dev_cap & MIO_DEV_CAP_OUT_CLOSED) + { + mio_seterrbfmt (mio, MIO_ENOCAPA, "unable to write to closed device"); + return -1; + } + + len = 0; + for (i = 0; i < iovcnt; i++) len += iov[i].iov_len; + + if (!MIO_WQ_ISEMPTY(&dev->wq)) + { + /* the writing queue is not empty. + * enqueue this request immediately */ + urem = len; + goto enqueue_data; + } + + if (dev->dev_cap & MIO_DEV_CAP_STREAM) + { + /* use the do..while() loop to be able to send a zero-length data */ + mio_iolen_t backup_index = -1, dcnt; + mio_iovec_t backup; + + do + { + dcnt = iovcnt - index; + x = dev->dev_mth->writev(dev, &iov[index], &dcnt, dstaddr); + if (x <= -1) return -1; + else if (x == 0) + { + /* [NOTE] + * the write queue is empty at this moment. a zero-length + * request for a stream device can still get enqueued if the + * write callback returns 0 though i can't figure out if there + * is a compelling reason to do so + */ + goto enqueue_data; /* enqueue remaining data */ + } + + urem -= dcnt; + while (index < iovcnt && (mio_oow_t)dcnt >= iov[index].iov_len) + dcnt -= iov[index++].iov_len; + + if (index == iovcnt) break; + + if (backup_index != index) + { + if (backup_index >= 0) iov[backup_index] = backup; + backup = iov[index]; + backup_index = index; + } + + iov[index].iov_ptr = (void*)((mio_uint8_t*)iov[index].iov_ptr + x); + iov[index].iov_len -= dcnt; + } + while (1); + + if (backup_index >= 0) iov[backup_index] = backup; + + if (iovcnt <= 0) /* original vector count */ + { + /* a zero-length writing request is to close the writing end. this causes further write request to fail */ + dev->dev_cap |= MIO_DEV_CAP_OUT_CLOSED; + } + + /* if i trigger the write completion callback here, the performance + * may increase, but there can be annoying recursion issues if the + * callback requests another writing operation. it's imperative to + * delay the callback until this write function is finished. + * ---> if (dev->dev_evcb->on_write(dev, len, wrctx, dstaddr) <= -1) return -1; */ + goto enqueue_completed_write; + } + else + { + mio_iolen_t dcnt; + + dcnt = iovcnt; + x = dev->dev_mth->writev(dev, iov, &dcnt, dstaddr); + if (x <= -1) return -1; + else if (x == 0) goto enqueue_data; + + urem -= dcnt; + /* partial writing is still considered ok for a non-stream device. */ + + /* read the comment in the 'if' block above for why i enqueue the write completion event + * instead of calling the event callback here... + * ---> if (dev->dev_evcb->on_write(dev, ulen, wrctx, dstaddr) <= -1) return -1; */ + goto enqueue_completed_write; + } + + return 1; /* written immediately and called on_write callback */ + +enqueue_data: + if (dev->dev_cap & MIO_DEV_CAP_OUT_UNQUEUEABLE) + { + /* writing queuing is not requested. so return failure */ + mio_seterrbfmt (mio, MIO_ENOCAPA, "device incapable of queuing"); + return -1; + } + + /* queue the remaining data*/ + q = (mio_wq_t*)mio_allocmem(mio, MIO_SIZEOF(*q) + (dstaddr? dstaddr->len: 0) + urem); + if (!q) return -1; + + q->tmridx = MIO_TMRIDX_INVALID; + q->dev = dev; + q->ctx = wrctx; + + if (dstaddr) + { + q->dstaddr.ptr = (mio_uint8_t*)(q + 1); + q->dstaddr.len = dstaddr->len; + MIO_MEMCPY (q->dstaddr.ptr, dstaddr->ptr, dstaddr->len); + } + else + { + q->dstaddr.len = 0; + } + + q->ptr = (mio_uint8_t*)(q + 1) + q->dstaddr.len; + q->len = urem; + q->olen = len; + for (i = index, j = 0; i < iovcnt; i++) + { + MIO_MEMCPY (&q->ptr[j], iov[i].iov_ptr, iov[i].iov_len); + j += iov[i].iov_len; + } + + if (tmout && MIO_IS_POS_NTIME(tmout)) + { + mio_tmrjob_t tmrjob; + + MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob)); + tmrjob.ctx = q; + mio_gettime (mio, &tmrjob.when); + MIO_ADD_NTIME (&tmrjob.when, &tmrjob.when, tmout); + tmrjob.handler = on_write_timeout; + tmrjob.idxptr = &q->tmridx; + + q->tmridx = mio_instmrjob(mio, &tmrjob); + if (q->tmridx == MIO_TMRIDX_INVALID) + { + mio_freemem (mio, q); + return -1; + } + } + + MIO_WQ_ENQ (&dev->wq, q); + if (!(dev->dev_cap & MIO_DEV_CAP_OUT_WATCHED)) + { + /* if output is not being watched, arrange to do so */ + if (mio_dev_watch(dev, MIO_DEV_WATCH_RENEW, 0) <= -1) + { + unlink_wq (mio, q); + mio_freemem (mio, q); + return -1; + } + } + + return 0; /* request pused to a write queue. */ + +enqueue_completed_write: + /* queue the remaining data*/ + cwq_extra_aligned = (dstaddr? dstaddr->len: 0); + cwq_extra_aligned = MIO_ALIGN_POW2(cwq_extra_aligned, MIO_CWQFL_ALIGN); + cwqfl_index = cwq_extra_aligned / MIO_CWQFL_SIZE; + + if (cwqfl_index < MIO_COUNTOF(mio->cwqfl) && mio->cwqfl[cwqfl_index]) + { + /* take an available cwq object from the free cwq list */ + cwq = dev->mio->cwqfl[cwqfl_index]; + dev->mio->cwqfl[cwqfl_index] = cwq->next; + } + else + { + cwq = (mio_cwq_t*)mio_allocmem(mio, MIO_SIZEOF(*cwq) + cwq_extra_aligned); + if (!cwq) return -1; + } + + MIO_MEMSET (cwq, 0, MIO_SIZEOF(*cwq)); + cwq->dev = dev; + cwq->ctx = wrctx; + if (dstaddr) + { + cwq->dstaddr.ptr = (mio_uint8_t*)(cwq + 1); + cwq->dstaddr.len = dstaddr->len; + MIO_MEMCPY (cwq->dstaddr.ptr, dstaddr->ptr, dstaddr->len); + } + else + { + cwq->dstaddr.len = 0; + } + + cwq->olen = len; + + MIO_CWQ_ENQ (&dev->mio->cwq, cwq); + dev->cw_count++; /* increment the number of complete write operations */ + return 0; +} + + int mio_dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, void* wrctx, const mio_devaddr_t* dstaddr) { return __dev_write(dev, data, len, MIO_NULL, wrctx, dstaddr); } +int mio_dev_writev (mio_dev_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, void* wrctx, const mio_devaddr_t* dstaddr) +{ + return __dev_writev(dev, iov, iovcnt, MIO_NULL, wrctx, dstaddr); +} + int mio_dev_timedwrite (mio_dev_t* dev, const void* data, mio_iolen_t len, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr) { return __dev_write(dev, data, len, tmout, wrctx, dstaddr); } -int mio_dev_timedwritev (mio_dev_t* dev, const mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr) +int mio_dev_timedwritev (mio_dev_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, const mio_devaddr_t* dstaddr) { - //return __dev_write(dev, data, len, tmout, wrctx, dstaddr); - return -1; + return __dev_writev(dev, iov, iovcnt, tmout, wrctx, dstaddr); } /* -------------------------------------------------------------------------- */ diff --git a/mio/lib/mio.h b/mio/lib/mio.h index 2a77d14..cf5916c 100644 --- a/mio/lib/mio.h +++ b/mio/lib/mio.h @@ -193,6 +193,7 @@ struct mio_dev_mth_t /* ------------------------------------------------------------------ */ int (*write) (mio_dev_t* dev, const void* data, mio_iolen_t* len, const mio_devaddr_t* dstaddr); + int (*writev) (mio_dev_t* dev, const mio_iovec_t* iov, mio_iolen_t* iovcnt, const mio_devaddr_t* dstaddr); /* ------------------------------------------------------------------ */ int (*ioctl) (mio_dev_t* dev, int cmd, void* arg); @@ -803,6 +804,13 @@ MIO_EXPORT int mio_dev_write ( const mio_devaddr_t* dstaddr ); +MIO_EXPORT int mio_dev_writev ( + mio_dev_t* dev, + mio_iovec_t* iov, + mio_iolen_t iovcnt, + void* wrctx, + const mio_devaddr_t* dstaddr +); MIO_EXPORT int mio_dev_timedwrite ( mio_dev_t* dev, @@ -816,11 +824,11 @@ MIO_EXPORT int mio_dev_timedwrite ( MIO_EXPORT int mio_dev_timedwritev ( mio_dev_t* dev, - const mio_iovec_t* iov, + mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, - const mio_devaddr_t* dstaddr + const mio_devaddr_t* dstaddr ); diff --git a/mio/lib/pro.c b/mio/lib/pro.c index 94703de..180ae84 100644 --- a/mio/lib/pro.c +++ b/mio/lib/pro.c @@ -563,7 +563,7 @@ static int dev_pro_write_slave (mio_dev_t* dev, const void* data, mio_iolen_t* l mio_dev_pro_slave_t* pro = (mio_dev_pro_slave_t*)dev; ssize_t x; - x = write (pro->pfd, data, *len); + x = write(pro->pfd, data, *len); if (x <= -1) { if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EAGAIN) return 0; /* no data can be written */ @@ -576,6 +576,24 @@ static int dev_pro_write_slave (mio_dev_t* dev, const void* data, mio_iolen_t* l return 1; } +static int dev_pro_writev_slave (mio_dev_t* dev, const mio_iovec_t* iov, mio_iolen_t* iovcnt, const mio_devaddr_t* dstaddr) +{ + mio_dev_pro_slave_t* pro = (mio_dev_pro_slave_t*)dev; + ssize_t x; + + x = writev(pro->pfd, iov, *iovcnt); + if (x <= -1) + { + if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EAGAIN) return 0; /* no data can be written */ + if (errno == EINTR) return 0; + mio_seterrwithsyserr (pro->mio, 0, errno); + return -1; + } + + *iovcnt = x; + return 1; +} + static mio_syshnd_t dev_pro_getsyshnd (mio_dev_t* dev) { return MIO_SYSHND_INVALID; @@ -638,6 +656,7 @@ static mio_dev_mth_t dev_pro_methods = dev_pro_kill_master, dev_pro_getsyshnd, + MIO_NULL, MIO_NULL, MIO_NULL, dev_pro_ioctl @@ -651,6 +670,7 @@ static mio_dev_mth_t dev_pro_methods_slave = dev_pro_read_slave, dev_pro_write_slave, + dev_pro_writev_slave, dev_pro_ioctl }; diff --git a/mio/lib/sck.c b/mio/lib/sck.c index fa795e3..8c922d7 100644 --- a/mio/lib/sck.c +++ b/mio/lib/sck.c @@ -548,6 +548,88 @@ static int dev_sck_write_stateful (mio_dev_t* dev, const void* data, mio_iolen_t return 1; } + +static int dev_sck_writev_stateful (mio_dev_t* dev, const mio_iovec_t* iov, mio_iolen_t* iovcnt, const mio_devaddr_t* dstaddr) +{ + mio_t* mio = dev->mio; + mio_dev_sck_t* rdev = (mio_dev_sck_t*)dev; + +#if 0 && defined(USE_SSL) + if (rdev->ssl) + { + int x; + + if (*iovcnt <= 0) + { + /* it's a writing finish indicator. close the writing end of + * the socket, probably leaving it in the half-closed state */ + if ((x = SSL_shutdown((SSL*)rdev->ssl)) == -1) + { + set_ssl_error (mio, SSL_get_error((SSL*)rdev->ssl, x)); + return -1; + } + return 1; + } + + x = SSL_write((SSL*)rdev->ssl, data, *len); + if (x <= -1) + { + int err = SSL_get_error ((SSL*)rdev->ssl, x); + if (err == SSL_ERROR_WANT_READ || err == SSL_ERROR_WANT_WRITE) return 0; + set_ssl_error (mio, err); + return -1; + } + + *len = x; + } + else + { +#endif + ssize_t x; + int flags = 0; + struct msghdr msg; + + if (*iovcnt <= 0) + { + /* it's a writing finish indicator. close the writing end of + * the socket, probably leaving it in the half-closed state */ + if (shutdown(rdev->sck, SHUT_WR) == -1) + { + mio_seterrwithsyserr (mio, 0, errno); + return -1; + } + + return 1; + } + + /* TODO: flags MSG_DONTROUTE, MSG_DONTWAIT, MSG_MORE, MSG_OOB, MSG_NOSIGNAL */ + #if defined(MSG_NOSIGNAL) + flags |= MSG_NOSIGNAL; + #endif + + #if defined(HAVE_SENDMSG) + MIO_MEMSET (&msg, 0, MIO_SIZEOF(msg)); + msg.msg_iov = (struct iovec*)iov; + msg.msg_iovlen = *iovcnt; + x = sendmsg(rdev->sck, &msg, flags); + #else + x = writev(rdev->sck, (const struct iovec*)iov, *iovcnt); + #endif + if (x == -1) + { + if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EAGAIN) return 0; /* no data can be written */ + if (errno == EINTR) return 0; + mio_seterrwithsyserr (mio, 0, errno); + return -1; + } + + *iovcnt = x; +#if 0 && defined(USE_SSL) + } +#endif + return 1; +} + static int dev_sck_write_stateless (mio_dev_t* dev, const void* data, mio_iolen_t* len, const mio_devaddr_t* dstaddr) { mio_t* mio = dev->mio; @@ -567,6 +649,36 @@ static int dev_sck_write_stateless (mio_dev_t* dev, const void* data, mio_iolen_ return 1; } + +static int dev_sck_writev_stateless (mio_dev_t* dev, const mio_iovec_t* iov, mio_iolen_t* iovcnt, const mio_devaddr_t* dstaddr) +{ + mio_t* mio = dev->mio; + mio_dev_sck_t* rdev = (mio_dev_sck_t*)dev; + struct msghdr msg; + ssize_t x; + + MIO_MEMSET (&msg, 0, MIO_SIZEOF(msg)); + if (MIO_LIKELY(dstaddr)) + { + msg.msg_name = dstaddr->ptr; + msg.msg_namelen = dstaddr->len; + } + msg.msg_iov = (struct iovec*)iov; + msg.msg_iovlen = *iovcnt; + + x = sendmsg(rdev->sck, &msg, 0); + if (x <= -1) + { + if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EAGAIN) return 0; /* no data can be written */ + if (errno == EINTR) return 0; + mio_seterrwithsyserr (mio, 0, errno); + return -1; + } + + *iovcnt = x; + return 1; +} + #if defined(USE_SSL) static int do_ssl (mio_dev_sck_t* dev, int (*ssl_func)(SSL*)) @@ -1039,6 +1151,7 @@ static mio_dev_mth_t dev_sck_methods_stateless = dev_sck_read_stateless, dev_sck_write_stateless, + dev_sck_writev_stateless, dev_sck_ioctl, /* ioctl */ }; @@ -1051,6 +1164,7 @@ static mio_dev_mth_t dev_sck_methods_stateful = dev_sck_read_stateful, dev_sck_write_stateful, + dev_sck_writev_stateful, dev_sck_ioctl, /* ioctl */ }; @@ -1062,6 +1176,7 @@ static mio_dev_mth_t dev_mth_clisck = dev_sck_read_stateful, dev_sck_write_stateful, + dev_sck_writev_stateful, dev_sck_ioctl }; /* ========================================================================= */ @@ -1595,16 +1710,22 @@ int mio_dev_sck_write (mio_dev_sck_t* dev, const void* data, mio_iolen_t dlen, v return mio_dev_write((mio_dev_t*)dev, data, dlen, wrctx, skad_to_devaddr(dev, dstaddr, &devaddr)); } +int mio_dev_sck_writev (mio_dev_sck_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, void* wrctx, const mio_skad_t* dstaddr) +{ + mio_devaddr_t devaddr; + return mio_dev_writev((mio_dev_t*)dev, iov, iovcnt, wrctx, skad_to_devaddr(dev, dstaddr, &devaddr)); +} + int mio_dev_sck_timedwrite (mio_dev_sck_t* dev, const void* data, mio_iolen_t dlen, const mio_ntime_t* tmout, void* wrctx, const mio_skad_t* dstaddr) { mio_devaddr_t devaddr; return mio_dev_timedwrite((mio_dev_t*)dev, data, dlen, tmout, wrctx, skad_to_devaddr(dev, dstaddr, &devaddr)); } -int mio_dev_sck_timedwritev (mio_dev_sck_t* dev, const mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, const mio_skad_t* dstaddr) +int mio_dev_sck_timedwritev (mio_dev_sck_t* dev, mio_iovec_t* iov, mio_iolen_t iovcnt, const mio_ntime_t* tmout, void* wrctx, const mio_skad_t* dstaddr) { mio_devaddr_t devaddr; - return mio_dev_timedwrite((mio_dev_t*)dev, iov, iovcnt, tmout, wrctx, skad_to_devaddr(dev, dstaddr, &devaddr)); + return mio_dev_timedwritev((mio_dev_t*)dev, iov, iovcnt, tmout, wrctx, skad_to_devaddr(dev, dstaddr, &devaddr)); }