cleaning up mio_dev_thr_t code

This commit is contained in:
hyung-hwan 2020-05-22 19:06:57 +00:00
parent 9c95db02e5
commit c441abdbfb
4 changed files with 207 additions and 241 deletions

View File

@ -30,6 +30,7 @@
#include <mio-sck.h>
#include <mio-pro.h>
#include <mio-pipe.h>
#include <mio-thr.h>
#include <mio-dns.h>
#include <mio-nwif.h>
#include <mio-http.h>
@ -638,6 +639,36 @@ static void pipe_on_close (mio_dev_pipe_t* dev, mio_dev_pipe_sid_t sid)
{
MIO_INFO1 (dev->mio, "PIPE[%d] CLOSED \n", (int)sid);
}
static int thr_on_read (mio_dev_thr_t* dev, const void* data, mio_iolen_t dlen)
{
MIO_INFO3 (dev->mio, "THR READ FROM THR - %d bytes - [%.*s]\n", (int)dlen, (int)dlen, data);
}
static int thr_on_write (mio_dev_thr_t* dev, mio_iolen_t wrlen, void* wrctx)
{
MIO_INFO1 (dev->mio, "THR WRITTEN TO THR - %d bytes\n", (int)wrlen);
}
static void thr_on_close (mio_dev_thr_t* dev, mio_dev_thr_sid_t sid)
{
MIO_INFO1 (dev->mio, "THR[%d] CLOSED \n", (int)sid);
}
static void thr_func (mio_dev_thr_t* dev, mio_dev_thr_iopair_t* iop, void* cx)
{
mio_bch_t buf[5];
ssize_t n;
while ((n = read(iop->rfd, buf, MIO_COUNTOF(buf)))> 0) write (iop->wfd, buf, n);
while (1)
{
sleep (1);
write (iop->wfd, "THR LOOPING", 11);
}
}
/* ========================================================================= */
static void on_dnc_resolve(mio_svc_dnc_t* dnc, mio_dns_msg_t* reqmsg, mio_errnum_t status, const void* data, mio_oow_t dlen)
@ -1213,8 +1244,21 @@ if (!mio_svc_dnc_resolve(dnc, "google.com", MIO_DNS_RRT_SOA, MIO_SVC_DNC_RESOLVE
mi.on_read = pipe_on_read;
mi.on_write = pipe_on_write;
mi.on_close = pipe_on_close;
pp = mio_dev_pipe_make (mio, 0, &mi);
mio_dev_pipe_write (pp, "hello, world", 12, MIO_NULL);
pp = mio_dev_pipe_make(mio, 0, &mi);
mio_dev_pipe_write (pp, "this is good", 12, MIO_NULL);
}
{
mio_dev_thr_t* tt;
mio_dev_thr_make_t mi;
mi.thr_func = thr_func;
mi.thr_ctx = MIO_NULL;
mi.on_read = thr_on_read;
mi.on_write = thr_on_write;
mi.on_close = thr_on_close;
tt = mio_dev_thr_make(mio, 0, &mi);
mio_dev_thr_write (tt, "hello, world", 12, MIO_NULL);
mio_dev_thr_write (tt, MIO_NULL, 0, MIO_NULL);
}
mio_loop (mio);

View File

@ -1652,111 +1652,6 @@ mio_bch_t* mio_svc_htts_dupmergepaths (mio_svc_htts_t* htts, const mio_bch_t* ba
/* ----------------------------------------------------------------- */
#if 0
typedef void (*mio_svc_htts_rsrc_cgi_t) (
int rfd,
int wfd
);
struct mio_svc_htts_rsrc_cgi_peer_t
{
int rfd;
int wfd;
};
typedef struct mio_svc_htts_rsrc_cgi_peer_t mio_svc_htts_rsrc_cgi_peer_t;
enum mio_svc_htts_rsrc_cgi_type_t
{
MIO_SVC_HTTS_RSRC_CGI_TYPE_FUNC,
MIO_SVC_HTTS_RSRC_CGI_TYPE_PROC
};
typedef enum mio_svc_htts_rsrc_cgi_type_t mio_svc_htts_rsrc_cgi_type_t;
struct rsrc_cgi_xtn_t
{
mio_svc_htts_rsrc_cgi_type_t type;
int rfd;
int wfd;
mio_svc_htts_rsrc_cgi_t handler;
pthread_t thr;
mio_svc_htts_rsrc_cgi_peer_t peer;
};
typedef struct rsrc_cgi_xtn_t rsrc_cgi_xtn_t;
static void rsrc_cgi_on_kill (mio_svc_htts_rsrc_t* rsrc)
{
rsrc_cgi_xtn_t* cgi = (rsrc_cgi_xtn_t*)mio_svc_htts_rsrc_getxtn(rsrc);
close (cgi->rfd); cgi->rfd = -1;
close (cgi->wfd); cgi->wfd = -1;
switch (cgi->type)
{
case MIO_SVC_HTTS_RSRC_CGI_TYPE_FUNC:
/* TODO: check cgi->thr is valid.
* non-blocking way? if alive, kill gracefully?? */
pthread_join (cgi->thr, MIO_NULL);
break;
case MIO_SVC_HTTS_RSRC_CGI_TYPE_PROC:
/* TODO:
waitpid with no wait
still alive kill
waitpid with no wait.
*/
break;
}
}
static void* cgi_thr_func (void* ctx)
{
rsrc_cgi_xtn_t* func = (rsrc_cgi_xtn_t*)ctx;
func->handler (func->peer.rfd, func->peer.wfd);
close (func->peer.rfd); func->peer.rfd = -1;
close (func->peer.wfd); func->peer.wfd = -1;
return MIO_NULL;
}
int mio_svc_htts_sendcgi (mio_svc_htts_t* htts, mio_dev_sck_t* csck, mio_svc_htts_rsrc_cgi_t handler, mio_htre_t* req)
{
mio_svc_htts_rsrc_t* rsrc = MIO_NULL;
rsrc_cgi_xtn_t* cgi = MIO_NULL;
int pfd[2];
rsrc = mio_svc_htts_rsrc_make(htts, csck, rsrc_cgi_on_kill, MIO_SIZEOF(*cgi));
if (MIO_UNLIKELY(!rsrc)) goto oops;
cgi = mio_svc_htts_rsrc_getxtn(rsrc);
cgi->type = MIO_SVC_HTTS_RSRC_CGI_TYPE_FUNC;
cgi->handler = handler;
cgi->rfd = -1;
cgi->wfd = -1;
cgi->peer.rfd = -1;
cgi->peer.wfd = -1;
if (pipe(pfd) == -1) goto oops;
cgi->peer.rfd = pfd[0];
cgi->wfd = pfd[1];
if (pipe(pfd) == -1) goto oops;
cgi->rfd = pfd[0];
cgi->peer.wfd = pfd[1];
if (pthread_create(&cgi->thr, MIO_NULL, cgi_thr_func, cgi) != 0) goto oops;
return 0;
oops:
if (cgi)
{
if (cgi->peer.rfd >= 0) { close (cgi->peer.rfd); cgi->peer.rfd = -1; }
if (cgi->peer.wfd >= 0) { close (cgi->peer.wfd); cgi->peer.wfd = -1; }
if (cgi->rfd >= 0) { close (cgi->rfd); cgi->rfd = -1; }
if (cgi->wfd >= 0) { close (cgi->wfd); cgi->wfd = -1; }
}
if (rsrc) mio_svc_htts_rsrc_kill (rsrc);
return -1;
}
/* ----------------------------------------------------------------- */
int mio_svc_htts_sendrsrc (mio_svc_htts_t* htts, mio_dev_sck_t* csck, mio_svc_htts_rsrc_t* rsrc, int status_code, mio_http_method_t method, const mio_http_version_t* version, int keepalive)

View File

@ -57,23 +57,30 @@ typedef void (*mio_dev_thr_on_close_t) (
mio_dev_thr_sid_t sid
);
typedef int (*mio_dev_thr_func_t) (
mio_dev_thr_t* dev,
mio_syshnd_t rfd,
mio_syshnd_t wfd,
void* ctx
struct mio_dev_thr_iopair_t
{
mio_syshnd_t rfd;
mio_syshnd_t wfd;
};
typedef struct mio_dev_thr_iopair_t mio_dev_thr_iopair_t;
typedef void (*mio_dev_thr_func_t) (
mio_t* mio,
mio_dev_thr_iopair_t* iop,
void* ctx
);
typedef struct mio_dev_thr_info_t mio_dev_thr_info_t;
struct mio_dev_thr_t
{
MIO_DEV_HEADER;
int flags;
mio_syshnd_t thr_fd[2];
mio_dev_thr_slave_t* slave[2];
int slave_count;
mio_dev_thr_func_t thr_func;
mio_dev_thr_info_t* thr_info;
mio_dev_thr_on_read_t on_read;
mio_dev_thr_on_write_t on_write;
mio_dev_thr_on_close_t on_close;
@ -87,18 +94,11 @@ struct mio_dev_thr_slave_t
mio_dev_thr_t* master; /* parent device */
};
enum mio_dev_thr_make_flag_t
{
MIO_DEV_THR_WRITEIN = (1 << 0),
MIO_DEV_THR_READOUT = (1 << 1),
};
typedef enum mio_dev_thr_make_flag_t mio_dev_thr_make_flag_t;
typedef struct mio_dev_thr_make_t mio_dev_thr_make_t;
struct mio_dev_thr_make_t
{
int flags; /**< bitwise-ORed of mio_dev_thr_make_flag_t enumerators */
mio_dev_thr_func_t thr_func;
void* thr_ctx;
mio_dev_thr_on_write_t on_write; /* mandatory */
mio_dev_thr_on_read_t on_read; /* mandatory */
mio_dev_thr_on_close_t on_close; /* optional */

View File

@ -48,23 +48,57 @@ static mio_dev_thr_slave_t* make_slave (mio_t* mio, slave_info_t* si);
/* ========================================================================= */
struct thr_info_t
struct mio_dev_thr_info_t
{
mio_dev_thr_t* dev;
mio_t* mio;
mio_dev_thr_func_t thr_func;
mio_syshnd_t rfd;
mio_syshnd_t wfd;
mio_dev_thr_iopair_t thr_iop;
void* thr_ctx;
pthread_t thr_hnd;
int thr_done;
mio_tmridx_t cleanup_tmridx;
};
typedef struct thr_info_t thr_info_t;
typedef struct mio_dev_thr_info_t mio_dev_thr_info_t;
static void free_thr_info (mio_t* mio, mio_dev_thr_info_t* ti)
{
if (ti->thr_iop.rfd != MIO_SYSHND_INVALID)
{
close (ti->thr_iop.rfd);
ti->thr_iop.rfd = MIO_SYSHND_INVALID;
}
if (ti->thr_iop.wfd != MIO_SYSHND_INVALID)
{
close (ti->thr_iop.wfd);
ti->thr_iop.wfd = MIO_SYSHND_INVALID;
}
mio_freemem (mio, ti);
}
static void mark_thr_done (void* ctx)
{
mio_dev_thr_info_t* ti = (mio_dev_thr_info_t*)ctx;
printf ("QQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQ\n");
ti->thr_done = 1;
}
static void* run_thr_func (void* ctx)
{
thr_info_t* ti = (thr_info_t*)ctx;
//ti->thr_func (ti->dev, ti->rfd, ti->wfd, );
close (ti->rfd);
close (ti->wfd);
mio_freemem (ti->dev->mio, ti);
mio_dev_thr_info_t* ti = (mio_dev_thr_info_t*)ctx;
/* i assume the thread is cancellable, and of the deferred cancellation type by default */
/*int dummy;
pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, &dummy);
pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, &dummy);*/
pthread_cleanup_push (mark_thr_done, ti);
ti->thr_func (ti->mio, &ti->thr_iop, ti->thr_ctx);
pthread_cleanup_pop (1);
pthread_exit (MIO_NULL);
return MIO_NULL;
}
@ -75,129 +109,80 @@ static int dev_thr_make_master (mio_dev_t* dev, void* ctx)
mio_dev_thr_t* rdev = (mio_dev_thr_t*)dev;
mio_dev_thr_make_t* info = (mio_dev_thr_make_t*)ctx;
mio_syshnd_t pfds[4] = { MIO_SYSHND_INVALID, MIO_SYSHND_INVALID, MIO_SYSHND_INVALID, MIO_SYSHND_INVALID };
int i, minidx = -1, maxidx = -1;
pid_t pid;
slave_info_t si;
int i;
if (info->flags & MIO_DEV_THR_WRITEIN)
if (pipe(&pfds[0]) == -1 || pipe(&pfds[2]) == -1)
{
if (pipe(&pfds[0]) == -1)
{
mio_seterrwithsyserr (mio, 0, errno);
goto oops;
}
minidx = 0; maxidx = 1;
}
if (info->flags & MIO_DEV_THR_READOUT)
{
if (pipe(&pfds[2]) == -1)
{
mio_seterrwithsyserr (mio, 0, errno);
goto oops;
}
if (minidx == -1) minidx = 2;
maxidx = 3;
}
if (maxidx == -1)
{
mio_seterrnum (mio, MIO_EINVAL);
mio_seterrwithsyserr (mio, 0, errno);
goto oops;
}
if (mio_makesyshndasync(mio, pfds[1]) <= -1 ||
mio_makesyshndasync(mio, pfds[2]) <= -1) goto oops;
if (info->flags & MIO_DEV_THR_WRITEIN)
{
/*
* 0123
* rw--
* X
* WRITE => 1
*/
if (mio_makesyshndasync(mio, pfds[1]) <= -1) goto oops;
}
si.mi = info;
si.pfd = pfds[1];
si.dev_cap = MIO_DEV_CAP_OUT | MIO_DEV_CAP_STREAM;
si.id = MIO_DEV_THR_IN;
if (info->flags & MIO_DEV_THR_READOUT)
{
/*
* 0123
* --rw
* X
* READ => 2
*/
if (mio_makesyshndasync(mio, pfds[2]) <= -1) goto oops;
}
rdev->slave[MIO_DEV_THR_IN] = make_slave(mio, &si);
if (!rdev->slave[MIO_DEV_THR_IN]) goto oops;
if (pfds[1] != MIO_SYSHND_INVALID)
{
slave_info_t si;
pfds[1] = MIO_SYSHND_INVALID;
rdev->slave_count++;
si.mi = info;
si.pfd = pfds[1];
si.dev_cap = MIO_DEV_CAP_OUT | MIO_DEV_CAP_STREAM;
si.id = MIO_DEV_THR_IN;
si.mi = info;
si.pfd = pfds[2];
si.dev_cap = MIO_DEV_CAP_IN | MIO_DEV_CAP_STREAM;
si.id = MIO_DEV_THR_OUT;
rdev->slave[MIO_DEV_THR_IN] = make_slave(mio, &si);
if (!rdev->slave[MIO_DEV_THR_IN]) goto oops;
rdev->slave[MIO_DEV_THR_OUT] = make_slave(mio, &si);
if (!rdev->slave[MIO_DEV_THR_OUT]) goto oops;
pfds[1] = MIO_SYSHND_INVALID;
rdev->slave_count++;
}
if (pfds[2] != MIO_SYSHND_INVALID)
{
slave_info_t si;
si.mi = info;
si.pfd = pfds[2];
si.dev_cap = MIO_DEV_CAP_IN | MIO_DEV_CAP_STREAM;
si.id = MIO_DEV_THR_OUT;
rdev->slave[MIO_DEV_THR_OUT] = make_slave(mio, &si);
if (!rdev->slave[MIO_DEV_THR_OUT]) goto oops;
pfds[2] = MIO_SYSHND_INVALID;
rdev->slave_count++;
}
pfds[2] = MIO_SYSHND_INVALID;
rdev->slave_count++;
for (i = 0; i < MIO_COUNTOF(rdev->slave); i++)
{
if (rdev->slave[i]) rdev->slave[i]->master = rdev;
}
/* CREATE THREAD. DATA. CREATE THREAD .. */
{
pthread_t thr;
pthread_attr_t attr;
int n;
thr_info_t* ti;
ti = mio_callocmem(mio, MIO_SIZEOF(*ti));
if (MIO_UNLIKELY(!ti)) goto oops;
pthread_attr_init (&attr);
pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED);
n = pthread_create(&thr, &attr, info->thr_func, ti);
pthread_attr_destroy (&attr);
if (n != 0)
{
mio_freemem (mio, ti);
goto oops;
}
}
rdev->dev_cap = MIO_DEV_CAP_VIRTUAL; /* the master device doesn't perform I/O */
rdev->flags = info->flags;
rdev->thr_func = info->thr_func;
rdev->on_read = info->on_read;
rdev->on_write = info->on_write;
rdev->on_close = info->on_close;
rdev->thr_fd[0] = pfds[0];
rdev->thr_fd[1] = pfds[3];
/* ---------------------------------------------------------- */
{
int n;
mio_dev_thr_info_t* ti;
ti = mio_callocmem(mio, MIO_SIZEOF(*ti));
if (MIO_UNLIKELY(!ti)) goto oops;
ti->mio = mio;
ti->thr_iop.rfd = pfds[0];
ti->thr_iop.wfd = pfds[3];
ti->thr_func = info->thr_func;
ti->thr_ctx = info->thr_ctx;
rdev->thr_info = ti;
n = pthread_create(&ti->thr_hnd, MIO_NULL, run_thr_func, ti);
if (n != 0)
{
rdev->thr_info = MIO_NULL;
mio_freemem (mio, ti);
goto oops;
}
}
/* ---------------------------------------------------------- */
return 0;
oops:
for (i = minidx; i < maxidx; i++)
for (i = 0; i < MIO_COUNTOF(pfds); i++)
{
if (pfds[i] != MIO_SYSHND_INVALID) close (pfds[i]);
}
@ -230,12 +215,27 @@ static int dev_thr_make_slave (mio_dev_t* dev, void* ctx)
return 0;
}
static void check_and_free_thr_info (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job)
{
mio_dev_thr_info_t* ti = (mio_dev_thr_info_t*)job->ctx;
if (ti->thr_done)
{
free_thr_info (mio, ti);
}
else
{
}
}
static int dev_thr_kill_master (mio_dev_t* dev, int force)
{
mio_t* mio = dev->mio;
mio_dev_thr_t* rdev = (mio_dev_thr_t*)dev;
int i, status;
pid_t wpid;
mio_dev_thr_info_t* ti;
int i;
ti = rdev->thr_info;
pthread_cancel (ti->thr_hnd);
if (rdev->slave_count > 0)
{
@ -256,7 +256,38 @@ static int dev_thr_kill_master (mio_dev_t* dev, int force)
}
}
/* TODO: may have to kill thread... */
if (ti->thr_done)
{
printf ("THREAD DONE>...111\n");
pthread_detach (ti->thr_hnd); /* pthread_join() may be blocking. */
free_thr_info (mio, ti);
}
else
{
mio_tmrjob_t tmrjob;
MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob));
tmrjob.ctx = ti;
mio_gettime (mio, &tmrjob.when);
tmrjob.when.sec++;
tmrjob.handler = check_and_free_thr_info;
tmrjob.idxptr = &ti->cleanup_tmridx;
if (ti->thr_done)
{
printf ("THREAD DONE>...222\n");
pthread_detach (ti->thr_hnd); /* pthread_join() may be blocking. */
free_thr_info (mio, ti);
}
else
{
printf ("THREAD NOT DONE>...222\n");
//ti->cleanup_tmridx = mio_instmrjob(mio, &tmrjob);
pthread_join (ti->thr_hnd, MIO_NULL); /* pthread_join() may be blocking. */
mio_freemem (mio, ti);
//mio_instmrjob (mio,
}
}
rdev->thr_info = MIO_NULL;
if (rdev->on_close) rdev->on_close (rdev, MIO_DEV_THR_MASTER);
return 0;
@ -633,8 +664,6 @@ void mio_dev_thr_halt (mio_dev_thr_t* dev)
int mio_dev_thr_read (mio_dev_thr_t* dev, int enabled)
{
mio_t* mio = dev->mio;
if (dev->slave[MIO_DEV_THR_OUT])
{
return mio_dev_read((mio_dev_t*)dev->slave[MIO_DEV_THR_OUT], enabled);
@ -648,15 +677,13 @@ int mio_dev_thr_read (mio_dev_thr_t* dev, int enabled)
int mio_dev_thr_timedread (mio_dev_thr_t* dev, int enabled, const mio_ntime_t* tmout)
{
mio_t* mio = dev->mio;
if (dev->slave[MIO_DEV_THR_OUT])
{
return mio_dev_timedread((mio_dev_t*)dev->slave[MIO_DEV_THR_OUT], enabled, tmout);
}
else
{
mio_seterrnum (mio, MIO_ENOCAPA); /* TODO: is it the right error number? */
mio_seterrnum (dev->mio, MIO_ENOCAPA); /* TODO: is it the right error number? */
return -1;
}
}