From c441abdbfb6e01eefb271b4a08e9b04223af8d1c Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Fri, 22 May 2020 19:06:57 +0000 Subject: [PATCH] cleaning up mio_dev_thr_t code --- mio/bin/t01.c | 48 ++++++++- mio/lib/http-svr.c | 105 ------------------ mio/lib/mio-thr.h | 32 +++--- mio/lib/thr.c | 263 +++++++++++++++++++++++++-------------------- 4 files changed, 207 insertions(+), 241 deletions(-) diff --git a/mio/bin/t01.c b/mio/bin/t01.c index 4f27411..b390bff 100644 --- a/mio/bin/t01.c +++ b/mio/bin/t01.c @@ -30,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -638,6 +639,36 @@ static void pipe_on_close (mio_dev_pipe_t* dev, mio_dev_pipe_sid_t sid) { MIO_INFO1 (dev->mio, "PIPE[%d] CLOSED \n", (int)sid); } + + +static int thr_on_read (mio_dev_thr_t* dev, const void* data, mio_iolen_t dlen) +{ + MIO_INFO3 (dev->mio, "THR READ FROM THR - %d bytes - [%.*s]\n", (int)dlen, (int)dlen, data); +} + +static int thr_on_write (mio_dev_thr_t* dev, mio_iolen_t wrlen, void* wrctx) +{ + MIO_INFO1 (dev->mio, "THR WRITTEN TO THR - %d bytes\n", (int)wrlen); +} + +static void thr_on_close (mio_dev_thr_t* dev, mio_dev_thr_sid_t sid) +{ + MIO_INFO1 (dev->mio, "THR[%d] CLOSED \n", (int)sid); +} + +static void thr_func (mio_dev_thr_t* dev, mio_dev_thr_iopair_t* iop, void* cx) +{ + mio_bch_t buf[5]; + ssize_t n; + + while ((n = read(iop->rfd, buf, MIO_COUNTOF(buf)))> 0) write (iop->wfd, buf, n); + + while (1) + { + sleep (1); + write (iop->wfd, "THR LOOPING", 11); + } +} /* ========================================================================= */ static void on_dnc_resolve(mio_svc_dnc_t* dnc, mio_dns_msg_t* reqmsg, mio_errnum_t status, const void* data, mio_oow_t dlen) @@ -1213,8 +1244,21 @@ if (!mio_svc_dnc_resolve(dnc, "google.com", MIO_DNS_RRT_SOA, MIO_SVC_DNC_RESOLVE mi.on_read = pipe_on_read; mi.on_write = pipe_on_write; mi.on_close = pipe_on_close; - pp = mio_dev_pipe_make (mio, 0, &mi); - mio_dev_pipe_write (pp, "hello, world", 12, MIO_NULL); + pp = mio_dev_pipe_make(mio, 0, &mi); + mio_dev_pipe_write (pp, "this is good", 12, MIO_NULL); +} + +{ + mio_dev_thr_t* tt; + mio_dev_thr_make_t mi; + mi.thr_func = thr_func; + mi.thr_ctx = MIO_NULL; + mi.on_read = thr_on_read; + mi.on_write = thr_on_write; + mi.on_close = thr_on_close; + tt = mio_dev_thr_make(mio, 0, &mi); + mio_dev_thr_write (tt, "hello, world", 12, MIO_NULL); + mio_dev_thr_write (tt, MIO_NULL, 0, MIO_NULL); } mio_loop (mio); diff --git a/mio/lib/http-svr.c b/mio/lib/http-svr.c index a27f54d..5f43a9c 100644 --- a/mio/lib/http-svr.c +++ b/mio/lib/http-svr.c @@ -1652,111 +1652,6 @@ mio_bch_t* mio_svc_htts_dupmergepaths (mio_svc_htts_t* htts, const mio_bch_t* ba /* ----------------------------------------------------------------- */ #if 0 -typedef void (*mio_svc_htts_rsrc_cgi_t) ( - int rfd, - int wfd -); - -struct mio_svc_htts_rsrc_cgi_peer_t -{ - int rfd; - int wfd; -}; -typedef struct mio_svc_htts_rsrc_cgi_peer_t mio_svc_htts_rsrc_cgi_peer_t; - -enum mio_svc_htts_rsrc_cgi_type_t -{ - MIO_SVC_HTTS_RSRC_CGI_TYPE_FUNC, - MIO_SVC_HTTS_RSRC_CGI_TYPE_PROC -}; -typedef enum mio_svc_htts_rsrc_cgi_type_t mio_svc_htts_rsrc_cgi_type_t; - -struct rsrc_cgi_xtn_t -{ - mio_svc_htts_rsrc_cgi_type_t type; - int rfd; - int wfd; - - mio_svc_htts_rsrc_cgi_t handler; - pthread_t thr; - mio_svc_htts_rsrc_cgi_peer_t peer; -}; -typedef struct rsrc_cgi_xtn_t rsrc_cgi_xtn_t; - -static void rsrc_cgi_on_kill (mio_svc_htts_rsrc_t* rsrc) -{ - rsrc_cgi_xtn_t* cgi = (rsrc_cgi_xtn_t*)mio_svc_htts_rsrc_getxtn(rsrc); - - close (cgi->rfd); cgi->rfd = -1; - close (cgi->wfd); cgi->wfd = -1; - - switch (cgi->type) - { - case MIO_SVC_HTTS_RSRC_CGI_TYPE_FUNC: -/* TODO: check cgi->thr is valid. - * non-blocking way? if alive, kill gracefully?? */ - pthread_join (cgi->thr, MIO_NULL); - break; - - case MIO_SVC_HTTS_RSRC_CGI_TYPE_PROC: - /* TODO: - waitpid with no wait - still alive kill - waitpid with no wait. - */ - break; - } -} - -static void* cgi_thr_func (void* ctx) -{ - rsrc_cgi_xtn_t* func = (rsrc_cgi_xtn_t*)ctx; - func->handler (func->peer.rfd, func->peer.wfd); - close (func->peer.rfd); func->peer.rfd = -1; - close (func->peer.wfd); func->peer.wfd = -1; - return MIO_NULL; -} - -int mio_svc_htts_sendcgi (mio_svc_htts_t* htts, mio_dev_sck_t* csck, mio_svc_htts_rsrc_cgi_t handler, mio_htre_t* req) -{ - mio_svc_htts_rsrc_t* rsrc = MIO_NULL; - rsrc_cgi_xtn_t* cgi = MIO_NULL; - int pfd[2]; - - rsrc = mio_svc_htts_rsrc_make(htts, csck, rsrc_cgi_on_kill, MIO_SIZEOF(*cgi)); - if (MIO_UNLIKELY(!rsrc)) goto oops; - - cgi = mio_svc_htts_rsrc_getxtn(rsrc); - cgi->type = MIO_SVC_HTTS_RSRC_CGI_TYPE_FUNC; - cgi->handler = handler; - cgi->rfd = -1; - cgi->wfd = -1; - cgi->peer.rfd = -1; - cgi->peer.wfd = -1; - - if (pipe(pfd) == -1) goto oops; - cgi->peer.rfd = pfd[0]; - cgi->wfd = pfd[1]; - - if (pipe(pfd) == -1) goto oops; - cgi->rfd = pfd[0]; - cgi->peer.wfd = pfd[1]; - - if (pthread_create(&cgi->thr, MIO_NULL, cgi_thr_func, cgi) != 0) goto oops; - return 0; - -oops: - if (cgi) - { - if (cgi->peer.rfd >= 0) { close (cgi->peer.rfd); cgi->peer.rfd = -1; } - if (cgi->peer.wfd >= 0) { close (cgi->peer.wfd); cgi->peer.wfd = -1; } - if (cgi->rfd >= 0) { close (cgi->rfd); cgi->rfd = -1; } - if (cgi->wfd >= 0) { close (cgi->wfd); cgi->wfd = -1; } - } - if (rsrc) mio_svc_htts_rsrc_kill (rsrc); - return -1; -} - /* ----------------------------------------------------------------- */ int mio_svc_htts_sendrsrc (mio_svc_htts_t* htts, mio_dev_sck_t* csck, mio_svc_htts_rsrc_t* rsrc, int status_code, mio_http_method_t method, const mio_http_version_t* version, int keepalive) diff --git a/mio/lib/mio-thr.h b/mio/lib/mio-thr.h index 9538917..303188f 100644 --- a/mio/lib/mio-thr.h +++ b/mio/lib/mio-thr.h @@ -57,23 +57,30 @@ typedef void (*mio_dev_thr_on_close_t) ( mio_dev_thr_sid_t sid ); -typedef int (*mio_dev_thr_func_t) ( - mio_dev_thr_t* dev, - mio_syshnd_t rfd, - mio_syshnd_t wfd, - void* ctx +struct mio_dev_thr_iopair_t +{ + mio_syshnd_t rfd; + mio_syshnd_t wfd; +}; +typedef struct mio_dev_thr_iopair_t mio_dev_thr_iopair_t; + +typedef void (*mio_dev_thr_func_t) ( + mio_t* mio, + mio_dev_thr_iopair_t* iop, + void* ctx ); +typedef struct mio_dev_thr_info_t mio_dev_thr_info_t; + struct mio_dev_thr_t { MIO_DEV_HEADER; - int flags; - mio_syshnd_t thr_fd[2]; mio_dev_thr_slave_t* slave[2]; int slave_count; - mio_dev_thr_func_t thr_func; + mio_dev_thr_info_t* thr_info; + mio_dev_thr_on_read_t on_read; mio_dev_thr_on_write_t on_write; mio_dev_thr_on_close_t on_close; @@ -87,18 +94,11 @@ struct mio_dev_thr_slave_t mio_dev_thr_t* master; /* parent device */ }; -enum mio_dev_thr_make_flag_t -{ - MIO_DEV_THR_WRITEIN = (1 << 0), - MIO_DEV_THR_READOUT = (1 << 1), -}; -typedef enum mio_dev_thr_make_flag_t mio_dev_thr_make_flag_t; - typedef struct mio_dev_thr_make_t mio_dev_thr_make_t; struct mio_dev_thr_make_t { - int flags; /**< bitwise-ORed of mio_dev_thr_make_flag_t enumerators */ mio_dev_thr_func_t thr_func; + void* thr_ctx; mio_dev_thr_on_write_t on_write; /* mandatory */ mio_dev_thr_on_read_t on_read; /* mandatory */ mio_dev_thr_on_close_t on_close; /* optional */ diff --git a/mio/lib/thr.c b/mio/lib/thr.c index 96d5fdd..125f556 100644 --- a/mio/lib/thr.c +++ b/mio/lib/thr.c @@ -48,23 +48,57 @@ static mio_dev_thr_slave_t* make_slave (mio_t* mio, slave_info_t* si); /* ========================================================================= */ -struct thr_info_t +struct mio_dev_thr_info_t { - mio_dev_thr_t* dev; + mio_t* mio; mio_dev_thr_func_t thr_func; - mio_syshnd_t rfd; - mio_syshnd_t wfd; + mio_dev_thr_iopair_t thr_iop; + void* thr_ctx; + pthread_t thr_hnd; + int thr_done; + mio_tmridx_t cleanup_tmridx; }; -typedef struct thr_info_t thr_info_t; +typedef struct mio_dev_thr_info_t mio_dev_thr_info_t; + + +static void free_thr_info (mio_t* mio, mio_dev_thr_info_t* ti) +{ + if (ti->thr_iop.rfd != MIO_SYSHND_INVALID) + { + close (ti->thr_iop.rfd); + ti->thr_iop.rfd = MIO_SYSHND_INVALID; + } + if (ti->thr_iop.wfd != MIO_SYSHND_INVALID) + { + close (ti->thr_iop.wfd); + ti->thr_iop.wfd = MIO_SYSHND_INVALID; + } + + mio_freemem (mio, ti); +} + +static void mark_thr_done (void* ctx) +{ + mio_dev_thr_info_t* ti = (mio_dev_thr_info_t*)ctx; + printf ("QQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQ\n"); + ti->thr_done = 1; +} static void* run_thr_func (void* ctx) { - thr_info_t* ti = (thr_info_t*)ctx; - //ti->thr_func (ti->dev, ti->rfd, ti->wfd, ); - close (ti->rfd); - close (ti->wfd); - mio_freemem (ti->dev->mio, ti); + mio_dev_thr_info_t* ti = (mio_dev_thr_info_t*)ctx; + + /* i assume the thread is cancellable, and of the deferred cancellation type by default */ + /*int dummy; + pthread_setcancelstate (PTHREAD_CANCEL_ENABLE, &dummy); + pthread_setcanceltype (PTHREAD_CANCEL_DEFERRED, &dummy);*/ + + pthread_cleanup_push (mark_thr_done, ti); + + ti->thr_func (ti->mio, &ti->thr_iop, ti->thr_ctx); + + pthread_cleanup_pop (1); pthread_exit (MIO_NULL); return MIO_NULL; } @@ -75,129 +109,80 @@ static int dev_thr_make_master (mio_dev_t* dev, void* ctx) mio_dev_thr_t* rdev = (mio_dev_thr_t*)dev; mio_dev_thr_make_t* info = (mio_dev_thr_make_t*)ctx; mio_syshnd_t pfds[4] = { MIO_SYSHND_INVALID, MIO_SYSHND_INVALID, MIO_SYSHND_INVALID, MIO_SYSHND_INVALID }; - int i, minidx = -1, maxidx = -1; - pid_t pid; + slave_info_t si; + int i; - if (info->flags & MIO_DEV_THR_WRITEIN) + if (pipe(&pfds[0]) == -1 || pipe(&pfds[2]) == -1) { - if (pipe(&pfds[0]) == -1) - { - mio_seterrwithsyserr (mio, 0, errno); - goto oops; - } - minidx = 0; maxidx = 1; - } - - if (info->flags & MIO_DEV_THR_READOUT) - { - if (pipe(&pfds[2]) == -1) - { - mio_seterrwithsyserr (mio, 0, errno); - goto oops; - } - if (minidx == -1) minidx = 2; - maxidx = 3; - } - - if (maxidx == -1) - { - mio_seterrnum (mio, MIO_EINVAL); + mio_seterrwithsyserr (mio, 0, errno); goto oops; } + + if (mio_makesyshndasync(mio, pfds[1]) <= -1 || + mio_makesyshndasync(mio, pfds[2]) <= -1) goto oops; - if (info->flags & MIO_DEV_THR_WRITEIN) - { - /* - * 0123 - * rw-- - * X - * WRITE => 1 - */ - if (mio_makesyshndasync(mio, pfds[1]) <= -1) goto oops; - } + si.mi = info; + si.pfd = pfds[1]; + si.dev_cap = MIO_DEV_CAP_OUT | MIO_DEV_CAP_STREAM; + si.id = MIO_DEV_THR_IN; - if (info->flags & MIO_DEV_THR_READOUT) - { - /* - * 0123 - * --rw - * X - * READ => 2 - */ - if (mio_makesyshndasync(mio, pfds[2]) <= -1) goto oops; - } + rdev->slave[MIO_DEV_THR_IN] = make_slave(mio, &si); + if (!rdev->slave[MIO_DEV_THR_IN]) goto oops; - if (pfds[1] != MIO_SYSHND_INVALID) - { - slave_info_t si; + pfds[1] = MIO_SYSHND_INVALID; + rdev->slave_count++; - si.mi = info; - si.pfd = pfds[1]; - si.dev_cap = MIO_DEV_CAP_OUT | MIO_DEV_CAP_STREAM; - si.id = MIO_DEV_THR_IN; + si.mi = info; + si.pfd = pfds[2]; + si.dev_cap = MIO_DEV_CAP_IN | MIO_DEV_CAP_STREAM; + si.id = MIO_DEV_THR_OUT; - rdev->slave[MIO_DEV_THR_IN] = make_slave(mio, &si); - if (!rdev->slave[MIO_DEV_THR_IN]) goto oops; + rdev->slave[MIO_DEV_THR_OUT] = make_slave(mio, &si); + if (!rdev->slave[MIO_DEV_THR_OUT]) goto oops; - pfds[1] = MIO_SYSHND_INVALID; - rdev->slave_count++; - } - - if (pfds[2] != MIO_SYSHND_INVALID) - { - slave_info_t si; - - si.mi = info; - si.pfd = pfds[2]; - si.dev_cap = MIO_DEV_CAP_IN | MIO_DEV_CAP_STREAM; - si.id = MIO_DEV_THR_OUT; - - rdev->slave[MIO_DEV_THR_OUT] = make_slave(mio, &si); - if (!rdev->slave[MIO_DEV_THR_OUT]) goto oops; - - pfds[2] = MIO_SYSHND_INVALID; - rdev->slave_count++; - } + pfds[2] = MIO_SYSHND_INVALID; + rdev->slave_count++; for (i = 0; i < MIO_COUNTOF(rdev->slave); i++) { if (rdev->slave[i]) rdev->slave[i]->master = rdev; } - /* CREATE THREAD. DATA. CREATE THREAD .. */ -{ -pthread_t thr; -pthread_attr_t attr; -int n; - - thr_info_t* ti; - - ti = mio_callocmem(mio, MIO_SIZEOF(*ti)); - if (MIO_UNLIKELY(!ti)) goto oops; - - pthread_attr_init (&attr); - pthread_attr_setdetachstate (&attr, PTHREAD_CREATE_DETACHED); - n = pthread_create(&thr, &attr, info->thr_func, ti); - pthread_attr_destroy (&attr); - if (n != 0) - { - mio_freemem (mio, ti); - goto oops; - } -} - rdev->dev_cap = MIO_DEV_CAP_VIRTUAL; /* the master device doesn't perform I/O */ - rdev->flags = info->flags; - rdev->thr_func = info->thr_func; rdev->on_read = info->on_read; rdev->on_write = info->on_write; rdev->on_close = info->on_close; - rdev->thr_fd[0] = pfds[0]; - rdev->thr_fd[1] = pfds[3]; + + /* ---------------------------------------------------------- */ + { + int n; + mio_dev_thr_info_t* ti; + + ti = mio_callocmem(mio, MIO_SIZEOF(*ti)); + if (MIO_UNLIKELY(!ti)) goto oops; + + ti->mio = mio; + ti->thr_iop.rfd = pfds[0]; + ti->thr_iop.wfd = pfds[3]; + ti->thr_func = info->thr_func; + ti->thr_ctx = info->thr_ctx; + + rdev->thr_info = ti; + n = pthread_create(&ti->thr_hnd, MIO_NULL, run_thr_func, ti); + if (n != 0) + { + rdev->thr_info = MIO_NULL; + mio_freemem (mio, ti); + goto oops; + } + } + /* ---------------------------------------------------------- */ + + return 0; oops: - for (i = minidx; i < maxidx; i++) + for (i = 0; i < MIO_COUNTOF(pfds); i++) { if (pfds[i] != MIO_SYSHND_INVALID) close (pfds[i]); } @@ -230,12 +215,27 @@ static int dev_thr_make_slave (mio_dev_t* dev, void* ctx) return 0; } +static void check_and_free_thr_info (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job) +{ + mio_dev_thr_info_t* ti = (mio_dev_thr_info_t*)job->ctx; + if (ti->thr_done) + { + free_thr_info (mio, ti); + } + else + { + } +} + static int dev_thr_kill_master (mio_dev_t* dev, int force) { mio_t* mio = dev->mio; mio_dev_thr_t* rdev = (mio_dev_thr_t*)dev; - int i, status; - pid_t wpid; + mio_dev_thr_info_t* ti; + int i; + + ti = rdev->thr_info; + pthread_cancel (ti->thr_hnd); if (rdev->slave_count > 0) { @@ -256,7 +256,38 @@ static int dev_thr_kill_master (mio_dev_t* dev, int force) } } -/* TODO: may have to kill thread... */ + if (ti->thr_done) + { +printf ("THREAD DONE>...111\n"); + pthread_detach (ti->thr_hnd); /* pthread_join() may be blocking. */ + free_thr_info (mio, ti); + } + else + { + mio_tmrjob_t tmrjob; + MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob)); + tmrjob.ctx = ti; + mio_gettime (mio, &tmrjob.when); + tmrjob.when.sec++; + tmrjob.handler = check_and_free_thr_info; + tmrjob.idxptr = &ti->cleanup_tmridx; + + if (ti->thr_done) + { +printf ("THREAD DONE>...222\n"); + pthread_detach (ti->thr_hnd); /* pthread_join() may be blocking. */ + free_thr_info (mio, ti); + } + else + { +printf ("THREAD NOT DONE>...222\n"); + //ti->cleanup_tmridx = mio_instmrjob(mio, &tmrjob); + pthread_join (ti->thr_hnd, MIO_NULL); /* pthread_join() may be blocking. */ + mio_freemem (mio, ti); + //mio_instmrjob (mio, + } + } + rdev->thr_info = MIO_NULL; if (rdev->on_close) rdev->on_close (rdev, MIO_DEV_THR_MASTER); return 0; @@ -633,8 +664,6 @@ void mio_dev_thr_halt (mio_dev_thr_t* dev) int mio_dev_thr_read (mio_dev_thr_t* dev, int enabled) { - mio_t* mio = dev->mio; - if (dev->slave[MIO_DEV_THR_OUT]) { return mio_dev_read((mio_dev_t*)dev->slave[MIO_DEV_THR_OUT], enabled); @@ -648,15 +677,13 @@ int mio_dev_thr_read (mio_dev_thr_t* dev, int enabled) int mio_dev_thr_timedread (mio_dev_thr_t* dev, int enabled, const mio_ntime_t* tmout) { - mio_t* mio = dev->mio; - if (dev->slave[MIO_DEV_THR_OUT]) { return mio_dev_timedread((mio_dev_t*)dev->slave[MIO_DEV_THR_OUT], enabled, tmout); } else { - mio_seterrnum (mio, MIO_ENOCAPA); /* TODO: is it the right error number? */ + mio_seterrnum (dev->mio, MIO_ENOCAPA); /* TODO: is it the right error number? */ return -1; } }