added stio_dev_halt()

enhanced stio_dev_watch() and stio_exec() to correct various event handling
This commit is contained in:
hyung-hwan 2016-02-04 15:06:20 +00:00
parent 8768bf4063
commit 01ffcf973d
10 changed files with 671 additions and 423 deletions

View File

@ -37,9 +37,26 @@
#include <arpa/inet.h> #include <arpa/inet.h>
#include <signal.h> #include <signal.h>
struct mmgr_stat_t
{
stio_size_t total_count;
};
typedef struct mmgr_stat_t mmgr_stat_t;
static mmgr_stat_t mmgr_stat;
static void* mmgr_alloc (stio_mmgr_t* mmgr, stio_size_t size) static void* mmgr_alloc (stio_mmgr_t* mmgr, stio_size_t size)
{ {
return malloc (size); if (((mmgr_stat_t*)mmgr->ctx)->total_count > 100)
{
printf ("CRITICAL ERROR ---> too many heap chunks...\n");
return STIO_NULL;
}
void* x = malloc (size);
if (x) ((mmgr_stat_t*)mmgr->ctx)->total_count++;
return x;
} }
static void* mmgr_realloc (stio_mmgr_t* mmgr, void* ptr, stio_size_t size) static void* mmgr_realloc (stio_mmgr_t* mmgr, void* ptr, stio_size_t size)
@ -49,17 +66,25 @@ static void* mmgr_realloc (stio_mmgr_t* mmgr, void* ptr, stio_size_t size)
static void mmgr_free (stio_mmgr_t* mmgr, void* ptr) static void mmgr_free (stio_mmgr_t* mmgr, void* ptr)
{ {
((mmgr_stat_t*)mmgr->ctx)->total_count--;
return free (ptr); return free (ptr);
} }
static stio_mmgr_t mmgr = static stio_mmgr_t mmgr =
{ {
mmgr_alloc, mmgr_alloc,
mmgr_realloc, mmgr_realloc,
mmgr_free, mmgr_free,
STIO_NULL &mmgr_stat
}; };
struct tcp_server_t
{
int tally;
};
typedef struct tcp_server_t tcp_server_t;
static void tcp_on_disconnect (stio_dev_tcp_t* tcp) static void tcp_on_disconnect (stio_dev_tcp_t* tcp)
{ {
if (tcp->state & STIO_DEV_TCP_CONNECTING) if (tcp->state & STIO_DEV_TCP_CONNECTING)
@ -101,12 +126,33 @@ printf ("device accepted client device... .asdfjkasdfkljasdlfkjasdj...\n");
static int tcp_on_write (stio_dev_tcp_t* tcp, void* wrctx) static int tcp_on_write (stio_dev_tcp_t* tcp, void* wrctx)
{ {
printf (">>> TCP SENT MESSAGE\n"); tcp_server_t* ts;
ts = (tcp_server_t*)(tcp + 1);
printf (">>> TCP SENT MESSAGE %d\n", ts->tally);
ts->tally++;
// if (ts->tally >= 2) stio_dev_tcp_halt (tcp);
//stio_dev_tcp_read (tcp);
printf ("ENABLING READING..............................\n");
stio_dev_read (tcp, 1);
return 0; return 0;
} }
static int tcp_on_read (stio_dev_tcp_t* tcp, const void* buf, stio_len_t len) static int tcp_on_read (stio_dev_tcp_t* tcp, const void* buf, stio_len_t len)
{ {
if (len <= 0)
{
printf ("STREAM DEVICE: EOF RECEIVED...\n");
/* no outstanding request. but EOF */
stio_dev_tcp_halt (tcp);
return 0;
}
printf ("on read %d\n", (int)len);
int n; int n;
static char a ='A'; static char a ='A';
char* xxx = malloc (1000000); char* xxx = malloc (1000000);
@ -114,7 +160,18 @@ memset (xxx, a++ ,1000000);
//return stio_dev_tcp_write (tcp, "HELLO", 5, STIO_NULL); //return stio_dev_tcp_write (tcp, "HELLO", 5, STIO_NULL);
n = stio_dev_tcp_write (tcp, xxx, 1000000, STIO_NULL); n = stio_dev_tcp_write (tcp, xxx, 1000000, STIO_NULL);
free (xxx); free (xxx);
return n;
if (n <= -1) return -1;
/* post the write finisher */
n = stio_dev_tcp_write (tcp, STIO_NULL, 0, STIO_NULL);
if (n <= -1) return -1;
printf ("DISABLING READING..............................\n");
stio_dev_read (tcp, 0);
return 0;
/* return 1; let the main loop to read more greedily without consulint the multiplexer */
} }
static stio_t* g_stio; static stio_t* g_stio;
@ -135,6 +192,7 @@ int main ()
stio_dev_tcp_connect_t tcp_conn; stio_dev_tcp_connect_t tcp_conn;
stio_dev_tcp_listen_t tcp_lstn; stio_dev_tcp_listen_t tcp_lstn;
stio_dev_tcp_make_t tcp_make; stio_dev_tcp_make_t tcp_make;
tcp_server_t* ts;
stio = stio_open (&mmgr, 0, 512, STIO_NULL); stio = stio_open (&mmgr, 0, 512, STIO_NULL);
if (!stio) if (!stio)
@ -172,19 +230,21 @@ int main ()
memcpy (&tcp_make.addr, &sin, STIO_SIZEOF(sin)); memcpy (&tcp_make.addr, &sin, STIO_SIZEOF(sin));
tcp_make.on_write = tcp_on_write; tcp_make.on_write = tcp_on_write;
tcp_make.on_read = tcp_on_read; tcp_make.on_read = tcp_on_read;
tcp[0] = stio_dev_tcp_make (stio, 0, &tcp_make); tcp[0] = stio_dev_tcp_make (stio, STIO_SIZEOF(tcp_server_t), &tcp_make);
if (!tcp[0]) if (!tcp[0])
{ {
printf ("Cannot make tcp\n"); printf ("Cannot make tcp\n");
goto oops; goto oops;
} }
ts = (tcp_server_t*)(tcp[0] + 1);
ts->tally = 0;
memset (&sin, 0, STIO_SIZEOF(sin)); memset (&sin, 0, STIO_SIZEOF(sin));
sin.sin_family = AF_INET; sin.sin_family = AF_INET;
sin.sin_port = htons(9999); sin.sin_port = htons(9999);
inet_pton (sin.sin_family, "192.168.1.1", &sin.sin_addr); //inet_pton (sin.sin_family, "192.168.1.1", &sin.sin_addr);
//inet_pton (sin.sin_family, "127.0.0.1", &sin.sin_addr); inet_pton (sin.sin_family, "127.0.0.1", &sin.sin_addr);
memset (&tcp_conn, 0, STIO_SIZEOF(tcp_conn)); memset (&tcp_conn, 0, STIO_SIZEOF(tcp_conn));
memcpy (&tcp_conn.addr, &sin, STIO_SIZEOF(sin)); memcpy (&tcp_conn.addr, &sin, STIO_SIZEOF(sin));
@ -205,12 +265,14 @@ int main ()
tcp_make.on_write = tcp_on_write; tcp_make.on_write = tcp_on_write;
tcp_make.on_read = tcp_on_read; tcp_make.on_read = tcp_on_read;
tcp[1] = stio_dev_tcp_make (stio, 0, &tcp_make); tcp[1] = stio_dev_tcp_make (stio, STIO_SIZEOF(tcp_server_t), &tcp_make);
if (!tcp[1]) if (!tcp[1])
{ {
printf ("Cannot make tcp\n"); printf ("Cannot make tcp\n");
goto oops; goto oops;
} }
ts = (tcp_server_t*)(tcp[1] + 1);
ts->tally = 0;
tcp_lstn.backlogs = 100; tcp_lstn.backlogs = 100;
tcp_lstn.on_connect = tcp_on_connect; tcp_lstn.on_connect = tcp_on_connect;

View File

@ -70,9 +70,10 @@ struct stio_t
stio_dev_t* tail; stio_dev_t* tail;
} dev; /* normal devices */ } dev; /* normal devices */
stio_dev_t* rdev; /* ruined device list - singly linked list */ stio_dev_t* hdev; /* halted device list - singly linked list */
stio_uint8_t bigbuf[65535]; /* TODO: make this dynamic depending on devices added. device may indicate a buffer size required??? */ stio_uint8_t bigbuf[65535]; /* TODO: make this dynamic depending on devices added. device may indicate a buffer size required??? */
int renew_watch;
struct struct
{ {

View File

@ -45,6 +45,13 @@ void stio_closeasyncsck (stio_t* stio, stio_sckhnd_t sck)
#endif #endif
} }
#if 0
int stio_shutasyncsck (stio_t* stio, stio_sckhnd_t sck, int how)
{
shutdown (sck, how);
}
#endif
int stio_makesckasync (stio_t* stio, stio_sckhnd_t sck) int stio_makesckasync (stio_t* stio, stio_sckhnd_t sck)
{ {
return stio_makesyshndasync (stio, (stio_syshnd_t)sck); return stio_makesyshndasync (stio, (stio_syshnd_t)sck);

View File

@ -47,8 +47,11 @@ static int tcp_make (stio_dev_t* dev, void* ctx)
tcp->sck = stio_openasyncsck (dev->stio, family, SOCK_STREAM); tcp->sck = stio_openasyncsck (dev->stio, family, SOCK_STREAM);
if (tcp->sck == STIO_SCKHND_INVALID) goto oops; if (tcp->sck == STIO_SCKHND_INVALID) goto oops;
//setsockopt (udp->sck, SOL_SOCKET, SO_REUSEADDR, ...); /* TODO:
// TRANSPARENT, ETC. setsockopt (udp->sck, SOL_SOCKET, SO_REUSEADDR, ...);
TRANSPARENT, ETC.
*/
iv = 1; iv = 1;
if (setsockopt (tcp->sck, SOL_SOCKET, SO_REUSEADDR, &iv, STIO_SIZEOF(iv)) == -1 || if (setsockopt (tcp->sck, SOL_SOCKET, SO_REUSEADDR, &iv, STIO_SIZEOF(iv)) == -1 ||
bind (tcp->sck, (struct sockaddr*)&arg->addr, len) == -1) bind (tcp->sck, (struct sockaddr*)&arg->addr, len) == -1)
@ -57,6 +60,7 @@ static int tcp_make (stio_dev_t* dev, void* ctx)
goto oops; goto oops;
} }
tcp->dev_capa = STIO_DEV_CAPA_IN | STIO_DEV_CAPA_OUT | STIO_DEV_CAPA_STREAM;
tcp->on_write = arg->on_write; tcp->on_write = arg->on_write;
tcp->on_read = arg->on_read; tcp->on_read = arg->on_read;
tcp->tmridx_connect = STIO_TMRIDX_INVALID; tcp->tmridx_connect = STIO_TMRIDX_INVALID;
@ -110,14 +114,13 @@ static stio_syshnd_t tcp_getsyshnd (stio_dev_t* dev)
return (stio_syshnd_t)tcp->sck; return (stio_syshnd_t)tcp->sck;
} }
static int tcp_read (stio_dev_t* dev, void* buf, stio_len_t* len) static int tcp_read (stio_dev_t* dev, void* buf, stio_len_t* len)
{ {
stio_dev_tcp_t* tcp = (stio_dev_tcp_t*)dev; stio_dev_tcp_t* tcp = (stio_dev_tcp_t*)dev;
ssize_t x; ssize_t x;
x = recv (tcp->sck, buf, *len, 0); x = recv (tcp->sck, buf, *len, 0);
if (x <= -1) if (x == -1)
{ {
if (errno == EINPROGRESS || errno == EWOULDBLOCK) return 0; /* no data available */ if (errno == EINPROGRESS || errno == EWOULDBLOCK) return 0; /* no data available */
if (errno == EINTR) return 0; if (errno == EINTR) return 0;
@ -134,13 +137,26 @@ static int tcp_write (stio_dev_t* dev, const void* data, stio_len_t* len)
stio_dev_tcp_t* tcp = (stio_dev_tcp_t*)dev; stio_dev_tcp_t* tcp = (stio_dev_tcp_t*)dev;
ssize_t x; ssize_t x;
int flags = 0; int flags = 0;
if (*len <= 0)
{
/* it's a writing finish indicator. close the writing end of
* the socket, probably leaving it in the half-closed state */
if (shutdown (tcp->sck, SHUT_WR) == -1)
{
tcp->stio->errnum = stio_syserrtoerrnum(errno);
return -1;
}
return 1;
}
/* TODO: flags MSG_DONTROUTE, MSG_DONTWAIT, MSG_MORE, MSG_OOB, MSG_NOSIGNAL */ /* TODO: flags MSG_DONTROUTE, MSG_DONTWAIT, MSG_MORE, MSG_OOB, MSG_NOSIGNAL */
#if defined(MSG_NOSIGNAL) #if defined(MSG_NOSIGNAL)
flags |= MSG_NOSIGNAL; flags |= MSG_NOSIGNAL;
#endif #endif
x = sendto (tcp->sck, data, *len, flags, STIO_NULL, 0); x = sendto (tcp->sck, data, *len, flags, STIO_NULL, 0);
if (x <= -1) if (x == -1)
{ {
if (errno == EINPROGRESS || errno == EWOULDBLOCK) return 0; /* no data can be written */ if (errno == EINPROGRESS || errno == EWOULDBLOCK) return 0; /* no data can be written */
if (errno == EINTR) return 0; if (errno == EINTR) return 0;
@ -163,7 +179,7 @@ static void tmr_connect_handle (stio_t* stio, const stio_ntime_t* now, stio_tmrj
* doesn't need to be deleted when it gets connected for this check * doesn't need to be deleted when it gets connected for this check
* here. this libarary, however, deletes the job when it gets * here. this libarary, however, deletes the job when it gets
* connected. */ * connected. */
stio_dev_tcp_kill (tcp); stio_dev_tcp_halt (tcp);
} }
} }
@ -391,11 +407,7 @@ printf ("TCP READY...%p\n", dev);
tcp->state &= ~STIO_DEV_TCP_CONNECTING; tcp->state &= ~STIO_DEV_TCP_CONNECTING;
tcp->state |= STIO_DEV_TCP_CONNECTED; tcp->state |= STIO_DEV_TCP_CONNECTED;
if (stio_dev_watch ((stio_dev_t*)tcp, STIO_DEV_WATCH_UPDATE, STIO_DEV_EVENT_IN) <= -1) if (stio_dev_watch ((stio_dev_t*)tcp, STIO_DEV_WATCH_RENEW, 0) <= -1) return -1;
{
printf ("CAANOT MANIPULTE WATCHER ...\n");
return -1;
}
if (tcp->tmridx_connect != STIO_TMRIDX_INVALID) if (tcp->tmridx_connect != STIO_TMRIDX_INVALID)
{ {
@ -455,20 +467,22 @@ printf ("CAANOT MANIPULTE WATCHER ...\n");
return -1; return -1;
} }
/* addr is the address of the peer */ /* use tcp->dev_size when instantiating a client tcp device
/* local addresss is inherited from the server */ * instead of STIO_SIZEOF(stio_dev_tcp_t). therefore, the
clitcp = (stio_dev_tcp_t*)stio_makedev (tcp->stio, STIO_SIZEOF(*tcp), &tcp_acc_mth, tcp->dev_evcb, &clisck); * extension area as big as that of the master tcp device
* is created in the client tcp device */
clitcp = (stio_dev_tcp_t*)stio_makedev (tcp->stio, tcp->dev_size, &tcp_acc_mth, tcp->dev_evcb, &clisck);
if (!clitcp) if (!clitcp)
{ {
close (clisck); close (clisck);
return -1; return -1;
} }
clitcp->dev_capa |= STIO_DEV_CAPA_IN | STIO_DEV_CAPA_OUT | STIO_DEV_CAPA_STREAM;
clitcp->state |= STIO_DEV_TCP_ACCEPTED; clitcp->state |= STIO_DEV_TCP_ACCEPTED;
clitcp->peer = peer; clitcp->peer = peer;
/*clitcp->parent = tcp;*/ /*clitcp->parent = tcp;*/
/* inherit some event handlers from the parent. /* inherit some event handlers from the parent.
* you can still change them inside the on_connect handler */ * you can still change them inside the on_connect handler */
clitcp->on_connect = tcp->on_connect; clitcp->on_connect = tcp->on_connect;
@ -477,7 +491,8 @@ printf ("CAANOT MANIPULTE WATCHER ...\n");
clitcp->on_read = tcp->on_read; clitcp->on_read = tcp->on_read;
clitcp->tmridx_connect = STIO_TMRIDX_INVALID; clitcp->tmridx_connect = STIO_TMRIDX_INVALID;
if (clitcp->on_connect (clitcp) <= -1) stio_dev_tcp_kill (clitcp); if (clitcp->on_connect (clitcp) <= -1) stio_dev_tcp_halt (clitcp);
return 0; /* success but don't invoke on_read() */ return 0; /* success but don't invoke on_read() */
} }
} }
@ -520,11 +535,6 @@ stio_dev_tcp_t* stio_dev_tcp_make (stio_t* stio, stio_size_t xtnsize, const stio
return (stio_dev_tcp_t*)stio_makedev (stio, STIO_SIZEOF(stio_dev_tcp_t) + xtnsize, &tcp_mth, &tcp_evcb, (void*)data); return (stio_dev_tcp_t*)stio_makedev (stio, STIO_SIZEOF(stio_dev_tcp_t) + xtnsize, &tcp_mth, &tcp_evcb, (void*)data);
} }
void stio_dev_tcp_kill (stio_dev_tcp_t* tcp)
{
stio_killdev (tcp->stio, (stio_dev_t*)tcp);
}
int stio_dev_tcp_bind (stio_dev_tcp_t* tcp, stio_dev_tcp_bind_t* bind) int stio_dev_tcp_bind (stio_dev_tcp_t* tcp, stio_dev_tcp_bind_t* bind)
{ {
return stio_dev_ioctl ((stio_dev_t*)tcp, STIO_DEV_TCP_BIND, bind); return stio_dev_ioctl ((stio_dev_t*)tcp, STIO_DEV_TCP_BIND, bind);
@ -544,3 +554,8 @@ int stio_dev_tcp_write (stio_dev_tcp_t* tcp, const void* data, stio_len_t len, v
{ {
return stio_dev_write ((stio_dev_t*)tcp, data, len, wrctx); return stio_dev_write ((stio_dev_t*)tcp, data, len, wrctx);
} }
int stio_dev_tcp_halt (stio_dev_tcp_t* tcp)
{
stio_dev_halt ((stio_dev_t*)tcp);
}

View File

@ -135,10 +135,6 @@ STIO_EXPORT stio_dev_tcp_t* stio_dev_tcp_make (
const stio_dev_tcp_make_t* data const stio_dev_tcp_make_t* data
); );
STIO_EXPORT void stio_dev_tcp_kill (
stio_dev_tcp_t* tcp
);
STIO_EXPORT int stio_dev_tcp_bind ( STIO_EXPORT int stio_dev_tcp_bind (
stio_dev_tcp_t* tcp, stio_dev_tcp_t* tcp,
stio_dev_tcp_bind_t* bind stio_dev_tcp_bind_t* bind
@ -160,6 +156,10 @@ STIO_EXPORT int stio_dev_tcp_write (
void* wrctx void* wrctx
); );
STIO_EXPORT int stio_dev_tcp_halt (
stio_dev_tcp_t* tcp
);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -46,10 +46,18 @@ static stio_tmridx_t sift_up (stio_t* stio, stio_tmridx_t index, int notify)
if (index > 0 && YOUNGER_THAN(&stio->tmr.jobs[index], &stio->tmr.jobs[parent])) if (index > 0 && YOUNGER_THAN(&stio->tmr.jobs[index], &stio->tmr.jobs[parent]))
{ {
stio_tmrjob_t item; stio_tmrjob_t item;
#if defined(STIO_USE_TMRJOB_IDXPTR)
/* nothing */
#else
stio_size_t old_index; stio_size_t old_index;
#endif
item = stio->tmr.jobs[index]; item = stio->tmr.jobs[index];
#if defined(STIO_USE_TMRJOB_IDXPTR)
/* nothing */
#else
old_index = index; old_index = index;
#endif
do do
{ {
@ -89,10 +97,19 @@ static stio_tmridx_t sift_down (stio_t* stio, stio_tmridx_t index, int notify)
if (index < base) /* at least 1 child is under the 'index' positmrn */ if (index < base) /* at least 1 child is under the 'index' positmrn */
{ {
stio_tmrjob_t item; stio_tmrjob_t item;
#if defined(STIO_USE_TMRJOB_IDXPTR)
/* nothing */
#else
stio_size_t old_index; stio_size_t old_index;
#endif
item = stio->tmr.jobs[index]; item = stio->tmr.jobs[index];
#if defined(STIO_USE_TMRJOB_IDXPTR)
/* nothing */
#else
old_index = index; old_index = index;
#endif
do do
{ {

View File

@ -180,14 +180,12 @@ static stio_dev_evcb_t udp_evcb =
udp_on_write udp_on_write
}; };
stio_dev_udp_t* stio_dev_udp_make (stio_t* stio, stio_size_t xtnsize, const stio_dev_udp_make_t* data) stio_dev_udp_t* stio_dev_udp_make (stio_t* stio, stio_size_t xtnsize, const stio_dev_udp_make_t* data)
{ {
return (stio_dev_udp_t*)stio_makedev (stio, STIO_SIZEOF(stio_dev_udp_t) + xtnsize, &udp_mth, &udp_evcb, (void*)data); return (stio_dev_udp_t*)stio_makedev (stio, STIO_SIZEOF(stio_dev_udp_t) + xtnsize, &udp_mth, &udp_evcb, (void*)data);
} }
void stio_dev_udp_halt (stio_dev_udp_t* udp)
void stio_dev_udp_kill (stio_dev_udp_t* udp)
{ {
stio_killdev (udp->stio, (stio_dev_t*)udp); stio_dev_halt ((stio_dev_t*)udp);
} }

View File

@ -66,7 +66,7 @@ STIO_EXPORT stio_dev_udp_t* stio_dev_udp_make (
const stio_dev_udp_make_t* data const stio_dev_udp_make_t* data
); );
STIO_EXPORT void stio_dev_udp_kill ( STIO_EXPORT void stio_dev_udp_halt (
stio_dev_udp_t* udp stio_dev_udp_t* udp
); );

View File

@ -24,7 +24,6 @@
THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/ */
#include "stio-prv.h" #include "stio-prv.h"
#include <sys/epoll.h> #include <sys/epoll.h>
@ -32,6 +31,8 @@
#include <unistd.h> #include <unistd.h>
#include <errno.h> #include <errno.h>
#define DEV_CAPA_ALL_WATCHED (STIO_DEV_CAPA_IN_WATCHED | STIO_DEV_CAPA_OUT_WATCHED | STIO_DEV_CAPA_PRI_WATCHED)
stio_t* stio_open (stio_mmgr_t* mmgr, stio_size_t xtnsize, stio_size_t tmrcapa, stio_errnum_t* errnum) stio_t* stio_open (stio_mmgr_t* mmgr, stio_size_t xtnsize, stio_size_t tmrcapa, stio_errnum_t* errnum)
{ {
stio_t* stio; stio_t* stio;
@ -104,124 +105,6 @@ void stio_fini (stio_t* stio)
close (stio->mux); close (stio->mux);
} }
stio_dev_t* stio_makedev (stio_t* stio, stio_size_t dev_size, stio_dev_mth_t* dev_mth, stio_dev_evcb_t* dev_evcb, void* make_ctx)
{
stio_dev_t* dev;
if (dev_size < STIO_SIZEOF(stio_dev_t))
{
stio->errnum = STIO_EINVAL;
return STIO_NULL;
}
dev = STIO_MMGR_ALLOC (stio->mmgr, dev_size);
if (!dev)
{
stio->errnum = STIO_ENOMEM;
return STIO_NULL;
}
STIO_MEMSET (dev, 0, dev_size);
dev->stio = stio;
/* default capability. dev->dev_mth->make() can change this.
* stio_dev_watch() is affected by the capability change. */
dev->dev_capa = STIO_DEV_CAPA_IN | STIO_DEV_CAPA_OUT;
dev->dev_mth = dev_mth;
dev->dev_evcb = dev_evcb;
STIO_WQ_INIT(&dev->wq);
/* call the callback function first */
stio->errnum = STIO_ENOERR;
if (dev->dev_mth->make (dev, make_ctx) <= -1)
{
if (stio->errnum == STIO_ENOERR) stio->errnum = STIO_EDEVMAKE;
goto oops;
}
#if defined(_WIN32)
if (CreateIoCompletionPort ((HANDLE)dev->dev_mth->getsyshnd(dev), stio->iocp, STIO_IOCP_KEY, 0) == NULL)
{
/* TODO: set errnum from GetLastError()... */
goto oops_after_make;
}
#else
if (stio_dev_watch (dev, STIO_DEV_WATCH_START, STIO_DEV_EVENT_IN) <= -1) goto oops_after_make;
#endif
/* and place the new dev at the back */
if (stio->dev.tail) stio->dev.tail->dev_next = dev;
else stio->dev.head = dev;
dev->dev_prev = stio->dev.tail;
stio->dev.tail = dev;
return dev;
oops_after_make:
dev->dev_mth->kill (dev);
oops:
STIO_MMGR_FREE (stio->mmgr, dev);
return STIO_NULL;
}
void stio_killdev (stio_t* stio, stio_dev_t* dev)
{
STIO_ASSERT (stio == dev->stio);
/* clear pending send requests */
while (!STIO_WQ_ISEMPTY(&dev->wq))
{
stio_wq_t* wq;
wq = STIO_WQ_HEAD(&dev->wq);
STIO_WQ_DEQ (&dev->wq);
STIO_MMGR_FREE (stio->mmgr, wq);
}
/* delink the dev object */
if (!(dev->dev_capa & STIO_DEV_CAPA_RUINED))
{
if (dev->dev_prev)
dev->dev_prev->dev_next = dev->dev_next;
else
stio->dev.head = dev->dev_next;
if (dev->dev_next)
dev->dev_next->dev_prev = dev->dev_prev;
else
stio->dev.tail = dev->dev_prev;
}
stio_dev_watch (dev, STIO_DEV_WATCH_STOP, 0);
/* and call the callback function */
dev->dev_mth->kill (dev);
STIO_MMGR_FREE (stio->mmgr, dev);
}
void stio_ruindev (stio_t* stio, stio_dev_t* dev)
{
if (!(dev->dev_capa & STIO_DEV_CAPA_RUINED))
{
/* delink the dev object from the device list */
if (dev->dev_prev)
dev->dev_prev->dev_next = dev->dev_next;
else
stio->dev.head = dev->dev_next;
if (dev->dev_next)
dev->dev_next->dev_prev = dev->dev_prev;
else
stio->dev.tail = dev->dev_prev;
/* place it at the beginning of the ruined device list */
dev->dev_prev = STIO_NULL;
dev->dev_next = stio->rdev;
stio->rdev = dev;
dev->dev_capa |= STIO_DEV_CAPA_RUINED;
}
}
int stio_prologue (stio_t* stio) int stio_prologue (stio_t* stio)
{ {
@ -234,11 +117,246 @@ void stio_epilogue (stio_t* stio)
/* TODO: */ /* TODO: */
} }
STIO_INLINE static void handle_event (stio_t* stio, stio_size_t i)
{
stio_dev_t* dev;
dev = stio->revs[i].data.ptr;
if (dev->dev_evcb->ready)
{
int x, events = 0;
if (stio->revs[i].events & EPOLLIN) events |= STIO_DEV_EVENT_IN;
if (stio->revs[i].events & EPOLLOUT) events |= STIO_DEV_EVENT_OUT;
if (stio->revs[i].events & EPOLLPRI) events |= STIO_DEV_EVENT_PRI;
if (stio->revs[i].events & EPOLLERR) events |= STIO_DEV_EVENT_ERR;
if (stio->revs[i].events & EPOLLHUP) events |= STIO_DEV_EVENT_HUP;
#if defined(EPOLLRDHUP)
else if (stio->revs[i].events & EPOLLRDHUP) events |= STIO_DEV_EVENT_HUP;
#endif
/* return value of ready()
* <= -1 - failure. kill the device.
* == 0 - ok. but don't invoke recv() or send().
* >= 1 - everything is ok. */
x = dev->dev_evcb->ready (dev, events);
if (x <= -1)
{
stio_dev_halt (dev);
return;
}
else if (x == 0) goto skip_evcb;
}
if (dev && stio->revs[i].events & EPOLLPRI)
{
/* urgent data */
/* TODO: urgent data.... */
/*x = dev->dev_mth->urgrecv (dev, stio->bugbuf, &len);*/
printf ("has urgent data...\n");
}
if (dev && stio->revs[i].events & EPOLLOUT)
{
/* write pending requests */
while (!STIO_WQ_ISEMPTY(&dev->wq))
{
stio_wq_t* q;
const stio_uint8_t* uptr;
stio_len_t urem, ulen;
int x;
q = STIO_WQ_HEAD(&dev->wq);
uptr = q->ptr;
urem = q->len;
send_leftover:
ulen = urem;
x = dev->dev_mth->write (dev, uptr, &ulen);
if (x <= -1)
{
stio_dev_halt (dev);
dev = STIO_NULL;
break;
}
else if (x == 0)
{
/* keep the left-over */
STIO_MEMMOVE (q->ptr, uptr, urem);
q->len = urem;
break;
}
else
{
uptr += ulen;
urem -= ulen;
if (urem <= 0)
{
/* finished writing a single write request */
int y, out_closed = 0;
if (q->len <= 0 && (dev->dev_capa & STIO_DEV_CAPA_STREAM))
{
/* it was a zero-length write request.
* for a stream, it is to close the output. */
dev->dev_capa |= STIO_DEV_CAPA_OUT_CLOSED;
stio->renew_watch = 1;
out_closed = 1;
}
STIO_WQ_UNLINK (q); /* STIO_WQ_DEQ(&dev->wq); */
y = dev->dev_evcb->on_write (dev, q->ctx);
STIO_MMGR_FREE (dev->stio->mmgr, q);
if (y <= -1)
{
stio_dev_halt (dev);
dev = STIO_NULL;
break;
}
if (out_closed)
{
/* drain all pending requests.
* callbacks are skipped for drained requests */
while (!STIO_WQ_ISEMPTY(&dev->wq))
{
q = STIO_WQ_HEAD(&dev->wq);
STIO_WQ_UNLINK (q);
STIO_MMGR_FREE (dev->stio->mmgr, q);
}
break;
}
}
else goto send_leftover;
}
}
if (dev && STIO_WQ_ISEMPTY(&dev->wq))
{
/* no pending request to write */
if ((dev->dev_capa & STIO_DEV_CAPA_IN_CLOSED) &&
(dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED))
{
stio_dev_halt (dev);
dev = STIO_NULL;
}
else
{
stio->renew_watch = 1;
}
}
}
if (dev && stio->revs[i].events & EPOLLIN)
{
stio_len_t len;
int x;
/* the devices are all non-blocking. read as much as possible
* if on_read callback returns 1 or greater. read only once
* if the on_read calllback returns 0. */
while (1)
{
len = STIO_COUNTOF(stio->bigbuf);
x = dev->dev_mth->read (dev, stio->bigbuf, &len);
if (x <= -1)
{
stio_dev_halt (dev);
dev = STIO_NULL;
break;
}
else if (x == 0)
{
/* no data is available - EWOULDBLOCK or something similar */
break;
}
else if (x >= 1)
{
if (len <= 0 && (dev->dev_capa & STIO_DEV_CAPA_STREAM))
{
/* EOF received. for a stream device, a zero-length
* read is interpreted as EOF. */
dev->dev_capa |= STIO_DEV_CAPA_IN_CLOSED;
stio->renew_watch = 1;
/* call the on_read callback to report EOF */
if (dev->dev_evcb->on_read (dev, stio->bigbuf, len) <= -1 ||
(dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED))
{
/* 1. input ended and its reporting failed or
* 2. input ended and no writing is possible */
stio_dev_halt (dev);
dev = STIO_NULL;
}
/* since EOF is received, reading can't be greedy */
break;
}
else
{
int y;
/* TODO: for a stream device, merge received data if bigbuf isn't full and fire the on_read callback
* when x == 0 or <= -1. you can */
/* data available */
y = dev->dev_evcb->on_read (dev, stio->bigbuf, len);
if (y <= -1)
{
stio_dev_halt (dev);
dev = STIO_NULL;
break;
}
else if (y == 0)
{
/* don't be greedy. read only once
* for this loop iteration */
break;
}
}
}
}
}
if (dev && stio->revs[i].events & (EPOLLERR | EPOLLHUP))
{
/* if error or hangup has been reported on the device,
* halt the device. this check is performed after
* EPOLLIN or EPOLLOUT check because EPOLLERR or EPOLLHUP
* can be set together with EPOLLIN or EPOLLOUT. */
dev->dev_capa |= STIO_DEV_CAPA_IN_CLOSED | STIO_DEV_CAPA_OUT_CLOSED;
stio->renew_watch = 1;
}
#if defined(EPOLLRDHUP)
else if (dev && stio->revs[i].events & EPOLLRDHUP)
{
}
#endif
if (dev &&
(dev->dev_capa & STIO_DEV_CAPA_IN_CLOSED) &&
(dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED))
{
stio_dev_halt (dev);
dev = STIO_NULL;
}
skip_evcb:
if (dev && stio->renew_watch && stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0) <= -1)
{
stio_dev_halt (dev);
dev = STIO_NULL;
}
}
int stio_exec (stio_t* stio) int stio_exec (stio_t* stio)
{ {
stio_ntime_t tmout; stio_ntime_t tmout;
#if defined(_WIN32) #if defined(_WIN32)
ULONG nentries, i; ULONG nentries, i;
#else #else
@ -282,215 +400,20 @@ int stio_exec (stio_t* stio)
} }
/* TODO: merge events??? for the same descriptor */ /* TODO: merge events??? for the same descriptor */
for (i = 0; i < nentries; i++) for (i = 0; i < nentries; i++)
{ {
stio_dev_t* dev; stio->renew_watch = 0;
handle_event (stio, i);
dev = stio->revs[i].data.ptr;
if (dev->dev_evcb->ready)
{
int x, events = 0;
if (stio->revs[i].events & EPOLLERR) events |= STIO_DEV_EVENT_ERR;
if (stio->revs[i].events & EPOLLHUP) events |= STIO_DEV_EVENT_HUP;
if (stio->revs[i].events & EPOLLIN) events |= STIO_DEV_EVENT_IN;
if (stio->revs[i].events & EPOLLOUT) events |= STIO_DEV_EVENT_OUT;
if (stio->revs[i].events & EPOLLPRI) events |= STIO_DEV_EVENT_PRI;
#if defined(EPOLLRDHUP)
/* interprete EPOLLRDHUP the same way as EPOLLHUP.
*
* when EPOLLRDHUP is set, EPOLLIN or EPOLLPRI or both are
* assumed to be set as EPOLLRDHUP is requested only if
* STIO_DEV_WATCH_IN is set for stio_dev_watch().
* in linux, when EPOLLRDHUP is set, EPOLLIN is set together
* if it's requested together. it seems to be safe to have
* the following assertion. however, let me commect it out
* in case the assumption above is not right.
* STIO_ASSERT (events & (STIO_DEV_EVENT_IN | STIO_DEV_EVENT_PRI));
*/
if (stio->revs[i].events & EPOLLRDHUP) events |= STIO_DEV_EVENT_HUP;
#endif
/* return value of ready()
* <= -1 - failure. kill the device.
* == 0 - ok. but don't invoke recv() or send().
* >= 1 - everything is ok. */
if ((x = dev->dev_evcb->ready (dev, events)) <= -1)
{
stio_ruindev (stio, dev);
dev = STIO_NULL;
}
else if (x >= 1)
{
goto invoke_evcb;
}
}
else
{
invoke_evcb:
if (dev && stio->revs[i].events & EPOLLPRI)
{
/* urgent data */
/* TODO: urgent data.... */
/*x = dev->dev_mth->urgrecv (dev, stio->bugbuf, &len);*/
printf ("has urgent data...\n");
}
if (dev && stio->revs[i].events & EPOLLIN)
{
stio_len_t len;
int x;
/* the devices are all non-blocking. so read as much as possible */
/* TODO: limit the number of iterations? */
while (1)
{
len = STIO_COUNTOF(stio->bigbuf);
x = dev->dev_mth->read (dev, stio->bigbuf, &len);
if (x <= -1)
{
stio_ruindev (stio, dev);
dev = STIO_NULL;
break;
}
else if (x == 0)
{
/* no data is available - EWOULDBLOCK or something similar */
break;
}
else if (x >= 1)
{
if (len <= 0)
{
/* EOF received. delay killing until output has been handled. */
if (STIO_WQ_ISEMPTY(&dev->wq))
{
/* no pending writes - ruin the device to kill it eventually */
stio_ruindev (stio, dev);
/* don't set 'dev' to STIO_NULL so that
* output handling can kick in if EPOLLOUT is set */
}
else
{
/* it might be in a half-open state */
dev->dev_capa &= ~(STIO_DEV_CAPA_IN | STIO_DEV_CAPA_PRI);
/* disable the input watching */
if (stio_dev_watch (dev, STIO_DEV_WATCH_UPDATE, STIO_DEV_EVENT_OUT) <= -1)
{
stio_ruindev (stio, dev);
dev = STIO_NULL;
}
}
break;
}
else
{
/* TODO: for a stream device, merge received data if bigbuf isn't full and fire the on_read callback
* when x == 0 or <= -1. you can */
/* data available */
if (dev->dev_evcb->on_read (dev, stio->bigbuf, len) <= -1)
{
stio_ruindev (stio, dev);
dev = STIO_NULL;
break;
}
}
}
}
}
if (dev && stio->revs[i].events & EPOLLOUT)
{
while (!STIO_WQ_ISEMPTY(&dev->wq))
{
stio_wq_t* q;
const stio_uint8_t* uptr;
stio_len_t urem, ulen;
int x;
q = STIO_WQ_HEAD(&dev->wq);
uptr = q->ptr;
urem = q->len;
send_leftover:
ulen = urem;
x = dev->dev_mth->write (dev, uptr, &ulen);
if (x <= -1)
{
/* TODO: error handling? call callback? or what? */
stio_killdev (stio, dev);
dev = STIO_NULL;
break;
}
else if (x == 0)
{
/* keep the left-over */
STIO_MEMMOVE (q->ptr, uptr, urem);
q->len = urem;
break;
}
else
{
uptr += ulen;
urem -= ulen;
if (urem <= 0)
{
int y;
STIO_WQ_UNLINK (q); /* STIO_WQ_DEQ(&dev->wq); */
y = dev->dev_evcb->on_write (dev, q->ctx);
STIO_MMGR_FREE (dev->stio->mmgr, q);
if (y <= -1)
{
stio_killdev (stio, dev);
dev = STIO_NULL;
break;
}
}
else goto send_leftover;
}
}
if (dev && STIO_WQ_ISEMPTY(&dev->wq))
{
/* no pending request to write.
* watch input only. disable output watching */
if (dev->dev_capa & STIO_DEV_CAPA_IN)
{
if (stio_dev_watch (dev, STIO_DEV_WATCH_UPDATE, STIO_DEV_EVENT_IN) <= -1)
{
stio_ruindev (stio, dev);
dev = STIO_NULL;
}
}
else
{
/* the device is not capable of reading.
* finish the device */
stio_ruindev (stio, dev);
dev = STIO_NULL;
}
}
}
}
} }
/* kill all ruined devices */ /* kill all halted devices */
while (stio->rdev) while (stio->hdev)
{ {
stio_dev_t* next; stio_dev_t* next;
next = stio->rdev->dev_next; next = stio->hdev->dev_next;
stio_killdev (stio, stio->rdev); stio_killdev (stio, stio->hdev);
stio->rdev = next; stio->hdev = next;
} }
#endif #endif
@ -508,8 +431,21 @@ int stio_loop (stio_t* stio)
if (!stio->dev.head) return 0; if (!stio->dev.head) return 0;
stio->stopreq = 0; stio->stopreq = 0;
stio->renew_watch = 0;
if (stio_prologue (stio) <= -1) return -1; if (stio_prologue (stio) <= -1) return -1;
if (stio->renew_watch)
{
printf (">>> GLOBAL WATCHIN RENEWAL....\n");
stio_dev_t* dev;
for (dev = stio->dev.head; dev; dev = dev->dev_next)
{
stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0);
}
stio->renew_watch = 0;
}
while (!stio->stopreq && stio->dev.head) while (!stio->stopreq && stio->dev.head)
{ {
if (stio_exec (stio) <= -1) break; if (stio_exec (stio) <= -1) break;
@ -520,6 +456,137 @@ int stio_loop (stio_t* stio)
return 0; return 0;
} }
stio_dev_t* stio_makedev (stio_t* stio, stio_size_t dev_size, stio_dev_mth_t* dev_mth, stio_dev_evcb_t* dev_evcb, void* make_ctx)
{
stio_dev_t* dev;
if (dev_size < STIO_SIZEOF(stio_dev_t))
{
stio->errnum = STIO_EINVAL;
return STIO_NULL;
}
dev = STIO_MMGR_ALLOC (stio->mmgr, dev_size);
if (!dev)
{
stio->errnum = STIO_ENOMEM;
return STIO_NULL;
}
STIO_MEMSET (dev, 0, dev_size);
dev->stio = stio;
dev->dev_size = dev_size;
/* default capability. dev->dev_mth->make() can change this.
* stio_dev_watch() is affected by the capability change. */
dev->dev_capa = STIO_DEV_CAPA_IN | STIO_DEV_CAPA_OUT;
dev->dev_mth = dev_mth;
dev->dev_evcb = dev_evcb;
STIO_WQ_INIT(&dev->wq);
/* call the callback function first */
stio->errnum = STIO_ENOERR;
if (dev->dev_mth->make (dev, make_ctx) <= -1)
{
if (stio->errnum == STIO_ENOERR) stio->errnum = STIO_EDEVMAKE;
goto oops;
}
/* set some internal capability bits according to the capabilities
* removed by the device making callback for convenience sake. */
if (!(dev->dev_capa & STIO_DEV_CAPA_IN)) dev->dev_capa |= STIO_DEV_CAPA_IN_CLOSED;
if (!(dev->dev_capa & STIO_DEV_CAPA_OUT)) dev->dev_capa |= STIO_DEV_CAPA_OUT_CLOSED;
#if defined(_WIN32)
if (CreateIoCompletionPort ((HANDLE)dev->dev_mth->getsyshnd(dev), stio->iocp, STIO_IOCP_KEY, 0) == NULL)
{
/* TODO: set errnum from GetLastError()... */
goto oops_after_make;
}
#else
if (stio_dev_watch (dev, STIO_DEV_WATCH_START, 0) <= -1) goto oops_after_make;
#endif
/* and place the new dev at the back */
if (stio->dev.tail) stio->dev.tail->dev_next = dev;
else stio->dev.head = dev;
dev->dev_prev = stio->dev.tail;
stio->dev.tail = dev;
return dev;
oops_after_make:
dev->dev_mth->kill (dev);
oops:
STIO_MMGR_FREE (stio->mmgr, dev);
return STIO_NULL;
}
void stio_killdev (stio_t* stio, stio_dev_t* dev)
{
STIO_ASSERT (stio == dev->stio);
/* clear pending send requests */
while (!STIO_WQ_ISEMPTY(&dev->wq))
{
stio_wq_t* wq;
wq = STIO_WQ_HEAD(&dev->wq);
STIO_WQ_DEQ (&dev->wq);
STIO_MMGR_FREE (stio->mmgr, wq);
}
/* delink the dev object */
if (!(dev->dev_capa & STIO_DEV_CAPA_HALTED))
{
if (dev->dev_prev)
dev->dev_prev->dev_next = dev->dev_next;
else
stio->dev.head = dev->dev_next;
if (dev->dev_next)
dev->dev_next->dev_prev = dev->dev_prev;
else
stio->dev.tail = dev->dev_prev;
}
stio_dev_watch (dev, STIO_DEV_WATCH_STOP, 0);
/* and call the callback function */
dev->dev_mth->kill (dev);
STIO_MMGR_FREE (stio->mmgr, dev);
}
void stio_dev_halt (stio_dev_t* dev)
{
if (!(dev->dev_capa & STIO_DEV_CAPA_HALTED))
{
stio_t* stio;
stio = dev->stio;
/* delink the dev object from the device list */
if (dev->dev_prev)
dev->dev_prev->dev_next = dev->dev_next;
else
stio->dev.head = dev->dev_next;
if (dev->dev_next)
dev->dev_next->dev_prev = dev->dev_prev;
else
stio->dev.tail = dev->dev_prev;
/* place it at the beginning of the halted device list.
* the halted device list is singly linked. */
dev->dev_prev = STIO_NULL;
dev->dev_next = stio->hdev;
stio->hdev = dev;
dev->dev_capa |= STIO_DEV_CAPA_HALTED;
}
}
int stio_dev_ioctl (stio_dev_t* dev, int cmd, void* arg) int stio_dev_ioctl (stio_dev_t* dev, int cmd, void* arg)
{ {
if (dev->dev_mth->ioctl) return dev->dev_mth->ioctl (dev, cmd, arg); if (dev->dev_mth->ioctl) return dev->dev_mth->ioctl (dev, cmd, arg);
@ -531,47 +598,30 @@ int stio_dev_watch (stio_dev_t* dev, stio_dev_watch_cmd_t cmd, int events)
{ {
struct epoll_event ev; struct epoll_event ev;
int epoll_op; int epoll_op;
int dev_capa;
/* this function honors STIO_DEV_EVENT_IN and STIO_DEV_EVENT_OUT only
* as valid input event bits. it intends to provide simple abstraction
* by reducing the variety of event bits that the caller has to handle. */
ev.events = EPOLLHUP | EPOLLERR;
if (events & STIO_DEV_EVENT_IN)
{
if (dev->dev_capa & STIO_DEV_CAPA_IN)
{
ev.events |= EPOLLIN;
#if defined(EPOLLRDHUP)
ev.events |= EPOLLRDHUP;
#endif
}
if (dev->dev_capa & STIO_DEV_CAPA_PRI)
{
ev.events |= EPOLLPRI;
#if defined(EPOLLRDHUP)
ev.events |= EPOLLRDHUP;
#endif
}
}
if (events & STIO_DEV_EVENT_OUT)
{
if (dev->dev_capa & STIO_DEV_CAPA_OUT) ev.events |= EPOLLOUT;
}
ev.data.ptr = dev; ev.data.ptr = dev;
switch (cmd) switch (cmd)
{ {
case STIO_DEV_WATCH_START: case STIO_DEV_WATCH_START:
/* upon start, only input watching is requested */
events = STIO_DEV_EVENT_IN;
epoll_op = EPOLL_CTL_ADD; epoll_op = EPOLL_CTL_ADD;
break; break;
case STIO_DEV_WATCH_RENEW:
/* auto-renwal mode. input watching is requested all the time.
* output watching is requested only if there're enqueued
* data for writing. */
events = STIO_DEV_EVENT_IN;
if (!STIO_WQ_ISEMPTY(&dev->wq)) events |= STIO_DEV_EVENT_OUT;
case STIO_DEV_WATCH_UPDATE: case STIO_DEV_WATCH_UPDATE:
/* honor event watching requests as given by the caller */
epoll_op = EPOLL_CTL_MOD; epoll_op = EPOLL_CTL_MOD;
break; break;
case STIO_DEV_WATCH_STOP: case STIO_DEV_WATCH_STOP:
events = 0; /* override events */
epoll_op = EPOLL_CTL_DEL; epoll_op = EPOLL_CTL_DEL;
break; break;
@ -580,23 +630,76 @@ int stio_dev_watch (stio_dev_t* dev, stio_dev_watch_cmd_t cmd, int events)
return -1; return -1;
} }
if (epoll_ctl (dev->stio->mux, epoll_op, dev->dev_mth->getsyshnd(dev), &ev) == -1) dev_capa = dev->dev_capa;
dev_capa &= ~(DEV_CAPA_ALL_WATCHED);
/* this function honors STIO_DEV_EVENT_IN and STIO_DEV_EVENT_OUT only
* as valid input event bits. it intends to provide simple abstraction
* by reducing the variety of event bits that the caller has to handle. */
ev.events = EPOLLHUP | EPOLLERR /*| EPOLLET*/;
if ((events & STIO_DEV_EVENT_IN) && !(dev->dev_capa & (STIO_DEV_CAPA_IN_CLOSED | STIO_DEV_CAPA_IN_DISABLED)))
{ {
dev->stio->errnum = stio_syserrtoerrnum(errno); if (dev->dev_capa & STIO_DEV_CAPA_IN)
return -1; {
ev.events |= EPOLLIN;
#if defined(EPOLLRDHUP)
ev.events |= EPOLLRDHUP;
#endif
if (dev->dev_capa & STIO_DEV_CAPA_PRI)
{
ev.events |= EPOLLPRI;
dev_capa |= STIO_DEV_CAPA_PRI_WATCHED;
}
dev_capa |= STIO_DEV_CAPA_IN_WATCHED;
}
} }
if ((events & STIO_DEV_EVENT_OUT) && !(dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED))
{
if (dev->dev_capa & STIO_DEV_CAPA_OUT)
{
ev.events |= EPOLLOUT;
dev_capa |= STIO_DEV_CAPA_OUT_WATCHED;
}
}
if (epoll_op == EPOLL_CTL_MOD && (dev_capa & DEV_CAPA_ALL_WATCHED) == (dev->dev_capa & DEV_CAPA_ALL_WATCHED))
{
/* skip calling epoll_ctl */
}
else
{
if (epoll_ctl (dev->stio->mux, epoll_op, dev->dev_mth->getsyshnd(dev), &ev) == -1)
{
dev->stio->errnum = stio_syserrtoerrnum(errno);
return -1;
}
}
dev->dev_capa = dev_capa;
return 0; return 0;
} }
void stio_dev_read (stio_dev_t* dev, int enabled)
{
if (enabled)
dev->dev_capa &= ~STIO_DEV_CAPA_IN_DISABLED;
else
dev->dev_capa |= STIO_DEV_CAPA_IN_DISABLED;
dev->stio->renew_watch = 1;
}
int stio_dev_write (stio_dev_t* dev, const void* data, stio_len_t len, void* wrctx) int stio_dev_write (stio_dev_t* dev, const void* data, stio_len_t len, void* wrctx)
{ {
const stio_uint8_t* uptr; const stio_uint8_t* uptr;
stio_len_t urem, ulen; stio_len_t urem, ulen;
stio_wq_t* q; stio_wq_t* q;
int x, wq_empty; int x;
if (!(dev->dev_capa & STIO_DEV_CAPA_OUT)) if (dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED)
{ {
dev->stio->errnum = STIO_ENOCAPA; dev->stio->errnum = STIO_ENOCAPA;
return -1; return -1;
@ -605,27 +708,49 @@ int stio_dev_write (stio_dev_t* dev, const void* data, stio_len_t len, void* wrc
uptr = data; uptr = data;
urem = len; urem = len;
wq_empty = STIO_WQ_ISEMPTY(&dev->wq); if (!STIO_WQ_ISEMPTY(&dev->wq))
if (!wq_empty) goto enqueue_data; {
/* the writing queue is not empty.
* enqueue this request immediately */
goto enqueue_data;
}
while (urem > 0) /* use the do..while() loop to be able to send a zero-length data */
do
{ {
ulen = urem; ulen = urem;
x = dev->dev_mth->write (dev, data, &ulen); x = dev->dev_mth->write (dev, data, &ulen);
if (x <= -1) return -1; if (x <= -1) return -1;
else if (x == 0) goto enqueue_data; /* enqueue remaining data */ else if (x == 0)
{
/* [NOTE]
* the write queue is empty at this moment. a zero-length
* request for a stream device can still get enqueued if the
* write callback returns 0 though i can't figure out if there
* is a compelling reason to do so
*/
goto enqueue_data; /* enqueue remaining data */
}
else else
{ {
urem -= ulen; urem -= ulen;
uptr += ulen; uptr += ulen;
} }
} }
while (urem > 0);
if (len <= 0 && (dev->dev_capa & STIO_DEV_CAPA_STREAM))
{
/* a zero-length writing request is to close the writing end */
dev->dev_capa |= STIO_DEV_CAPA_OUT_CLOSED;
}
dev->dev_evcb->on_write (dev, wrctx); dev->dev_evcb->on_write (dev, wrctx);
return 0; return 0;
enqueue_data: enqueue_data:
/* queue the remaining data*/ /* queue the remaining data*/
printf ("ENQUEING DATA...\n");
q = (stio_wq_t*)STIO_MMGR_ALLOC (dev->stio->mmgr, STIO_SIZEOF(*q) + urem); q = (stio_wq_t*)STIO_MMGR_ALLOC (dev->stio->mmgr, STIO_SIZEOF(*q) + urem);
if (!q) if (!q)
{ {
@ -639,15 +764,20 @@ enqueue_data:
STIO_MEMCPY (q->ptr, uptr, urem); STIO_MEMCPY (q->ptr, uptr, urem);
STIO_WQ_ENQ (&dev->wq, q); STIO_WQ_ENQ (&dev->wq, q);
if (wq_empty)
dev->stio->renew_watch = 1;
#if 0
if (!(dev->dev_capa & STIO_DEV_CAPA_OUT_WATCHED))
{ {
if (stio_dev_watch (dev, STIO_DEV_WATCH_UPDATE, STIO_DEV_EVENT_IN | STIO_DEV_EVENT_OUT) <= -1) /* if output is not being watched, arrange to do so */
if (stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0) <= -1)
{ {
STIO_WQ_UNLINK (q); /* unlink the ENQed item */ STIO_WQ_UNLINK (q); /* unlink the ENQed item */
STIO_MMGR_FREE (dev->stio->mmgr, q); STIO_MMGR_FREE (dev->stio->mmgr, q);
return -1; return -1;
} }
} }
#endif
return 0; return 0;
} }

View File

@ -97,8 +97,7 @@ struct stio_dev_mth_t
/* ------------------------------------------------------------------ */ /* ------------------------------------------------------------------ */
/* return -1 on failure, 0 if no data is availble, 1 otherwise. /* return -1 on failure, 0 if no data is availble, 1 otherwise.
* when returning 1, *len must be sent to the length of data read. * when returning 1, *len must be sent to the length of data read.
* if *len is set to 0, it's treated as EOF. * if *len is set to 0, it's treated as EOF. */
* it must not kill the device */
int (*read) (stio_dev_t* dev, void* data, stio_len_t* len); int (*read) (stio_dev_t* dev, void* data, stio_len_t* len);
/* ------------------------------------------------------------------ */ /* ------------------------------------------------------------------ */
@ -113,12 +112,12 @@ struct stio_dev_evcb_t
{ {
/* return -1 on failure. 0 or 1 on success. /* return -1 on failure. 0 or 1 on success.
* when 0 is returned, it doesn't attempt to perform actual I/O. * when 0 is returned, it doesn't attempt to perform actual I/O.
* when 1 is returned, it attempts to perform actual I/O. * when 1 is returned, it attempts to perform actual I/O. */
* it must not kill the device */
int (*ready) (stio_dev_t* dev, int events); int (*ready) (stio_dev_t* dev, int events);
/* return -1 on failure, 0 on success /* return -1 on failure, 0 or 1 on success.
* it must not kill the device */ * when 0 is returned, the main loop stops the attempt to read more data.
* when 1 is returned, the main loop attempts to read more data without*/
int (*on_read) (stio_dev_t* dev, const void* data, stio_len_t len); int (*on_read) (stio_dev_t* dev, const void* data, stio_len_t len);
/* return -1 on failure, 0 on success. /* return -1 on failure, 0 on success.
@ -126,14 +125,16 @@ struct stio_dev_evcb_t
int (*on_write) (stio_dev_t* dev, void* wrctx); int (*on_write) (stio_dev_t* dev, void* wrctx);
}; };
typedef enum stio_wq_type_t stio_wq_type_t;
struct stio_wq_t struct stio_wq_t
{ {
stio_wq_t* next; stio_wq_t* next;
stio_wq_t* prev; stio_wq_t* prev;
stio_uint8_t* ptr; stio_uint8_t* ptr;
stio_len_t len; stio_len_t len;
void* ctx; void* ctx;
}; };
#define STIO_WQ_INIT(wq) ((wq)->next = (wq)->prev = (wq)) #define STIO_WQ_INIT(wq) ((wq)->next = (wq)->prev = (wq))
@ -177,6 +178,7 @@ struct stio_wq_t
#define STIO_DEV_HEADERS \ #define STIO_DEV_HEADERS \
stio_t* stio; \ stio_t* stio; \
stio_size_t dev_size; \
int dev_capa; \ int dev_capa; \
stio_dev_mth_t* dev_mth; \ stio_dev_mth_t* dev_mth; \
stio_dev_evcb_t* dev_evcb; \ stio_dev_evcb_t* dev_evcb; \
@ -191,13 +193,24 @@ struct stio_dev_t
enum stio_dev_capa_t enum stio_dev_capa_t
{ {
STIO_DEV_CAPA_IN = (1 << 0), STIO_DEV_CAPA_IN = (1 << 0),
STIO_DEV_CAPA_OUT = (1 << 1), STIO_DEV_CAPA_OUT = (1 << 1),
STIO_DEV_CAPA_PRI = (1 << 2),
STIO_DEV_CAPA_STREAM = (1 << 3), /* #STIO_DEV_CAPA_PRI is meaningful only if #STIO_DEV_CAPA_IN is set */
STIO_DEV_CAPA_PRI = (1 << 2),
/*STIO_DEV_CAPA_HALFOPEN = (1 << 3),*/
STIO_DEV_CAPA_STREAM = (1 << 4),
/* internal use only. never set this bit to the dev_capa field */ /* internal use only. never set this bit to the dev_capa field */
STIO_DEV_CAPA_RUINED = (1 << 15) STIO_DEV_CAPA_IN_DISABLED = (1 << 9),
STIO_DEV_CAPA_IN_CLOSED = (1 << 10),
STIO_DEV_CAPA_OUT_CLOSED = (1 << 11),
STIO_DEV_CAPA_IN_WATCHED = (1 << 12),
STIO_DEV_CAPA_OUT_WATCHED = (1 << 13),
STIO_DEV_CAPA_PRI_WATCHED = (1 << 14), /**< can be set only if STIO_DEV_CAPA_IN_WATCHED is set */
STIO_DEV_CAPA_HALTED = (1 << 15)
}; };
typedef enum stio_dev_capa_t stio_dev_capa_t; typedef enum stio_dev_capa_t stio_dev_capa_t;
@ -205,6 +218,7 @@ enum stio_dev_watch_cmd_t
{ {
STIO_DEV_WATCH_START, STIO_DEV_WATCH_START,
STIO_DEV_WATCH_UPDATE, STIO_DEV_WATCH_UPDATE,
STIO_DEV_WATCH_RENEW, /* automatic update */
STIO_DEV_WATCH_STOP STIO_DEV_WATCH_STOP
}; };
typedef enum stio_dev_watch_cmd_t stio_dev_watch_cmd_t; typedef enum stio_dev_watch_cmd_t stio_dev_watch_cmd_t;
@ -287,11 +301,6 @@ STIO_EXPORT void stio_killdev (
stio_dev_t* dev stio_dev_t* dev
); );
STIO_EXPORT void stio_ruindev (
stio_t* stio,
stio_dev_t* dev
);
STIO_EXPORT int stio_dev_ioctl ( STIO_EXPORT int stio_dev_ioctl (
stio_dev_t* dev, stio_dev_t* dev,
int cmd, int cmd,
@ -305,6 +314,11 @@ STIO_EXPORT int stio_dev_watch (
int events int events
); );
STIO_EXPORT void stio_dev_read (
stio_dev_t* dev,
int enabled
);
STIO_EXPORT int stio_dev_write ( STIO_EXPORT int stio_dev_write (
stio_dev_t* dev, stio_dev_t* dev,
const void* data, const void* data,
@ -312,6 +326,10 @@ STIO_EXPORT int stio_dev_write (
void* wrctx void* wrctx
); );
STIO_EXPORT void stio_dev_halt (
stio_dev_t* dev
);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif