added mio_schedtmrjobat() and mio_schedtmrjobafter()

touched up some log messages in sck.c
This commit is contained in:
hyung-hwan 2020-08-01 15:56:53 +00:00
parent c230c92249
commit e77391da1d
7 changed files with 85 additions and 47 deletions

View File

@ -988,23 +988,6 @@ static void handle_signal (int sig)
if (g_mio) mio_stop (g_mio, MIO_STOPREQ_TERMINATION); if (g_mio) mio_stop (g_mio, MIO_STOPREQ_TERMINATION);
} }
static int schedule_timer_job_after (mio_t* mio, const mio_ntime_t* fire_after, mio_tmrjob_handler_t handler, void* ctx)
{
mio_tmrjob_t tmrjob;
memset (&tmrjob, 0, MIO_SIZEOF(tmrjob));
tmrjob.ctx = ctx;
mio_gettime (mio, &tmrjob.when);
MIO_ADD_NTIME (&tmrjob.when, &tmrjob.when, fire_after);
tmrjob.handler = handler;
tmrjob.idxptr = MIO_NULL;
return mio_instmrjob(mio, &tmrjob);
}
static void send_test_query (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job) static void send_test_query (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job)
{ {
//if (!mio_svc_dnc_resolve((mio_svc_dnc_t*)job->ctx, "www.microsoft.com", MIO_DNS_RRT_CNAME, MIO_SVC_DNC_RESOLVE_FLAG_COOKIE, on_dnc_resolve, 0)) //if (!mio_svc_dnc_resolve((mio_svc_dnc_t*)job->ctx, "www.microsoft.com", MIO_DNS_RRT_CNAME, MIO_SVC_DNC_RESOLVE_FLAG_COOKIE, on_dnc_resolve, 0))
@ -1340,7 +1323,7 @@ for (i = 0; i < 5; i++)
{ {
mio_ntime_t x; mio_ntime_t x;
MIO_INIT_NTIME (&x, 5, 0); MIO_INIT_NTIME (&x, 5, 0);
schedule_timer_job_after (mio, &x, send_test_query, dnc); mio_schedtmrjobafter (mio, &x, send_test_query, MIO_NULL, dnc);
if (!mio_svc_dnc_resolve(dnc, "b.wild.com", MIO_DNS_RRT_A, MIO_SVC_DNC_RESOLVE_FLAG_PREFER_TCP, on_dnc_resolve, 0)) if (!mio_svc_dnc_resolve(dnc, "b.wild.com", MIO_DNS_RRT_A, MIO_SVC_DNC_RESOLVE_FLAG_PREFER_TCP, on_dnc_resolve, 0))
{ {

View File

@ -342,6 +342,7 @@ struct xx_mq_t
static xx_mq_t xx_mq; static xx_mq_t xx_mq;
#if 0
static int schedule_timer_job_at (mio_dev_sck_t* dev, const mio_ntime_t* fire_at, mio_tmrjob_handler_t handler, mio_tmridx_t* tmridx) static int schedule_timer_job_at (mio_dev_sck_t* dev, const mio_ntime_t* fire_at, mio_tmrjob_handler_t handler, mio_tmridx_t* tmridx)
{ {
mio_tmrjob_t tmrjob; mio_tmrjob_t tmrjob;
@ -355,6 +356,7 @@ static int schedule_timer_job_at (mio_dev_sck_t* dev, const mio_ntime_t* fire_at
return mio_instmrjob(dev->mio, &tmrjob); return mio_instmrjob(dev->mio, &tmrjob);
} }
#endif
static void enable_accept (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job) static void enable_accept (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job)
{ {
@ -408,7 +410,7 @@ static int try_to_accept (mio_dev_sck_t* sck, mio_dev_sck_qxmsg_t* qxmsg, int in
} }
if (xx_tmridx == MIO_TMRIDX_INVALID) if (xx_tmridx == MIO_TMRIDX_INVALID)
schedule_timer_job_at (sck, MIO_NULL, enable_accept, &xx_tmridx); mio_schedtmrjobat (mio, MIO_NULL, enable_accept, &xx_tmridx, sck);
return 0; /* enqueued for later writing */ return 0; /* enqueued for later writing */
} }
@ -488,6 +490,7 @@ static int add_listener (mio_t* mio, mio_bch_t* addrstr)
memset (&mi, 0, MIO_SIZEOF(mi)); memset (&mi, 0, MIO_SIZEOF(mi));
mi.type = (mio_skad_family(&bi.localaddr) == MIO_AF_INET? MIO_DEV_SCK_TCP4: MIO_DEV_SCK_TCP6); mi.type = (mio_skad_family(&bi.localaddr) == MIO_AF_INET? MIO_DEV_SCK_TCP4: MIO_DEV_SCK_TCP6);
mi.options = MIO_DEV_SCK_MAKE_LENIENT;
mi.on_write = tcp_sck_on_write; mi.on_write = tcp_sck_on_write;
mi.on_read = tcp_sck_on_read; mi.on_read = tcp_sck_on_read;
mi.on_connect = tcp_sck_on_connect; /* this is invoked on a client accept as well */ mi.on_connect = tcp_sck_on_connect; /* this is invoked on a client accept as well */

View File

@ -274,6 +274,7 @@ printf ("listener socket disconnect..................sck %p %d\n", sck, sck->hnd
} }
} }
/* ------------------------------------------------------------------------ */
/* ------------------------------------------------------------------------ */ /* ------------------------------------------------------------------------ */
@ -334,7 +335,7 @@ mio_svc_htts_t* mio_svc_htts_start (mio_t* mio, mio_dev_sck_bind_t* sck_bind, mi
MIO_MEMSET (&info, 0, MIO_SIZEOF(info)); MIO_MEMSET (&info, 0, MIO_SIZEOF(info));
info.l.backlogs = 4096; info.l.backlogs = 4096;
MIO_INIT_NTIME (&info.l.accept_tmout, 5, 1); MIO_INIT_NTIME (&info.l.accept_tmout, 5, 1); /* usedd for ssl accept */
if (mio_dev_sck_listen(htts->lsck, &info.l) <= -1) goto oops; if (mio_dev_sck_listen(htts->lsck, &info.l) <= -1) goto oops;
} }

View File

@ -348,7 +348,7 @@ static void fire_cwq_handlers (mio_t* mio)
if (dev_to_halt) if (dev_to_halt)
{ {
MIO_DEBUG1 (mio, "DEV(%p) - halting a device for on_write error upon write completion[1]\n", dev_to_halt); MIO_DEBUG2 (mio, "DEV(%p) - halting a device for on_write error upon write completion[1] - %js\n", dev_to_halt, mio_geterrmsg(mio));
mio_dev_halt (dev_to_halt); mio_dev_halt (dev_to_halt);
} }
} }
@ -396,7 +396,7 @@ static void fire_cwq_handlers_for_dev (mio_t* mio, mio_dev_t* dev, int for_kill)
if (!for_kill && dev_to_halt) if (!for_kill && dev_to_halt)
{ {
MIO_DEBUG1 (mio, "DEV(%p) - halting a device for on_write error upon write completion[2]\n", dev_to_halt); MIO_DEBUG2 (mio, "DEV(%p) - halting a device for on_write error upon write completion[2] - %js\n", dev_to_halt, mio_geterrmsg(mio));
mio_dev_halt (dev_to_halt); mio_dev_halt (dev_to_halt);
} }
} }
@ -427,7 +427,7 @@ static MIO_INLINE void handle_event (mio_t* mio, mio_dev_t* dev, int events, int
x = dev->dev_evcb->ready(dev, xevents); x = dev->dev_evcb->ready(dev, xevents);
if (x <= -1) if (x <= -1)
{ {
MIO_DEBUG1 (mio, "DEV(%p) - halting a device for ready callback error\n", dev); MIO_DEBUG2 (mio, "DEV(%p) - halting a device for ready callback error - %js\n", dev, mio_geterrmsg(mio));
mio_dev_halt (dev); mio_dev_halt (dev);
return; return;
} }
@ -468,7 +468,7 @@ static MIO_INLINE void handle_event (mio_t* mio, mio_dev_t* dev, int events, int
} }
if (x <= -1) if (x <= -1)
{ {
MIO_DEBUG1 (mio, "DEV(%p) - halting a device for write failure\n", dev); MIO_DEBUG2 (mio, "DEV(%p) - halting a device for write failure - %js\n", dev, mio_geterrmsg(mio));
mio_dev_halt (dev); mio_dev_halt (dev);
dev = MIO_NULL; dev = MIO_NULL;
break; break;
@ -512,7 +512,7 @@ static MIO_INLINE void handle_event (mio_t* mio, mio_dev_t* dev, int events, int
if (y <= -1) if (y <= -1)
{ {
MIO_DEBUG1 (mio, "DEV(%p) - halting a device for on_write error\n", dev); MIO_DEBUG2 (mio, "DEV(%p) - halting a device for on_write error - %js\n", dev, mio_geterrmsg(mio));
mio_dev_halt (dev); mio_dev_halt (dev);
dev = MIO_NULL; dev = MIO_NULL;
break; break;
@ -566,7 +566,7 @@ static MIO_INLINE void handle_event (mio_t* mio, mio_dev_t* dev, int events, int
x = dev->dev_mth->read(dev, mio->bigbuf, &len, &srcaddr); x = dev->dev_mth->read(dev, mio->bigbuf, &len, &srcaddr);
if (x <= -1) if (x <= -1)
{ {
MIO_DEBUG1 (mio, "DEV(%p) - halting a device for read failure\n", dev); MIO_DEBUG2 (mio, "DEV(%p) - halting a device for read failure - %js\n", dev, mio_geterrmsg(mio));
mio_dev_halt (dev); mio_dev_halt (dev);
dev = MIO_NULL; dev = MIO_NULL;
break; break;
@ -630,7 +630,7 @@ static MIO_INLINE void handle_event (mio_t* mio, mio_dev_t* dev, int events, int
{ {
/* 1. input ended and its reporting failed or /* 1. input ended and its reporting failed or
* 2. input ended and no writing is possible */ * 2. input ended and no writing is possible */
MIO_DEBUG1 (mio, "DEV(%p) - halting a stream device for on_read failure while output is closed\n", dev); MIO_DEBUG2 (mio, "DEV(%p) - halting a stream device for on_read failure while output is closed - %js\n", dev, mio_geterrmsg(mio));
mio_dev_halt (dev); mio_dev_halt (dev);
dev = MIO_NULL; dev = MIO_NULL;
} }
@ -648,7 +648,7 @@ static MIO_INLINE void handle_event (mio_t* mio, mio_dev_t* dev, int events, int
y = dev->dev_evcb->on_read(dev, mio->bigbuf, len, &srcaddr); y = dev->dev_evcb->on_read(dev, mio->bigbuf, len, &srcaddr);
if (y <= -1) if (y <= -1)
{ {
MIO_DEBUG1 (mio, "DEV(%p) - halting a non-stream device for on_read failure while output is closed\n", dev); MIO_DEBUG2 (mio, "DEV(%p) - halting a non-stream device for on_read failure while output is closed - %js\n", dev, mio_geterrmsg(mio));
mio_dev_halt (dev); mio_dev_halt (dev);
dev = MIO_NULL; dev = MIO_NULL;
break; break;
@ -1177,7 +1177,7 @@ static void on_read_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* j
if (x <= -1) if (x <= -1)
{ {
MIO_DEBUG1 (mio, "DEV(%p) - halting a device for on_read error upon timeout\n", dev); MIO_DEBUG2 (mio, "DEV(%p) - halting a device for on_read error upon timeout - %js\n", dev, mio_geterrmsg(mio));
mio_dev_halt (dev); mio_dev_halt (dev);
} }
} }
@ -1268,7 +1268,7 @@ static void on_write_timeout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t*
if (x <= -1) if (x <= -1)
{ {
MIO_DEBUG1 (mio, "DEV(%p) - halting a device for on_write error upon timeout\n", dev); MIO_DEBUG2 (mio, "DEV(%p) - halting a device for on_write error upon timeout - %js\n", dev, mio_geterrmsg(mio));
mio_dev_halt (dev); mio_dev_halt (dev);
} }
} }

View File

@ -989,6 +989,24 @@ MIO_EXPORT int mio_gettmrjobdeadline (
mio_ntime_t* deadline mio_ntime_t* deadline
); );
MIO_EXPORT int mio_schedtmrjobat (
mio_t* mio,
const mio_ntime_t* fire_at,
mio_tmrjob_handler_t handler,
mio_tmridx_t* tmridx,
void* ctx
);
MIO_EXPORT int mio_schedtmrjobafter (
mio_t* mio,
const mio_ntime_t* fire_after,
mio_tmrjob_handler_t handler,
mio_tmridx_t* tmridx,
void* ctx
);
/* ========================================================================= /* =========================================================================
* TIME * TIME
* ========================================================================= */ * ========================================================================= */

View File

@ -295,8 +295,11 @@ static void ssl_connect_timedout (mio_t* mio, const mio_ntime_t* now, mio_tmrjob
} }
} }
static int schedule_timer_job_at (mio_dev_sck_t* dev, const mio_ntime_t* fire_at, mio_tmrjob_handler_t handler) static MIO_INLINE int schedule_timer_job_at (mio_dev_sck_t* dev, const mio_ntime_t* fire_at, mio_tmrjob_handler_t handler)
{ {
#if 1
return mio_schedtmrjobat(dev->mio, fire_at, handler, &dev->tmrjob_index, dev);
#else
mio_tmrjob_t tmrjob; mio_tmrjob_t tmrjob;
MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob)); MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob));
@ -309,10 +312,14 @@ static int schedule_timer_job_at (mio_dev_sck_t* dev, const mio_ntime_t* fire_at
MIO_ASSERT (dev->mio, dev->tmrjob_index == MIO_TMRIDX_INVALID); MIO_ASSERT (dev->mio, dev->tmrjob_index == MIO_TMRIDX_INVALID);
dev->tmrjob_index = mio_instmrjob(dev->mio, &tmrjob); dev->tmrjob_index = mio_instmrjob(dev->mio, &tmrjob);
return dev->tmrjob_index == MIO_TMRIDX_INVALID? -1: 0; return dev->tmrjob_index == MIO_TMRIDX_INVALID? -1: 0;
#endif
} }
static int schedule_timer_job_after (mio_dev_sck_t* dev, const mio_ntime_t* fire_after, mio_tmrjob_handler_t handler) static MIO_INLINE int schedule_timer_job_after (mio_dev_sck_t* dev, const mio_ntime_t* fire_after, mio_tmrjob_handler_t handler)
{ {
#if 1
return mio_schedtmrjobafter(dev->mio, fire_after, handler, &dev->tmrjob_index, dev);
#else
mio_t* mio = dev->mio; mio_t* mio = dev->mio;
mio_ntime_t fire_at; mio_ntime_t fire_at;
@ -322,6 +329,7 @@ static int schedule_timer_job_after (mio_dev_sck_t* dev, const mio_ntime_t* fire
MIO_ADD_NTIME (&fire_at, &fire_at, fire_after); MIO_ADD_NTIME (&fire_at, &fire_at, fire_after);
return schedule_timer_job_at(dev, &fire_at, handler); return schedule_timer_job_at(dev, &fire_at, handler);
#endif
} }
/* ======================================================================== */ /* ======================================================================== */
@ -1577,27 +1585,22 @@ static int accept_incoming_connection (mio_dev_sck_t* rdev)
clisck = accept4(rdev->hnd, (struct sockaddr*)&remoteaddr, &addrlen, flags); clisck = accept4(rdev->hnd, (struct sockaddr*)&remoteaddr, &addrlen, flags);
if (clisck <= -1) if (clisck <= -1)
{ {
if (errno != ENOSYS) if (errno != ENOSYS) goto accept_error;
{
if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EAGAIN) return 0;
if (errno == EINTR) return 0; /* if interrupted by a signal, treat it as if it's EINPROGRESS */
mio_seterrwithsyserr (mio, 0, errno);
return -1;
}
/* go on for the normal 3-parameter accept */ /* go on for the normal 3-parameter accept */
} }
else else
{ {
goto accept_done; goto accept_done;
} }
#endif #endif
addrlen = MIO_SIZEOF(remoteaddr); addrlen = MIO_SIZEOF(remoteaddr);
clisck = accept(rdev->hnd, (struct sockaddr*)&remoteaddr, &addrlen); clisck = accept(rdev->hnd, (struct sockaddr*)&remoteaddr, &addrlen);
if (clisck <= -1) if (clisck <= -1)
{ {
#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC) && defined(HAVE_ACCEPT4)
accept_error:
#endif
if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EAGAIN) return 0; if (errno == EINPROGRESS || errno == EWOULDBLOCK || errno == EAGAIN) return 0;
if (errno == EINTR) return 0; /* if interrupted by a signal, treat it as if it's EINPROGRESS */ if (errno == EINTR) return 0; /* if interrupted by a signal, treat it as if it's EINPROGRESS */
@ -1605,7 +1608,9 @@ static int accept_incoming_connection (mio_dev_sck_t* rdev)
return -1; return -1;
} }
#if defined(SOCK_NONBLOCK) && defined(SOCK_CLOEXEC) && defined(HAVE_ACCEPT4)
accept_done: accept_done:
#endif
return make_accepted_client_connection(rdev, clisck, &remoteaddr, rdev->type); return make_accepted_client_connection(rdev, clisck, &remoteaddr, rdev->type);
} }

View File

@ -38,7 +38,7 @@ void mio_cleartmrjobs (mio_t* mio)
while (mio->tmr.size > 0) mio_deltmrjob (mio, 0); while (mio->tmr.size > 0) mio_deltmrjob (mio, 0);
} }
static mio_tmridx_t sift_up (mio_t* mio, mio_tmridx_t index, int notify) static mio_tmridx_t sift_up (mio_t* mio, mio_tmridx_t index)
{ {
mio_tmridx_t parent; mio_tmridx_t parent;
@ -68,7 +68,7 @@ static mio_tmridx_t sift_up (mio_t* mio, mio_tmridx_t index, int notify)
return index; return index;
} }
static mio_tmridx_t sift_down (mio_t* mio, mio_tmridx_t index, int notify) static mio_tmridx_t sift_down (mio_t* mio, mio_tmridx_t index)
{ {
mio_oow_t base = mio->tmr.size / 2; mio_oow_t base = mio->tmr.size / 2;
@ -124,7 +124,7 @@ void mio_deltmrjob (mio_t* mio, mio_tmridx_t index)
{ {
mio->tmr.jobs[index] = mio->tmr.jobs[mio->tmr.size]; mio->tmr.jobs[index] = mio->tmr.jobs[mio->tmr.size];
if (mio->tmr.jobs[index].idxptr) *mio->tmr.jobs[index].idxptr = index; if (mio->tmr.jobs[index].idxptr) *mio->tmr.jobs[index].idxptr = index;
YOUNGER_THAN(&mio->tmr.jobs[index], &item)? sift_up(mio, index, 1): sift_down(mio, index, 1); YOUNGER_THAN(&mio->tmr.jobs[index], &item)? sift_up(mio, index): sift_down(mio, index);
} }
} }
@ -149,7 +149,7 @@ mio_tmridx_t mio_instmrjob (mio_t* mio, const mio_tmrjob_t* job)
mio->tmr.size = mio->tmr.size + 1; mio->tmr.size = mio->tmr.size + 1;
mio->tmr.jobs[index] = *job; mio->tmr.jobs[index] = *job;
if (mio->tmr.jobs[index].idxptr) *mio->tmr.jobs[index].idxptr = index; if (mio->tmr.jobs[index].idxptr) *mio->tmr.jobs[index].idxptr = index;
return sift_up(mio, index, 0); return sift_up(mio, index);
} }
mio_tmridx_t mio_updtmrjob (mio_t* mio, mio_tmridx_t index, const mio_tmrjob_t* job) mio_tmridx_t mio_updtmrjob (mio_t* mio, mio_tmridx_t index, const mio_tmrjob_t* job)
@ -158,7 +158,7 @@ mio_tmridx_t mio_updtmrjob (mio_t* mio, mio_tmridx_t index, const mio_tmrjob_t*
item = mio->tmr.jobs[index]; item = mio->tmr.jobs[index];
mio->tmr.jobs[index] = *job; mio->tmr.jobs[index] = *job;
if (mio->tmr.jobs[index].idxptr) *mio->tmr.jobs[index].idxptr = index; if (mio->tmr.jobs[index].idxptr) *mio->tmr.jobs[index].idxptr = index;
return YOUNGER_THAN(job, &item)? sift_up (mio, index, 0): sift_down (mio, index, 0); return YOUNGER_THAN(job, &item)? sift_up (mio, index): sift_down (mio, index);
} }
void mio_firetmrjobs (mio_t* mio, const mio_ntime_t* tm, mio_oow_t* firecnt) void mio_firetmrjobs (mio_t* mio, const mio_ntime_t* tm, mio_oow_t* firecnt)
@ -228,3 +228,31 @@ int mio_gettmrjobdeadline (mio_t* mio, mio_tmridx_t index, mio_ntime_t* deadline
*deadline = mio->tmr.jobs[index].when; *deadline = mio->tmr.jobs[index].when;
return 0; return 0;
} }
int mio_schedtmrjobat (mio_t* mio, const mio_ntime_t* fire_at, mio_tmrjob_handler_t handler, mio_tmridx_t* tmridx, void* ctx)
{
mio_tmrjob_t tmrjob;
MIO_MEMSET (&tmrjob, 0, MIO_SIZEOF(tmrjob));
tmrjob.ctx = ctx;
if (fire_at) tmrjob.when = *fire_at;
tmrjob.handler = handler;
tmrjob.idxptr = tmridx;
return mio_instmrjob(mio, &tmrjob) == MIO_TMRIDX_INVALID? -1: 0;
}
int mio_schedtmrjobafter (mio_t* mio, const mio_ntime_t* fire_after, mio_tmrjob_handler_t handler, mio_tmridx_t* tmridx, void* ctx)
{
mio_ntime_t fire_at;
MIO_ASSERT (mio, MIO_IS_POS_NTIME(fire_after));
mio_gettime (mio, &fire_at);
MIO_ADD_NTIME (&fire_at, &fire_at, fire_after);
return mio_schedtmrjobat(mio, &fire_at, handler, tmridx, ctx);
}