From 306384370c0d501b50699bfe18ad0a72b5658ea7 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Mon, 14 Jan 2019 08:33:14 +0000 Subject: [PATCH] changed to push backward the write completion event into the main loop --- mio/lib/main.c | 57 +++++++++++++++-------- mio/lib/mio-prv.h | 2 + mio/lib/mio-sck.c | 4 +- mio/lib/mio-sck.h | 2 +- mio/lib/mio.c | 86 ++++++++++++++++++++++++++++++----- mio/lib/mio.h | 113 +++++++++++++++++++++++++++++++++------------- 6 files changed, 199 insertions(+), 65 deletions(-) diff --git a/mio/lib/main.c b/mio/lib/main.c index 111dfba..0e002fc 100644 --- a/mio/lib/main.c +++ b/mio/lib/main.c @@ -174,15 +174,15 @@ static int tcp_sck_on_connect (mio_dev_sck_t* tcp) if (tcp->state & MIO_DEV_SCK_CONNECTED) { -printf ("device connected to a remote server... LOCAL %s:%d REMOTE %s:%d.", buf1, mio_getsckaddrport(&tcp->localaddr), buf2, mio_getsckaddrport(&tcp->remoteaddr)); +printf ("DEVICE connected to a remote server... LOCAL %s:%d REMOTE %s:%d.", buf1, mio_getsckaddrport(&tcp->localaddr), buf2, mio_getsckaddrport(&tcp->remoteaddr)); } else if (tcp->state & MIO_DEV_SCK_ACCEPTED) { -printf ("device accepted client device... .LOCAL %s:%d REMOTE %s:%d\n", buf1, mio_getsckaddrport(&tcp->localaddr), buf2, mio_getsckaddrport(&tcp->remoteaddr)); +printf ("DEVICE accepted client device... .LOCAL %s:%d REMOTE %s:%d\n", buf1, mio_getsckaddrport(&tcp->localaddr), buf2, mio_getsckaddrport(&tcp->remoteaddr)); } - return mio_dev_sck_write (tcp, "hello", 5, MIO_NULL, MIO_NULL); + return mio_dev_sck_write(tcp, "hello", 5, MIO_NULL, MIO_NULL); } static int tcp_sck_on_write (mio_dev_sck_t* tcp, mio_iolen_t wrlen, void* wrctx, const mio_sckaddr_t* dstaddr) @@ -191,18 +191,18 @@ static int tcp_sck_on_write (mio_dev_sck_t* tcp, mio_iolen_t wrlen, void* wrctx, if (wrlen <= -1) { -printf ("SEDING TIMED OUT...........\n"); +printf ("TCP_SCK_ON_WRITE SEDING TIMED OUT...........\n"); mio_dev_sck_halt(tcp); } else { ts = (tcp_server_t*)(tcp + 1); - printf (">>> SENT MESSAGE %d of length %ld\n", ts->tally, (long int)wrlen); + printf ("TCP_SCK_ON_WRITE >>> SENT MESSAGE %d of length %ld\n", ts->tally, (long int)wrlen); ts->tally++; // if (ts->tally >= 2) mio_dev_sck_halt (tcp); -printf ("ENABLING READING..............................\n"); +printf ("TCP_SCK_ON_WRITE ENABLING READING..............................\n"); mio_dev_sck_read (tcp, 1); //mio_dev_sck_timedread (tcp, 1, 1000); @@ -210,19 +210,20 @@ printf ("ENABLING READING..............................\n"); return 0; } +/* TODO: serialize callbacks... */ static int tcp_sck_on_read (mio_dev_sck_t* tcp, const void* buf, mio_iolen_t len, const mio_sckaddr_t* srcaddr) { int n; if (len <= 0) { - printf ("STREAM DEVICE: EOF RECEIVED...\n"); + printf ("TCP_SCK_ON_READ STREAM DEVICE: EOF RECEIVED...\n"); /* no outstanding request. but EOF */ mio_dev_sck_halt (tcp); return 0; } -printf ("on read %d\n", (int)len); +printf ("TCP_SCK_ON_READ read %d\n", (int)len); { mio_ntime_t tmout; @@ -231,6 +232,7 @@ static char a ='A'; char* xxx = malloc (1000000); memset (xxx, a++ ,1000000); +printf ("TCP_SCK_ON_READ initiating write... of %d\n", 1000000); //return mio_dev_sck_write (tcp, "HELLO", 5, MIO_NULL); mio_inittime (&tmout, 5, 0); n = mio_dev_sck_timedwrite (tcp, xxx, 1000000, &tmout, MIO_NULL, MIO_NULL); @@ -240,12 +242,12 @@ free (xxx); if (n <= -1) return -1; } - -printf ("DISABLING READING..............................\n"); +printf ("TCP_SCK_ON_READ DISABLING READING..............................\n"); mio_dev_sck_read (tcp, 0); - /* post the write finisher */ - n = mio_dev_sck_write (tcp, MIO_NULL, 0, MIO_NULL, MIO_NULL); +printf ("TCP_SCK_ON_READ WRITING 0.............................\n"); + /* post the write finisher - close the writing end */ + n = mio_dev_sck_write(tcp, MIO_NULL, 0, MIO_NULL, MIO_NULL); if (n <= -1) return -1; return 0; @@ -299,9 +301,10 @@ static int arp_sck_on_write (mio_dev_sck_t* dev, mio_iolen_t wrlen, void* wrctx, return 0; } -static void arp_sck_on_connect (mio_dev_sck_t* dev) +static int arp_sck_on_connect (mio_dev_sck_t* dev) { printf ("STARTING UP ARP SOCKET %d...\n", dev->sck); + return 0; } static void arp_sck_on_disconnect (mio_dev_sck_t* dev) @@ -335,7 +338,7 @@ static int setup_arp_tester (mio_t* mio) mio_sckaddr_initforeth (ðdst, if_nametoindex("wlan0"), (mio_ethaddr_t*)"\xAA\xBB\xFF\xCC\xDD\xFF"); - memset (ðarp, 0, sizeof(etharp)); + memset (ðarp, 0, MIO_SIZEOF(etharp)); memcpy (etharp.ethhdr.source, "\xB8\x6B\x23\x9C\x10\x76", MIO_ETHADDR_LEN); //memcpy (etharp.ethhdr.dest, "\xFF\xFF\xFF\xFF\xFF\xFF", MIO_ETHADDR_LEN); @@ -350,8 +353,8 @@ static int setup_arp_tester (mio_t* mio) memcpy (etharp.arppld.sha, "\xB8\x6B\x23\x9C\x10\x76", MIO_ETHADDR_LEN); - if (mio_dev_sck_write(sck, ðarp, sizeof(etharp), NULL, ðdst) <= -1) - //if (mio_dev_sck_write (sck, ðarp.arphdr, sizeof(etharp) - sizeof(etharp.ethhdr), NULL, ðaddr) <= -1) + if (mio_dev_sck_write(sck, ðarp, MIO_SIZEOF(etharp), MIO_NULL, ðdst) <= -1) + //if (mio_dev_sck_write (sck, ðarp.arphdr, MIO_SIZEOF(etharp) - MIO_SIZEOF(etharp.ethhdr), MIO_NULL, ðaddr) <= -1) { printf ("CANNOT WRITE ARP...\n"); } @@ -392,7 +395,7 @@ static void send_icmp (mio_dev_sck_t* dev, mio_uint16_t seq) memset (&buf[MIO_SIZEOF(*icmphdr)], 'A', MIO_SIZEOF(buf) - MIO_SIZEOF(*icmphdr)); icmphdr->checksum = mio_checksumip (icmphdr, MIO_SIZEOF(buf)); - if (mio_dev_sck_write (dev, buf, MIO_SIZEOF(buf), NULL, &dstaddr) <= -1) + if (mio_dev_sck_write (dev, buf, MIO_SIZEOF(buf), MIO_NULL, &dstaddr) <= -1) { printf ("CANNOT WRITE ICMP...\n"); mio_dev_sck_halt (dev); @@ -543,6 +546,7 @@ static int setup_ping4_tester (mio_t* mio) /* ========================================================================= */ +#if 0 static mio_t* g_mio; static void handle_signal (int sig) @@ -761,11 +765,15 @@ oops: return -1; } -#if 0 +#endif +#if 1 int main (int argc, char* argv[]) { mio_t* mio = MIO_NULL; mio_dev_sck_t* tcpsvr; + mio_dev_sck_make_t tcp_make; + mio_dev_sck_connect_t tcp_conn; + tcp_server_t* ts; mio = mio_open (&mmgr, 0, 512, MIO_NULL); if (!mio) @@ -789,7 +797,18 @@ int main (int argc, char* argv[]) ts = (tcp_server_t*)(tcpsvr + 1); ts->tally = 0; - + + memset (&tcp_conn, 0, MIO_SIZEOF(tcp_conn)); +{ + in_addr_t ia = inet_addr("127.0.0.1"); + mio_sckaddr_initforip4 (&tcp_conn.remoteaddr, 9999, (mio_ip4addr_t*)&ia); +} + mio_inittime (&tcp_conn.connect_tmout, 5, 0); + tcp_conn.options = 0; + if (mio_dev_sck_connect(tcpsvr, &tcp_conn) <= -1) + { + } + #if 0 while (1) { diff --git a/mio/lib/mio-prv.h b/mio/lib/mio-prv.h index 015bd0f..3026208 100644 --- a/mio/lib/mio-prv.h +++ b/mio/lib/mio-prv.h @@ -78,6 +78,8 @@ struct mio_t mio_tmrjob_t* jobs; } tmr; + mio_cwq_t cwq; + /* platform specific fields below */ #if defined(_WIN32) HANDLE iocp; diff --git a/mio/lib/mio-sck.c b/mio/lib/mio-sck.c index 2a29a26..369f9c0 100644 --- a/mio/lib/mio-sck.c +++ b/mio/lib/mio-sck.c @@ -995,7 +995,7 @@ fcntl (rdev->sck, F_SETFL, flags | O_NONBLOCK); { if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EAGAIN) { - if (mio_dev_watch ((mio_dev_t*)rdev, MIO_DEV_WATCH_UPDATE, MIO_DEV_EVENT_IN | MIO_DEV_EVENT_OUT) <= -1) + if (mio_dev_watch((mio_dev_t*)rdev, MIO_DEV_WATCH_UPDATE, MIO_DEV_EVENT_IN | MIO_DEV_EVENT_OUT) <= -1) { /* watcher update failure. it's critical */ mio_stop (rdev->mio, MIO_STOPREQ_WATCHER_ERROR); @@ -1031,7 +1031,7 @@ fcntl (rdev->sck, F_SETFL, flags | O_NONBLOCK); rdev->mio->errnum = mio_syserrtoerrnum(errno); oops_connect: - if (mio_dev_watch ((mio_dev_t*)rdev, MIO_DEV_WATCH_UPDATE, MIO_DEV_EVENT_IN) <= -1) + if (mio_dev_watch((mio_dev_t*)rdev, MIO_DEV_WATCH_UPDATE, MIO_DEV_EVENT_IN) <= -1) { /* watcher update failure. it's critical */ mio_stop (rdev->mio, MIO_STOPREQ_WATCHER_ERROR); diff --git a/mio/lib/mio-sck.h b/mio/lib/mio-sck.h index cc21897..bd715ea 100644 --- a/mio/lib/mio-sck.h +++ b/mio/lib/mio-sck.h @@ -558,7 +558,7 @@ static MIO_INLINE void mio_dev_sck_halt (mio_dev_sck_t* sck) static MIO_INLINE int mio_dev_sck_read (mio_dev_sck_t* sck, int enabled) { - return mio_dev_read ((mio_dev_t*)sck, enabled); + return mio_dev_read((mio_dev_t*)sck, enabled); } #else diff --git a/mio/lib/mio.c b/mio/lib/mio.c index 3272915..d148c4c 100644 --- a/mio/lib/mio.c +++ b/mio/lib/mio.c @@ -382,6 +382,7 @@ int mio_init (mio_t* mio, mio_mmgr_t* mmgr, mio_oow_t tmrcapa) } mio->tmr.capa = tmrcapa; + MIO_CWQ_INIT (&mio->cwq); return 0; } @@ -394,6 +395,16 @@ void mio_fini (mio_t* mio) mio_dev_t* tail; } diehard; + /* clear completed write event queues - TODO: do i need to fire these? */ + while (!MIO_CWQ_ISEMPTY(&mio->cwq)) + { + mio_cwq_t* cwq; + cwq = MIO_CWQ_HEAD(&mio->cwq); + MIO_CWQ_UNLINK (cwq); + MIO_MMGR_FREE (mio->mmgr, cwq); + } + + /* kill all registered devices */ while (mio->actdev.head) { @@ -723,11 +734,21 @@ static MIO_INLINE int __exec (mio_t* mio) /*if (!mio->actdev.head) return 0;*/ + /* execute callbacks for completed write operations */ + while (!MIO_CWQ_ISEMPTY(&mio->cwq)) + { + mio_cwq_t* cwq; + cwq = MIO_CWQ_HEAD(&mio->cwq); + if (cwq->dev->dev_evcb->on_write(cwq->dev, cwq->olen, cwq->ctx, &cwq->dstaddr) <= -1) return -1; + MIO_CWQ_UNLINK (cwq); + MIO_MMGR_FREE (mio->mmgr, cwq); + } + /* execute the scheduled jobs before checking devices with the * multiplexer. the scheduled jobs can safely destroy the devices */ mio_firetmrjobs (mio, MIO_NULL, MIO_NULL); - if (mio_gettmrtmout (mio, MIO_NULL, &tmout) <= -1) + if (mio_gettmrtmout(mio, MIO_NULL, &tmout) <= -1) { /* defaults to 1 second if timeout can't be acquired */ tmout.sec = 1; /* TODO: make the default timeout configurable */ @@ -750,7 +771,7 @@ static MIO_INLINE int __exec (mio_t* mio) mux = (mio_mux_t*)mio->mux; - nentries = poll (mux->pd.pfd, mux->pd.size, MIO_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec)); + nentries = poll(mux->pd.pfd, mux->pd.size, MIO_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec)); if (nentries == -1) { if (errno == EINTR) return 0; @@ -782,7 +803,7 @@ static MIO_INLINE int __exec (mio_t* mio) mux = (mio_mux_t*)mio->mux; - nentries = epoll_wait (mux->hnd, mux->revs, MIO_COUNTOF(mux->revs), MIO_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec)); + nentries = epoll_wait(mux->hnd, mux->revs, MIO_COUNTOF(mux->revs), MIO_SECNSEC_TO_MSEC(tmout.sec, tmout.nsec)); if (nentries == -1) { if (errno == EINTR) return 0; /* it's actually ok */ @@ -887,7 +908,7 @@ mio_dev_t* mio_makedev (mio_t* mio, mio_oow_t dev_size, mio_dev_mth_t* dev_mth, dev->dev_capa = MIO_DEV_CAPA_IN | MIO_DEV_CAPA_OUT; dev->dev_mth = dev_mth; dev->dev_evcb = dev_evcb; - MIO_WQ_INIT(&dev->wq); + MIO_WQ_INIT (&dev->wq); /* call the callback function first */ mio->errnum = MIO_ENOERR; @@ -909,7 +930,7 @@ mio_dev_t* mio_makedev (mio_t* mio, mio_oow_t dev_size, mio_dev_mth_t* dev_mth, if (!(dev->dev_capa & MIO_DEV_CAPA_OUT)) dev->dev_capa |= MIO_DEV_CAPA_OUT_CLOSED; #if defined(_WIN32) - if (CreateIoCompletionPort ((HANDLE)dev->dev_mth->getsyshnd(dev), mio->iocp, MIO_IOCP_KEY, 0) == NULL) + if (CreateIoCompletionPort((HANDLE)dev->dev_mth->getsyshnd(dev), mio->iocp, MIO_IOCP_KEY, 0) == NULL) { /* TODO: set errnum from GetLastError()... */ goto oops_after_make; @@ -1043,6 +1064,17 @@ void mio_killdev (mio_t* mio, mio_dev_t* dev) goto kill_device; } +#if 0 + /* clear completed write event queues - TODO: do i need to fire these? */ + while (!MIO_CWQ_ISEMPTY(&dev->cwq)) + { + mio_cwq_t* q; + q = MIO_CWQ_HEAD(&dev->cwq); + MIO_CWQ_UNLINK (q); + MIO_MMGR_FREE (mio->mmgr, q); + } +#endif + /* clear pending send requests */ while (!MIO_WQ_ISEMPTY(&dev->wq)) { @@ -1184,7 +1216,7 @@ int mio_dev_watch (mio_dev_t* dev, mio_dev_watch_cmd_t cmd, int events) } else { - if (mux_control (dev, mux_cmd, dev->dev_mth->getsyshnd(dev), dev_capa) <= -1) return -1; + if (mux_control(dev, mux_cmd, dev->dev_mth->getsyshnd(dev), dev_capa) <= -1) return -1; } dev->dev_capa = dev_capa; @@ -1214,7 +1246,7 @@ int mio_dev_read (mio_dev_t* dev, int enabled) return 0; renew_watch_now: - if (mio_dev_watch (dev, MIO_DEV_WATCH_RENEW, 0) <= -1) return -1; + if (mio_dev_watch(dev, MIO_DEV_WATCH_RENEW, 0) <= -1) return -1; return 0; } @@ -1228,7 +1260,7 @@ static void on_write_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* dev = q->dev; dev->mio->errnum = MIO_ETMOUT; - x = dev->dev_evcb->on_write (dev, -1, q->ctx, &q->dstaddr); + x = dev->dev_evcb->on_write(dev, -1, q->ctx, &q->dstaddr); MIO_ASSERT (q->tmridx == MIO_TMRIDX_INVALID); MIO_WQ_UNLINK(q); @@ -1242,6 +1274,7 @@ static int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const const mio_uint8_t* uptr; mio_iolen_t urem, ulen; mio_wq_t* q; + mio_cwq_t* cwq; int x; if (dev->dev_capa & MIO_DEV_CAPA_OUT_CLOSED) @@ -1266,7 +1299,7 @@ static int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const do { ulen = urem; - x = dev->dev_mth->write (dev, data, &ulen, dstaddr); + x = dev->dev_mth->write(dev, data, &ulen, dstaddr); if (x <= -1) return -1; else if (x == 0) { @@ -1292,7 +1325,8 @@ static int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const dev->dev_capa |= MIO_DEV_CAPA_OUT_CLOSED; } - if (dev->dev_evcb->on_write(dev, len, wrctx, dstaddr) <= -1) return -1; + //if (dev->dev_evcb->on_write(dev, len, wrctx, dstaddr) <= -1) return -1; + goto enqueue_completed_write; } else { @@ -1303,7 +1337,8 @@ static int __dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, const else if (x == 0) goto enqueue_data; /* partial writing is still considered ok for a non-stream device */ - if (dev->dev_evcb->on_write (dev, ulen, wrctx, dstaddr) <= -1) return -1; + //if (dev->dev_evcb->on_write(dev, ulen, wrctx, dstaddr) <= -1) return -1; + goto enqueue_completed_write; } return 1; /* written immediately and called on_write callback */ @@ -1317,7 +1352,7 @@ enqueue_data: } /* queue the remaining data*/ - q = (mio_wq_t*)MIO_MMGR_ALLOC (dev->mio->mmgr, MIO_SIZEOF(*q) + (dstaddr? dstaddr->len: 0) + urem); + q = (mio_wq_t*)MIO_MMGR_ALLOC(dev->mio->mmgr, MIO_SIZEOF(*q) + (dstaddr? dstaddr->len: 0) + urem); if (!q) { dev->mio->errnum = MIO_ENOMEM; @@ -1380,6 +1415,33 @@ enqueue_data: } return 0; /* request pused to a write queue. */ + +enqueue_completed_write: + /* queue the remaining data*/ + cwq = (mio_cwq_t*)MIO_MMGR_ALLOC(dev->mio->mmgr, MIO_SIZEOF(*cwq) + (dstaddr? dstaddr->len: 0)); + if (!cwq) + { + dev->mio->errnum = MIO_ENOMEM; + return -1; + } + + cwq->dev = dev; + cwq->ctx = wrctx; + if (dstaddr) + { + cwq->dstaddr.ptr = (mio_uint8_t*)(cwq + 1); + cwq->dstaddr.len = dstaddr->len; + MIO_MEMCPY (cwq->dstaddr.ptr, dstaddr->ptr, dstaddr->len); + } + else + { + cwq->dstaddr.len = 0; + } + + cwq->olen = len; + + MIO_CWQ_ENQ (&dev->mio->cwq, cwq); + return 0; } int mio_dev_write (mio_dev_t* dev, const void* data, mio_iolen_t len, void* wrctx, const mio_devaddr_t* dstaddr) diff --git a/mio/lib/mio.h b/mio/lib/mio.h index b544612..b35c6fa 100644 --- a/mio/lib/mio.h +++ b/mio/lib/mio.h @@ -75,7 +75,9 @@ typedef struct mio_dev_t mio_dev_t; typedef struct mio_dev_mth_t mio_dev_mth_t; typedef struct mio_dev_evcb_t mio_dev_evcb_t; +typedef struct mio_q_t mio_q_t; typedef struct mio_wq_t mio_wq_t; +typedef struct mio_cwq_t mio_cwq_t; typedef mio_intptr_t mio_iolen_t; /* NOTE: this is a signed type */ enum mio_errnum_t @@ -196,6 +198,79 @@ struct mio_dev_evcb_t int (*on_write) (mio_dev_t* dev, mio_iolen_t wrlen, void* wrctx, const mio_devaddr_t* dstaddr); }; +struct mio_q_t +{ + mio_q_t* next; + mio_q_t* prev; +}; + +#define MIO_Q_INIT(q) ((q)->next = (q)->prev = (q)) +#define MIO_Q_TAIL(q) ((q)->prev) +#define MIO_Q_HEAD(q) ((q)->next) +#define MIO_Q_ISEMPTY(q) (MIO_Q_HEAD(q) == (q)) +#define MIO_Q_ISNODE(q,x) ((q) != (x)) +#define MIO_Q_ISHEAD(q,x) (MIO_Q_HEAD(q) == (x)) +#define MIO_Q_ISTAIL(q,x) (MIO_Q_TAIL(q) == (x)) + +#define MIO_Q_NEXT(x) ((x)->next) +#define MIO_Q_PREV(x) ((x)->prev) + +#define MIO_Q_LINK(p,x,n) do { \ + mio_q_t* pp = (p), * nn = (n); \ + (x)->prev = (p); \ + (x)->next = (n); \ + nn->prev = (x); \ + pp->next = (x); \ +} while (0) + +#define MIO_Q_UNLINK(x) do { \ + mio_q_t* pp = (x)->prev, * nn = (x)->next; \ + nn->prev = pp; pp->next = nn; \ +} while (0) + +#define MIO_Q_REPL(o,n) do { \ + mio_q_t* oo = (o), * nn = (n); \ + nn->next = oo->next; \ + nn->next->prev = nn; \ + nn->prev = oo->prev; \ + nn->prev->next = nn; \ +} while (0) + +/* insert an item at the back of the queue */ +/*#define MIO_Q_ENQ(wq,x) MIO_Q_LINK(MIO_Q_TAIL(wq), x, MIO_Q_TAIL(wq)->next)*/ +#define MIO_Q_ENQ(wq,x) MIO_Q_LINK(MIO_Q_TAIL(wq), x, wq) + +/* remove an item in the front from the queue */ +#define MIO_Q_DEQ(wq) MIO_Q_UNLINK(MIO_Q_HEAD(wq)) + +/* completed write queue */ +struct mio_cwq_t +{ + mio_cwq_t* next; + mio_cwq_t* prev; + + mio_iolen_t olen; + void* ctx; + mio_dev_t* dev; + mio_devaddr_t dstaddr; +}; + +#define MIO_CWQ_INIT(cwq) ((cwq)->next = (cwq)->prev = (cwq)) +#define MIO_CWQ_TAIL(cwq) ((cwq)->prev) +#define MIO_CWQ_HEAD(cwq) ((cwq)->next) +#define MIO_CWQ_ISEMPTY(cwq) (MIO_CWQ_HEAD(cwq) == (cwq)) +#define MIO_CWQ_ISNODE(cwq,x) ((cwq) != (x)) +#define MIO_CWQ_ISHEAD(cwq,x) (MIO_CWQ_HEAD(cwq) == (x)) +#define MIO_CWQ_ISTAIL(cwq,x) (MIO_CWQ_TAIL(cwq) == (x)) +#define MIO_CWQ_NEXT(x) ((x)->next) +#define MIO_CWQ_PREV(x) ((x)->prev) +#define MIO_CWQ_LINK(p,x,n) MIO_Q_LINK((mio_q_t*)p,(mio_q_t*)x,(mio_q_t*)n) +#define MIO_CWQ_UNLINK(x) MIO_Q_UNLINK((mio_q_t*)x) +#define MIO_CWQ_REPL(o,n) MIO_CWQ_REPL(o,n); +#define MIO_CWQ_ENQ(cwq,x) MIO_CWQ_LINK(MIO_CWQ_TAIL(cwq), (mio_q_t*)x, cwq) +#define MIO_CWQ_DEQ(cwq) MIO_CWQ_UNLINK(MIO_CWQ_HEAD(cwq)) + +/* write queue */ struct mio_wq_t { mio_wq_t* next; @@ -204,7 +279,7 @@ struct mio_wq_t mio_iolen_t olen; /* original data length */ mio_uint8_t* ptr; /* pointer to data */ mio_iolen_t len; /* remaining data length */ - void* ctx; + void* ctx; mio_dev_t* dev; /* back-pointer to the device */ mio_tmridx_t tmridx; @@ -218,42 +293,18 @@ struct mio_wq_t #define MIO_WQ_ISNODE(wq,x) ((wq) != (x)) #define MIO_WQ_ISHEAD(wq,x) (MIO_WQ_HEAD(wq) == (x)) #define MIO_WQ_ISTAIL(wq,x) (MIO_WQ_TAIL(wq) == (x)) - #define MIO_WQ_NEXT(x) ((x)->next) #define MIO_WQ_PREV(x) ((x)->prev) - -#define MIO_WQ_LINK(p,x,n) do { \ - mio_wq_t* pp = (p), * nn = (n); \ - (x)->prev = (p); \ - (x)->next = (n); \ - nn->prev = (x); \ - pp->next = (x); \ -} while (0) - -#define MIO_WQ_UNLINK(x) do { \ - mio_wq_t* pp = (x)->prev, * nn = (x)->next; \ - nn->prev = pp; pp->next = nn; \ -} while (0) - -#define MIO_WQ_REPL(o,n) do { \ - mio_wq_t* oo = (o), * nn = (n); \ - nn->next = oo->next; \ - nn->next->prev = nn; \ - nn->prev = oo->prev; \ - nn->prev->next = nn; \ -} while (0) - -/* insert an item at the back of the queue */ -/*#define MIO_WQ_ENQ(wq,x) MIO_WQ_LINK(MIO_WQ_TAIL(wq), x, MIO_WQ_TAIL(wq)->next)*/ -#define MIO_WQ_ENQ(wq,x) MIO_WQ_LINK(MIO_WQ_TAIL(wq), x, wq) - -/* remove an item in the front from the queue */ +#define MIO_WQ_LINK(p,x,n) MIO_Q_LINK((mio_q_t*)p,(mio_q_t*)x,(mio_q_t*)n) +#define MIO_WQ_UNLINK(x) MIO_Q_UNLINK((mio_q_t*)x) +#define MIO_WQ_REPL(o,n) MIO_WQ_REPL(o,n); +#define MIO_WQ_ENQ(wq,x) MIO_WQ_LINK(MIO_WQ_TAIL(wq), (mio_q_t*)x, wq) #define MIO_WQ_DEQ(wq) MIO_WQ_UNLINK(MIO_WQ_HEAD(wq)) #define MIO_DEV_HEADERS \ mio_t* mio; \ - mio_oow_t dev_size; \ - int dev_capa; \ + mio_oow_t dev_size; \ + int dev_capa; \ mio_dev_mth_t* dev_mth; \ mio_dev_evcb_t* dev_evcb; \ mio_wq_t wq; \