added stio_dev_timedwrite()

This commit is contained in:
hyung-hwan 2016-02-05 13:25:59 +00:00
parent 01ffcf973d
commit 5f3a0dac28
10 changed files with 405 additions and 232 deletions

View File

@ -124,20 +124,28 @@ printf ("device accepted client device... .asdfjkasdfkljasdlfkjasdj...\n");
}
static int tcp_on_write (stio_dev_tcp_t* tcp, void* wrctx)
static int tcp_on_write (stio_dev_tcp_t* tcp, stio_len_t wrlen, void* wrctx)
{
tcp_server_t* ts;
if (wrlen <= -1)
{
printf ("SEDING TIMED OUT...........\n");
stio_dev_tcp_halt(tcp);
}
else
{
ts = (tcp_server_t*)(tcp + 1);
printf (">>> TCP SENT MESSAGE %d\n", ts->tally);
printf (">>> SENT MESSAGE %d of length %ld\n", ts->tally, (long int)wrlen);
ts->tally++;
// if (ts->tally >= 2) stio_dev_tcp_halt (tcp);
//stio_dev_tcp_read (tcp);
printf ("ENABLING READING..............................\n");
stio_dev_read (tcp, 1);
stio_dev_tcp_read (tcp, 1);
}
return 0;
}
@ -153,12 +161,14 @@ static int tcp_on_read (stio_dev_tcp_t* tcp, const void* buf, stio_len_t len)
printf ("on read %d\n", (int)len);
stio_ntime_t tmout;
int n;
static char a ='A';
char* xxx = malloc (1000000);
memset (xxx, a++ ,1000000);
//return stio_dev_tcp_write (tcp, "HELLO", 5, STIO_NULL);
n = stio_dev_tcp_write (tcp, xxx, 1000000, STIO_NULL);
stio_inittime (&tmout, 1, 0);
n = stio_dev_tcp_timedwrite (tcp, xxx, 1000000, &tmout, STIO_NULL);
free (xxx);
if (n <= -1) return -1;
@ -168,7 +178,7 @@ free (xxx);
if (n <= -1) return -1;
printf ("DISABLING READING..............................\n");
stio_dev_read (tcp, 0);
stio_dev_tcp_read (tcp, 0);
return 0;
/* return 1; let the main loop to read more greedily without consulint the multiplexer */
@ -243,12 +253,12 @@ int main ()
memset (&sin, 0, STIO_SIZEOF(sin));
sin.sin_family = AF_INET;
sin.sin_port = htons(9999);
//inet_pton (sin.sin_family, "192.168.1.1", &sin.sin_addr);
inet_pton (sin.sin_family, "127.0.0.1", &sin.sin_addr);
inet_pton (sin.sin_family, "192.168.1.4", &sin.sin_addr);
//inet_pton (sin.sin_family, "127.0.0.1", &sin.sin_addr);
memset (&tcp_conn, 0, STIO_SIZEOF(tcp_conn));
memcpy (&tcp_conn.addr, &sin, STIO_SIZEOF(sin));
tcp_conn.timeout.sec = 5;
tcp_conn.tmout.sec = 5;
tcp_conn.on_connect = tcp_on_connect;
tcp_conn.on_disconnect = tcp_on_disconnect;
if (stio_dev_tcp_connect (tcp[0], &tcp_conn) <= -1)

View File

@ -416,8 +416,8 @@ struct stio_mmgr_t
# define STIO_INLINE inline
# define STIO_HAVE_INLINE
#elif defined(__GNUC__) && defined(__GNUC_GNU_INLINE__)
/* gcc disables inline when -std=c89 or -ansi is used.
* so use __inline__ supported by gcc regardless of the options */
/* gcc disables inline when -std=c89 or -ansi is used.
* so use __inline__ supported by gcc regardless of the options */
# define STIO_INLINE /*extern*/ __inline__
# define STIO_HAVE_INLINE
#else
@ -426,8 +426,6 @@ struct stio_mmgr_t
#endif
/**
* The STIO_TYPE_IS_SIGNED() macro determines if a type is signed.
* \code

View File

@ -233,12 +233,13 @@ void stio_dev_pro_kill (stio_dev_pro_t* pro)
stio_killdev (pro->stio, (stio_dev_t*)pro);
}
#if 0
int stio_dev_pro_write (stio_dev_pro_t* pro, const void* data, stio_len_t len, void* wrctx)
{
return stio_dev_write ((stio_dev_t*)pro, data, len, wrctx);
}
#if 0
stio_dev_pro_t* stio_dev_pro_getdev (stio_dev_pro_t* pro, stio_dev_pro_type_t type)
{
switch (type)

View File

@ -73,7 +73,9 @@ struct stio_t
stio_dev_t* hdev; /* halted device list - singly linked list */
stio_uint8_t bigbuf[65535]; /* TODO: make this dynamic depending on devices added. device may indicate a buffer size required??? */
int renew_watch;
unsigned int renew_watch: 1;
unsigned int in_exec: 1;
struct
{
@ -144,15 +146,6 @@ struct stio_t
#define STIO_SEC_TO_USEC(sec) ((sec) * STIO_USECS_PER_SEC)
#define STIO_USEC_TO_SEC(usec) ((usec) / STIO_USECS_PER_SEC)
#define stio_inittime(x,s,ns) (((x)->sec = (s)), ((x)->nsec = (ns)))
#define stio_cleartime(x) stio_inittime(x,0,0)
/*#define stio_cleartime(x) ((x)->sec = (x)->nsec = 0)*/
#define stio_cmptime(x,y) \
(((x)->sec == (y)->sec)? ((x)->nsec - (y)->nsec): \
((x)->sec - (y)->sec))
#define stio_iszerotime(x) ((x)->sec == 0 && (x)->nsec == 0)
#ifdef __cplusplus
extern "C" {
@ -168,52 +161,6 @@ stio_errnum_t stio_syserrtoerrnum (
);
/**
* The stio_gettime() function gets the current time.
*/
STIO_EXPORT void stio_gettime (
stio_ntime_t* nt
);
/**
* The stio_addtime() function adds x and y and stores the result in z
*/
STIO_EXPORT void stio_addtime (
const stio_ntime_t* x,
const stio_ntime_t* y,
stio_ntime_t* z
);
/**
* The stio_subtime() function subtract y from x and stores the result in z.
*/
STIO_EXPORT void stio_subtime (
const stio_ntime_t* x,
const stio_ntime_t* y,
stio_ntime_t* z
);
/**
* The stio_instmrjob() function schedules a new event.
*
* \return #STIO_TMRIDX_INVALID on failure, valid index on success.
*/
stio_tmridx_t stio_instmrjob (
stio_t* stio,
const stio_tmrjob_t* job
);
stio_tmridx_t stio_updtmrjob (
stio_t* stio,
stio_tmridx_t index,
const stio_tmrjob_t* job
);
void stio_deltmrjob (
stio_t* stio,
stio_tmridx_t index
);
void stio_cleartmrjobs (
stio_t* stio
@ -221,24 +168,17 @@ void stio_cleartmrjobs (
void stio_firetmrjobs (
stio_t* stio,
const stio_ntime_t* tm,
const stio_ntime_t* tmbase,
stio_size_t* firecnt
);
int stio_gettmrtmout (
stio_t* stio,
const stio_ntime_t* tm,
const stio_ntime_t* tmbase,
stio_ntime_t* tmout
);
/**
* The stio_gettmrjob() function returns the
* pointer to the registered event at the given index.
*/
stio_tmrjob_t* stio_gettmrjob (
stio_t* stio,
stio_tmridx_t index
);
#ifdef __cplusplus
}
#endif

View File

@ -51,7 +51,6 @@ static int tcp_make (stio_dev_t* dev, void* ctx)
setsockopt (udp->sck, SOL_SOCKET, SO_REUSEADDR, ...);
TRANSPARENT, ETC.
*/
iv = 1;
if (setsockopt (tcp->sck, SOL_SOCKET, SO_REUSEADDR, &iv, STIO_SIZEOF(iv)) == -1 ||
bind (tcp->sck, (struct sockaddr*)&arg->addr, len) == -1)
@ -255,12 +254,12 @@ static int tcp_ioctl (stio_dev_t* dev, int cmd, void* arg)
{
stio_tmrjob_t tmrjob;
if (!stio_iszerotime(&conn->timeout))
if (!stio_isnegtime(&conn->tmout))
{
STIO_MEMSET (&tmrjob, 0, STIO_SIZEOF(tmrjob));
tmrjob.ctx = tcp;
stio_gettime (&tmrjob.when);
stio_addtime (&tmrjob.when, &conn->timeout, &tmrjob.when);
stio_addtime (&tmrjob.when, &conn->tmout, &tmrjob.when);
tmrjob.handler = tmr_connect_handle;
#if defined(STIO_USE_TMRJOB_IDXPTR)
tmrjob.idxptr = &tcp->tmridx_connect;
@ -273,7 +272,8 @@ static int tcp_ioctl (stio_dev_t* dev, int cmd, void* arg)
if (tcp->tmridx_connect == STIO_TMRIDX_INVALID)
{
stio_dev_watch ((stio_dev_t*)tcp, STIO_DEV_WATCH_UPDATE, STIO_DEV_EVENT_IN);
/* event manipulation failure can't be handled properly. so ignore it */
/* event manipulation failure can't be handled properly. so ignore it.
* anyway, it's already in a failure condition */
return -1;
}
}
@ -517,10 +517,10 @@ static int tcp_on_read (stio_dev_t* dev, const void* data, stio_len_t len)
return tcp->on_read (tcp, data, len);
}
static int tcp_on_write (stio_dev_t* dev, void* wrctx)
static int tcp_on_write (stio_dev_t* dev, stio_len_t wrlen, void* wrctx)
{
stio_dev_tcp_t* tcp = (stio_dev_tcp_t*)dev;
return tcp->on_write (tcp, wrctx);
return tcp->on_write (tcp, wrlen, wrctx);
}
static stio_dev_evcb_t tcp_evcb =
@ -550,12 +550,3 @@ int stio_dev_tcp_listen (stio_dev_tcp_t* tcp, stio_dev_tcp_listen_t* lstn)
return stio_dev_ioctl ((stio_dev_t*)tcp, STIO_DEV_TCP_LISTEN, lstn);
}
int stio_dev_tcp_write (stio_dev_tcp_t* tcp, const void* data, stio_len_t len, void* wrctx)
{
return stio_dev_write ((stio_dev_t*)tcp, data, len, wrctx);
}
int stio_dev_tcp_halt (stio_dev_tcp_t* tcp)
{
stio_dev_halt ((stio_dev_t*)tcp);
}

View File

@ -54,7 +54,7 @@ typedef void (*stio_dev_tcp_on_accepted_t) (stio_dev_tcp_t* dev, stio_dev_tcp_t*
typedef void (*stio_dev_tcp_on_disconnect_t) (stio_dev_tcp_t* dev);
typedef int (*stio_dev_tcp_on_read_t) (stio_dev_tcp_t* dev, const void* data, stio_len_t len);
typedef int (*stio_dev_tcp_on_write_t) (stio_dev_tcp_t* dev, void* wrctx);
typedef int (*stio_dev_tcp_on_write_t) (stio_dev_tcp_t* dev, stio_len_t wrlen, void* wrctx);
struct stio_dev_tcp_t
{
@ -104,7 +104,7 @@ typedef struct stio_dev_tcp_connect_t stio_dev_tcp_connect_t;
struct stio_dev_tcp_connect_t
{
stio_sckadr_t addr;
stio_ntime_t timeout; /* connect timeout */
stio_ntime_t tmout; /* connect timeout */
stio_dev_tcp_on_connect_t on_connect;
stio_dev_tcp_on_disconnect_t on_disconnect;
};
@ -149,16 +149,37 @@ STIO_EXPORT int stio_dev_tcp_listen (
stio_dev_tcp_listen_t* lstn
);
STIO_EXPORT int stio_dev_tcp_write (
stio_dev_tcp_t* tcp,
const void* data,
stio_len_t len,
void* wrctx
);
STIO_EXPORT int stio_dev_tcp_halt (
stio_dev_tcp_t* tcp
);
#if defined(STIO_HAVE_INLINE)
static STIO_INLINE int stio_dev_tcp_read (stio_dev_tcp_t* tcp, int enabled)
{
return stio_dev_read ((stio_dev_t*)tcp, enabled);
}
static STIO_INLINE int stio_dev_tcp_write (stio_dev_tcp_t* tcp, const void* data, stio_len_t len, void* wrctx)
{
return stio_dev_write ((stio_dev_t*)tcp, data, len, wrctx);
}
static STIO_INLINE int stio_dev_tcp_timedwrite (stio_dev_tcp_t* tcp, const void* data, stio_len_t len, const stio_ntime_t* tmout, void* wrctx)
{
return stio_dev_timedwrite ((stio_dev_t*)tcp, data, len, tmout, wrctx);
}
static STIO_INLINE void stio_dev_tcp_halt (stio_dev_tcp_t* tcp)
{
stio_dev_halt ((stio_dev_t*)tcp);
}
#else
#define stio_dev_tcp_read(tcp,enabled) stio_dev_read((stio_dev_t*)tcp, enabled)
#define stio_dev_tcp_write(tcp,data,len,wrctx) stio_dev_write((stio_dev_t*)tcp, data, len, wrctx)
#define stio_dev_tcp_timedwrite(tcp,data,len,tmout,wrctx) stio_dev_timedwrite((stio_dev_t*)tcp, data, len, tmout, wrctx)
#define stio_dev_tcp_halt(tcp) stio_dev_halt((stio_dev_t*)tcp)
#endif
#ifdef __cplusplus
}

View File

@ -167,7 +167,7 @@ printf ("dATA received %d bytes\n", (int)len);
}
static int udp_on_write (stio_dev_t* dev, void* msgid)
static int udp_on_write (stio_dev_t* dev, stio_len_t wrlen, void* wrctx)
{
return 0;
@ -184,8 +184,3 @@ stio_dev_udp_t* stio_dev_udp_make (stio_t* stio, stio_size_t xtnsize, const stio
{
return (stio_dev_udp_t*)stio_makedev (stio, STIO_SIZEOF(stio_dev_udp_t) + xtnsize, &udp_mth, &udp_evcb, (void*)data);
}
void stio_dev_udp_halt (stio_dev_udp_t* udp)
{
stio_dev_halt ((stio_dev_t*)udp);
}

View File

@ -66,10 +66,36 @@ STIO_EXPORT stio_dev_udp_t* stio_dev_udp_make (
const stio_dev_udp_make_t* data
);
STIO_EXPORT void stio_dev_udp_halt (
stio_dev_udp_t* udp
);
#if defined(STIO_HAVE_INLINE)
static STIO_INLINE int stio_dev_udp_read (stio_dev_udp_t* udp, int enabled)
{
return stio_dev_read ((stio_dev_t*)udp, enabled);
}
static STIO_INLINE int stio_dev_udp_write (stio_dev_udp_t* udp, const void* data, stio_len_t len, void* wrctx)
{
return stio_dev_write ((stio_dev_t*)udp, data, len, wrctx);
}
static STIO_INLINE int stio_dev_udp_timedwrite (stio_dev_udp_t* udp, const void* data, stio_len_t len, const stio_ntime_t* tmout, void* wrctx)
{
return stio_dev_timedwrite ((stio_dev_t*)udp, data, len, tmout, wrctx);
}
static STIO_INLINE void stio_dev_udp_halt (stio_dev_udp_t* udp)
{
stio_dev_halt ((stio_dev_t*)udp);
}
#else
#define stio_dev_udp_read(udp,enabled) stio_dev_read((stio_dev_t*)udp, enabled)
#define stio_dev_udp_write(udp,data,len,wrctx) stio_dev_write((stio_dev_t*)udp, data, len, wrctx)
#define stio_dev_udp_timedwrite(udp,data,len,tmout,wrctx) stio_dev_timedwrite((stio_dev_t*)udp, data, len, tmout, wrctx)
#define stio_dev_udp_halt(udp) stio_dev_halt((stio_dev_t*)udp)
#endif
#ifdef __cplusplus
}
#endif

View File

@ -117,12 +117,25 @@ void stio_epilogue (stio_t* stio)
/* TODO: */
}
STIO_INLINE static void handle_event (stio_t* stio, stio_size_t i)
static STIO_INLINE void unlink_wq (stio_t* stio, stio_wq_t* q)
{
if (q->tmridx != STIO_TMRIDX_INVALID)
{
stio_deltmrjob (stio, q->tmridx);
STIO_ASSERT (q->tmridx == STIO_TMRIDX_INVALID);
}
STIO_WQ_UNLINK (q);
}
static STIO_INLINE void handle_event (stio_t* stio, stio_size_t i)
{
stio_dev_t* dev;
stio->renew_watch = 0;
dev = stio->revs[i].data.ptr;
STIO_ASSERT (stio == dev->stio);
if (dev->dev_evcb->ready)
{
int x, events = 0;
@ -153,7 +166,7 @@ STIO_INLINE static void handle_event (stio_t* stio, stio_size_t i)
{
/* urgent data */
/* TODO: urgent data.... */
/*x = dev->dev_mth->urgrecv (dev, stio->bugbuf, &len);*/
/*x = dev->dev_mth->urgread (dev, stio->bugbuf, &len);*/
printf ("has urgent data...\n");
}
@ -207,9 +220,9 @@ printf ("has urgent data...\n");
out_closed = 1;
}
STIO_WQ_UNLINK (q); /* STIO_WQ_DEQ(&dev->wq); */
y = dev->dev_evcb->on_write (dev, q->ctx);
STIO_MMGR_FREE (dev->stio->mmgr, q);
unlink_wq (stio, q);
y = dev->dev_evcb->on_write (dev, q->olen, q->ctx);
STIO_MMGR_FREE (stio->mmgr, q);
if (y <= -1)
{
@ -225,7 +238,7 @@ printf ("has urgent data...\n");
while (!STIO_WQ_ISEMPTY(&dev->wq))
{
q = STIO_WQ_HEAD(&dev->wq);
STIO_WQ_UNLINK (q);
unlink_wq (stio, q);
STIO_MMGR_FREE (dev->stio->mmgr, q);
}
break;
@ -321,28 +334,39 @@ printf ("has urgent data...\n");
}
}
if (dev && stio->revs[i].events & (EPOLLERR | EPOLLHUP))
{
/* if error or hangup has been reported on the device,
* halt the device. this check is performed after
* EPOLLIN or EPOLLOUT check because EPOLLERR or EPOLLHUP
* can be set together with EPOLLIN or EPOLLOUT. */
dev->dev_capa |= STIO_DEV_CAPA_IN_CLOSED | STIO_DEV_CAPA_OUT_CLOSED;
stio->renew_watch = 1;
}
#if defined(EPOLLRDHUP)
else if (dev && stio->revs[i].events & EPOLLRDHUP)
if (dev)
{
if (stio->revs[i].events & (EPOLLERR | EPOLLHUP))
{
/* if error or hangup has been reported on the device,
* halt the device. this check is performed after
* EPOLLIN or EPOLLOUT check because EPOLLERR or EPOLLHUP
* can be set together with EPOLLIN or EPOLLOUT. */
dev->dev_capa |= STIO_DEV_CAPA_IN_CLOSED | STIO_DEV_CAPA_OUT_CLOSED;
stio->renew_watch = 1;
}
#if defined(EPOLLRDHUP)
else if (dev && stio->revs[i].events & EPOLLRDHUP)
{
if (stio->revs[i].events & (EPOLLIN | EPOLLOUT | EPOLLPRI))
{
/* it may be a half-open state. don't do anything here
* to let the next read detect EOF */
}
else
{
dev->dev_capa |= STIO_DEV_CAPA_IN_CLOSED | STIO_DEV_CAPA_OUT_CLOSED;
stio->renew_watch = 1;
}
}
#endif
}
#endif
if (dev &&
(dev->dev_capa & STIO_DEV_CAPA_IN_CLOSED) &&
(dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED))
{
stio_dev_halt (dev);
dev = STIO_NULL;
if ((dev->dev_capa & STIO_DEV_CAPA_IN_CLOSED) &&
(dev->dev_capa & STIO_DEV_CAPA_OUT_CLOSED))
{
stio_dev_halt (dev);
dev = STIO_NULL;
}
}
skip_evcb:
@ -353,7 +377,7 @@ skip_evcb:
}
}
int stio_exec (stio_t* stio)
static STIO_INLINE int __exec (stio_t* stio)
{
stio_ntime_t tmout;
@ -403,7 +427,6 @@ int stio_exec (stio_t* stio)
for (i = 0; i < nentries; i++)
{
stio->renew_watch = 0;
handle_event (stio, i);
}
@ -421,6 +444,17 @@ int stio_exec (stio_t* stio)
return 0;
}
int stio_exec (stio_t* stio)
{
int n;
stio->in_exec = 1;
n = __exec (stio);
stio->in_exec = 0;
return n;
}
void stio_stop (stio_t* stio)
{
stio->stopreq = 1;
@ -435,17 +469,6 @@ int stio_loop (stio_t* stio)
if (stio_prologue (stio) <= -1) return -1;
if (stio->renew_watch)
{
printf (">>> GLOBAL WATCHIN RENEWAL....\n");
stio_dev_t* dev;
for (dev = stio->dev.head; dev; dev = dev->dev_next)
{
stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0);
}
stio->renew_watch = 0;
}
while (!stio->stopreq && stio->dev.head)
{
if (stio_exec (stio) <= -1) break;
@ -529,12 +552,10 @@ void stio_killdev (stio_t* stio, stio_dev_t* dev)
/* clear pending send requests */
while (!STIO_WQ_ISEMPTY(&dev->wq))
{
stio_wq_t* wq;
wq = STIO_WQ_HEAD(&dev->wq);
STIO_WQ_DEQ (&dev->wq);
STIO_MMGR_FREE (stio->mmgr, wq);
stio_wq_t* q;
q = STIO_WQ_HEAD(&dev->wq);
unlink_wq (stio, q);
STIO_MMGR_FREE (stio->mmgr, q);
}
/* delink the dev object */
@ -682,17 +703,53 @@ int stio_dev_watch (stio_dev_t* dev, stio_dev_watch_cmd_t cmd, int events)
return 0;
}
void stio_dev_read (stio_dev_t* dev, int enabled)
int stio_dev_read (stio_dev_t* dev, int enabled)
{
if (dev->dev_capa & STIO_DEV_CAPA_IN_CLOSED)
{
dev->stio->errnum = STIO_ENOCAPA;
return -1;
}
if (enabled)
{
dev->dev_capa &= ~STIO_DEV_CAPA_IN_DISABLED;
if (!dev->stio->in_exec && (dev->dev_capa & STIO_DEV_CAPA_IN_WATCHED)) goto renew_watch_now;
}
else
{
dev->dev_capa |= STIO_DEV_CAPA_IN_DISABLED;
if (!dev->stio->in_exec && !(dev->dev_capa & STIO_DEV_CAPA_IN_WATCHED)) goto renew_watch_now;
}
dev->stio->renew_watch = 1;
return 0;
renew_watch_now:
if (stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0) <= -1) return -1;
return 0;
}
int stio_dev_write (stio_dev_t* dev, const void* data, stio_len_t len, void* wrctx)
static void on_write_timeout (stio_t* stio, const stio_ntime_t* now, stio_tmrjob_t* job)
{
stio_wq_t* q;
stio_dev_t* dev;
int x;
q = (stio_wq_t*)job->ctx;
dev = q->dev;
dev->stio->errnum = STIO_ETMOUT;
x = dev->dev_evcb->on_write (dev, -1, q->ctx);
STIO_ASSERT (q->tmridx == STIO_TMRIDX_INVALID);
STIO_WQ_UNLINK(q);
STIO_MMGR_FREE (stio->mmgr, q);
if (x <= -1) stio_dev_halt (dev);
}
static int __dev_write (stio_dev_t* dev, const void* data, stio_len_t len, const stio_ntime_t* tmout, void* wrctx)
{
const stio_uint8_t* uptr;
stio_len_t urem, ulen;
@ -715,42 +772,55 @@ int stio_dev_write (stio_dev_t* dev, const void* data, stio_len_t len, void* wrc
goto enqueue_data;
}
/* use the do..while() loop to be able to send a zero-length data */
do
if (dev->dev_capa & STIO_DEV_CAPA_STREAM)
{
/* use the do..while() loop to be able to send a zero-length data */
do
{
ulen = urem;
x = dev->dev_mth->write (dev, data, &ulen);
if (x <= -1) return -1;
else if (x == 0)
{
/* [NOTE]
* the write queue is empty at this moment. a zero-length
* request for a stream device can still get enqueued if the
* write callback returns 0 though i can't figure out if there
* is a compelling reason to do so
*/
goto enqueue_data; /* enqueue remaining data */
}
else
{
urem -= ulen;
uptr += ulen;
}
}
while (urem > 0);
if (len <= 0) /* original length */
{
/* a zero-length writing request is to close the writing end */
dev->dev_capa |= STIO_DEV_CAPA_OUT_CLOSED;
}
if (dev->dev_evcb->on_write (dev, len, wrctx) <= -1) return -1;
}
else
{
ulen = urem;
x = dev->dev_mth->write (dev, data, &ulen);
if (x <= -1) return -1;
else if (x == 0)
{
/* [NOTE]
* the write queue is empty at this moment. a zero-length
* request for a stream device can still get enqueued if the
* write callback returns 0 though i can't figure out if there
* is a compelling reason to do so
*/
goto enqueue_data; /* enqueue remaining data */
}
else
{
urem -= ulen;
uptr += ulen;
}
}
while (urem > 0);
else if (x == 0) goto enqueue_data;
if (len <= 0 && (dev->dev_capa & STIO_DEV_CAPA_STREAM))
{
/* a zero-length writing request is to close the writing end */
dev->dev_capa |= STIO_DEV_CAPA_OUT_CLOSED;
/* partial writing is still considered ok for a non-stream device */
if (dev->dev_evcb->on_write (dev, ulen, wrctx) <= -1) return -1;
}
dev->dev_evcb->on_write (dev, wrctx);
return 0;
return 1; /* written immediately and called on_write callback */
enqueue_data:
/* queue the remaining data*/
printf ("ENQUEING DATA...\n");
q = (stio_wq_t*)STIO_MMGR_ALLOC (dev->stio->mmgr, STIO_SIZEOF(*q) + urem);
if (!q)
{
@ -758,28 +828,60 @@ printf ("ENQUEING DATA...\n");
return -1;
}
q->tmridx = STIO_TMRIDX_INVALID;
q->dev = dev;
q->ctx = wrctx;
q->ptr = (stio_uint8_t*)(q + 1);
q->len = urem;
q->olen = len;
STIO_MEMCPY (q->ptr, uptr, urem);
STIO_WQ_ENQ (&dev->wq, q);
dev->stio->renew_watch = 1;
#if 0
if (!(dev->dev_capa & STIO_DEV_CAPA_OUT_WATCHED))
if (tmout && !stio_isnegtime(tmout))
{
/* if output is not being watched, arrange to do so */
if (stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0) <= -1)
stio_tmrjob_t tmrjob;
STIO_MEMSET (&tmrjob, 0, STIO_SIZEOF(tmrjob));
tmrjob.ctx = q;
stio_gettime (&tmrjob.when);
stio_addtime (&tmrjob.when, tmout, &tmrjob.when);
tmrjob.handler = on_write_timeout;
tmrjob.idxptr = &q->tmridx;
q->tmridx = stio_instmrjob (dev->stio, &tmrjob);
if (q->tmridx == STIO_TMRIDX_INVALID)
{
STIO_WQ_UNLINK (q); /* unlink the ENQed item */
STIO_MMGR_FREE (dev->stio->mmgr, q);
return -1;
}
}
#endif
return 0;
STIO_WQ_ENQ (&dev->wq, q);
if (!dev->stio->in_exec && !(dev->dev_capa & STIO_DEV_CAPA_OUT_WATCHED))
{
/* if output is not being watched, arrange to do so */
if (stio_dev_watch (dev, STIO_DEV_WATCH_RENEW, 0) <= -1)
{
unlink_wq (dev->stio, q);
STIO_MMGR_FREE (dev->stio->mmgr, q);
return -1;
}
}
else
{
dev->stio->renew_watch = 1;
}
return 0; /* request pused to a write queue. */
}
int stio_dev_write (stio_dev_t* dev, const void* data, stio_len_t len, void* wrctx)
{
return __dev_write (dev, data, len, STIO_NULL, wrctx);
}
int stio_dev_timedwrite (stio_dev_t* dev, const void* data, stio_len_t len, const stio_ntime_t* tmout, void* wrctx)
{
return __dev_write (dev, data, len, tmout, wrctx);
}
int stio_makesyshndasync (stio_t* stio, stio_syshnd_t hnd)

View File

@ -48,7 +48,6 @@ struct stio_ntime_t
#define STIO_SYSHND_INVALID (-1)
#endif
/* ------------------------------------------------------------------------- */
typedef struct stio_t stio_t;
typedef struct stio_dev_t stio_dev_t;
@ -56,8 +55,7 @@ typedef struct stio_dev_mth_t stio_dev_mth_t;
typedef struct stio_dev_evcb_t stio_dev_evcb_t;
typedef struct stio_wq_t stio_wq_t;
typedef unsigned int stio_len_t; /* TODO: remove it? */
typedef stio_intptr_t stio_len_t; /* NOTE: this is a signed type */
enum stio_errnum_t
{
@ -65,12 +63,13 @@ enum stio_errnum_t
STIO_ENOMEM,
STIO_EINVAL,
STIO_ENOENT,
STIO_ENOSUP, /* not supported */
STIO_EMFILE,
STIO_ENOSUP, /* not supported */
STIO_EMFILE, /* too many open files */
STIO_ENFILE,
STIO_ECONRF, /* connection refused */
STIO_ECONRS, /* connection reset */
STIO_ENOCAPA, /* no capability */
STIO_ECONRF, /* connection refused */
STIO_ECONRS, /* connection reset */
STIO_ENOCAPA, /* no capability */
STIO_ETMOUT, /* timed out */
STIO_EDEVMAKE,
STIO_EDEVERR,
@ -81,6 +80,21 @@ enum stio_errnum_t
typedef enum stio_errnum_t stio_errnum_t;
typedef struct stio_tmrjob_t stio_tmrjob_t;
typedef stio_size_t stio_tmridx_t;
typedef void (*stio_tmrjob_handler_t) (
stio_t* stio,
const stio_ntime_t* now,
stio_tmrjob_t* tmrjob
);
typedef void (*stio_tmrjob_updater_t) (
stio_t* stio,
stio_tmridx_t old_index,
stio_tmridx_t new_index,
stio_tmrjob_t* tmrjob
);
struct stio_dev_mth_t
{
@ -121,20 +135,24 @@ struct stio_dev_evcb_t
int (*on_read) (stio_dev_t* dev, const void* data, stio_len_t len);
/* return -1 on failure, 0 on success.
* it must not kill the device */
int (*on_write) (stio_dev_t* dev, void* wrctx);
* wrlen is the length of data written. it is the length of the originally
* posted writing request for a stream device. For a non stream device, it
* may be shorter than the originally posted length. */
int (*on_write) (stio_dev_t* dev, stio_len_t wrlen, void* wrctx);
};
typedef enum stio_wq_type_t stio_wq_type_t;
struct stio_wq_t
{
stio_wq_t* next;
stio_wq_t* prev;
stio_len_t olen; /* original length */
stio_uint8_t* ptr;
stio_len_t len;
void* ctx;
stio_dev_t* dev; /* back-pointer to the device */
stio_tmridx_t tmridx;
};
#define STIO_WQ_INIT(wq) ((wq)->next = (wq)->prev = (wq))
@ -235,22 +253,6 @@ enum stio_dev_event_t
typedef enum stio_dev_event_t stio_dev_event_t;
typedef struct stio_tmrjob_t stio_tmrjob_t;
typedef stio_size_t stio_tmridx_t;
typedef void (*stio_tmrjob_handler_t) (
stio_t* stio,
const stio_ntime_t* now,
stio_tmrjob_t* tmrjob
);
typedef void (*stio_tmrjob_updater_t) (
stio_t* stio,
stio_tmridx_t old_index,
stio_tmridx_t new_index,
stio_tmrjob_t* tmrjob
);
#ifdef __cplusplus
extern "C" {
#endif
@ -314,22 +316,109 @@ STIO_EXPORT int stio_dev_watch (
int events
);
STIO_EXPORT void stio_dev_read (
STIO_EXPORT int stio_dev_read (
stio_dev_t* dev,
int enabled
);
/**
* The stio_dev_write() function posts a writing request.
* It attempts to write data immediately if there is no pending requests.
* If writing fails, it returns -1. If writing succeeds, it calls the
* on_write callback. If the callback fails, it returns -1. If the callback
* succeeds, it returns 1. If no immediate writing is possible, the request
* is enqueued to a pending request list. If enqueing gets successful,
* it returns 0. otherwise it returns -1.
*/
STIO_EXPORT int stio_dev_write (
stio_dev_t* dev,
const void* data,
stio_len_t len,
void* wrctx
stio_dev_t* dev,
const void* data,
stio_len_t len,
void* wrctx
);
STIO_EXPORT int stio_dev_timedwrite (
stio_dev_t* dev,
const void* data,
stio_len_t len,
const stio_ntime_t* tmout,
void* wrctx
);
STIO_EXPORT void stio_dev_halt (
stio_dev_t* dev
);
/* ------------------------------------------------ */
#define stio_inittime(x,s,ns) (((x)->sec = (s)), ((x)->nsec = (ns)))
#define stio_cleartime(x) stio_inittime(x,0,0)
#define stio_cmptime(x,y) \
(((x)->sec == (y)->sec)? ((x)->nsec - (y)->nsec): \
((x)->sec - (y)->sec))
/* if time has been normalized properly, nsec must be equal to or
* greater than 0. */
#define stio_isnegtime(x) ((x)->sec < 0)
/**
* The stio_gettime() function gets the current time.
*/
STIO_EXPORT void stio_gettime (
stio_ntime_t* nt
);
/**
* The stio_addtime() function adds x and y and stores the result in z
*/
STIO_EXPORT void stio_addtime (
const stio_ntime_t* x,
const stio_ntime_t* y,
stio_ntime_t* z
);
/**
* The stio_subtime() function subtract y from x and stores the result in z.
*/
STIO_EXPORT void stio_subtime (
const stio_ntime_t* x,
const stio_ntime_t* y,
stio_ntime_t* z
);
/**
* The stio_instmrjob() function schedules a new event.
*
* \return #STIO_TMRIDX_INVALID on failure, valid index on success.
*/
STIO_EXPORT stio_tmridx_t stio_instmrjob (
stio_t* stio,
const stio_tmrjob_t* job
);
STIO_EXPORT stio_tmridx_t stio_updtmrjob (
stio_t* stio,
stio_tmridx_t index,
const stio_tmrjob_t* job
);
STIO_EXPORT void stio_deltmrjob (
stio_t* stio,
stio_tmridx_t index
);
/**
* The stio_gettmrjob() function returns the
* pointer to the registered event at the given index.
*/
STIO_EXPORT stio_tmrjob_t* stio_gettmrjob (
stio_t* stio,
stio_tmridx_t index
);
#ifdef __cplusplus
}
#endif