diff --git a/stio/lib/main.c b/stio/lib/main.c index 471eb4b..3d9f40e 100644 --- a/stio/lib/main.c +++ b/stio/lib/main.c @@ -124,20 +124,28 @@ printf ("device accepted client device... .asdfjkasdfkljasdlfkjasdj...\n"); } -static int tcp_on_write (stio_dev_tcp_t* tcp, void* wrctx) +static int tcp_on_write (stio_dev_tcp_t* tcp, stio_len_t wrlen, void* wrctx) { tcp_server_t* ts; +if (wrlen <= -1) +{ +printf ("SEDING TIMED OUT...........\n"); + stio_dev_tcp_halt(tcp); +} +else +{ + + ts = (tcp_server_t*)(tcp + 1); - printf (">>> TCP SENT MESSAGE %d\n", ts->tally); + printf (">>> SENT MESSAGE %d of length %ld\n", ts->tally, (long int)wrlen); ts->tally++; // if (ts->tally >= 2) stio_dev_tcp_halt (tcp); - //stio_dev_tcp_read (tcp); - printf ("ENABLING READING..............................\n"); - stio_dev_read (tcp, 1); + stio_dev_tcp_read (tcp, 1); +} return 0; } @@ -153,12 +161,14 @@ static int tcp_on_read (stio_dev_tcp_t* tcp, const void* buf, stio_len_t len) printf ("on read %d\n", (int)len); +stio_ntime_t tmout; int n; static char a ='A'; char* xxx = malloc (1000000); memset (xxx, a++ ,1000000); //return stio_dev_tcp_write (tcp, "HELLO", 5, STIO_NULL); - n = stio_dev_tcp_write (tcp, xxx, 1000000, STIO_NULL); + stio_inittime (&tmout, 1, 0); + n = stio_dev_tcp_timedwrite (tcp, xxx, 1000000, &tmout, STIO_NULL); free (xxx); if (n <= -1) return -1; @@ -168,7 +178,7 @@ free (xxx); if (n <= -1) return -1; printf ("DISABLING READING..............................\n"); - stio_dev_read (tcp, 0); + stio_dev_tcp_read (tcp, 0); return 0; /* return 1; let the main loop to read more greedily without consulint the multiplexer */ @@ -243,12 +253,12 @@ int main () memset (&sin, 0, STIO_SIZEOF(sin)); sin.sin_family = AF_INET; sin.sin_port = htons(9999); - //inet_pton (sin.sin_family, "192.168.1.1", &sin.sin_addr); - inet_pton (sin.sin_family, "127.0.0.1", &sin.sin_addr); + inet_pton (sin.sin_family, "192.168.1.4", &sin.sin_addr); + //inet_pton (sin.sin_family, "127.0.0.1", &sin.sin_addr); memset (&tcp_conn, 0, STIO_SIZEOF(tcp_conn)); memcpy (&tcp_conn.addr, &sin, STIO_SIZEOF(sin)); - tcp_conn.timeout.sec = 5; + tcp_conn.tmout.sec = 5; tcp_conn.on_connect = tcp_on_connect; tcp_conn.on_disconnect = tcp_on_disconnect; if (stio_dev_tcp_connect (tcp[0], &tcp_conn) <= -1) diff --git a/stio/lib/stio-cmn.h b/stio/lib/stio-cmn.h index 2fb01ee..0da5d46 100644 --- a/stio/lib/stio-cmn.h +++ b/stio/lib/stio-cmn.h @@ -416,8 +416,8 @@ struct stio_mmgr_t # define STIO_INLINE inline # define STIO_HAVE_INLINE #elif defined(__GNUC__) && defined(__GNUC_GNU_INLINE__) - /* gcc disables inline when -std=c89 or -ansi is used. - * so use __inline__ supported by gcc regardless of the options */ + /* gcc disables inline when -std=c89 or -ansi is used. + * so use __inline__ supported by gcc regardless of the options */ # define STIO_INLINE /*extern*/ __inline__ # define STIO_HAVE_INLINE #else @@ -426,8 +426,6 @@ struct stio_mmgr_t #endif - - /** * The STIO_TYPE_IS_SIGNED() macro determines if a type is signed. * \code diff --git a/stio/lib/stio-pro.c b/stio/lib/stio-pro.c index 8fadd5c..606db29 100644 --- a/stio/lib/stio-pro.c +++ b/stio/lib/stio-pro.c @@ -233,12 +233,13 @@ void stio_dev_pro_kill (stio_dev_pro_t* pro) stio_killdev (pro->stio, (stio_dev_t*)pro); } +#if 0 int stio_dev_pro_write (stio_dev_pro_t* pro, const void* data, stio_len_t len, void* wrctx) { return stio_dev_write ((stio_dev_t*)pro, data, len, wrctx); } -#if 0 + stio_dev_pro_t* stio_dev_pro_getdev (stio_dev_pro_t* pro, stio_dev_pro_type_t type) { switch (type) diff --git a/stio/lib/stio-prv.h b/stio/lib/stio-prv.h index 315994c..01fce37 100644 --- a/stio/lib/stio-prv.h +++ b/stio/lib/stio-prv.h @@ -73,7 +73,9 @@ struct stio_t stio_dev_t* hdev; /* halted device list - singly linked list */ stio_uint8_t bigbuf[65535]; /* TODO: make this dynamic depending on devices added. device may indicate a buffer size required??? */ - int renew_watch; + + unsigned int renew_watch: 1; + unsigned int in_exec: 1; struct { @@ -144,15 +146,6 @@ struct stio_t #define STIO_SEC_TO_USEC(sec) ((sec) * STIO_USECS_PER_SEC) #define STIO_USEC_TO_SEC(usec) ((usec) / STIO_USECS_PER_SEC) -#define stio_inittime(x,s,ns) (((x)->sec = (s)), ((x)->nsec = (ns))) -#define stio_cleartime(x) stio_inittime(x,0,0) -/*#define stio_cleartime(x) ((x)->sec = (x)->nsec = 0)*/ -#define stio_cmptime(x,y) \ - (((x)->sec == (y)->sec)? ((x)->nsec - (y)->nsec): \ - ((x)->sec - (y)->sec)) - -#define stio_iszerotime(x) ((x)->sec == 0 && (x)->nsec == 0) - #ifdef __cplusplus extern "C" { @@ -168,52 +161,6 @@ stio_errnum_t stio_syserrtoerrnum ( ); -/** - * The stio_gettime() function gets the current time. - */ -STIO_EXPORT void stio_gettime ( - stio_ntime_t* nt -); - -/** - * The stio_addtime() function adds x and y and stores the result in z - */ -STIO_EXPORT void stio_addtime ( - const stio_ntime_t* x, - const stio_ntime_t* y, - stio_ntime_t* z -); - -/** - * The stio_subtime() function subtract y from x and stores the result in z. - */ -STIO_EXPORT void stio_subtime ( - const stio_ntime_t* x, - const stio_ntime_t* y, - stio_ntime_t* z -); - -/** - * The stio_instmrjob() function schedules a new event. - * - * \return #STIO_TMRIDX_INVALID on failure, valid index on success. - */ - -stio_tmridx_t stio_instmrjob ( - stio_t* stio, - const stio_tmrjob_t* job -); - -stio_tmridx_t stio_updtmrjob ( - stio_t* stio, - stio_tmridx_t index, - const stio_tmrjob_t* job -); - -void stio_deltmrjob ( - stio_t* stio, - stio_tmridx_t index -); void stio_cleartmrjobs ( stio_t* stio @@ -221,24 +168,17 @@ void stio_cleartmrjobs ( void stio_firetmrjobs ( stio_t* stio, - const stio_ntime_t* tm, + const stio_ntime_t* tmbase, stio_size_t* firecnt ); + int stio_gettmrtmout ( stio_t* stio, - const stio_ntime_t* tm, + const stio_ntime_t* tmbase, stio_ntime_t* tmout ); -/** - * The stio_gettmrjob() function returns the - * pointer to the registered event at the given index. - */ -stio_tmrjob_t* stio_gettmrjob ( - stio_t* stio, - stio_tmridx_t index -); #ifdef __cplusplus } #endif diff --git a/stio/lib/stio-tcp.c b/stio/lib/stio-tcp.c index 57ab776..47967f8 100644 --- a/stio/lib/stio-tcp.c +++ b/stio/lib/stio-tcp.c @@ -51,7 +51,6 @@ static int tcp_make (stio_dev_t* dev, void* ctx) setsockopt (udp->sck, SOL_SOCKET, SO_REUSEADDR, ...); TRANSPARENT, ETC. */ - iv = 1; if (setsockopt (tcp->sck, SOL_SOCKET, SO_REUSEADDR, &iv, STIO_SIZEOF(iv)) == -1 || bind (tcp->sck, (struct sockaddr*)&arg->addr, len) == -1) @@ -255,12 +254,12 @@ static int tcp_ioctl (stio_dev_t* dev, int cmd, void* arg) { stio_tmrjob_t tmrjob; - if (!stio_iszerotime(&conn->timeout)) + if (!stio_isnegtime(&conn->tmout)) { STIO_MEMSET (&tmrjob, 0, STIO_SIZEOF(tmrjob)); tmrjob.ctx = tcp; stio_gettime (&tmrjob.when); - stio_addtime (&tmrjob.when, &conn->timeout, &tmrjob.when); + stio_addtime (&tmrjob.when, &conn->tmout, &tmrjob.when); tmrjob.handler = tmr_connect_handle; #if defined(STIO_USE_TMRJOB_IDXPTR) tmrjob.idxptr = &tcp->tmridx_connect; @@ -273,7 +272,8 @@ static int tcp_ioctl (stio_dev_t* dev, int cmd, void* arg) if (tcp->tmridx_connect == STIO_TMRIDX_INVALID) { stio_dev_watch ((stio_dev_t*)tcp, STIO_DEV_WATCH_UPDATE, STIO_DEV_EVENT_IN); - /* event manipulation failure can't be handled properly. so ignore it */ + /* event manipulation failure can't be handled properly. so ignore it. + * anyway, it's already in a failure condition */ return -1; } } @@ -517,10 +517,10 @@ static int tcp_on_read (stio_dev_t* dev, const void* data, stio_len_t len) return tcp->on_read (tcp, data, len); } -static int tcp_on_write (stio_dev_t* dev, void* wrctx) +static int tcp_on_write (stio_dev_t* dev, stio_len_t wrlen, void* wrctx) { stio_dev_tcp_t* tcp = (stio_dev_tcp_t*)dev; - return tcp->on_write (tcp, wrctx); + return tcp->on_write (tcp, wrlen, wrctx); } static stio_dev_evcb_t tcp_evcb = @@ -550,12 +550,3 @@ int stio_dev_tcp_listen (stio_dev_tcp_t* tcp, stio_dev_tcp_listen_t* lstn) return stio_dev_ioctl ((stio_dev_t*)tcp, STIO_DEV_TCP_LISTEN, lstn); } -int stio_dev_tcp_write (stio_dev_tcp_t* tcp, const void* data, stio_len_t len, void* wrctx) -{ - return stio_dev_write ((stio_dev_t*)tcp, data, len, wrctx); -} - -int stio_dev_tcp_halt (stio_dev_tcp_t* tcp) -{ - stio_dev_halt ((stio_dev_t*)tcp); -} diff --git a/stio/lib/stio-tcp.h b/stio/lib/stio-tcp.h index f3fec0b..5b64839 100644 --- a/stio/lib/stio-tcp.h +++ b/stio/lib/stio-tcp.h @@ -54,7 +54,7 @@ typedef void (*stio_dev_tcp_on_accepted_t) (stio_dev_tcp_t* dev, stio_dev_tcp_t* typedef void (*stio_dev_tcp_on_disconnect_t) (stio_dev_tcp_t* dev); typedef int (*stio_dev_tcp_on_read_t) (stio_dev_tcp_t* dev, const void* data, stio_len_t len); -typedef int (*stio_dev_tcp_on_write_t) (stio_dev_tcp_t* dev, void* wrctx); +typedef int (*stio_dev_tcp_on_write_t) (stio_dev_tcp_t* dev, stio_len_t wrlen, void* wrctx); struct stio_dev_tcp_t { @@ -104,7 +104,7 @@ typedef struct stio_dev_tcp_connect_t stio_dev_tcp_connect_t; struct stio_dev_tcp_connect_t { stio_sckadr_t addr; - stio_ntime_t timeout; /* connect timeout */ + stio_ntime_t tmout; /* connect timeout */ stio_dev_tcp_on_connect_t on_connect; stio_dev_tcp_on_disconnect_t on_disconnect; }; @@ -149,16 +149,37 @@ STIO_EXPORT int stio_dev_tcp_listen ( stio_dev_tcp_listen_t* lstn ); -STIO_EXPORT int stio_dev_tcp_write ( - stio_dev_tcp_t* tcp, - const void* data, - stio_len_t len, - void* wrctx -); -STIO_EXPORT int stio_dev_tcp_halt ( - stio_dev_tcp_t* tcp -); +#if defined(STIO_HAVE_INLINE) + +static STIO_INLINE int stio_dev_tcp_read (stio_dev_tcp_t* tcp, int enabled) +{ + return stio_dev_read ((stio_dev_t*)tcp, enabled); +} + +static STIO_INLINE int stio_dev_tcp_write (stio_dev_tcp_t* tcp, const void* data, stio_len_t len, void* wrctx) +{ + return stio_dev_write ((stio_dev_t*)tcp, data, len, wrctx); +} + +static STIO_INLINE int stio_dev_tcp_timedwrite (stio_dev_tcp_t* tcp, const void* data, stio_len_t len, const stio_ntime_t* tmout, void* wrctx) +{ + return stio_dev_timedwrite ((stio_dev_t*)tcp, data, len, tmout, wrctx); +} + + +static STIO_INLINE void stio_dev_tcp_halt (stio_dev_tcp_t* tcp) +{ + stio_dev_halt ((stio_dev_t*)tcp); +} +#else + +#define stio_dev_tcp_read(tcp,enabled) stio_dev_read((stio_dev_t*)tcp, enabled) +#define stio_dev_tcp_write(tcp,data,len,wrctx) stio_dev_write((stio_dev_t*)tcp, data, len, wrctx) +#define stio_dev_tcp_timedwrite(tcp,data,len,tmout,wrctx) stio_dev_timedwrite((stio_dev_t*)tcp, data, len, tmout, wrctx) +#define stio_dev_tcp_halt(tcp) stio_dev_halt((stio_dev_t*)tcp) + +#endif #ifdef __cplusplus } diff --git a/stio/lib/stio-udp.c b/stio/lib/stio-udp.c index 544fc35..119417c 100644 --- a/stio/lib/stio-udp.c +++ b/stio/lib/stio-udp.c @@ -167,7 +167,7 @@ printf ("dATA received %d bytes\n", (int)len); } -static int udp_on_write (stio_dev_t* dev, void* msgid) +static int udp_on_write (stio_dev_t* dev, stio_len_t wrlen, void* wrctx) { return 0; @@ -184,8 +184,3 @@ stio_dev_udp_t* stio_dev_udp_make (stio_t* stio, stio_size_t xtnsize, const stio { return (stio_dev_udp_t*)stio_makedev (stio, STIO_SIZEOF(stio_dev_udp_t) + xtnsize, &udp_mth, &udp_evcb, (void*)data); } - -void stio_dev_udp_halt (stio_dev_udp_t* udp) -{ - stio_dev_halt ((stio_dev_t*)udp); -} diff --git a/stio/lib/stio-udp.h b/stio/lib/stio-udp.h index 1d8d16b..1bc5e6c 100644 --- a/stio/lib/stio-udp.h +++ b/stio/lib/stio-udp.h @@ -66,10 +66,36 @@ STIO_EXPORT stio_dev_udp_t* stio_dev_udp_make ( const stio_dev_udp_make_t* data ); -STIO_EXPORT void stio_dev_udp_halt ( - stio_dev_udp_t* udp -); +#if defined(STIO_HAVE_INLINE) + +static STIO_INLINE int stio_dev_udp_read (stio_dev_udp_t* udp, int enabled) +{ + return stio_dev_read ((stio_dev_t*)udp, enabled); +} + +static STIO_INLINE int stio_dev_udp_write (stio_dev_udp_t* udp, const void* data, stio_len_t len, void* wrctx) +{ + return stio_dev_write ((stio_dev_t*)udp, data, len, wrctx); +} + +static STIO_INLINE int stio_dev_udp_timedwrite (stio_dev_udp_t* udp, const void* data, stio_len_t len, const stio_ntime_t* tmout, void* wrctx) +{ + return stio_dev_timedwrite ((stio_dev_t*)udp, data, len, tmout, wrctx); +} + +static STIO_INLINE void stio_dev_udp_halt (stio_dev_udp_t* udp) +{ + stio_dev_halt ((stio_dev_t*)udp); +} +#else + +#define stio_dev_udp_read(udp,enabled) stio_dev_read((stio_dev_t*)udp, enabled) +#define stio_dev_udp_write(udp,data,len,wrctx) stio_dev_write((stio_dev_t*)udp, data, len, wrctx) +#define stio_dev_udp_timedwrite(udp,data,len,tmout,wrctx) stio_dev_timedwrite((stio_dev_t*)udp, data, len, tmout, wrctx) +#define stio_dev_udp_halt(udp) stio_dev_halt((stio_dev_t*)udp) + +#endif #ifdef __cplusplus } #endif diff --git a/stio/lib/stio.c b/stio/lib/stio.c index 22a9f87..a4bf1a9 100644 --- a/stio/lib/stio.c +++ b/stio/lib/stio.c @@ -117,12 +117,25 @@ void stio_epilogue (stio_t* stio) /* TODO: */ } -STIO_INLINE static void handle_event (stio_t* stio, stio_size_t i) +static STIO_INLINE void unlink_wq (stio_t* stio, stio_wq_t* q) +{ + if (q->tmridx != STIO_TMRIDX_INVALID) + { + stio_deltmrjob (stio, q->tmridx); + STIO_ASSERT (q->tmridx == STIO_TMRIDX_INVALID); + } + STIO_WQ_UNLINK (q); +} + +static STIO_INLINE void handle_event (stio_t* stio, stio_size_t i) { stio_dev_t* dev; + stio->renew_watch = 0; dev = stio->revs[i].data.ptr; + STIO_ASSERT (stio == dev->stio); + if (dev->dev_evcb->ready) { int x, events = 0; @@ -153,7 +166,7 @@ STIO_INLINE static void handle_event (stio_t* stio, stio_size_t i) { /* urgent data */ /* TODO: urgent data.... */ - /*x = dev->dev_mth->urgrecv (dev, stio->bugbuf, &len);*/ + /*x = dev->dev_mth->urgread (dev, stio->bugbuf, &len);*/ printf ("has urgent data...\n"); } @@ -207,9 +220,9 @@ printf ("has urgent data...\n"); out_closed = 1; } - STIO_WQ_UNLINK (q); /* STIO_WQ_DEQ(&dev->wq); */ - y = dev->dev_evcb->on_write (dev, q->ctx); - STIO_MMGR_FREE (dev->stio->mmgr, q); + unlink_wq (stio, q); + y = dev->dev_evcb->on_write (dev, q->olen, q->ctx); + STIO_MMGR_FREE (stio->mmgr, q); if (y <= -1) { @@ -225,7 +238,7 @@ printf ("has urgent data...\n"); while (!STIO_WQ_ISEMPTY(&dev->wq)) { q = STIO_WQ_HEAD(&dev->wq); - STIO_WQ_UNLINK (q); + unlink_wq (stio, q); STIO_MMGR_FREE (dev->stio->mmgr, q); } break; @@ -321,28 +334,39 @@ printf ("has urgent data...\n"); } } - if (dev && stio->revs[i].events & (EPOLLERR | EPOLLHUP)) - { - /* if error or hangup has been reported on the device, - * halt the device. this check is performed after - * EPOLLIN or EPOLLOUT check because EPOLLERR or EPOLLHUP - * can be set together with EPOLLIN or EPOLLOUT. */ - dev->dev_capa |= STIO_DEV_CAPA_IN_CLOSED | STIO_DEV_CAPA_OUT_CLOSED; - stio->renew_watch = 1; - } -#if defined(EPOLLRDHUP) - else if (dev && stio->revs[i].events & EPOLLRDHUP) + if (dev) { + if (stio->revs[i].events & (EPOLLERR | EPOLLHUP)) + { + /* if error or hangup has been reported on the device, + * halt the device. this check is performed after + * EPOLLIN or EPOLLOUT check because EPOLLERR or EPOLLHUP + * can be set together with EPOLLIN or EPOLLOUT. */ + dev->dev_capa |= STIO_DEV_CAPA_IN_CLOSED | STIO_DEV_CAPA_OUT_CLOSED; + stio->renew_watch = 1; + } + #if defined(EPOLLRDHUP) + else if (dev && stio->revs[i].events & EPOLLRDHUP) + { + if (stio->revs[i].events & (EPOLLIN | EPOLLOUT | EPOLLPRI)) + { + /* it may be a half-open state. don't do anything here + * to let the next read detect EOF */ + } + else + { + dev->dev_capa |= STIO_DEV_CAPA_IN_CLOSED | STIO_DEV_CAPA_OUT_CLOSED; + stio->renew_watch = 1; + } + } + #endif - } -#endif - - if (dev && - (dev->dev_capa & STIO_DEV_CAPA_IN_CLOSED) && - (dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED)) - { - stio_dev_halt (dev); - dev = STIO_NULL; + if ((dev->dev_capa & STIO_DEV_CAPA_IN_CLOSED) && + (dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED)) + { + stio_dev_halt (dev); + dev = STIO_NULL; + } } skip_evcb: @@ -353,7 +377,7 @@ skip_evcb: } } -int stio_exec (stio_t* stio) +static STIO_INLINE int __exec (stio_t* stio) { stio_ntime_t tmout; @@ -403,7 +427,6 @@ int stio_exec (stio_t* stio) for (i = 0; i < nentries; i++) { - stio->renew_watch = 0; handle_event (stio, i); } @@ -421,6 +444,17 @@ int stio_exec (stio_t* stio) return 0; } +int stio_exec (stio_t* stio) +{ + int n; + + stio->in_exec = 1; + n = __exec (stio); + stio->in_exec = 0; + + return n; +} + void stio_stop (stio_t* stio) { stio->stopreq = 1; @@ -435,17 +469,6 @@ int stio_loop (stio_t* stio) if (stio_prologue (stio) <= -1) return -1; - if (stio->renew_watch) - { -printf (">>> GLOBAL WATCHIN RENEWAL....\n"); - stio_dev_t* dev; - for (dev = stio->dev.head; dev; dev = dev->dev_next) - { - stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0); - } - stio->renew_watch = 0; - } - while (!stio->stopreq && stio->dev.head) { if (stio_exec (stio) <= -1) break; @@ -529,12 +552,10 @@ void stio_killdev (stio_t* stio, stio_dev_t* dev) /* clear pending send requests */ while (!STIO_WQ_ISEMPTY(&dev->wq)) { - stio_wq_t* wq; - - wq = STIO_WQ_HEAD(&dev->wq); - STIO_WQ_DEQ (&dev->wq); - - STIO_MMGR_FREE (stio->mmgr, wq); + stio_wq_t* q; + q = STIO_WQ_HEAD(&dev->wq); + unlink_wq (stio, q); + STIO_MMGR_FREE (stio->mmgr, q); } /* delink the dev object */ @@ -682,17 +703,53 @@ int stio_dev_watch (stio_dev_t* dev, stio_dev_watch_cmd_t cmd, int events) return 0; } -void stio_dev_read (stio_dev_t* dev, int enabled) +int stio_dev_read (stio_dev_t* dev, int enabled) { + if (dev->dev_capa & STIO_DEV_CAPA_IN_CLOSED) + { + dev->stio->errnum = STIO_ENOCAPA; + return -1; + } + if (enabled) + { dev->dev_capa &= ~STIO_DEV_CAPA_IN_DISABLED; + if (!dev->stio->in_exec && (dev->dev_capa & STIO_DEV_CAPA_IN_WATCHED)) goto renew_watch_now; + } else + { dev->dev_capa |= STIO_DEV_CAPA_IN_DISABLED; + if (!dev->stio->in_exec && !(dev->dev_capa & STIO_DEV_CAPA_IN_WATCHED)) goto renew_watch_now; + } dev->stio->renew_watch = 1; + return 0; + +renew_watch_now: + if (stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0) <= -1) return -1; + return 0; } -int stio_dev_write (stio_dev_t* dev, const void* data, stio_len_t len, void* wrctx) +static void on_write_timeout (stio_t* stio, const stio_ntime_t* now, stio_tmrjob_t* job) +{ + stio_wq_t* q; + stio_dev_t* dev; + int x; + + q = (stio_wq_t*)job->ctx; + dev = q->dev; + + dev->stio->errnum = STIO_ETMOUT; + x = dev->dev_evcb->on_write (dev, -1, q->ctx); + + STIO_ASSERT (q->tmridx == STIO_TMRIDX_INVALID); + STIO_WQ_UNLINK(q); + STIO_MMGR_FREE (stio->mmgr, q); + + if (x <= -1) stio_dev_halt (dev); +} + +static int __dev_write (stio_dev_t* dev, const void* data, stio_len_t len, const stio_ntime_t* tmout, void* wrctx) { const stio_uint8_t* uptr; stio_len_t urem, ulen; @@ -715,42 +772,55 @@ int stio_dev_write (stio_dev_t* dev, const void* data, stio_len_t len, void* wrc goto enqueue_data; } - /* use the do..while() loop to be able to send a zero-length data */ - do + if (dev->dev_capa & STIO_DEV_CAPA_STREAM) + { + /* use the do..while() loop to be able to send a zero-length data */ + do + { + ulen = urem; + x = dev->dev_mth->write (dev, data, &ulen); + if (x <= -1) return -1; + else if (x == 0) + { + /* [NOTE] + * the write queue is empty at this moment. a zero-length + * request for a stream device can still get enqueued if the + * write callback returns 0 though i can't figure out if there + * is a compelling reason to do so + */ + goto enqueue_data; /* enqueue remaining data */ + } + else + { + urem -= ulen; + uptr += ulen; + } + } + while (urem > 0); + + if (len <= 0) /* original length */ + { + /* a zero-length writing request is to close the writing end */ + dev->dev_capa |= STIO_DEV_CAPA_OUT_CLOSED; + } + + if (dev->dev_evcb->on_write (dev, len, wrctx) <= -1) return -1; + } + else { ulen = urem; x = dev->dev_mth->write (dev, data, &ulen); if (x <= -1) return -1; - else if (x == 0) - { - /* [NOTE] - * the write queue is empty at this moment. a zero-length - * request for a stream device can still get enqueued if the - * write callback returns 0 though i can't figure out if there - * is a compelling reason to do so - */ - goto enqueue_data; /* enqueue remaining data */ - } - else - { - urem -= ulen; - uptr += ulen; - } - } - while (urem > 0); + else if (x == 0) goto enqueue_data; - if (len <= 0 && (dev->dev_capa & STIO_DEV_CAPA_STREAM)) - { - /* a zero-length writing request is to close the writing end */ - dev->dev_capa |= STIO_DEV_CAPA_OUT_CLOSED; + /* partial writing is still considered ok for a non-stream device */ + if (dev->dev_evcb->on_write (dev, ulen, wrctx) <= -1) return -1; } - dev->dev_evcb->on_write (dev, wrctx); - return 0; + return 1; /* written immediately and called on_write callback */ enqueue_data: /* queue the remaining data*/ -printf ("ENQUEING DATA...\n"); q = (stio_wq_t*)STIO_MMGR_ALLOC (dev->stio->mmgr, STIO_SIZEOF(*q) + urem); if (!q) { @@ -758,28 +828,60 @@ printf ("ENQUEING DATA...\n"); return -1; } + q->tmridx = STIO_TMRIDX_INVALID; + q->dev = dev; q->ctx = wrctx; q->ptr = (stio_uint8_t*)(q + 1); q->len = urem; + q->olen = len; STIO_MEMCPY (q->ptr, uptr, urem); - STIO_WQ_ENQ (&dev->wq, q); - - dev->stio->renew_watch = 1; -#if 0 - if (!(dev->dev_capa & STIO_DEV_CAPA_OUT_WATCHED)) + if (tmout && !stio_isnegtime(tmout)) { - /* if output is not being watched, arrange to do so */ - if (stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0) <= -1) + stio_tmrjob_t tmrjob; + + STIO_MEMSET (&tmrjob, 0, STIO_SIZEOF(tmrjob)); + tmrjob.ctx = q; + stio_gettime (&tmrjob.when); + stio_addtime (&tmrjob.when, tmout, &tmrjob.when); + tmrjob.handler = on_write_timeout; + tmrjob.idxptr = &q->tmridx; + + q->tmridx = stio_instmrjob (dev->stio, &tmrjob); + if (q->tmridx == STIO_TMRIDX_INVALID) { - STIO_WQ_UNLINK (q); /* unlink the ENQed item */ STIO_MMGR_FREE (dev->stio->mmgr, q); return -1; } } -#endif - return 0; + STIO_WQ_ENQ (&dev->wq, q); + if (!dev->stio->in_exec && !(dev->dev_capa & STIO_DEV_CAPA_OUT_WATCHED)) + { + /* if output is not being watched, arrange to do so */ + if (stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0) <= -1) + { + unlink_wq (dev->stio, q); + STIO_MMGR_FREE (dev->stio->mmgr, q); + return -1; + } + } + else + { + dev->stio->renew_watch = 1; + } + + return 0; /* request pused to a write queue. */ +} + +int stio_dev_write (stio_dev_t* dev, const void* data, stio_len_t len, void* wrctx) +{ + return __dev_write (dev, data, len, STIO_NULL, wrctx); +} + +int stio_dev_timedwrite (stio_dev_t* dev, const void* data, stio_len_t len, const stio_ntime_t* tmout, void* wrctx) +{ + return __dev_write (dev, data, len, tmout, wrctx); } int stio_makesyshndasync (stio_t* stio, stio_syshnd_t hnd) diff --git a/stio/lib/stio.h b/stio/lib/stio.h index 9051fdc..f86ca46 100644 --- a/stio/lib/stio.h +++ b/stio/lib/stio.h @@ -48,7 +48,6 @@ struct stio_ntime_t #define STIO_SYSHND_INVALID (-1) #endif - /* ------------------------------------------------------------------------- */ typedef struct stio_t stio_t; typedef struct stio_dev_t stio_dev_t; @@ -56,8 +55,7 @@ typedef struct stio_dev_mth_t stio_dev_mth_t; typedef struct stio_dev_evcb_t stio_dev_evcb_t; typedef struct stio_wq_t stio_wq_t; -typedef unsigned int stio_len_t; /* TODO: remove it? */ - +typedef stio_intptr_t stio_len_t; /* NOTE: this is a signed type */ enum stio_errnum_t { @@ -65,12 +63,13 @@ enum stio_errnum_t STIO_ENOMEM, STIO_EINVAL, STIO_ENOENT, - STIO_ENOSUP, /* not supported */ - STIO_EMFILE, + STIO_ENOSUP, /* not supported */ + STIO_EMFILE, /* too many open files */ STIO_ENFILE, - STIO_ECONRF, /* connection refused */ - STIO_ECONRS, /* connection reset */ - STIO_ENOCAPA, /* no capability */ + STIO_ECONRF, /* connection refused */ + STIO_ECONRS, /* connection reset */ + STIO_ENOCAPA, /* no capability */ + STIO_ETMOUT, /* timed out */ STIO_EDEVMAKE, STIO_EDEVERR, @@ -81,6 +80,21 @@ enum stio_errnum_t typedef enum stio_errnum_t stio_errnum_t; +typedef struct stio_tmrjob_t stio_tmrjob_t; +typedef stio_size_t stio_tmridx_t; + +typedef void (*stio_tmrjob_handler_t) ( + stio_t* stio, + const stio_ntime_t* now, + stio_tmrjob_t* tmrjob +); + +typedef void (*stio_tmrjob_updater_t) ( + stio_t* stio, + stio_tmridx_t old_index, + stio_tmridx_t new_index, + stio_tmrjob_t* tmrjob +); struct stio_dev_mth_t { @@ -121,20 +135,24 @@ struct stio_dev_evcb_t int (*on_read) (stio_dev_t* dev, const void* data, stio_len_t len); /* return -1 on failure, 0 on success. - * it must not kill the device */ - int (*on_write) (stio_dev_t* dev, void* wrctx); + * wrlen is the length of data written. it is the length of the originally + * posted writing request for a stream device. For a non stream device, it + * may be shorter than the originally posted length. */ + int (*on_write) (stio_dev_t* dev, stio_len_t wrlen, void* wrctx); }; -typedef enum stio_wq_type_t stio_wq_type_t; - struct stio_wq_t { stio_wq_t* next; stio_wq_t* prev; + stio_len_t olen; /* original length */ stio_uint8_t* ptr; stio_len_t len; void* ctx; + stio_dev_t* dev; /* back-pointer to the device */ + + stio_tmridx_t tmridx; }; #define STIO_WQ_INIT(wq) ((wq)->next = (wq)->prev = (wq)) @@ -235,22 +253,6 @@ enum stio_dev_event_t typedef enum stio_dev_event_t stio_dev_event_t; -typedef struct stio_tmrjob_t stio_tmrjob_t; -typedef stio_size_t stio_tmridx_t; - -typedef void (*stio_tmrjob_handler_t) ( - stio_t* stio, - const stio_ntime_t* now, - stio_tmrjob_t* tmrjob -); - -typedef void (*stio_tmrjob_updater_t) ( - stio_t* stio, - stio_tmridx_t old_index, - stio_tmridx_t new_index, - stio_tmrjob_t* tmrjob -); - #ifdef __cplusplus extern "C" { #endif @@ -314,22 +316,109 @@ STIO_EXPORT int stio_dev_watch ( int events ); -STIO_EXPORT void stio_dev_read ( +STIO_EXPORT int stio_dev_read ( stio_dev_t* dev, int enabled ); +/** + * The stio_dev_write() function posts a writing request. + * It attempts to write data immediately if there is no pending requests. + * If writing fails, it returns -1. If writing succeeds, it calls the + * on_write callback. If the callback fails, it returns -1. If the callback + * succeeds, it returns 1. If no immediate writing is possible, the request + * is enqueued to a pending request list. If enqueing gets successful, + * it returns 0. otherwise it returns -1. + */ STIO_EXPORT int stio_dev_write ( - stio_dev_t* dev, - const void* data, - stio_len_t len, - void* wrctx + stio_dev_t* dev, + const void* data, + stio_len_t len, + void* wrctx +); + + +STIO_EXPORT int stio_dev_timedwrite ( + stio_dev_t* dev, + const void* data, + stio_len_t len, + const stio_ntime_t* tmout, + void* wrctx ); STIO_EXPORT void stio_dev_halt ( stio_dev_t* dev ); + +/* ------------------------------------------------ */ + +#define stio_inittime(x,s,ns) (((x)->sec = (s)), ((x)->nsec = (ns))) +#define stio_cleartime(x) stio_inittime(x,0,0) +#define stio_cmptime(x,y) \ + (((x)->sec == (y)->sec)? ((x)->nsec - (y)->nsec): \ + ((x)->sec - (y)->sec)) + +/* if time has been normalized properly, nsec must be equal to or + * greater than 0. */ +#define stio_isnegtime(x) ((x)->sec < 0) + +/** + * The stio_gettime() function gets the current time. + */ +STIO_EXPORT void stio_gettime ( + stio_ntime_t* nt +); + +/** + * The stio_addtime() function adds x and y and stores the result in z + */ +STIO_EXPORT void stio_addtime ( + const stio_ntime_t* x, + const stio_ntime_t* y, + stio_ntime_t* z +); + +/** + * The stio_subtime() function subtract y from x and stores the result in z. + */ +STIO_EXPORT void stio_subtime ( + const stio_ntime_t* x, + const stio_ntime_t* y, + stio_ntime_t* z +); + +/** + * The stio_instmrjob() function schedules a new event. + * + * \return #STIO_TMRIDX_INVALID on failure, valid index on success. + */ + +STIO_EXPORT stio_tmridx_t stio_instmrjob ( + stio_t* stio, + const stio_tmrjob_t* job +); + +STIO_EXPORT stio_tmridx_t stio_updtmrjob ( + stio_t* stio, + stio_tmridx_t index, + const stio_tmrjob_t* job +); + +STIO_EXPORT void stio_deltmrjob ( + stio_t* stio, + stio_tmridx_t index +); + +/** + * The stio_gettmrjob() function returns the + * pointer to the registered event at the given index. + */ +STIO_EXPORT stio_tmrjob_t* stio_gettmrjob ( + stio_t* stio, + stio_tmridx_t index +); + #ifdef __cplusplus } #endif