From 115031d2ceea04bccba981e359caad6fbce245d9 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Thu, 17 Jan 2019 08:09:19 +0000 Subject: [PATCH] added moo_dev_timedread() --- mio/lib/main.c | 32 ++++++++------ mio/lib/mio-pro.c | 2 +- mio/lib/mio-sck.c | 4 -- mio/lib/mio-sck.h | 15 ++++--- mio/lib/mio.c | 105 ++++++++++++++++++++++++++++++++++++++++++---- mio/lib/mio.h | 3 +- 6 files changed, 129 insertions(+), 32 deletions(-) diff --git a/mio/lib/main.c b/mio/lib/main.c index f5d1a6e..415df6f 100644 --- a/mio/lib/main.c +++ b/mio/lib/main.c @@ -189,11 +189,12 @@ printf ("DEVICE accepted client device... .LOCAL %s:%d REMOTE %s:%d\n", buf1, mi static int tcp_sck_on_write (mio_dev_sck_t* tcp, mio_iolen_t wrlen, void* wrctx, const mio_sckaddr_t* dstaddr) { tcp_server_t* ts; + mio_ntime_t tmout; if (wrlen <= -1) { printf ("TCP_SCK_ON_WRITE SEDING TIMED OUT...........\n"); - mio_dev_sck_halt(tcp); + mio_dev_sck_halt (tcp); } else { @@ -204,19 +205,24 @@ else // if (ts->tally >= 2) mio_dev_sck_halt (tcp); printf ("TCP_SCK_ON_WRITE ENABLING READING..............................\n"); - mio_dev_sck_read (tcp, 1); - - //mio_dev_sck_timedread (tcp, 1, 1000); + mio_inittime (&tmout, 5, 0); + //mio_dev_sck_read (tcp, 1); + mio_dev_sck_timedread (tcp, 1, &tmout); } return 0; } -/* TODO: serialize callbacks... */ static int tcp_sck_on_read (mio_dev_sck_t* tcp, const void* buf, mio_iolen_t len, const mio_sckaddr_t* srcaddr) { int n; - if (len <= 0) + if (len <= -1) + { + printf ("TCP_SCK_ON_READ STREAM DEVICE: TIMED OUT...\n"); + mio_dev_sck_halt (tcp); + return 0; + } + else if (len <= 0) { printf ("TCP_SCK_ON_READ STREAM DEVICE: EOF RECEIVED...\n"); /* no outstanding request. but EOF */ @@ -574,7 +580,7 @@ int main (int argc, char* argv[]) SSL_library_init (); #endif - mio = mio_open (&mmgr, 0, 512, MIO_NULL); + mio = mio_open(&mmgr, 0, 512, MIO_NULL); if (!mio) { printf ("Cannot open mio\n"); @@ -635,7 +641,7 @@ int main (int argc, char* argv[]) mio_inittime (&tcp_conn.connect_tmout, 5, 0); tcp_conn.options = MIO_DEV_SCK_CONNECT_SSL; - if (mio_dev_sck_connect (tcp[0], &tcp_conn) <= -1) + if (mio_dev_sck_connect(tcp[0], &tcp_conn) <= -1) { printf ("mio_dev_sck_connect() failed....\n"); /* carry on regardless of failure */ @@ -662,7 +668,7 @@ int main (int argc, char* argv[]) mio_sckaddr_initforip4 (&tcp_bind.localaddr, 1234, MIO_NULL); tcp_bind.options = MIO_DEV_SCK_BIND_REUSEADDR; - if (mio_dev_sck_bind (tcp[1],&tcp_bind) <= -1) + if (mio_dev_sck_bind(tcp[1],&tcp_bind) <= -1) { printf ("tcp[1] mio_dev_sck_bind() failed....\n"); goto oops; @@ -670,7 +676,7 @@ int main (int argc, char* argv[]) tcp_lstn.backlogs = 100; - if (mio_dev_sck_listen (tcp[1], &tcp_lstn) <= -1) + if (mio_dev_sck_listen(tcp[1], &tcp_lstn) <= -1) { printf ("tcp[1] mio_dev_sck_listen() failed....\n"); goto oops; @@ -684,7 +690,7 @@ int main (int argc, char* argv[]) tcp_make.on_connect = tcp_sck_on_connect; tcp_make.on_disconnect = tcp_sck_on_disconnect; - tcp[2] = mio_dev_sck_make (mio, MIO_SIZEOF(tcp_server_t), &tcp_make); + tcp[2] = mio_dev_sck_make(mio, MIO_SIZEOF(tcp_server_t), &tcp_make); if (!tcp[2]) { printf ("Cannot make tcp\n"); @@ -732,7 +738,7 @@ for (i = 0; i < 5; i++) pro_make.on_write = pro_on_write; pro_make.on_close = pro_on_close; - pro = mio_dev_pro_make (mio, 0, &pro_make); + pro = mio_dev_pro_make(mio, 0, &pro_make); if (!pro) { printf ("CANNOT CREATE PROCESS PIPE\n"); @@ -775,7 +781,7 @@ int main (int argc, char* argv[]) mio_dev_sck_connect_t tcp_conn; tcp_server_t* ts; - mio = mio_open (&mmgr, 0, 512, MIO_NULL); + mio = mio_open(&mmgr, 0, 512, MIO_NULL); if (!mio) { printf ("Cannot open mio\n"); diff --git a/mio/lib/mio-pro.c b/mio/lib/mio-pro.c index 5261a9a..fc79759 100644 --- a/mio/lib/mio-pro.c +++ b/mio/lib/mio-pro.c @@ -806,7 +806,7 @@ int mio_dev_pro_timedwrite (mio_dev_pro_t* dev, const void* data, mio_iolen_t dl { if (dev->slave[0]) { - return mio_dev_timedwrite ((mio_dev_t*)dev->slave[0], data, dlen, tmout, wrctx, MIO_NULL); + return mio_dev_timedwrite((mio_dev_t*)dev->slave[0], data, dlen, tmout, wrctx, MIO_NULL); } else { diff --git a/mio/lib/mio-sck.c b/mio/lib/mio-sck.c index a4cfef0..1d724de 100644 --- a/mio/lib/mio-sck.c +++ b/mio/lib/mio-sck.c @@ -1712,10 +1712,6 @@ int mio_dev_sck_timedwrite (mio_dev_sck_t* dev, const void* data, mio_iolen_t dl } - - - - /* ========================================================================= */ mio_uint16_t mio_checksumip (const void* hdr, mio_oow_t len) diff --git a/mio/lib/mio-sck.h b/mio/lib/mio-sck.h index f4bf608..cad3d15 100644 --- a/mio/lib/mio-sck.h +++ b/mio/lib/mio-sck.h @@ -516,7 +516,7 @@ MIO_EXPORT void mio_sckaddr_initforeth ( MIO_EXPORT mio_dev_sck_t* mio_dev_sck_make ( mio_t* mio, - mio_oow_t xtnsize, + mio_oow_t xtnsize, const mio_dev_sck_make_t* info ); @@ -537,18 +537,18 @@ MIO_EXPORT int mio_dev_sck_listen ( MIO_EXPORT int mio_dev_sck_write ( mio_dev_sck_t* dev, - const void* data, + const void* data, mio_iolen_t len, - void* wrctx, + void* wrctx, const mio_sckaddr_t* dstaddr ); MIO_EXPORT int mio_dev_sck_timedwrite ( mio_dev_sck_t* dev, - const void* data, + const void* data, mio_iolen_t len, const mio_ntime_t* tmout, - void* wrctx, + void* wrctx, const mio_sckaddr_t* dstaddr ); @@ -564,10 +564,15 @@ static MIO_INLINE int mio_dev_sck_read (mio_dev_sck_t* sck, int enabled) return mio_dev_read((mio_dev_t*)sck, enabled); } +static MIO_INLINE int mio_dev_sck_timedread (mio_dev_sck_t* sck, int enabled, mio_ntime_t* tmout) +{ + return mio_dev_timedread((mio_dev_t*)sck, enabled, tmout); +} #else #define mio_dev_sck_halt(sck) mio_dev_halt((mio_dev_t*)sck) #define mio_dev_sck_read(sck,enabled) mio_dev_read((mio_dev_t*)sck, enabled) +#define mio_dev_sck_timedread(sck,enabled,tmout) mio_dev_timedread((mio_dev_t*)sck, enabled, tmout) #endif diff --git a/mio/lib/mio.c b/mio/lib/mio.c index ecbd910..fe31c13 100644 --- a/mio/lib/mio.c +++ b/mio/lib/mio.c @@ -502,7 +502,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup) * <= -1 - failure. kill the device. * == 0 - ok. but don't invoke recv() or send(). * >= 1 - everything is ok. */ - x = dev->dev_evcb->ready (dev, xevents); + x = dev->dev_evcb->ready(dev, xevents); if (x <= -1) { mio_dev_halt (dev); @@ -515,7 +515,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup) { /* urgent data */ /* TODO: implement urgent data handling */ - /*x = dev->dev_mth->urgread (dev, mio->bugbuf, &len);*/ + /*x = dev->dev_mth->urgread(dev, mio->bugbuf, &len);*/ } if (dev && (events & MIO_DEV_EVENT_OUT)) @@ -535,7 +535,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup) send_leftover: ulen = urem; - x = dev->dev_mth->write (dev, uptr, &ulen, &q->dstaddr); + x = dev->dev_mth->write(dev, uptr, &ulen, &q->dstaddr); if (x <= -1) { mio_dev_halt (dev); @@ -569,7 +569,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup) } unlink_wq (mio, q); - y = dev->dev_evcb->on_write (dev, q->olen, q->ctx, &q->dstaddr); + y = dev->dev_evcb->on_write(dev, q->olen, q->ctx, &q->dstaddr); MIO_MMGR_FREE (mio->mmgr, q); if (y <= -1) @@ -624,7 +624,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup) while (1) { len = MIO_COUNTOF(mio->bigbuf); - x = dev->dev_mth->read (dev, mio->bigbuf, &len, &srcaddr); + x = dev->dev_mth->read(dev, mio->bigbuf, &len, &srcaddr); if (x <= -1) { mio_dev_halt (dev); @@ -646,7 +646,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup) mio->renew_watch = 1; /* call the on_read callback to report EOF */ - if (dev->dev_evcb->on_read (dev, mio->bigbuf, len, &srcaddr) <= -1 || + if (dev->dev_evcb->on_read(dev, mio->bigbuf, len, &srcaddr) <= -1 || (dev->dev_capa & MIO_DEV_CAPA_OUT_CLOSED)) { /* 1. input ended and its reporting failed or @@ -665,7 +665,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup) * when x == 0 or <= -1. you can */ /* data available */ - y = dev->dev_evcb->on_read (dev, mio->bigbuf, len, &srcaddr); + y = dev->dev_evcb->on_read(dev, mio->bigbuf, len, &srcaddr); if (y <= -1) { mio_dev_halt (dev); @@ -717,7 +717,7 @@ static MIO_INLINE void handle_event (mio_dev_t* dev, int events, int rdhup) } skip_evcb: - if (dev && mio->renew_watch && mio_dev_watch (dev, MIO_DEV_WATCH_RENEW, 0) <= -1) + if (dev && mio->renew_watch && mio_dev_watch(dev, MIO_DEV_WATCH_RENEW, 0) <= -1) { mio_dev_halt (dev); dev = MIO_NULL; @@ -926,6 +926,7 @@ mio_dev_t* mio_makedev (mio_t* mio, mio_oow_t dev_size, mio_dev_mth_t* dev_mth, dev->dev_capa = MIO_DEV_CAPA_IN | MIO_DEV_CAPA_OUT; dev->dev_mth = dev_mth; dev->dev_evcb = dev_evcb; + dev->rtmridx = MIO_TMRIDX_INVALID; MIO_WQ_INIT (&dev->wq); dev->cw_count = 0; @@ -1080,9 +1081,18 @@ void mio_killdev (mio_t* mio, mio_dev_t* dev) if (dev->dev_capa & MIO_DEV_CAPA_ZOMBIE) { MIO_ASSERT (MIO_WQ_ISEMPTY(&dev->wq)); + MIO_ASSERT (dev->cw_count == 0); + MIO_ASSERT (dev->rtmridx == MIO_TMRIDX_INVALID); goto kill_device; } + if (dev->rtmridx != MIO_TMRIDX_INVALID) + { +printf ("REMOVING.... TIMER FOR DEV\n"); + mio_deltmrjob (mio, dev->rtmridx); + dev->rtmridx = MIO_TMRIDX_INVALID; + } + /* clear completed write event queues - TODO: do i need to fire these? */ if (dev->cw_count > 0) { @@ -1248,8 +1258,81 @@ int mio_dev_watch (mio_dev_t* dev, mio_dev_watch_cmd_t cmd, int events) return 0; } +static void on_read_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job) +{ + mio_dev_t* dev; + int x; + + dev = (mio_dev_t*)job->ctx; + + dev->mio->errnum = MIO_ETMOUT; + x = dev->dev_evcb->on_read(dev, MIO_NULL, -1, MIO_NULL); + + MIO_ASSERT (dev->rtmridx == MIO_TMRIDX_INVALID); + + if (x <= -1) mio_dev_halt (dev); +} + +static int __dev_read (mio_dev_t* dev, int enabled, const mio_ntime_t* tmout, void* rdctx) +{ + if (dev->dev_capa & MIO_DEV_CAPA_IN_CLOSED) + { + dev->mio->errnum = MIO_ENOCAPA; + return -1; + } + + if (enabled) + { + dev->dev_capa &= ~MIO_DEV_CAPA_IN_DISABLED; + if (!dev->mio->in_exec && (dev->dev_capa & MIO_DEV_CAPA_IN_WATCHED)) goto renew_watch_now; + } + else + { + dev->dev_capa |= MIO_DEV_CAPA_IN_DISABLED; + if (!dev->mio->in_exec && !(dev->dev_capa & MIO_DEV_CAPA_IN_WATCHED)) goto renew_watch_now; + } + + dev->mio->renew_watch = 1; + goto update_timer; + +renew_watch_now: + if (mio_dev_watch(dev, MIO_DEV_WATCH_RENEW, 0) <= -1) return -1; + goto update_timer; + +update_timer: + if (dev->rtmridx != MIO_TMRIDX_INVALID) + { + /* read timeout already on the socket. remove it first */ + mio_deltmrjob (dev->mio, dev->rtmridx); + dev->rtmridx = MIO_TMRIDX_INVALID; + } + + if (tmout && mio_ispostime(tmout)) + { + mio_tmrjob_t tmrjob; + + MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob)); + tmrjob.ctx = dev; + mio_gettime (&tmrjob.when); + mio_addtime (&tmrjob.when, tmout, &tmrjob.when); + tmrjob.handler = on_read_timeout; + tmrjob.idxptr = &dev->rtmridx; + + dev->rtmridx = mio_instmrjob(dev->mio, &tmrjob); + if (dev->rtmridx == MIO_TMRIDX_INVALID) + { + /* if timer registration fails, timeout will never be triggered */ + return -1; + } + } + return 0; +} + int mio_dev_read (mio_dev_t* dev, int enabled) { + return __dev_read(dev, enabled, MIO_NULL, MIO_NULL); + +#if 0 if (dev->dev_capa & MIO_DEV_CAPA_IN_CLOSED) { dev->mio->errnum = MIO_ENOCAPA; @@ -1273,6 +1356,12 @@ int mio_dev_read (mio_dev_t* dev, int enabled) renew_watch_now: if (mio_dev_watch(dev, MIO_DEV_WATCH_RENEW, 0) <= -1) return -1; return 0; +#endif +} + +int mio_dev_timedread (mio_dev_t* dev, int enabled, const mio_ntime_t* tmout) +{ + return __dev_read(dev, enabled, tmout, MIO_NULL); } static void on_write_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job) diff --git a/mio/lib/mio.h b/mio/lib/mio.h index 12f70a7..2d4ea52 100644 --- a/mio/lib/mio.h +++ b/mio/lib/mio.h @@ -132,7 +132,7 @@ typedef void (*mio_tmrjob_handler_t) ( struct mio_tmrjob_t { - void* ctx; + void* ctx; mio_ntime_t when; mio_tmrjob_handler_t handler; mio_tmridx_t* idxptr; /* pointer to the index holder */ @@ -307,6 +307,7 @@ struct mio_wq_t int dev_capa; \ mio_dev_mth_t* dev_mth; \ mio_dev_evcb_t* dev_evcb; \ + mio_tmridx_t rtmridx; \ mio_wq_t wq; \ mio_oow_t cw_count; \ mio_dev_t* dev_prev; \