enhanced nwio

This commit is contained in:
hyung-hwan 2012-04-29 15:26:44 +00:00
parent 005471b505
commit 9425ec0730
8 changed files with 200 additions and 137 deletions

View File

@ -894,19 +894,14 @@ static qse_mmgr_t debug_mmgr =
}; };
#endif #endif
static qse_ssize_t nwio_handler ( static qse_ssize_t nwio_handler_open (
qse_awk_rtx_t* rtx, qse_awk_rio_cmd_t cmd, qse_awk_rio_arg_t* riod, qse_awk_rtx_t* rtx, qse_awk_rio_arg_t* riod, int flags, qse_nwad_t* nwad)
qse_char_t* data, qse_size_t size, qse_nwad_t* nwad)
{ {
switch (cmd)
{
case QSE_AWK_RIO_OPEN:
{
qse_nwio_t* handle; qse_nwio_t* handle;
handle = qse_nwio_open ( handle = qse_nwio_open (
qse_awk_rtx_getmmgr(rtx), 0, nwad, qse_awk_rtx_getmmgr(rtx), 0, nwad,
QSE_NWIO_TEXT | QSE_NWIO_IGNOREMBWCERR | flags | QSE_NWIO_TEXT | QSE_NWIO_IGNOREMBWCERR |
QSE_NWIO_READNORETRY | QSE_NWIO_WRITENORETRY QSE_NWIO_READNORETRY | QSE_NWIO_WRITENORETRY
); );
if (handle == QSE_NULL) return -1; if (handle == QSE_NULL) return -1;
@ -918,39 +913,83 @@ static qse_ssize_t nwio_handler (
} }
#endif #endif
riod->handle = (void*)handle; riod->handle2 = (void*)handle;
return 1; return 1;
}
static qse_ssize_t nwio_handler_rest (
qse_awk_rtx_t* rtx, qse_awk_rio_cmd_t cmd, qse_awk_rio_arg_t* riod,
qse_char_t* data, qse_size_t size)
{
switch (cmd)
{
case QSE_AWK_RIO_OPEN:
{
qse_awk_rtx_seterrnum (rtx, QSE_AWK_EINTERN, QSE_NULL);
return -1;
} }
case QSE_AWK_RIO_CLOSE: case QSE_AWK_RIO_CLOSE:
{ {
qse_nwio_close ((qse_nwio_t*)riod->handle); qse_nwio_close ((qse_nwio_t*)riod->handle2);
riod->handle = QSE_NULL; riod->handle2 = QSE_NULL;
return 0; return 0;
} }
case QSE_AWK_RIO_READ: case QSE_AWK_RIO_READ:
{ {
return qse_nwio_read ((qse_nwio_t*)riod->handle, data, size); return qse_nwio_read ((qse_nwio_t*)riod->handle2, data, size);
} }
case QSE_AWK_RIO_WRITE: case QSE_AWK_RIO_WRITE:
{ {
return qse_nwio_write ((qse_nwio_t*)riod->handle, data, size); return qse_nwio_write ((qse_nwio_t*)riod->handle2, data, size);
} }
case QSE_AWK_RIO_FLUSH: case QSE_AWK_RIO_FLUSH:
{ {
/*if (riod->mode == QSE_AWK_RIO_PIPE_READ) return -1;*/ /*if (riod->mode == QSE_AWK_RIO_PIPE_READ) return -1;*/
return qse_nwio_flush ((qse_nwio_t*)riod->handle); return qse_nwio_flush ((qse_nwio_t*)riod->handle2);
} }
case QSE_AWK_RIO_NEXT: case QSE_AWK_RIO_NEXT:
{ {
qse_awk_rtx_seterrnum (rtx, QSE_AWK_EINTERN, QSE_NULL);
return -1; return -1;
} }
} }
qse_awk_rtx_seterrnum (rtx, QSE_AWK_EINTERN, QSE_NULL);
return -1;
}
static int parse_pipe_uri (const qse_char_t* uri, int* flags, qse_nwad_t* nwad)
{
static struct
{
qse_char_t* prefix;
qse_size_t len;
int flags;
} x[] =
{
{ QSE_T("tcp://"), 6, QSE_NWIO_TCP },
{ QSE_T("udp://"), 6, QSE_NWIO_UDP },
{ QSE_T("tcpd://"), 7, QSE_NWIO_TCP | QSE_NWIO_PASSIVE },
{ QSE_T("udpd://"), 7, QSE_NWIO_UDP | QSE_NWIO_PASSIVE }
};
int i;
for (i = 0; i < QSE_COUNTOF(x); i++)
{
if (qse_strzcmp (uri, x[i].prefix, x[i].len) == 0)
{
if (qse_strtonwad (uri + x[i].len, nwad) <= -1) return -1;
*flags = x[i].flags;
return 0;
}
}
return -1; return -1;
} }
@ -959,13 +998,16 @@ static qse_ssize_t new_pipe_handler (
qse_char_t* data, qse_size_t size) qse_char_t* data, qse_size_t size)
{ {
struct rtx_xtn_t* xtn; struct rtx_xtn_t* xtn;
int flags;
qse_nwad_t nwad; qse_nwad_t nwad;
xtn = qse_awk_rtx_getxtnstd (rtx); xtn = qse_awk_rtx_getxtnstd (rtx);
if (qse_strtonwad (riod->name, &nwad) >= 0) if (cmd == QSE_AWK_RIO_OPEN && parse_pipe_uri (riod->name, &flags, &nwad) >= 0)
return nwio_handler (rtx, cmd, riod, data, size, &nwad); return nwio_handler_open (rtx, riod, flags, &nwad);
else if (riod->handle2)
return nwio_handler_rest (rtx, cmd, riod, data, size);
else
return xtn->old_pipe_handler (rtx, cmd, riod, data, size); return xtn->old_pipe_handler (rtx, cmd, riod, data, size);
} }

View File

@ -542,6 +542,7 @@ struct qse_awk_rio_arg_t
qse_char_t* name; /**< name of I/O object */ qse_char_t* name; /**< name of I/O object */
qse_awk_rio_rwcmode_t rwcmode; /**< closing mode for rwpipe */ qse_awk_rio_rwcmode_t rwcmode; /**< closing mode for rwpipe */
void* handle; /**< I/O handle set by a handler */ void* handle; /**< I/O handle set by a handler */
void* handle2; /**< secondary I/O handle set by a handler */
/*-- from here down, internal use only --*/ /*-- from here down, internal use only --*/
int type; int type;

View File

@ -253,9 +253,6 @@ qse_ssize_t qse_fio_read (
/** /**
* The qse_fio_write() function writes data. * The qse_fio_write() function writes data.
* If QSE_FIO_TEXT is used and the size parameter is (qse_size_t)-1,
* the function treats the data parameter as a pointer to a null-terminated
* string.
*/ */
qse_ssize_t qse_fio_write ( qse_ssize_t qse_fio_write (
qse_fio_t* fio, qse_fio_t* fio,

View File

@ -37,14 +37,14 @@ enum qse_nwio_flag_t
QSE_NWIO_NOAUTOFLUSH = (1 << 2), QSE_NWIO_NOAUTOFLUSH = (1 << 2),
/* normal open flags */ /* normal open flags */
QSE_NWIO_READ = (1 << 8), QSE_NWIO_PASSIVE = (1 << 8),
QSE_NWIO_WRITE = (1 << 9), QSE_NWIO_TCP = (1 << 9),
QSE_NWIO_UDP = (1 << 10),
/** do not reread if read has been interrupted */ /** do not reread if read has been interrupted */
QSE_NWIO_READNORETRY = (1 << 10), QSE_NWIO_READNORETRY = (1 << 14),
/** do not rewrite if write has been interrupted */ /** do not rewrite if write has been interrupted */
QSE_NWIO_WRITENORETRY = (1 << 11), QSE_NWIO_WRITENORETRY = (1 << 15),
QSE_NWIO_LISTEN = (1 << 12)
}; };
enum qse_nwio_errnum_t enum qse_nwio_errnum_t
@ -89,6 +89,7 @@ struct qse_nwio_t
qse_nwio_errnum_t errnum; qse_nwio_errnum_t errnum;
qse_nwio_hnd_t handle; qse_nwio_hnd_t handle;
qse_tio_t* tio; qse_tio_t* tio;
int status;
}; };
#define QSE_NWIO_HANDLE(nwio) ((nwio)->handle) #define QSE_NWIO_HANDLE(nwio) ((nwio)->handle)

View File

@ -303,7 +303,8 @@ qse_pio_pid_t qse_pio_getchild (
); );
/** /**
* The qse_pio_read() fucntion reads data. * The qse_pio_read() fucntion reads at most @a size bytes/characters
* and stores them to the buffer pointed to by @a buf.
* @return -1 on failure, 0 on EOF, data length read on success * @return -1 on failure, 0 on EOF, data length read on success
*/ */
qse_ssize_t qse_pio_read ( qse_ssize_t qse_pio_read (
@ -314,9 +315,10 @@ qse_ssize_t qse_pio_read (
); );
/** /**
* The qse_pio_write() function writes data. * The qse_pio_write() function writes up @a size bytes/characters
* If @a size is zero, qse_pio_write() closes the the writing * from the buffer pointed to by @a data. If #QSE_PIO_TEXT is used
* stream causing the child process reach the end of the stream. * and the @a size parameter is (qse_size_t)-1, the function treats
* the @a data parameter as a pointer to a null-terminated string.
* @return -1 on failure, data length written on success * @return -1 on failure, data length written on success
*/ */
qse_ssize_t qse_pio_write ( qse_ssize_t qse_pio_write (

View File

@ -36,7 +36,6 @@ enum qse_sio_flag_t
* qse_fio_flag_t enumerators. you can use values between * qse_fio_flag_t enumerators. you can use values between
* (1<<0) and (1<<7) inclusive reserved in qse_fio_flag_t. * (1<<0) and (1<<7) inclusive reserved in qse_fio_flag_t.
* the range is represented by QSE_FIO_RESERVED. */ * the range is represented by QSE_FIO_RESERVED. */
QSE_SIO_URI = (1 << 0),
QSE_SIO_IGNOREMBWCERR = (1 << 1), QSE_SIO_IGNOREMBWCERR = (1 << 1),
QSE_SIO_NOAUTOFLUSH = (1 << 2), QSE_SIO_NOAUTOFLUSH = (1 << 2),

View File

@ -6,8 +6,8 @@
QSE is free software: you can redistribute it and/or modify QSE is free software: you can redistribute it and/or modify
it under the terms of the GNU Lesser General Public License as it under the terms of the GNU Lesser General Public License as
published by the Free Software Foundation, either vernwion 3 of published by the Free Software Foundation, either version 3 of
the License, or (at your option) any later vernwion. the License, or (at your option) any later version.
QSE is distributed in the hope that it will be useful, QSE is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of but WITHOUT ANY WARRANTY; without even the implied warranty of
@ -33,6 +33,16 @@
# include <netinet/in.h> # include <netinet/in.h>
#endif #endif
enum
{
UDP_CONNECT_NEEDED = (1 << 0)
};
union sockaddr_t
{
struct sockaddr_in in4;
struct sockaddr_in6 in6;
};
static qse_ssize_t socket_input ( static qse_ssize_t socket_input (
qse_tio_t* tio, qse_tio_cmd_t cmd, void* buf, qse_size_t size); qse_tio_t* tio, qse_tio_cmd_t cmd, void* buf, qse_size_t size);
@ -260,13 +270,14 @@ void qse_nwio_close (qse_nwio_t* nwio)
int qse_nwio_init ( int qse_nwio_init (
qse_nwio_t* nwio, qse_mmgr_t* mmgr, const qse_nwad_t* nwad, int flags) qse_nwio_t* nwio, qse_mmgr_t* mmgr, const qse_nwad_t* nwad, int flags)
{ {
union union sockaddr_t addr;
{ #ifdef HAVE_SOCKLEN_T
struct sockaddr_in in4; socklen_t addrlen;
struct sockaddr_in6 in6; #else
} addr;
int addrlen; int addrlen;
#endif
int family; int family;
int type;
QSE_MEMSET (nwio, 0, QSE_SIZEOF(*nwio)); QSE_MEMSET (nwio, 0, QSE_SIZEOF(*nwio));
nwio->mmgr = mmgr; nwio->mmgr = mmgr;
@ -281,7 +292,15 @@ int qse_nwio_init (
#elif defined(__DOS__) #elif defined(__DOS__)
/* TODO: */ /* TODO: */
#else #else
nwio->handle = socket (family, SOCK_STREAM, 0); if (flags & QSE_NWIO_TCP) type = SOCK_STREAM;
else if (flags & QSE_NWIO_UDP) type = SOCK_DGRAM;
else
{
nwio->errnum = QSE_NWIO_EINVAL;
return -1;
}
nwio->handle = socket (family, type, 0);
if (nwio->handle <= -1) if (nwio->handle <= -1)
{ {
nwio->errnum = syserr_to_errnum (errno); nwio->errnum = syserr_to_errnum (errno);
@ -295,28 +314,39 @@ int qse_nwio_init (
} }
#endif #endif
if (flags & QSE_NWIO_LISTEN) if (flags & QSE_NWIO_PASSIVE)
{ {
qse_nwio_hnd_t handle; qse_nwio_hnd_t handle;
if (bind (nwio->handle, (struct sockaddr*)&addr, addrlen) <= -1 || if (bind (nwio->handle, (struct sockaddr*)&addr, addrlen) <= -1)
listen (nwio->handle, 10) <= -1)
{ {
nwio->errnum = syserr_to_errnum (errno); nwio->errnum = syserr_to_errnum (errno);
goto oops; goto oops;
} }
/* TODO: socklen_t */ if (flags & QSE_NWIO_TCP)
handle = accept (nwio->handle, &addr, &addrlen); {
if (listen (nwio->handle, 10) <= -1)
{
nwio->errnum = syserr_to_errnum (errno);
goto oops;
}
handle = accept (nwio->handle, (struct sockaddr*)&addr, &addrlen);
if (handle <= -1) if (handle <= -1)
{ {
nwio->errnum = syserr_to_errnum (errno); nwio->errnum = syserr_to_errnum (errno);
goto oops; goto oops;
} }
close (nwio->handle); QSE_CLOSE (nwio->handle);
nwio->handle = handle; nwio->handle = handle;
} }
else if (flags & QSE_NWIO_UDP)
{
nwio->status |= UDP_CONNECT_NEEDED;
}
}
else else
{ {
if (connect (nwio->handle, (struct sockaddr*)&addr, addrlen) <= -1) if (connect (nwio->handle, (struct sockaddr*)&addr, addrlen) <= -1)
@ -365,7 +395,7 @@ oops:
#elif defined(__DOS__) #elif defined(__DOS__)
/* TODO: */ /* TODO: */
#else #else
close (nwio->handle); QSE_CLOSE (nwio->handle);
#endif #endif
return -1; return -1;
} }
@ -454,50 +484,59 @@ static qse_ssize_t nwio_read (qse_nwio_t* nwio, void* buf, qse_size_t size)
#endif #endif
#if defined(_WIN32) #if defined(_WIN32)
/* TODO: */
if (size > (QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(DWORD)))
size = QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(DWORD);
if (ReadFile(nwio->handle, buf, (DWORD)size, &count, QSE_NULL) == FALSE)
{
/* ReadFile receives ERROR_BROKEN_PIPE when the write end
* is closed in the child process */
if (GetLastError() == ERROR_BROKEN_PIPE) return 0;
nwio->errnum = syserr_to_errnum(GetLastError());
return -1;
}
return (qse_ssize_t)count;
#elif defined(__OS2__) #elif defined(__OS2__)
/* TODO: */
if (size > (QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(ULONG)))
size = QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(ULONG);
rc = DosRead (nwio->handle, buf, (ULONG)size, &count);
if (rc != NO_ERROR)
{
if (rc == ERROR_BROKEN_PIPE) return 0; /* TODO: check this */
nwio->errnum = syserr_to_errnum(rc);
return -1;
}
return (qse_ssize_t)count;
#elif defined(__DOS__) #elif defined(__DOS__)
/* TODO: verify this */ /* TODO: */
if (size > (QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(unsigned int)))
size = QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(unsigned int);
n = read (nwio->handle, buf, size);
if (n <= -1) nwio->errnum = syserr_to_errnum(errno);
return n;
#else #else
if (size > (QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(size_t))) if (size > (QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(size_t)))
size = QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(size_t); size = QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(size_t);
reread: reread:
if (nwio->status & UDP_CONNECT_NEEDED)
{
union sockaddr_t addr;
#ifdef HAVE_SOCKLEN_T
socklen_t addrlen;
#else
int addrlen;
#endif
addrlen = QSE_SIZEOF(addr);
n = recvfrom (
nwio->handle, buf, size, 0,
(struct sockaddr*)&addr, &addrlen);
if (n <= -1)
{
if (errno == EINTR)
{
if (nwio->flags & QSE_NWIO_READNORETRY)
nwio->errnum = QSE_NWIO_EINTR;
else goto reread;
}
else
{
nwio->errnum = syserr_to_errnum (errno);
}
}
else if (n >= 1)
{
/* for udp, it just creates a stream with the
* first sender */
if (connect (nwio->handle, (struct sockaddr*)&addr, addrlen) <= -1)
{
nwio->errnum = syserr_to_errnum (errno);
return -1;
}
nwio->status &= ~UDP_CONNECT_NEEDED;
}
}
else
{
n = recv (nwio->handle, buf, size, 0); n = recv (nwio->handle, buf, size, 0);
if (n <= -1) if (n <= -1)
{ {
@ -512,6 +551,7 @@ reread:
nwio->errnum = syserr_to_errnum (errno); nwio->errnum = syserr_to_errnum (errno);
} }
} }
}
return n; return n;
#endif #endif
@ -549,37 +589,15 @@ static qse_ssize_t nwio_write (qse_nwio_t* nwio, const void* data, qse_size_t si
#if defined(_WIN32) #if defined(_WIN32)
if (size > (QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(DWORD))) /* TODO: */
size = QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(DWORD);
if (WriteFile (nwio->handle, data, (DWORD)size, &count, QSE_NULL) == FALSE)
{
nwio->errnum = syserr_to_errnum(GetLastError());
return -1;
}
return (qse_ssize_t)count;
#elif defined(__OS2__) #elif defined(__OS2__)
if (size > (QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(ULONG))) /* TODO: */
size = QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(ULONG);
rc = DosWrite (nwio->handle, (PVOID)data, (ULONG)size, &count);
if (rc != NO_ERROR)
{
nwio->errnum = syserr_to_errnum(rc);
return -1;
}
return (qse_ssize_t)count;
#elif defined(__DOS__) #elif defined(__DOS__)
if (size > (QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(unsigned int))) /* TODO: */
size = QSE_TYPE_MAX(qse_ssize_t) & QSE_TYPE_MAX(unsigned int);
n = write (nwio->handle, data, size);
if (n <= -1) nwio->errnum = syserr_to_errnum (errno);
return n;
#else #else
@ -656,4 +674,3 @@ static qse_ssize_t socket_output (
return 0; return 0;
} }

View File

@ -306,12 +306,16 @@ qse_ssize_t qse_tio_flush (qse_tio_t* tio)
if (n <= -1) if (n <= -1)
{ {
if (tio->errnum == QSE_TIO_ENOERR) tio->errnum = QSE_TIO_EOTHER; if (tio->errnum == QSE_TIO_ENOERR) tio->errnum = QSE_TIO_EOTHER;
if (cur != tio->out.buf.ptr)
{
QSE_MEMCPY (tio->out.buf.ptr, cur, left); QSE_MEMCPY (tio->out.buf.ptr, cur, left);
tio->outbuf_len = left; tio->outbuf_len = left;
}
return -1; return -1;
} }
if (n == 0) if (n == 0)
{ {
if (cur != tio->out.buf.ptr)
QSE_MEMCPY (tio->out.buf.ptr, cur, left); QSE_MEMCPY (tio->out.buf.ptr, cur, left);
break; break;
} }