From 01ffcf973d63d141d4d07f204b1bd5edb7667a17 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Thu, 4 Feb 2016 15:06:20 +0000 Subject: [PATCH] added stio_dev_halt() enhanced stio_dev_watch() and stio_exec() to correct various event handling --- stio/lib/main.c | 78 +++- stio/lib/stio-prv.h | 3 +- stio/lib/stio-sck.c | 7 + stio/lib/stio-tcp.c | 59 +-- stio/lib/stio-tcp.h | 8 +- stio/lib/stio-tmr.c | 17 + stio/lib/stio-udp.c | 6 +- stio/lib/stio-udp.h | 2 +- stio/lib/stio.c | 854 +++++++++++++++++++++++++------------------- stio/lib/stio.h | 60 ++-- 10 files changed, 671 insertions(+), 423 deletions(-) diff --git a/stio/lib/main.c b/stio/lib/main.c index 0a1205f..471eb4b 100644 --- a/stio/lib/main.c +++ b/stio/lib/main.c @@ -37,9 +37,26 @@ #include #include +struct mmgr_stat_t +{ + stio_size_t total_count; +}; + +typedef struct mmgr_stat_t mmgr_stat_t; + +static mmgr_stat_t mmgr_stat; + static void* mmgr_alloc (stio_mmgr_t* mmgr, stio_size_t size) { - return malloc (size); + if (((mmgr_stat_t*)mmgr->ctx)->total_count > 100) + { +printf ("CRITICAL ERROR ---> too many heap chunks...\n"); + return STIO_NULL; + } + + void* x = malloc (size); + if (x) ((mmgr_stat_t*)mmgr->ctx)->total_count++; + return x; } static void* mmgr_realloc (stio_mmgr_t* mmgr, void* ptr, stio_size_t size) @@ -49,17 +66,25 @@ static void* mmgr_realloc (stio_mmgr_t* mmgr, void* ptr, stio_size_t size) static void mmgr_free (stio_mmgr_t* mmgr, void* ptr) { + ((mmgr_stat_t*)mmgr->ctx)->total_count--; return free (ptr); } + static stio_mmgr_t mmgr = { mmgr_alloc, mmgr_realloc, mmgr_free, - STIO_NULL + &mmgr_stat }; +struct tcp_server_t +{ + int tally; +}; +typedef struct tcp_server_t tcp_server_t; + static void tcp_on_disconnect (stio_dev_tcp_t* tcp) { if (tcp->state & STIO_DEV_TCP_CONNECTING) @@ -101,12 +126,33 @@ printf ("device accepted client device... .asdfjkasdfkljasdlfkjasdj...\n"); static int tcp_on_write (stio_dev_tcp_t* tcp, void* wrctx) { - printf (">>> TCP SENT MESSAGE\n"); + tcp_server_t* ts; + + ts = (tcp_server_t*)(tcp + 1); + printf (">>> TCP SENT MESSAGE %d\n", ts->tally); + + ts->tally++; +// if (ts->tally >= 2) stio_dev_tcp_halt (tcp); + + //stio_dev_tcp_read (tcp); + +printf ("ENABLING READING..............................\n"); + stio_dev_read (tcp, 1); return 0; } static int tcp_on_read (stio_dev_tcp_t* tcp, const void* buf, stio_len_t len) { + if (len <= 0) + { + printf ("STREAM DEVICE: EOF RECEIVED...\n"); + /* no outstanding request. but EOF */ + stio_dev_tcp_halt (tcp); + return 0; + } + +printf ("on read %d\n", (int)len); + int n; static char a ='A'; char* xxx = malloc (1000000); @@ -114,7 +160,18 @@ memset (xxx, a++ ,1000000); //return stio_dev_tcp_write (tcp, "HELLO", 5, STIO_NULL); n = stio_dev_tcp_write (tcp, xxx, 1000000, STIO_NULL); free (xxx); -return n; + + if (n <= -1) return -1; + + /* post the write finisher */ + n = stio_dev_tcp_write (tcp, STIO_NULL, 0, STIO_NULL); + if (n <= -1) return -1; + +printf ("DISABLING READING..............................\n"); + stio_dev_read (tcp, 0); + return 0; + +/* return 1; let the main loop to read more greedily without consulint the multiplexer */ } static stio_t* g_stio; @@ -135,6 +192,7 @@ int main () stio_dev_tcp_connect_t tcp_conn; stio_dev_tcp_listen_t tcp_lstn; stio_dev_tcp_make_t tcp_make; + tcp_server_t* ts; stio = stio_open (&mmgr, 0, 512, STIO_NULL); if (!stio) @@ -172,19 +230,21 @@ int main () memcpy (&tcp_make.addr, &sin, STIO_SIZEOF(sin)); tcp_make.on_write = tcp_on_write; tcp_make.on_read = tcp_on_read; - tcp[0] = stio_dev_tcp_make (stio, 0, &tcp_make); + tcp[0] = stio_dev_tcp_make (stio, STIO_SIZEOF(tcp_server_t), &tcp_make); if (!tcp[0]) { printf ("Cannot make tcp\n"); goto oops; } + ts = (tcp_server_t*)(tcp[0] + 1); + ts->tally = 0; memset (&sin, 0, STIO_SIZEOF(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(9999); - inet_pton (sin.sin_family, "192.168.1.1", &sin.sin_addr); - //inet_pton (sin.sin_family, "127.0.0.1", &sin.sin_addr); + //inet_pton (sin.sin_family, "192.168.1.1", &sin.sin_addr); + inet_pton (sin.sin_family, "127.0.0.1", &sin.sin_addr); memset (&tcp_conn, 0, STIO_SIZEOF(tcp_conn)); memcpy (&tcp_conn.addr, &sin, STIO_SIZEOF(sin)); @@ -205,12 +265,14 @@ int main () tcp_make.on_write = tcp_on_write; tcp_make.on_read = tcp_on_read; - tcp[1] = stio_dev_tcp_make (stio, 0, &tcp_make); + tcp[1] = stio_dev_tcp_make (stio, STIO_SIZEOF(tcp_server_t), &tcp_make); if (!tcp[1]) { printf ("Cannot make tcp\n"); goto oops; } + ts = (tcp_server_t*)(tcp[1] + 1); + ts->tally = 0; tcp_lstn.backlogs = 100; tcp_lstn.on_connect = tcp_on_connect; diff --git a/stio/lib/stio-prv.h b/stio/lib/stio-prv.h index 72ca2ff..315994c 100644 --- a/stio/lib/stio-prv.h +++ b/stio/lib/stio-prv.h @@ -70,9 +70,10 @@ struct stio_t stio_dev_t* tail; } dev; /* normal devices */ - stio_dev_t* rdev; /* ruined device list - singly linked list */ + stio_dev_t* hdev; /* halted device list - singly linked list */ stio_uint8_t bigbuf[65535]; /* TODO: make this dynamic depending on devices added. device may indicate a buffer size required??? */ + int renew_watch; struct { diff --git a/stio/lib/stio-sck.c b/stio/lib/stio-sck.c index 14a4a2f..fdf5240 100644 --- a/stio/lib/stio-sck.c +++ b/stio/lib/stio-sck.c @@ -45,6 +45,13 @@ void stio_closeasyncsck (stio_t* stio, stio_sckhnd_t sck) #endif } +#if 0 +int stio_shutasyncsck (stio_t* stio, stio_sckhnd_t sck, int how) +{ + shutdown (sck, how); +} +#endif + int stio_makesckasync (stio_t* stio, stio_sckhnd_t sck) { return stio_makesyshndasync (stio, (stio_syshnd_t)sck); diff --git a/stio/lib/stio-tcp.c b/stio/lib/stio-tcp.c index bacb099..57ab776 100644 --- a/stio/lib/stio-tcp.c +++ b/stio/lib/stio-tcp.c @@ -47,8 +47,11 @@ static int tcp_make (stio_dev_t* dev, void* ctx) tcp->sck = stio_openasyncsck (dev->stio, family, SOCK_STREAM); if (tcp->sck == STIO_SCKHND_INVALID) goto oops; - //setsockopt (udp->sck, SOL_SOCKET, SO_REUSEADDR, ...); - // TRANSPARENT, ETC. +/* TODO: + setsockopt (udp->sck, SOL_SOCKET, SO_REUSEADDR, ...); + TRANSPARENT, ETC. + */ + iv = 1; if (setsockopt (tcp->sck, SOL_SOCKET, SO_REUSEADDR, &iv, STIO_SIZEOF(iv)) == -1 || bind (tcp->sck, (struct sockaddr*)&arg->addr, len) == -1) @@ -57,6 +60,7 @@ static int tcp_make (stio_dev_t* dev, void* ctx) goto oops; } + tcp->dev_capa = STIO_DEV_CAPA_IN | STIO_DEV_CAPA_OUT | STIO_DEV_CAPA_STREAM; tcp->on_write = arg->on_write; tcp->on_read = arg->on_read; tcp->tmridx_connect = STIO_TMRIDX_INVALID; @@ -110,14 +114,13 @@ static stio_syshnd_t tcp_getsyshnd (stio_dev_t* dev) return (stio_syshnd_t)tcp->sck; } - static int tcp_read (stio_dev_t* dev, void* buf, stio_len_t* len) { stio_dev_tcp_t* tcp = (stio_dev_tcp_t*)dev; ssize_t x; x = recv (tcp->sck, buf, *len, 0); - if (x <= -1) + if (x == -1) { if (errno == EINPROGRESS || errno == EWOULDBLOCK) return 0; /* no data available */ if (errno == EINTR) return 0; @@ -134,13 +137,26 @@ static int tcp_write (stio_dev_t* dev, const void* data, stio_len_t* len) stio_dev_tcp_t* tcp = (stio_dev_tcp_t*)dev; ssize_t x; int flags = 0; - + + if (*len <= 0) + { + /* it's a writing finish indicator. close the writing end of + * the socket, probably leaving it in the half-closed state */ + if (shutdown (tcp->sck, SHUT_WR) == -1) + { + tcp->stio->errnum = stio_syserrtoerrnum(errno); + return -1; + } + + return 1; + } + /* TODO: flags MSG_DONTROUTE, MSG_DONTWAIT, MSG_MORE, MSG_OOB, MSG_NOSIGNAL */ #if defined(MSG_NOSIGNAL) flags |= MSG_NOSIGNAL; #endif x = sendto (tcp->sck, data, *len, flags, STIO_NULL, 0); - if (x <= -1) + if (x == -1) { if (errno == EINPROGRESS || errno == EWOULDBLOCK) return 0; /* no data can be written */ if (errno == EINTR) return 0; @@ -163,7 +179,7 @@ static void tmr_connect_handle (stio_t* stio, const stio_ntime_t* now, stio_tmrj * doesn't need to be deleted when it gets connected for this check * here. this libarary, however, deletes the job when it gets * connected. */ - stio_dev_tcp_kill (tcp); + stio_dev_tcp_halt (tcp); } } @@ -391,11 +407,7 @@ printf ("TCP READY...%p\n", dev); tcp->state &= ~STIO_DEV_TCP_CONNECTING; tcp->state |= STIO_DEV_TCP_CONNECTED; - if (stio_dev_watch ((stio_dev_t*)tcp, STIO_DEV_WATCH_UPDATE, STIO_DEV_EVENT_IN) <= -1) - { -printf ("CAANOT MANIPULTE WATCHER ...\n"); - return -1; - } + if (stio_dev_watch ((stio_dev_t*)tcp, STIO_DEV_WATCH_RENEW, 0) <= -1) return -1; if (tcp->tmridx_connect != STIO_TMRIDX_INVALID) { @@ -455,20 +467,22 @@ printf ("CAANOT MANIPULTE WATCHER ...\n"); return -1; } - /* addr is the address of the peer */ - /* local addresss is inherited from the server */ - clitcp = (stio_dev_tcp_t*)stio_makedev (tcp->stio, STIO_SIZEOF(*tcp), &tcp_acc_mth, tcp->dev_evcb, &clisck); + /* use tcp->dev_size when instantiating a client tcp device + * instead of STIO_SIZEOF(stio_dev_tcp_t). therefore, the + * extension area as big as that of the master tcp device + * is created in the client tcp device */ + clitcp = (stio_dev_tcp_t*)stio_makedev (tcp->stio, tcp->dev_size, &tcp_acc_mth, tcp->dev_evcb, &clisck); if (!clitcp) { close (clisck); return -1; } + clitcp->dev_capa |= STIO_DEV_CAPA_IN | STIO_DEV_CAPA_OUT | STIO_DEV_CAPA_STREAM; clitcp->state |= STIO_DEV_TCP_ACCEPTED; clitcp->peer = peer; /*clitcp->parent = tcp;*/ - /* inherit some event handlers from the parent. * you can still change them inside the on_connect handler */ clitcp->on_connect = tcp->on_connect; @@ -477,7 +491,8 @@ printf ("CAANOT MANIPULTE WATCHER ...\n"); clitcp->on_read = tcp->on_read; clitcp->tmridx_connect = STIO_TMRIDX_INVALID; - if (clitcp->on_connect (clitcp) <= -1) stio_dev_tcp_kill (clitcp); + if (clitcp->on_connect (clitcp) <= -1) stio_dev_tcp_halt (clitcp); + return 0; /* success but don't invoke on_read() */ } } @@ -520,11 +535,6 @@ stio_dev_tcp_t* stio_dev_tcp_make (stio_t* stio, stio_size_t xtnsize, const stio return (stio_dev_tcp_t*)stio_makedev (stio, STIO_SIZEOF(stio_dev_tcp_t) + xtnsize, &tcp_mth, &tcp_evcb, (void*)data); } -void stio_dev_tcp_kill (stio_dev_tcp_t* tcp) -{ - stio_killdev (tcp->stio, (stio_dev_t*)tcp); -} - int stio_dev_tcp_bind (stio_dev_tcp_t* tcp, stio_dev_tcp_bind_t* bind) { return stio_dev_ioctl ((stio_dev_t*)tcp, STIO_DEV_TCP_BIND, bind); @@ -544,3 +554,8 @@ int stio_dev_tcp_write (stio_dev_tcp_t* tcp, const void* data, stio_len_t len, v { return stio_dev_write ((stio_dev_t*)tcp, data, len, wrctx); } + +int stio_dev_tcp_halt (stio_dev_tcp_t* tcp) +{ + stio_dev_halt ((stio_dev_t*)tcp); +} diff --git a/stio/lib/stio-tcp.h b/stio/lib/stio-tcp.h index cc73fa7..f3fec0b 100644 --- a/stio/lib/stio-tcp.h +++ b/stio/lib/stio-tcp.h @@ -135,10 +135,6 @@ STIO_EXPORT stio_dev_tcp_t* stio_dev_tcp_make ( const stio_dev_tcp_make_t* data ); -STIO_EXPORT void stio_dev_tcp_kill ( - stio_dev_tcp_t* tcp -); - STIO_EXPORT int stio_dev_tcp_bind ( stio_dev_tcp_t* tcp, stio_dev_tcp_bind_t* bind @@ -160,6 +156,10 @@ STIO_EXPORT int stio_dev_tcp_write ( void* wrctx ); +STIO_EXPORT int stio_dev_tcp_halt ( + stio_dev_tcp_t* tcp +); + #ifdef __cplusplus } #endif diff --git a/stio/lib/stio-tmr.c b/stio/lib/stio-tmr.c index 629dfb2..2a79d3f 100644 --- a/stio/lib/stio-tmr.c +++ b/stio/lib/stio-tmr.c @@ -46,10 +46,18 @@ static stio_tmridx_t sift_up (stio_t* stio, stio_tmridx_t index, int notify) if (index > 0 && YOUNGER_THAN(&stio->tmr.jobs[index], &stio->tmr.jobs[parent])) { stio_tmrjob_t item; +#if defined(STIO_USE_TMRJOB_IDXPTR) + /* nothing */ +#else stio_size_t old_index; +#endif item = stio->tmr.jobs[index]; +#if defined(STIO_USE_TMRJOB_IDXPTR) + /* nothing */ +#else old_index = index; +#endif do { @@ -89,10 +97,19 @@ static stio_tmridx_t sift_down (stio_t* stio, stio_tmridx_t index, int notify) if (index < base) /* at least 1 child is under the 'index' positmrn */ { stio_tmrjob_t item; +#if defined(STIO_USE_TMRJOB_IDXPTR) + /* nothing */ +#else stio_size_t old_index; +#endif item = stio->tmr.jobs[index]; + +#if defined(STIO_USE_TMRJOB_IDXPTR) + /* nothing */ +#else old_index = index; +#endif do { diff --git a/stio/lib/stio-udp.c b/stio/lib/stio-udp.c index 8732514..544fc35 100644 --- a/stio/lib/stio-udp.c +++ b/stio/lib/stio-udp.c @@ -180,14 +180,12 @@ static stio_dev_evcb_t udp_evcb = udp_on_write }; - stio_dev_udp_t* stio_dev_udp_make (stio_t* stio, stio_size_t xtnsize, const stio_dev_udp_make_t* data) { return (stio_dev_udp_t*)stio_makedev (stio, STIO_SIZEOF(stio_dev_udp_t) + xtnsize, &udp_mth, &udp_evcb, (void*)data); } - -void stio_dev_udp_kill (stio_dev_udp_t* udp) +void stio_dev_udp_halt (stio_dev_udp_t* udp) { - stio_killdev (udp->stio, (stio_dev_t*)udp); + stio_dev_halt ((stio_dev_t*)udp); } diff --git a/stio/lib/stio-udp.h b/stio/lib/stio-udp.h index 83b2a2e..1d8d16b 100644 --- a/stio/lib/stio-udp.h +++ b/stio/lib/stio-udp.h @@ -66,7 +66,7 @@ STIO_EXPORT stio_dev_udp_t* stio_dev_udp_make ( const stio_dev_udp_make_t* data ); -STIO_EXPORT void stio_dev_udp_kill ( +STIO_EXPORT void stio_dev_udp_halt ( stio_dev_udp_t* udp ); diff --git a/stio/lib/stio.c b/stio/lib/stio.c index 999c3e5..22a9f87 100644 --- a/stio/lib/stio.c +++ b/stio/lib/stio.c @@ -24,7 +24,6 @@ THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ - #include "stio-prv.h" #include @@ -32,6 +31,8 @@ #include #include +#define DEV_CAPA_ALL_WATCHED (STIO_DEV_CAPA_IN_WATCHED | STIO_DEV_CAPA_OUT_WATCHED | STIO_DEV_CAPA_PRI_WATCHED) + stio_t* stio_open (stio_mmgr_t* mmgr, stio_size_t xtnsize, stio_size_t tmrcapa, stio_errnum_t* errnum) { stio_t* stio; @@ -104,124 +105,6 @@ void stio_fini (stio_t* stio) close (stio->mux); } -stio_dev_t* stio_makedev (stio_t* stio, stio_size_t dev_size, stio_dev_mth_t* dev_mth, stio_dev_evcb_t* dev_evcb, void* make_ctx) -{ - stio_dev_t* dev; - - if (dev_size < STIO_SIZEOF(stio_dev_t)) - { - stio->errnum = STIO_EINVAL; - return STIO_NULL; - } - - dev = STIO_MMGR_ALLOC (stio->mmgr, dev_size); - if (!dev) - { - stio->errnum = STIO_ENOMEM; - return STIO_NULL; - } - - STIO_MEMSET (dev, 0, dev_size); - dev->stio = stio; - /* default capability. dev->dev_mth->make() can change this. - * stio_dev_watch() is affected by the capability change. */ - dev->dev_capa = STIO_DEV_CAPA_IN | STIO_DEV_CAPA_OUT; - dev->dev_mth = dev_mth; - dev->dev_evcb = dev_evcb; - STIO_WQ_INIT(&dev->wq); - - /* call the callback function first */ - stio->errnum = STIO_ENOERR; - if (dev->dev_mth->make (dev, make_ctx) <= -1) - { - if (stio->errnum == STIO_ENOERR) stio->errnum = STIO_EDEVMAKE; - goto oops; - } - -#if defined(_WIN32) - if (CreateIoCompletionPort ((HANDLE)dev->dev_mth->getsyshnd(dev), stio->iocp, STIO_IOCP_KEY, 0) == NULL) - { - /* TODO: set errnum from GetLastError()... */ - goto oops_after_make; - } -#else - if (stio_dev_watch (dev, STIO_DEV_WATCH_START, STIO_DEV_EVENT_IN) <= -1) goto oops_after_make; -#endif - - /* and place the new dev at the back */ - if (stio->dev.tail) stio->dev.tail->dev_next = dev; - else stio->dev.head = dev; - dev->dev_prev = stio->dev.tail; - stio->dev.tail = dev; - - return dev; - -oops_after_make: - dev->dev_mth->kill (dev); -oops: - STIO_MMGR_FREE (stio->mmgr, dev); - return STIO_NULL; -} - -void stio_killdev (stio_t* stio, stio_dev_t* dev) -{ - STIO_ASSERT (stio == dev->stio); - - /* clear pending send requests */ - while (!STIO_WQ_ISEMPTY(&dev->wq)) - { - stio_wq_t* wq; - - wq = STIO_WQ_HEAD(&dev->wq); - STIO_WQ_DEQ (&dev->wq); - - STIO_MMGR_FREE (stio->mmgr, wq); - } - - /* delink the dev object */ - if (!(dev->dev_capa & STIO_DEV_CAPA_RUINED)) - { - if (dev->dev_prev) - dev->dev_prev->dev_next = dev->dev_next; - else - stio->dev.head = dev->dev_next; - - if (dev->dev_next) - dev->dev_next->dev_prev = dev->dev_prev; - else - stio->dev.tail = dev->dev_prev; - } - - stio_dev_watch (dev, STIO_DEV_WATCH_STOP, 0); - - /* and call the callback function */ - dev->dev_mth->kill (dev); - - STIO_MMGR_FREE (stio->mmgr, dev); -} - -void stio_ruindev (stio_t* stio, stio_dev_t* dev) -{ - if (!(dev->dev_capa & STIO_DEV_CAPA_RUINED)) - { - /* delink the dev object from the device list */ - if (dev->dev_prev) - dev->dev_prev->dev_next = dev->dev_next; - else - stio->dev.head = dev->dev_next; - if (dev->dev_next) - dev->dev_next->dev_prev = dev->dev_prev; - else - stio->dev.tail = dev->dev_prev; - - /* place it at the beginning of the ruined device list */ - dev->dev_prev = STIO_NULL; - dev->dev_next = stio->rdev; - stio->rdev = dev; - - dev->dev_capa |= STIO_DEV_CAPA_RUINED; - } -} int stio_prologue (stio_t* stio) { @@ -234,11 +117,246 @@ void stio_epilogue (stio_t* stio) /* TODO: */ } +STIO_INLINE static void handle_event (stio_t* stio, stio_size_t i) +{ + stio_dev_t* dev; + + dev = stio->revs[i].data.ptr; + + if (dev->dev_evcb->ready) + { + int x, events = 0; + + if (stio->revs[i].events & EPOLLIN) events |= STIO_DEV_EVENT_IN; + if (stio->revs[i].events & EPOLLOUT) events |= STIO_DEV_EVENT_OUT; + if (stio->revs[i].events & EPOLLPRI) events |= STIO_DEV_EVENT_PRI; + if (stio->revs[i].events & EPOLLERR) events |= STIO_DEV_EVENT_ERR; + if (stio->revs[i].events & EPOLLHUP) events |= STIO_DEV_EVENT_HUP; + #if defined(EPOLLRDHUP) + else if (stio->revs[i].events & EPOLLRDHUP) events |= STIO_DEV_EVENT_HUP; + #endif + + /* return value of ready() + * <= -1 - failure. kill the device. + * == 0 - ok. but don't invoke recv() or send(). + * >= 1 - everything is ok. */ + x = dev->dev_evcb->ready (dev, events); + if (x <= -1) + { + stio_dev_halt (dev); + return; + } + else if (x == 0) goto skip_evcb; + } + + if (dev && stio->revs[i].events & EPOLLPRI) + { + /* urgent data */ +/* TODO: urgent data.... */ + /*x = dev->dev_mth->urgrecv (dev, stio->bugbuf, &len);*/ +printf ("has urgent data...\n"); + } + + if (dev && stio->revs[i].events & EPOLLOUT) + { + /* write pending requests */ + while (!STIO_WQ_ISEMPTY(&dev->wq)) + { + stio_wq_t* q; + const stio_uint8_t* uptr; + stio_len_t urem, ulen; + int x; + + q = STIO_WQ_HEAD(&dev->wq); + + uptr = q->ptr; + urem = q->len; + + send_leftover: + ulen = urem; + x = dev->dev_mth->write (dev, uptr, &ulen); + if (x <= -1) + { + stio_dev_halt (dev); + dev = STIO_NULL; + break; + } + else if (x == 0) + { + /* keep the left-over */ + STIO_MEMMOVE (q->ptr, uptr, urem); + q->len = urem; + break; + } + else + { + uptr += ulen; + urem -= ulen; + + if (urem <= 0) + { + /* finished writing a single write request */ + int y, out_closed = 0; + + if (q->len <= 0 && (dev->dev_capa & STIO_DEV_CAPA_STREAM)) + { + /* it was a zero-length write request. + * for a stream, it is to close the output. */ + dev->dev_capa |= STIO_DEV_CAPA_OUT_CLOSED; + stio->renew_watch = 1; + out_closed = 1; + } + + STIO_WQ_UNLINK (q); /* STIO_WQ_DEQ(&dev->wq); */ + y = dev->dev_evcb->on_write (dev, q->ctx); + STIO_MMGR_FREE (dev->stio->mmgr, q); + + if (y <= -1) + { + stio_dev_halt (dev); + dev = STIO_NULL; + break; + } + + if (out_closed) + { + /* drain all pending requests. + * callbacks are skipped for drained requests */ + while (!STIO_WQ_ISEMPTY(&dev->wq)) + { + q = STIO_WQ_HEAD(&dev->wq); + STIO_WQ_UNLINK (q); + STIO_MMGR_FREE (dev->stio->mmgr, q); + } + break; + } + } + else goto send_leftover; + } + } + + if (dev && STIO_WQ_ISEMPTY(&dev->wq)) + { + /* no pending request to write */ + if ((dev->dev_capa & STIO_DEV_CAPA_IN_CLOSED) && + (dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED)) + { + stio_dev_halt (dev); + dev = STIO_NULL; + } + else + { + stio->renew_watch = 1; + } + } + } + + if (dev && stio->revs[i].events & EPOLLIN) + { + stio_len_t len; + int x; + + /* the devices are all non-blocking. read as much as possible + * if on_read callback returns 1 or greater. read only once + * if the on_read calllback returns 0. */ + while (1) + { + len = STIO_COUNTOF(stio->bigbuf); + x = dev->dev_mth->read (dev, stio->bigbuf, &len); + if (x <= -1) + { + stio_dev_halt (dev); + dev = STIO_NULL; + break; + } + else if (x == 0) + { + /* no data is available - EWOULDBLOCK or something similar */ + break; + } + else if (x >= 1) + { + if (len <= 0 && (dev->dev_capa & STIO_DEV_CAPA_STREAM)) + { + /* EOF received. for a stream device, a zero-length + * read is interpreted as EOF. */ + dev->dev_capa |= STIO_DEV_CAPA_IN_CLOSED; + stio->renew_watch = 1; + + /* call the on_read callback to report EOF */ + if (dev->dev_evcb->on_read (dev, stio->bigbuf, len) <= -1 || + (dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED)) + { + /* 1. input ended and its reporting failed or + * 2. input ended and no writing is possible */ + stio_dev_halt (dev); + dev = STIO_NULL; + } + + /* since EOF is received, reading can't be greedy */ + break; + } + else + { + int y; + /* TODO: for a stream device, merge received data if bigbuf isn't full and fire the on_read callback + * when x == 0 or <= -1. you can */ + + /* data available */ + y = dev->dev_evcb->on_read (dev, stio->bigbuf, len); + if (y <= -1) + { + stio_dev_halt (dev); + dev = STIO_NULL; + break; + } + else if (y == 0) + { + /* don't be greedy. read only once + * for this loop iteration */ + break; + } + } + } + } + } + + if (dev && stio->revs[i].events & (EPOLLERR | EPOLLHUP)) + { + /* if error or hangup has been reported on the device, + * halt the device. this check is performed after + * EPOLLIN or EPOLLOUT check because EPOLLERR or EPOLLHUP + * can be set together with EPOLLIN or EPOLLOUT. */ + dev->dev_capa |= STIO_DEV_CAPA_IN_CLOSED | STIO_DEV_CAPA_OUT_CLOSED; + stio->renew_watch = 1; + } +#if defined(EPOLLRDHUP) + else if (dev && stio->revs[i].events & EPOLLRDHUP) + { + + } +#endif + + if (dev && + (dev->dev_capa & STIO_DEV_CAPA_IN_CLOSED) && + (dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED)) + { + stio_dev_halt (dev); + dev = STIO_NULL; + } + +skip_evcb: + if (dev && stio->renew_watch && stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0) <= -1) + { + stio_dev_halt (dev); + dev = STIO_NULL; + } +} + int stio_exec (stio_t* stio) { stio_ntime_t tmout; - #if defined(_WIN32) ULONG nentries, i; #else @@ -282,215 +400,20 @@ int stio_exec (stio_t* stio) } /* TODO: merge events??? for the same descriptor */ + for (i = 0; i < nentries; i++) { - stio_dev_t* dev; - - dev = stio->revs[i].data.ptr; - - if (dev->dev_evcb->ready) - { - int x, events = 0; - - if (stio->revs[i].events & EPOLLERR) events |= STIO_DEV_EVENT_ERR; - if (stio->revs[i].events & EPOLLHUP) events |= STIO_DEV_EVENT_HUP; - if (stio->revs[i].events & EPOLLIN) events |= STIO_DEV_EVENT_IN; - if (stio->revs[i].events & EPOLLOUT) events |= STIO_DEV_EVENT_OUT; - if (stio->revs[i].events & EPOLLPRI) events |= STIO_DEV_EVENT_PRI; - - #if defined(EPOLLRDHUP) - /* interprete EPOLLRDHUP the same way as EPOLLHUP. - * - * when EPOLLRDHUP is set, EPOLLIN or EPOLLPRI or both are - * assumed to be set as EPOLLRDHUP is requested only if - * STIO_DEV_WATCH_IN is set for stio_dev_watch(). - * in linux, when EPOLLRDHUP is set, EPOLLIN is set together - * if it's requested together. it seems to be safe to have - * the following assertion. however, let me commect it out - * in case the assumption above is not right. - * STIO_ASSERT (events & (STIO_DEV_EVENT_IN | STIO_DEV_EVENT_PRI)); - */ - if (stio->revs[i].events & EPOLLRDHUP) events |= STIO_DEV_EVENT_HUP; - #endif - - /* return value of ready() - * <= -1 - failure. kill the device. - * == 0 - ok. but don't invoke recv() or send(). - * >= 1 - everything is ok. */ - if ((x = dev->dev_evcb->ready (dev, events)) <= -1) - { - stio_ruindev (stio, dev); - dev = STIO_NULL; - } - else if (x >= 1) - { - goto invoke_evcb; - } - } - else - { - invoke_evcb: - if (dev && stio->revs[i].events & EPOLLPRI) - { - /* urgent data */ -/* TODO: urgent data.... */ - /*x = dev->dev_mth->urgrecv (dev, stio->bugbuf, &len);*/ - printf ("has urgent data...\n"); - } - - if (dev && stio->revs[i].events & EPOLLIN) - { - stio_len_t len; - int x; - - /* the devices are all non-blocking. so read as much as possible */ - /* TODO: limit the number of iterations? */ - while (1) - { - len = STIO_COUNTOF(stio->bigbuf); - x = dev->dev_mth->read (dev, stio->bigbuf, &len); - - if (x <= -1) - { - stio_ruindev (stio, dev); - dev = STIO_NULL; - break; - } - else if (x == 0) - { - /* no data is available - EWOULDBLOCK or something similar */ - break; - } - else if (x >= 1) - { - if (len <= 0) - { - /* EOF received. delay killing until output has been handled. */ - if (STIO_WQ_ISEMPTY(&dev->wq)) - { - /* no pending writes - ruin the device to kill it eventually */ - stio_ruindev (stio, dev); - /* don't set 'dev' to STIO_NULL so that - * output handling can kick in if EPOLLOUT is set */ - } - else - { - /* it might be in a half-open state */ - dev->dev_capa &= ~(STIO_DEV_CAPA_IN | STIO_DEV_CAPA_PRI); - - /* disable the input watching */ - if (stio_dev_watch (dev, STIO_DEV_WATCH_UPDATE, STIO_DEV_EVENT_OUT) <= -1) - { - stio_ruindev (stio, dev); - dev = STIO_NULL; - } - } - - break; - } - else - { - /* TODO: for a stream device, merge received data if bigbuf isn't full and fire the on_read callback - * when x == 0 or <= -1. you can */ - - /* data available */ - if (dev->dev_evcb->on_read (dev, stio->bigbuf, len) <= -1) - { - stio_ruindev (stio, dev); - dev = STIO_NULL; - break; - } - } - } - } - } - - if (dev && stio->revs[i].events & EPOLLOUT) - { - while (!STIO_WQ_ISEMPTY(&dev->wq)) - { - stio_wq_t* q; - const stio_uint8_t* uptr; - stio_len_t urem, ulen; - int x; - - q = STIO_WQ_HEAD(&dev->wq); - - uptr = q->ptr; - urem = q->len; - - send_leftover: - ulen = urem; - x = dev->dev_mth->write (dev, uptr, &ulen); - if (x <= -1) - { - /* TODO: error handling? call callback? or what? */ - stio_killdev (stio, dev); - dev = STIO_NULL; - break; - } - else if (x == 0) - { - /* keep the left-over */ - STIO_MEMMOVE (q->ptr, uptr, urem); - q->len = urem; - break; - } - else - { - uptr += ulen; - urem -= ulen; - - if (urem <= 0) - { - int y; - - STIO_WQ_UNLINK (q); /* STIO_WQ_DEQ(&dev->wq); */ - y = dev->dev_evcb->on_write (dev, q->ctx); - STIO_MMGR_FREE (dev->stio->mmgr, q); - - if (y <= -1) - { - stio_killdev (stio, dev); - dev = STIO_NULL; - break; - } - } - else goto send_leftover; - } - } - - if (dev && STIO_WQ_ISEMPTY(&dev->wq)) - { - /* no pending request to write. - * watch input only. disable output watching */ - if (dev->dev_capa & STIO_DEV_CAPA_IN) - { - if (stio_dev_watch (dev, STIO_DEV_WATCH_UPDATE, STIO_DEV_EVENT_IN) <= -1) - { - stio_ruindev (stio, dev); - dev = STIO_NULL; - } - } - else - { - /* the device is not capable of reading. - * finish the device */ - stio_ruindev (stio, dev); - dev = STIO_NULL; - } - } - } - } + stio->renew_watch = 0; + handle_event (stio, i); } - /* kill all ruined devices */ - while (stio->rdev) + /* kill all halted devices */ + while (stio->hdev) { stio_dev_t* next; - next = stio->rdev->dev_next; - stio_killdev (stio, stio->rdev); - stio->rdev = next; + next = stio->hdev->dev_next; + stio_killdev (stio, stio->hdev); + stio->hdev = next; } #endif @@ -508,8 +431,21 @@ int stio_loop (stio_t* stio) if (!stio->dev.head) return 0; stio->stopreq = 0; + stio->renew_watch = 0; + if (stio_prologue (stio) <= -1) return -1; + if (stio->renew_watch) + { +printf (">>> GLOBAL WATCHIN RENEWAL....\n"); + stio_dev_t* dev; + for (dev = stio->dev.head; dev; dev = dev->dev_next) + { + stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0); + } + stio->renew_watch = 0; + } + while (!stio->stopreq && stio->dev.head) { if (stio_exec (stio) <= -1) break; @@ -520,6 +456,137 @@ int stio_loop (stio_t* stio) return 0; } + +stio_dev_t* stio_makedev (stio_t* stio, stio_size_t dev_size, stio_dev_mth_t* dev_mth, stio_dev_evcb_t* dev_evcb, void* make_ctx) +{ + stio_dev_t* dev; + + if (dev_size < STIO_SIZEOF(stio_dev_t)) + { + stio->errnum = STIO_EINVAL; + return STIO_NULL; + } + + dev = STIO_MMGR_ALLOC (stio->mmgr, dev_size); + if (!dev) + { + stio->errnum = STIO_ENOMEM; + return STIO_NULL; + } + + STIO_MEMSET (dev, 0, dev_size); + dev->stio = stio; + dev->dev_size = dev_size; + /* default capability. dev->dev_mth->make() can change this. + * stio_dev_watch() is affected by the capability change. */ + dev->dev_capa = STIO_DEV_CAPA_IN | STIO_DEV_CAPA_OUT; + dev->dev_mth = dev_mth; + dev->dev_evcb = dev_evcb; + STIO_WQ_INIT(&dev->wq); + + /* call the callback function first */ + stio->errnum = STIO_ENOERR; + if (dev->dev_mth->make (dev, make_ctx) <= -1) + { + if (stio->errnum == STIO_ENOERR) stio->errnum = STIO_EDEVMAKE; + goto oops; + } + + /* set some internal capability bits according to the capabilities + * removed by the device making callback for convenience sake. */ + if (!(dev->dev_capa & STIO_DEV_CAPA_IN)) dev->dev_capa |= STIO_DEV_CAPA_IN_CLOSED; + if (!(dev->dev_capa & STIO_DEV_CAPA_OUT)) dev->dev_capa |= STIO_DEV_CAPA_OUT_CLOSED; + +#if defined(_WIN32) + if (CreateIoCompletionPort ((HANDLE)dev->dev_mth->getsyshnd(dev), stio->iocp, STIO_IOCP_KEY, 0) == NULL) + { + /* TODO: set errnum from GetLastError()... */ + goto oops_after_make; + } +#else + if (stio_dev_watch (dev, STIO_DEV_WATCH_START, 0) <= -1) goto oops_after_make; +#endif + + /* and place the new dev at the back */ + if (stio->dev.tail) stio->dev.tail->dev_next = dev; + else stio->dev.head = dev; + dev->dev_prev = stio->dev.tail; + stio->dev.tail = dev; + + return dev; + +oops_after_make: + dev->dev_mth->kill (dev); +oops: + STIO_MMGR_FREE (stio->mmgr, dev); + return STIO_NULL; +} + +void stio_killdev (stio_t* stio, stio_dev_t* dev) +{ + STIO_ASSERT (stio == dev->stio); + + /* clear pending send requests */ + while (!STIO_WQ_ISEMPTY(&dev->wq)) + { + stio_wq_t* wq; + + wq = STIO_WQ_HEAD(&dev->wq); + STIO_WQ_DEQ (&dev->wq); + + STIO_MMGR_FREE (stio->mmgr, wq); + } + + /* delink the dev object */ + if (!(dev->dev_capa & STIO_DEV_CAPA_HALTED)) + { + if (dev->dev_prev) + dev->dev_prev->dev_next = dev->dev_next; + else + stio->dev.head = dev->dev_next; + + if (dev->dev_next) + dev->dev_next->dev_prev = dev->dev_prev; + else + stio->dev.tail = dev->dev_prev; + } + + stio_dev_watch (dev, STIO_DEV_WATCH_STOP, 0); + + /* and call the callback function */ + dev->dev_mth->kill (dev); + + STIO_MMGR_FREE (stio->mmgr, dev); +} + +void stio_dev_halt (stio_dev_t* dev) +{ + if (!(dev->dev_capa & STIO_DEV_CAPA_HALTED)) + { + stio_t* stio; + + stio = dev->stio; + + /* delink the dev object from the device list */ + if (dev->dev_prev) + dev->dev_prev->dev_next = dev->dev_next; + else + stio->dev.head = dev->dev_next; + if (dev->dev_next) + dev->dev_next->dev_prev = dev->dev_prev; + else + stio->dev.tail = dev->dev_prev; + + /* place it at the beginning of the halted device list. + * the halted device list is singly linked. */ + dev->dev_prev = STIO_NULL; + dev->dev_next = stio->hdev; + stio->hdev = dev; + + dev->dev_capa |= STIO_DEV_CAPA_HALTED; + } +} + int stio_dev_ioctl (stio_dev_t* dev, int cmd, void* arg) { if (dev->dev_mth->ioctl) return dev->dev_mth->ioctl (dev, cmd, arg); @@ -531,47 +598,30 @@ int stio_dev_watch (stio_dev_t* dev, stio_dev_watch_cmd_t cmd, int events) { struct epoll_event ev; int epoll_op; - - /* this function honors STIO_DEV_EVENT_IN and STIO_DEV_EVENT_OUT only - * as valid input event bits. it intends to provide simple abstraction - * by reducing the variety of event bits that the caller has to handle. */ - ev.events = EPOLLHUP | EPOLLERR; - - if (events & STIO_DEV_EVENT_IN) - { - if (dev->dev_capa & STIO_DEV_CAPA_IN) - { - ev.events |= EPOLLIN; - #if defined(EPOLLRDHUP) - ev.events |= EPOLLRDHUP; - #endif - } - if (dev->dev_capa & STIO_DEV_CAPA_PRI) - { - ev.events |= EPOLLPRI; - #if defined(EPOLLRDHUP) - ev.events |= EPOLLRDHUP; - #endif - } - } - - if (events & STIO_DEV_EVENT_OUT) - { - if (dev->dev_capa & STIO_DEV_CAPA_OUT) ev.events |= EPOLLOUT; - } + int dev_capa; ev.data.ptr = dev; switch (cmd) { case STIO_DEV_WATCH_START: + /* upon start, only input watching is requested */ + events = STIO_DEV_EVENT_IN; epoll_op = EPOLL_CTL_ADD; break; + case STIO_DEV_WATCH_RENEW: + /* auto-renwal mode. input watching is requested all the time. + * output watching is requested only if there're enqueued + * data for writing. */ + events = STIO_DEV_EVENT_IN; + if (!STIO_WQ_ISEMPTY(&dev->wq)) events |= STIO_DEV_EVENT_OUT; case STIO_DEV_WATCH_UPDATE: + /* honor event watching requests as given by the caller */ epoll_op = EPOLL_CTL_MOD; break; case STIO_DEV_WATCH_STOP: + events = 0; /* override events */ epoll_op = EPOLL_CTL_DEL; break; @@ -580,23 +630,76 @@ int stio_dev_watch (stio_dev_t* dev, stio_dev_watch_cmd_t cmd, int events) return -1; } - if (epoll_ctl (dev->stio->mux, epoll_op, dev->dev_mth->getsyshnd(dev), &ev) == -1) + dev_capa = dev->dev_capa; + dev_capa &= ~(DEV_CAPA_ALL_WATCHED); + + /* this function honors STIO_DEV_EVENT_IN and STIO_DEV_EVENT_OUT only + * as valid input event bits. it intends to provide simple abstraction + * by reducing the variety of event bits that the caller has to handle. */ + ev.events = EPOLLHUP | EPOLLERR /*| EPOLLET*/; + + if ((events & STIO_DEV_EVENT_IN) && !(dev->dev_capa & (STIO_DEV_CAPA_IN_CLOSED | STIO_DEV_CAPA_IN_DISABLED))) { - dev->stio->errnum = stio_syserrtoerrnum(errno); - return -1; + if (dev->dev_capa & STIO_DEV_CAPA_IN) + { + ev.events |= EPOLLIN; + #if defined(EPOLLRDHUP) + ev.events |= EPOLLRDHUP; + #endif + if (dev->dev_capa & STIO_DEV_CAPA_PRI) + { + ev.events |= EPOLLPRI; + dev_capa |= STIO_DEV_CAPA_PRI_WATCHED; + } + + dev_capa |= STIO_DEV_CAPA_IN_WATCHED; + } } + if ((events & STIO_DEV_EVENT_OUT) && !(dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED)) + { + if (dev->dev_capa & STIO_DEV_CAPA_OUT) + { + ev.events |= EPOLLOUT; + dev_capa |= STIO_DEV_CAPA_OUT_WATCHED; + } + } + + if (epoll_op == EPOLL_CTL_MOD && (dev_capa & DEV_CAPA_ALL_WATCHED) == (dev->dev_capa & DEV_CAPA_ALL_WATCHED)) + { + /* skip calling epoll_ctl */ + } + else + { + if (epoll_ctl (dev->stio->mux, epoll_op, dev->dev_mth->getsyshnd(dev), &ev) == -1) + { + dev->stio->errnum = stio_syserrtoerrnum(errno); + return -1; + } + } + + dev->dev_capa = dev_capa; return 0; } +void stio_dev_read (stio_dev_t* dev, int enabled) +{ + if (enabled) + dev->dev_capa &= ~STIO_DEV_CAPA_IN_DISABLED; + else + dev->dev_capa |= STIO_DEV_CAPA_IN_DISABLED; + + dev->stio->renew_watch = 1; +} + int stio_dev_write (stio_dev_t* dev, const void* data, stio_len_t len, void* wrctx) { const stio_uint8_t* uptr; stio_len_t urem, ulen; stio_wq_t* q; - int x, wq_empty; + int x; - if (!(dev->dev_capa & STIO_DEV_CAPA_OUT)) + if (dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED) { dev->stio->errnum = STIO_ENOCAPA; return -1; @@ -605,27 +708,49 @@ int stio_dev_write (stio_dev_t* dev, const void* data, stio_len_t len, void* wrc uptr = data; urem = len; - wq_empty = STIO_WQ_ISEMPTY(&dev->wq); - if (!wq_empty) goto enqueue_data; + if (!STIO_WQ_ISEMPTY(&dev->wq)) + { + /* the writing queue is not empty. + * enqueue this request immediately */ + goto enqueue_data; + } - while (urem > 0) + /* use the do..while() loop to be able to send a zero-length data */ + do { ulen = urem; x = dev->dev_mth->write (dev, data, &ulen); if (x <= -1) return -1; - else if (x == 0) goto enqueue_data; /* enqueue remaining data */ + else if (x == 0) + { + /* [NOTE] + * the write queue is empty at this moment. a zero-length + * request for a stream device can still get enqueued if the + * write callback returns 0 though i can't figure out if there + * is a compelling reason to do so + */ + goto enqueue_data; /* enqueue remaining data */ + } else { urem -= ulen; uptr += ulen; } } + while (urem > 0); + + if (len <= 0 && (dev->dev_capa & STIO_DEV_CAPA_STREAM)) + { + /* a zero-length writing request is to close the writing end */ + dev->dev_capa |= STIO_DEV_CAPA_OUT_CLOSED; + } dev->dev_evcb->on_write (dev, wrctx); return 0; enqueue_data: /* queue the remaining data*/ +printf ("ENQUEING DATA...\n"); q = (stio_wq_t*)STIO_MMGR_ALLOC (dev->stio->mmgr, STIO_SIZEOF(*q) + urem); if (!q) { @@ -639,15 +764,20 @@ enqueue_data: STIO_MEMCPY (q->ptr, uptr, urem); STIO_WQ_ENQ (&dev->wq, q); - if (wq_empty) + + dev->stio->renew_watch = 1; +#if 0 + if (!(dev->dev_capa & STIO_DEV_CAPA_OUT_WATCHED)) { - if (stio_dev_watch (dev, STIO_DEV_WATCH_UPDATE, STIO_DEV_EVENT_IN | STIO_DEV_EVENT_OUT) <= -1) + /* if output is not being watched, arrange to do so */ + if (stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0) <= -1) { STIO_WQ_UNLINK (q); /* unlink the ENQed item */ STIO_MMGR_FREE (dev->stio->mmgr, q); return -1; } } +#endif return 0; } diff --git a/stio/lib/stio.h b/stio/lib/stio.h index 4115ded..9051fdc 100644 --- a/stio/lib/stio.h +++ b/stio/lib/stio.h @@ -97,8 +97,7 @@ struct stio_dev_mth_t /* ------------------------------------------------------------------ */ /* return -1 on failure, 0 if no data is availble, 1 otherwise. * when returning 1, *len must be sent to the length of data read. - * if *len is set to 0, it's treated as EOF. - * it must not kill the device */ + * if *len is set to 0, it's treated as EOF. */ int (*read) (stio_dev_t* dev, void* data, stio_len_t* len); /* ------------------------------------------------------------------ */ @@ -113,12 +112,12 @@ struct stio_dev_evcb_t { /* return -1 on failure. 0 or 1 on success. * when 0 is returned, it doesn't attempt to perform actual I/O. - * when 1 is returned, it attempts to perform actual I/O. - * it must not kill the device */ + * when 1 is returned, it attempts to perform actual I/O. */ int (*ready) (stio_dev_t* dev, int events); - /* return -1 on failure, 0 on success - * it must not kill the device */ + /* return -1 on failure, 0 or 1 on success. + * when 0 is returned, the main loop stops the attempt to read more data. + * when 1 is returned, the main loop attempts to read more data without*/ int (*on_read) (stio_dev_t* dev, const void* data, stio_len_t len); /* return -1 on failure, 0 on success. @@ -126,14 +125,16 @@ struct stio_dev_evcb_t int (*on_write) (stio_dev_t* dev, void* wrctx); }; +typedef enum stio_wq_type_t stio_wq_type_t; + struct stio_wq_t { - stio_wq_t* next; - stio_wq_t* prev; + stio_wq_t* next; + stio_wq_t* prev; - stio_uint8_t* ptr; - stio_len_t len; - void* ctx; + stio_uint8_t* ptr; + stio_len_t len; + void* ctx; }; #define STIO_WQ_INIT(wq) ((wq)->next = (wq)->prev = (wq)) @@ -177,6 +178,7 @@ struct stio_wq_t #define STIO_DEV_HEADERS \ stio_t* stio; \ + stio_size_t dev_size; \ int dev_capa; \ stio_dev_mth_t* dev_mth; \ stio_dev_evcb_t* dev_evcb; \ @@ -191,13 +193,24 @@ struct stio_dev_t enum stio_dev_capa_t { - STIO_DEV_CAPA_IN = (1 << 0), - STIO_DEV_CAPA_OUT = (1 << 1), - STIO_DEV_CAPA_PRI = (1 << 2), - STIO_DEV_CAPA_STREAM = (1 << 3), + STIO_DEV_CAPA_IN = (1 << 0), + STIO_DEV_CAPA_OUT = (1 << 1), + + /* #STIO_DEV_CAPA_PRI is meaningful only if #STIO_DEV_CAPA_IN is set */ + STIO_DEV_CAPA_PRI = (1 << 2), + + /*STIO_DEV_CAPA_HALFOPEN = (1 << 3),*/ + STIO_DEV_CAPA_STREAM = (1 << 4), /* internal use only. never set this bit to the dev_capa field */ - STIO_DEV_CAPA_RUINED = (1 << 15) + STIO_DEV_CAPA_IN_DISABLED = (1 << 9), + STIO_DEV_CAPA_IN_CLOSED = (1 << 10), + STIO_DEV_CAPA_OUT_CLOSED = (1 << 11), + STIO_DEV_CAPA_IN_WATCHED = (1 << 12), + STIO_DEV_CAPA_OUT_WATCHED = (1 << 13), + STIO_DEV_CAPA_PRI_WATCHED = (1 << 14), /**< can be set only if STIO_DEV_CAPA_IN_WATCHED is set */ + + STIO_DEV_CAPA_HALTED = (1 << 15) }; typedef enum stio_dev_capa_t stio_dev_capa_t; @@ -205,6 +218,7 @@ enum stio_dev_watch_cmd_t { STIO_DEV_WATCH_START, STIO_DEV_WATCH_UPDATE, + STIO_DEV_WATCH_RENEW, /* automatic update */ STIO_DEV_WATCH_STOP }; typedef enum stio_dev_watch_cmd_t stio_dev_watch_cmd_t; @@ -287,11 +301,6 @@ STIO_EXPORT void stio_killdev ( stio_dev_t* dev ); -STIO_EXPORT void stio_ruindev ( - stio_t* stio, - stio_dev_t* dev -); - STIO_EXPORT int stio_dev_ioctl ( stio_dev_t* dev, int cmd, @@ -305,6 +314,11 @@ STIO_EXPORT int stio_dev_watch ( int events ); +STIO_EXPORT void stio_dev_read ( + stio_dev_t* dev, + int enabled +); + STIO_EXPORT int stio_dev_write ( stio_dev_t* dev, const void* data, @@ -312,6 +326,10 @@ STIO_EXPORT int stio_dev_write ( void* wrctx ); +STIO_EXPORT void stio_dev_halt ( + stio_dev_t* dev +); + #ifdef __cplusplus } #endif