diff --git a/mio/bin/t01.c b/mio/bin/t01.c index b390bff..262e127 100644 --- a/mio/bin/t01.c +++ b/mio/bin/t01.c @@ -629,10 +629,12 @@ static int setup_ping4_tester (mio_t* mio) static int pipe_on_read (mio_dev_pipe_t* dev, const void* data, mio_iolen_t dlen) { MIO_INFO3 (dev->mio, "PIPE READ %d bytes - [%.*s]\n", (int)dlen, (int)dlen, data); + return 0; } static int pipe_on_write (mio_dev_pipe_t* dev, mio_iolen_t wrlen, void* wrctx) { MIO_INFO1 (dev->mio, "PIPE WRITTEN %d bytes\n", (int)wrlen); + return 0; } static void pipe_on_close (mio_dev_pipe_t* dev, mio_dev_pipe_sid_t sid) @@ -644,30 +646,48 @@ static void pipe_on_close (mio_dev_pipe_t* dev, mio_dev_pipe_sid_t sid) static int thr_on_read (mio_dev_thr_t* dev, const void* data, mio_iolen_t dlen) { MIO_INFO3 (dev->mio, "THR READ FROM THR - %d bytes - [%.*s]\n", (int)dlen, (int)dlen, data); + //if (dlen == 0) mio_dev_halt(dev); /* EOF on the input. treat this as end of whole thread transaction */ + return 0; } static int thr_on_write (mio_dev_thr_t* dev, mio_iolen_t wrlen, void* wrctx) { MIO_INFO1 (dev->mio, "THR WRITTEN TO THR - %d bytes\n", (int)wrlen); + return 0; } static void thr_on_close (mio_dev_thr_t* dev, mio_dev_thr_sid_t sid) { + if (sid == MIO_DEV_THR_OUT) mio_dev_thr_haltslave (dev, MIO_DEV_THR_IN); MIO_INFO1 (dev->mio, "THR[%d] CLOSED \n", (int)sid); } -static void thr_func (mio_dev_thr_t* dev, mio_dev_thr_iopair_t* iop, void* cx) +static void thr_func (mio_t* mio, mio_dev_thr_iopair_t* iop, void* cx) { mio_bch_t buf[5]; ssize_t n; +static int x = 0; +int y; +int z = 0; + + //y = ++x; + y = __atomic_add_fetch (&x, 1, __ATOMIC_RELAXED); + while ((n = read(iop->rfd, buf, MIO_COUNTOF(buf)))> 0) write (iop->wfd, buf, n); while (1) { sleep (1); write (iop->wfd, "THR LOOPING", 11); + z++; + if ((y % 2) && (z >5)) + { + write (iop->wfd, 0, MIO_NULL); + break; + } } + } /* ========================================================================= */ @@ -1248,6 +1268,7 @@ if (!mio_svc_dnc_resolve(dnc, "google.com", MIO_DNS_RRT_SOA, MIO_SVC_DNC_RESOLVE mio_dev_pipe_write (pp, "this is good", 12, MIO_NULL); } +for (i = 0; i < 20; i++) { mio_dev_thr_t* tt; mio_dev_thr_make_t mi; diff --git a/mio/lib/err.c b/mio/lib/err.c index 4df7cbc..5587f51 100644 --- a/mio/lib/err.c +++ b/mio/lib/err.c @@ -107,7 +107,7 @@ const mio_ooch_t* mio_backuperrmsg (mio_t* mio) void mio_seterrnum (mio_t* mio, mio_errnum_t errnum) { - if (mio->shuterr) return; + if (mio->_shuterr) return; mio->errnum = errnum; mio->errmsg.len = 0; } @@ -161,7 +161,7 @@ void mio_seterrbfmt (mio_t* mio, mio_errnum_t errnum, const mio_bch_t* fmt, ...) va_list ap; mio_fmtout_t fo; - if (mio->shuterr) return; + if (mio->_shuterr) return; mio->errmsg.len = 0; MIO_MEMSET (&fo, 0, MIO_SIZEOF(fo)); @@ -181,7 +181,7 @@ void mio_seterrufmt (mio_t* mio, mio_errnum_t errnum, const mio_uch_t* fmt, ...) va_list ap; mio_fmtout_t fo; - if (mio->shuterr) return; + if (mio->_shuterr) return; mio->errmsg.len = 0; MIO_MEMSET (&fo, 0, MIO_SIZEOF(fo)); @@ -201,7 +201,7 @@ void mio_seterrbfmtv (mio_t* mio, mio_errnum_t errnum, const mio_bch_t* fmt, va_ { mio_fmtout_t fo; - if (mio->shuterr) return; + if (mio->_shuterr) return; mio->errmsg.len = 0; @@ -218,7 +218,7 @@ void mio_seterrufmtv (mio_t* mio, mio_errnum_t errnum, const mio_uch_t* fmt, va_ { mio_fmtout_t fo; - if (mio->shuterr) return; + if (mio->_shuterr) return; mio->errmsg.len = 0; @@ -237,7 +237,7 @@ void mio_seterrwithsyserr (mio_t* mio, int syserr_type, int syserr_code) { mio_errnum_t errnum; - if (mio->shuterr) return; + if (mio->_shuterr) return; /*if (mio->vmprim.syserrstrb) {*/ @@ -259,7 +259,7 @@ void mio_seterrbfmtwithsyserr (mio_t* mio, int syserr_type, int syserr_code, con mio_oow_t ucslen, bcslen; va_list ap; - if (mio->shuterr) return; + if (mio->_shuterr) return; /* if (mio->vmprim.syserrstrb) @@ -317,7 +317,7 @@ void mio_seterrufmtwithsyserr (mio_t* mio, int syserr_type, int syserr_code, con mio_oow_t ucslen, bcslen; va_list ap; - if (mio->shuterr) return; + if (mio->_shuterr) return; /*if (mio->vmprim.syserrstrb) {*/ diff --git a/mio/lib/mio-prv.h b/mio/lib/mio-prv.h index d452742..19983e0 100644 --- a/mio/lib/mio-prv.h +++ b/mio/lib/mio-prv.h @@ -99,10 +99,10 @@ /* i don't want an error raised inside the callback to override * the existing error number and message. */ #define prim_write_log(mio,mask,ptr,len) do { \ - int shuterr = (mio)->shuterr; \ - (mio)->shuterr = 1; \ + int __shuterr = (mio)->_shuterr; \ + (mio)->_shuterr = 1; \ mio_sys_writelog (mio, mask, ptr, len); \ - (mio)->shuterr = shuterr; \ + (mio)->_shuterr = __shuterr; \ } while(0) #ifdef __cplusplus diff --git a/mio/lib/mio-thr.h b/mio/lib/mio-thr.h index 303188f..3321e75 100644 --- a/mio/lib/mio-thr.h +++ b/mio/lib/mio-thr.h @@ -172,8 +172,9 @@ MIO_EXPORT int mio_dev_thr_close ( mio_dev_thr_sid_t sid ); -MIO_EXPORT int mio_dev_thr_killthr ( - mio_dev_thr_t* thr +void mio_dev_thr_haltslave ( + mio_dev_thr_t* dev, + mio_dev_thr_sid_t sid ); #ifdef __cplusplus diff --git a/mio/lib/mio.c b/mio/lib/mio.c index e2cd919..d95628d 100644 --- a/mio/lib/mio.c +++ b/mio/lib/mio.c @@ -30,6 +30,7 @@ #define DEV_CAP_ALL_WATCHED (MIO_DEV_CAP_IN_WATCHED | MIO_DEV_CAP_OUT_WATCHED | MIO_DEV_CAP_PRI_WATCHED) +static void clear_unneeded_cfmbs (mio_t* mio); static int schedule_kill_zombie_job (mio_dev_t* dev); static int kill_and_free_device (mio_dev_t* dev, int force); @@ -127,6 +128,7 @@ int mio_init (mio_t* mio, mio_mmgr_t* mmgr, mio_cmgr_t* cmgr, mio_oow_t tmrcapa) mio->tmr.capa = tmrcapa; + MIO_CFMBL_INIT (&mio->cfmb); MIO_DEVL_INIT (&mio->actdev); MIO_DEVL_INIT (&mio->hltdev); MIO_DEVL_INIT (&mio->zmbdev); @@ -152,6 +154,8 @@ void mio_fini (mio_t* mio) mio_dev_t diehard; mio_oow_t i; + mio->_fini_in_progress = 1; + /* clean up free cwq list */ for (i = 0; i < MIO_COUNTOF(mio->cwqfl); i++) { @@ -230,6 +234,10 @@ void mio_fini (mio_t* mio) mio_cleartmrjobs (mio); mio_freemem (mio, mio->tmr.jobs); +printf ("BEGINNING CFMB CLEARANCE........\n"); + /* clear unneeded cfmbs insistently - a misbehaving checker will make this cleaning step loop forever*/ + while (!MIO_CFMBL_IS_EMPTY(&mio->cfmb)) clear_unneeded_cfmbs (mio); + mio_sys_fini (mio); /* finalize the system dependent data */ mio_freemem (mio, mio->log.ptr); @@ -674,10 +682,30 @@ skip_evcb: } } +static void clear_unneeded_cfmbs (mio_t* mio) +{ + mio_cfmb_t* cur, * next; + + cur = MIO_CFMBL_FIRST_CFMB(&mio->cfmb); + while (!MIO_CFMBL_IS_NIL_CFMB(&mio->cfmb, cur)) + { + next = MIO_CFMBL_NEXT_CFMB(cur); + if (cur->cfmb_checker(mio, cur)) + { + MIO_CFMBL_UNLINK_CFMB (cur); + mio_freemem (mio, cur); + } + cur = next; + } +} + int mio_exec (mio_t* mio) { int ret = 0; + /* clear unneeded cfmbs - i hate to do this. TODO: should i do this less frequently? if less frequent, would it accumulate too many blocks? */ + if (!MIO_CFMBL_IS_EMPTY(&mio->cfmb)) clear_unneeded_cfmbs (mio); + /* execute callbacks for completed write operations */ fire_cwq_handlers (mio); @@ -1656,6 +1684,13 @@ void mio_freemem (mio_t* mio, void* ptr) { MIO_MMGR_FREE (mio->_mmgr, ptr); } +/* ------------------------------------------------------------------------ */ + +void mio_addcfmb (mio_t* mio, mio_cfmb_t* cfmb, mio_cfmb_checker_t checker) +{ + cfmb->cfmb_checker = checker; + MIO_CFMBL_APPEND_CFMB (&mio->cfmb, cfmb); +} /* ------------------------------------------------------------------------ */ diff --git a/mio/lib/mio.h b/mio/lib/mio.h index 830edf0..027dcda 100644 --- a/mio/lib/mio.h +++ b/mio/lib/mio.h @@ -417,6 +417,55 @@ typedef enum mio_dev_event_t mio_dev_event_t; #define MIO_CWQFL_ALIGN 16 +/* ========================================================================= + * CHECK-AND-FREE MEMORY BLOCK + * ========================================================================= */ + +#define MIO_CFMB_HEADER \ + mio_t* mio; \ + mio_cfmb_t* cfmb_next; \ + mio_cfmb_t* cfmb_prev; \ + mio_cfmb_checker_t cfmb_checker + +typedef struct mio_cfmb_t mio_cfmb_t; + +typedef int (*mio_cfmb_checker_t) ( + mio_t* mio, + mio_cfmb_t* cfmb +); + +struct mio_cfmb_t +{ + MIO_CFMB_HEADER; +}; + +#define MIO_CFMBL_PREPEND_CFMB(lh,cfmb) do { \ + (cfmb)->cfmb_prev = (lh); \ + (cfmb)->cfmb_next = (lh)->cfmb_next; \ + (cfmb)->cfmb_next->cfmb_prev = (cfmb); \ + (lh)->cfmb_next = (cfmb); \ +} while(0) + +#define MIO_CFMBL_APPEND_CFMB(lh,cfmb) do { \ + (cfmb)->cfmb_next = (lh); \ + (cfmb)->cfmb_prev = (lh)->cfmb_prev; \ + (cfmb)->cfmb_prev->cfmb_next = (cfmb); \ + (lh)->cfmb_prev = (cfmb); \ +} while(0) + +#define MIO_CFMBL_UNLINK_CFMB(cfmb) do { \ + (cfmb)->cfmb_prev->cfmb_next = (cfmb)->cfmb_next; \ + (cfmb)->cfmb_next->cfmb_prev = (cfmb)->cfmb_prev; \ +} while (0) + +#define MIO_CFMBL_INIT(lh) ((lh)->cfmb_next = (lh)->cfmb_prev = lh) +#define MIO_CFMBL_FIRST_CFMB(lh) ((lh)->cfmb_next) +#define MIO_CFMBL_LAST_CFMB(lh) ((lh)->cfmb_prev) +#define MIO_CFMBL_IS_EMPTY(lh) (MIO_CFMBL_FIRST_CFMB(lh) == (lh)) +#define MIO_CFMBL_IS_NIL_CFMB(lh,cfmb) ((cfmb) == (lh)) + +#define MIO_CFMBL_PREV_CFMB(cfmb) ((cfmb)->cfmb_prev) +#define MIO_CFMBL_NEXT_CFMB(cfmb) ((cfmb)->cfmb_next) /* ========================================================================= * SERVICE * ========================================================================= */ @@ -465,6 +514,8 @@ struct mio_svc_t #define MIO_SVCL_IS_EMPTY(lh) (MIO_SVCL_FIRST_SVC(lh) == (lh)) #define MIO_SVCL_IS_NIL_SVC(lh,svc) ((svc) == (lh)) +#define MIO_SVCL_PREV_SVC(svc) ((svc)->svc_prev) +#define MIO_SVCL_NEXT_SVC(svc) ((svc)->svc_next) /* ========================================================================= * MIO LOGGING * ========================================================================= */ @@ -568,7 +619,8 @@ struct mio_t mio_oow_t len; } errmsg; - int shuterr; + unsigned short int _shuterr; + unsigned short int _fini_in_progress; struct { @@ -598,6 +650,7 @@ struct mio_t mio_stopreq_t stopreq; /* stop request to abort mio_loop() */ + mio_cfmb_t cfmb; /* list head of cfmbs */ mio_dev_t actdev; /* list head of active devices */ mio_dev_t hltdev; /* list head of halted devices */ mio_dev_t zmbdev; /* list head of zombie devices */ diff --git a/mio/lib/thr.c b/mio/lib/thr.c index 125f556..0facd23 100644 --- a/mio/lib/thr.c +++ b/mio/lib/thr.c @@ -50,19 +50,19 @@ static mio_dev_thr_slave_t* make_slave (mio_t* mio, slave_info_t* si); struct mio_dev_thr_info_t { - mio_t* mio; + MIO_CFMB_HEADER; + mio_dev_thr_func_t thr_func; mio_dev_thr_iopair_t thr_iop; void* thr_ctx; pthread_t thr_hnd; int thr_done; - mio_tmridx_t cleanup_tmridx; }; typedef struct mio_dev_thr_info_t mio_dev_thr_info_t; -static void free_thr_info (mio_t* mio, mio_dev_thr_info_t* ti) +static void free_thr_info_resources (mio_t* mio, mio_dev_thr_info_t* ti) { if (ti->thr_iop.rfd != MIO_SYSHND_INVALID) { @@ -74,14 +74,37 @@ static void free_thr_info (mio_t* mio, mio_dev_thr_info_t* ti) close (ti->thr_iop.wfd); ti->thr_iop.wfd = MIO_SYSHND_INVALID; } +} - mio_freemem (mio, ti); +static int ready_to_free_thr_info (mio_t* mio, mio_cfmb_t* cfmb) +{ + mio_dev_thr_info_t* ti = (mio_dev_thr_info_t*)cfmb; + +#if 1 + if (MIO_UNLIKELY(mio->_fini_in_progress)) + { + pthread_join (ti->thr_hnd, MIO_NULL); /* BAD. blocking call in a non-blocking library. not useful to call pthread_tryjoin_np() here. */ + free_thr_info_resources (mio, ti); + return 1; /* free me */ + } +#endif + + if (ti->thr_done) + { + free_thr_info_resources (mio, ti); +#if defined(HAVE_PTHREAD_TRYJOIN_NP) + if (pthread_tryjoin_np(ti->thr_hnd) != 0) /* not terminated yet - however, this isn't necessary. z*/ +#endif + pthread_detach (ti->thr_hnd); /* just detach it */ + return 1; /* free me */ + } + + return 0; /* not freeed */ } static void mark_thr_done (void* ctx) { mio_dev_thr_info_t* ti = (mio_dev_thr_info_t*)ctx; - printf ("QQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQ\n"); ti->thr_done = 1; } @@ -98,6 +121,10 @@ static void* run_thr_func (void* ctx) ti->thr_func (ti->mio, &ti->thr_iop, ti->thr_ctx); + /* This part may get partially executed or not executed if the thread is cancelled */ + free_thr_info_resources (ti->mio, ti); /* TODO: check if the close() call inside this call completes when it becomes a cancellation point. if so, the code must get changed */ + /* ---------------------------------------------------------- */ + pthread_cleanup_pop (1); pthread_exit (MIO_NULL); return MIO_NULL; @@ -117,7 +144,7 @@ static int dev_thr_make_master (mio_dev_t* dev, void* ctx) mio_seterrwithsyserr (mio, 0, errno); goto oops; } - + if (mio_makesyshndasync(mio, pfds[1]) <= -1 || mio_makesyshndasync(mio, pfds[2]) <= -1) goto oops; @@ -215,18 +242,6 @@ static int dev_thr_make_slave (mio_dev_t* dev, void* ctx) return 0; } -static void check_and_free_thr_info (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job) -{ - mio_dev_thr_info_t* ti = (mio_dev_thr_info_t*)job->ctx; - if (ti->thr_done) - { - free_thr_info (mio, ti); - } - else - { - } -} - static int dev_thr_kill_master (mio_dev_t* dev, int force) { mio_t* mio = dev->mio; @@ -260,32 +275,13 @@ static int dev_thr_kill_master (mio_dev_t* dev, int force) { printf ("THREAD DONE>...111\n"); pthread_detach (ti->thr_hnd); /* pthread_join() may be blocking. */ - free_thr_info (mio, ti); + free_thr_info_resources (mio, ti); + mio_freemem (mio, ti); } else { - mio_tmrjob_t tmrjob; - MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob)); - tmrjob.ctx = ti; - mio_gettime (mio, &tmrjob.when); - tmrjob.when.sec++; - tmrjob.handler = check_and_free_thr_info; - tmrjob.idxptr = &ti->cleanup_tmridx; - - if (ti->thr_done) - { -printf ("THREAD DONE>...222\n"); - pthread_detach (ti->thr_hnd); /* pthread_join() may be blocking. */ - free_thr_info (mio, ti); - } - else - { -printf ("THREAD NOT DONE>...222\n"); - //ti->cleanup_tmridx = mio_instmrjob(mio, &tmrjob); - pthread_join (ti->thr_hnd, MIO_NULL); /* pthread_join() may be blocking. */ - mio_freemem (mio, ti); - //mio_instmrjob (mio, - } +printf ("THREAD NOT DONE>...111\n"); + mio_addcfmb (mio, ti, ready_to_free_thr_info); } rdev->thr_info = MIO_NULL; @@ -719,12 +715,11 @@ int mio_dev_thr_close (mio_dev_thr_t* dev, mio_dev_thr_sid_t sid) return mio_dev_ioctl((mio_dev_t*)dev, MIO_DEV_THR_CLOSE, &sid); } -#if 0 -int mio_dev_thr_killchild (mio_dev_thr_t* dev) +void mio_dev_thr_haltslave (mio_dev_thr_t* dev, mio_dev_thr_sid_t sid) { - return mio_dev_ioctl((mio_dev_t*)dev, MIO_DEV_THR_KILL_CHILD, MIO_NULL); + if (sid >= 0 && sid < MIO_COUNTOF(dev->slave) && dev->slave[sid]) + mio_dev_halt((mio_dev_t*)dev->slave[sid]); } -#endif #if 0 mio_dev_thr_t* mio_dev_thr_getdev (mio_dev_thr_t* thr, mio_dev_thr_sid_t sid)