added mio_cfmb_t, mio_addcfmb(), and related functions and macros to handle a memory block that can't be freed immediately

This commit is contained in:
hyung-hwan 2020-05-23 06:07:43 +00:00
parent c441abdbfb
commit 0623260ec4
7 changed files with 165 additions and 60 deletions

View File

@ -629,10 +629,12 @@ static int setup_ping4_tester (mio_t* mio)
static int pipe_on_read (mio_dev_pipe_t* dev, const void* data, mio_iolen_t dlen)
{
MIO_INFO3 (dev->mio, "PIPE READ %d bytes - [%.*s]\n", (int)dlen, (int)dlen, data);
return 0;
}
static int pipe_on_write (mio_dev_pipe_t* dev, mio_iolen_t wrlen, void* wrctx)
{
MIO_INFO1 (dev->mio, "PIPE WRITTEN %d bytes\n", (int)wrlen);
return 0;
}
static void pipe_on_close (mio_dev_pipe_t* dev, mio_dev_pipe_sid_t sid)
@ -644,30 +646,48 @@ static void pipe_on_close (mio_dev_pipe_t* dev, mio_dev_pipe_sid_t sid)
static int thr_on_read (mio_dev_thr_t* dev, const void* data, mio_iolen_t dlen)
{
MIO_INFO3 (dev->mio, "THR READ FROM THR - %d bytes - [%.*s]\n", (int)dlen, (int)dlen, data);
//if (dlen == 0) mio_dev_halt(dev); /* EOF on the input. treat this as end of whole thread transaction */
return 0;
}
static int thr_on_write (mio_dev_thr_t* dev, mio_iolen_t wrlen, void* wrctx)
{
MIO_INFO1 (dev->mio, "THR WRITTEN TO THR - %d bytes\n", (int)wrlen);
return 0;
}
static void thr_on_close (mio_dev_thr_t* dev, mio_dev_thr_sid_t sid)
{
if (sid == MIO_DEV_THR_OUT) mio_dev_thr_haltslave (dev, MIO_DEV_THR_IN);
MIO_INFO1 (dev->mio, "THR[%d] CLOSED \n", (int)sid);
}
static void thr_func (mio_dev_thr_t* dev, mio_dev_thr_iopair_t* iop, void* cx)
static void thr_func (mio_t* mio, mio_dev_thr_iopair_t* iop, void* cx)
{
mio_bch_t buf[5];
ssize_t n;
static int x = 0;
int y;
int z = 0;
//y = ++x;
y = __atomic_add_fetch (&x, 1, __ATOMIC_RELAXED);
while ((n = read(iop->rfd, buf, MIO_COUNTOF(buf)))> 0) write (iop->wfd, buf, n);
while (1)
{
sleep (1);
write (iop->wfd, "THR LOOPING", 11);
z++;
if ((y % 2) && (z >5))
{
write (iop->wfd, 0, MIO_NULL);
break;
}
}
}
/* ========================================================================= */
@ -1248,6 +1268,7 @@ if (!mio_svc_dnc_resolve(dnc, "google.com", MIO_DNS_RRT_SOA, MIO_SVC_DNC_RESOLVE
mio_dev_pipe_write (pp, "this is good", 12, MIO_NULL);
}
for (i = 0; i < 20; i++)
{
mio_dev_thr_t* tt;
mio_dev_thr_make_t mi;

View File

@ -107,7 +107,7 @@ const mio_ooch_t* mio_backuperrmsg (mio_t* mio)
void mio_seterrnum (mio_t* mio, mio_errnum_t errnum)
{
if (mio->shuterr) return;
if (mio->_shuterr) return;
mio->errnum = errnum;
mio->errmsg.len = 0;
}
@ -161,7 +161,7 @@ void mio_seterrbfmt (mio_t* mio, mio_errnum_t errnum, const mio_bch_t* fmt, ...)
va_list ap;
mio_fmtout_t fo;
if (mio->shuterr) return;
if (mio->_shuterr) return;
mio->errmsg.len = 0;
MIO_MEMSET (&fo, 0, MIO_SIZEOF(fo));
@ -181,7 +181,7 @@ void mio_seterrufmt (mio_t* mio, mio_errnum_t errnum, const mio_uch_t* fmt, ...)
va_list ap;
mio_fmtout_t fo;
if (mio->shuterr) return;
if (mio->_shuterr) return;
mio->errmsg.len = 0;
MIO_MEMSET (&fo, 0, MIO_SIZEOF(fo));
@ -201,7 +201,7 @@ void mio_seterrbfmtv (mio_t* mio, mio_errnum_t errnum, const mio_bch_t* fmt, va_
{
mio_fmtout_t fo;
if (mio->shuterr) return;
if (mio->_shuterr) return;
mio->errmsg.len = 0;
@ -218,7 +218,7 @@ void mio_seterrufmtv (mio_t* mio, mio_errnum_t errnum, const mio_uch_t* fmt, va_
{
mio_fmtout_t fo;
if (mio->shuterr) return;
if (mio->_shuterr) return;
mio->errmsg.len = 0;
@ -237,7 +237,7 @@ void mio_seterrwithsyserr (mio_t* mio, int syserr_type, int syserr_code)
{
mio_errnum_t errnum;
if (mio->shuterr) return;
if (mio->_shuterr) return;
/*if (mio->vmprim.syserrstrb)
{*/
@ -259,7 +259,7 @@ void mio_seterrbfmtwithsyserr (mio_t* mio, int syserr_type, int syserr_code, con
mio_oow_t ucslen, bcslen;
va_list ap;
if (mio->shuterr) return;
if (mio->_shuterr) return;
/*
if (mio->vmprim.syserrstrb)
@ -317,7 +317,7 @@ void mio_seterrufmtwithsyserr (mio_t* mio, int syserr_type, int syserr_code, con
mio_oow_t ucslen, bcslen;
va_list ap;
if (mio->shuterr) return;
if (mio->_shuterr) return;
/*if (mio->vmprim.syserrstrb)
{*/

View File

@ -99,10 +99,10 @@
/* i don't want an error raised inside the callback to override
* the existing error number and message. */
#define prim_write_log(mio,mask,ptr,len) do { \
int shuterr = (mio)->shuterr; \
(mio)->shuterr = 1; \
int __shuterr = (mio)->_shuterr; \
(mio)->_shuterr = 1; \
mio_sys_writelog (mio, mask, ptr, len); \
(mio)->shuterr = shuterr; \
(mio)->_shuterr = __shuterr; \
} while(0)
#ifdef __cplusplus

View File

@ -172,8 +172,9 @@ MIO_EXPORT int mio_dev_thr_close (
mio_dev_thr_sid_t sid
);
MIO_EXPORT int mio_dev_thr_killthr (
mio_dev_thr_t* thr
void mio_dev_thr_haltslave (
mio_dev_thr_t* dev,
mio_dev_thr_sid_t sid
);
#ifdef __cplusplus

View File

@ -30,6 +30,7 @@
#define DEV_CAP_ALL_WATCHED (MIO_DEV_CAP_IN_WATCHED | MIO_DEV_CAP_OUT_WATCHED | MIO_DEV_CAP_PRI_WATCHED)
static void clear_unneeded_cfmbs (mio_t* mio);
static int schedule_kill_zombie_job (mio_dev_t* dev);
static int kill_and_free_device (mio_dev_t* dev, int force);
@ -127,6 +128,7 @@ int mio_init (mio_t* mio, mio_mmgr_t* mmgr, mio_cmgr_t* cmgr, mio_oow_t tmrcapa)
mio->tmr.capa = tmrcapa;
MIO_CFMBL_INIT (&mio->cfmb);
MIO_DEVL_INIT (&mio->actdev);
MIO_DEVL_INIT (&mio->hltdev);
MIO_DEVL_INIT (&mio->zmbdev);
@ -152,6 +154,8 @@ void mio_fini (mio_t* mio)
mio_dev_t diehard;
mio_oow_t i;
mio->_fini_in_progress = 1;
/* clean up free cwq list */
for (i = 0; i < MIO_COUNTOF(mio->cwqfl); i++)
{
@ -230,6 +234,10 @@ void mio_fini (mio_t* mio)
mio_cleartmrjobs (mio);
mio_freemem (mio, mio->tmr.jobs);
printf ("BEGINNING CFMB CLEARANCE........\n");
/* clear unneeded cfmbs insistently - a misbehaving checker will make this cleaning step loop forever*/
while (!MIO_CFMBL_IS_EMPTY(&mio->cfmb)) clear_unneeded_cfmbs (mio);
mio_sys_fini (mio); /* finalize the system dependent data */
mio_freemem (mio, mio->log.ptr);
@ -674,10 +682,30 @@ skip_evcb:
}
}
static void clear_unneeded_cfmbs (mio_t* mio)
{
mio_cfmb_t* cur, * next;
cur = MIO_CFMBL_FIRST_CFMB(&mio->cfmb);
while (!MIO_CFMBL_IS_NIL_CFMB(&mio->cfmb, cur))
{
next = MIO_CFMBL_NEXT_CFMB(cur);
if (cur->cfmb_checker(mio, cur))
{
MIO_CFMBL_UNLINK_CFMB (cur);
mio_freemem (mio, cur);
}
cur = next;
}
}
int mio_exec (mio_t* mio)
{
int ret = 0;
/* clear unneeded cfmbs - i hate to do this. TODO: should i do this less frequently? if less frequent, would it accumulate too many blocks? */
if (!MIO_CFMBL_IS_EMPTY(&mio->cfmb)) clear_unneeded_cfmbs (mio);
/* execute callbacks for completed write operations */
fire_cwq_handlers (mio);
@ -1656,6 +1684,13 @@ void mio_freemem (mio_t* mio, void* ptr)
{
MIO_MMGR_FREE (mio->_mmgr, ptr);
}
/* ------------------------------------------------------------------------ */
void mio_addcfmb (mio_t* mio, mio_cfmb_t* cfmb, mio_cfmb_checker_t checker)
{
cfmb->cfmb_checker = checker;
MIO_CFMBL_APPEND_CFMB (&mio->cfmb, cfmb);
}
/* ------------------------------------------------------------------------ */

View File

@ -417,6 +417,55 @@ typedef enum mio_dev_event_t mio_dev_event_t;
#define MIO_CWQFL_ALIGN 16
/* =========================================================================
* CHECK-AND-FREE MEMORY BLOCK
* ========================================================================= */
#define MIO_CFMB_HEADER \
mio_t* mio; \
mio_cfmb_t* cfmb_next; \
mio_cfmb_t* cfmb_prev; \
mio_cfmb_checker_t cfmb_checker
typedef struct mio_cfmb_t mio_cfmb_t;
typedef int (*mio_cfmb_checker_t) (
mio_t* mio,
mio_cfmb_t* cfmb
);
struct mio_cfmb_t
{
MIO_CFMB_HEADER;
};
#define MIO_CFMBL_PREPEND_CFMB(lh,cfmb) do { \
(cfmb)->cfmb_prev = (lh); \
(cfmb)->cfmb_next = (lh)->cfmb_next; \
(cfmb)->cfmb_next->cfmb_prev = (cfmb); \
(lh)->cfmb_next = (cfmb); \
} while(0)
#define MIO_CFMBL_APPEND_CFMB(lh,cfmb) do { \
(cfmb)->cfmb_next = (lh); \
(cfmb)->cfmb_prev = (lh)->cfmb_prev; \
(cfmb)->cfmb_prev->cfmb_next = (cfmb); \
(lh)->cfmb_prev = (cfmb); \
} while(0)
#define MIO_CFMBL_UNLINK_CFMB(cfmb) do { \
(cfmb)->cfmb_prev->cfmb_next = (cfmb)->cfmb_next; \
(cfmb)->cfmb_next->cfmb_prev = (cfmb)->cfmb_prev; \
} while (0)
#define MIO_CFMBL_INIT(lh) ((lh)->cfmb_next = (lh)->cfmb_prev = lh)
#define MIO_CFMBL_FIRST_CFMB(lh) ((lh)->cfmb_next)
#define MIO_CFMBL_LAST_CFMB(lh) ((lh)->cfmb_prev)
#define MIO_CFMBL_IS_EMPTY(lh) (MIO_CFMBL_FIRST_CFMB(lh) == (lh))
#define MIO_CFMBL_IS_NIL_CFMB(lh,cfmb) ((cfmb) == (lh))
#define MIO_CFMBL_PREV_CFMB(cfmb) ((cfmb)->cfmb_prev)
#define MIO_CFMBL_NEXT_CFMB(cfmb) ((cfmb)->cfmb_next)
/* =========================================================================
* SERVICE
* ========================================================================= */
@ -465,6 +514,8 @@ struct mio_svc_t
#define MIO_SVCL_IS_EMPTY(lh) (MIO_SVCL_FIRST_SVC(lh) == (lh))
#define MIO_SVCL_IS_NIL_SVC(lh,svc) ((svc) == (lh))
#define MIO_SVCL_PREV_SVC(svc) ((svc)->svc_prev)
#define MIO_SVCL_NEXT_SVC(svc) ((svc)->svc_next)
/* =========================================================================
* MIO LOGGING
* ========================================================================= */
@ -568,7 +619,8 @@ struct mio_t
mio_oow_t len;
} errmsg;
int shuterr;
unsigned short int _shuterr;
unsigned short int _fini_in_progress;
struct
{
@ -598,6 +650,7 @@ struct mio_t
mio_stopreq_t stopreq; /* stop request to abort mio_loop() */
mio_cfmb_t cfmb; /* list head of cfmbs */
mio_dev_t actdev; /* list head of active devices */
mio_dev_t hltdev; /* list head of halted devices */
mio_dev_t zmbdev; /* list head of zombie devices */

View File

@ -50,19 +50,19 @@ static mio_dev_thr_slave_t* make_slave (mio_t* mio, slave_info_t* si);
struct mio_dev_thr_info_t
{
mio_t* mio;
MIO_CFMB_HEADER;
mio_dev_thr_func_t thr_func;
mio_dev_thr_iopair_t thr_iop;
void* thr_ctx;
pthread_t thr_hnd;
int thr_done;
mio_tmridx_t cleanup_tmridx;
};
typedef struct mio_dev_thr_info_t mio_dev_thr_info_t;
static void free_thr_info (mio_t* mio, mio_dev_thr_info_t* ti)
static void free_thr_info_resources (mio_t* mio, mio_dev_thr_info_t* ti)
{
if (ti->thr_iop.rfd != MIO_SYSHND_INVALID)
{
@ -74,14 +74,37 @@ static void free_thr_info (mio_t* mio, mio_dev_thr_info_t* ti)
close (ti->thr_iop.wfd);
ti->thr_iop.wfd = MIO_SYSHND_INVALID;
}
}
mio_freemem (mio, ti);
static int ready_to_free_thr_info (mio_t* mio, mio_cfmb_t* cfmb)
{
mio_dev_thr_info_t* ti = (mio_dev_thr_info_t*)cfmb;
#if 1
if (MIO_UNLIKELY(mio->_fini_in_progress))
{
pthread_join (ti->thr_hnd, MIO_NULL); /* BAD. blocking call in a non-blocking library. not useful to call pthread_tryjoin_np() here. */
free_thr_info_resources (mio, ti);
return 1; /* free me */
}
#endif
if (ti->thr_done)
{
free_thr_info_resources (mio, ti);
#if defined(HAVE_PTHREAD_TRYJOIN_NP)
if (pthread_tryjoin_np(ti->thr_hnd) != 0) /* not terminated yet - however, this isn't necessary. z*/
#endif
pthread_detach (ti->thr_hnd); /* just detach it */
return 1; /* free me */
}
return 0; /* not freeed */
}
static void mark_thr_done (void* ctx)
{
mio_dev_thr_info_t* ti = (mio_dev_thr_info_t*)ctx;
printf ("QQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQ\n");
ti->thr_done = 1;
}
@ -98,6 +121,10 @@ static void* run_thr_func (void* ctx)
ti->thr_func (ti->mio, &ti->thr_iop, ti->thr_ctx);
/* This part may get partially executed or not executed if the thread is cancelled */
free_thr_info_resources (ti->mio, ti); /* TODO: check if the close() call inside this call completes when it becomes a cancellation point. if so, the code must get changed */
/* ---------------------------------------------------------- */
pthread_cleanup_pop (1);
pthread_exit (MIO_NULL);
return MIO_NULL;
@ -215,18 +242,6 @@ static int dev_thr_make_slave (mio_dev_t* dev, void* ctx)
return 0;
}
static void check_and_free_thr_info (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job)
{
mio_dev_thr_info_t* ti = (mio_dev_thr_info_t*)job->ctx;
if (ti->thr_done)
{
free_thr_info (mio, ti);
}
else
{
}
}
static int dev_thr_kill_master (mio_dev_t* dev, int force)
{
mio_t* mio = dev->mio;
@ -260,32 +275,13 @@ static int dev_thr_kill_master (mio_dev_t* dev, int force)
{
printf ("THREAD DONE>...111\n");
pthread_detach (ti->thr_hnd); /* pthread_join() may be blocking. */
free_thr_info (mio, ti);
}
else
{
mio_tmrjob_t tmrjob;
MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob));
tmrjob.ctx = ti;
mio_gettime (mio, &tmrjob.when);
tmrjob.when.sec++;
tmrjob.handler = check_and_free_thr_info;
tmrjob.idxptr = &ti->cleanup_tmridx;
if (ti->thr_done)
{
printf ("THREAD DONE>...222\n");
pthread_detach (ti->thr_hnd); /* pthread_join() may be blocking. */
free_thr_info (mio, ti);
}
else
{
printf ("THREAD NOT DONE>...222\n");
//ti->cleanup_tmridx = mio_instmrjob(mio, &tmrjob);
pthread_join (ti->thr_hnd, MIO_NULL); /* pthread_join() may be blocking. */
free_thr_info_resources (mio, ti);
mio_freemem (mio, ti);
//mio_instmrjob (mio,
}
else
{
printf ("THREAD NOT DONE>...111\n");
mio_addcfmb (mio, ti, ready_to_free_thr_info);
}
rdev->thr_info = MIO_NULL;
@ -719,12 +715,11 @@ int mio_dev_thr_close (mio_dev_thr_t* dev, mio_dev_thr_sid_t sid)
return mio_dev_ioctl((mio_dev_t*)dev, MIO_DEV_THR_CLOSE, &sid);
}
#if 0
int mio_dev_thr_killchild (mio_dev_thr_t* dev)
void mio_dev_thr_haltslave (mio_dev_thr_t* dev, mio_dev_thr_sid_t sid)
{
return mio_dev_ioctl((mio_dev_t*)dev, MIO_DEV_THR_KILL_CHILD, MIO_NULL);
if (sid >= 0 && sid < MIO_COUNTOF(dev->slave) && dev->slave[sid])
mio_dev_halt((mio_dev_t*)dev->slave[sid]);
}
#endif
#if 0
mio_dev_thr_t* mio_dev_thr_getdev (mio_dev_thr_t* thr, mio_dev_thr_sid_t sid)