diff --git a/mio/bin/t04.c b/mio/bin/t04.c index 7173eb0..c6c4db3 100644 --- a/mio/bin/t04.c +++ b/mio/bin/t04.c @@ -3,6 +3,7 @@ #include #include #include +#include #include @@ -146,6 +147,40 @@ printf ("[%lu] NO DATA..\n", sid); } } +static mio_t* g_mio = MIO_NULL; + +static void handle_signal (int sig) +{ + mio_stop (g_mio, MIO_STOPREQ_TERMINATION); +} + +static void send_test_query (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job) +{ + mio_svc_marc_t* marc = (mio_svc_marc_t*)job->ctx; + + if (mio_svc_mar_querywithbchars(marc, 0, MIO_SVC_MARC_QTYPE_SELECT, "SHOW STATUS", 11, on_result, MIO_NULL) <= -1) + { + MIO_INFO1 (mio, "FAILED TO SEND QUERY - %js\n", mio_geterrmsg(mio)); + } +} + +static int schedule_timer_job_after (mio_t* mio, const mio_ntime_t* fire_after, mio_tmrjob_handler_t handler, void* ctx) +{ + mio_tmrjob_t tmrjob; + + memset (&tmrjob, 0, MIO_SIZEOF(tmrjob)); + tmrjob.ctx = ctx; + + mio_gettime (mio, &tmrjob.when); + MIO_ADD_NTIME (&tmrjob.when, &tmrjob.when, fire_after); + + tmrjob.handler = handler; + tmrjob.idxptr = MIO_NULL; + + return mio_instmrjob(mio, &tmrjob); +} + + int main (int argc, char* argv[]) { @@ -166,7 +201,6 @@ int main (int argc, char* argv[]) goto oops; } - memset (&ci, 0, MIO_SIZEOF(ci)); ci.host = argv[1]; ci.port = 3306; /* TODO: argv[2]; */ @@ -209,9 +243,23 @@ int main (int argc, char* argv[]) } #endif - mio_loop (mio); + g_mio = mio; + signal (SIGINT, handle_signal); + + /* ---------------------------------------- */ + { + mio_ntime_t x; + MIO_INIT_NTIME (&x, 32, 0); + schedule_timer_job_after (mio, &x, send_test_query, marc); + mio_loop (mio); + } + /* ---------------------------------------- */ + + signal (SIGINT, SIG_IGN); + g_mio = MIO_NULL; oops: +printf ("about to close mio...\n"); if (mio) mio_close (mio); return 0; } diff --git a/mio/lib/mar-cli.c b/mio/lib/mar-cli.c index c1cd260..2ea85fa 100644 --- a/mio/lib/mar-cli.c +++ b/mio/lib/mar-cli.c @@ -28,6 +28,7 @@ #include "mio-prv.h" #include +#include typedef struct sess_t sess_t; typedef struct sess_qry_t sess_qry_t; @@ -37,13 +38,13 @@ struct mio_svc_marc_t MIO_SVC_HEADER; mio_svc_marc_connect_t ci; + int stopping; struct { sess_t* ptr; mio_oow_t capa; } sess; - }; struct sess_qry_t @@ -77,7 +78,6 @@ struct dev_xtn_t }; - mio_svc_marc_t* mio_svc_marc_start (mio_t* mio, const mio_svc_marc_connect_t* ci) { mio_svc_marc_t* marc = MIO_NULL; @@ -103,6 +103,15 @@ oops: void mio_svc_marc_stop (mio_svc_marc_t* marc) { mio_t* mio = marc->mio; + mio_oow_t i; + + marc->stopping = 1; + + for (i = 0; i < marc->sess.capa; i++) + { + if (marc->sess.ptr[i].dev) mio_dev_mar_kill (marc->sess.ptr[i].dev); + } + mio_freemem (mio, marc->sess.ptr); MIO_SVCL_UNLINK_SVC (marc); mio_freemem (mio, marc); @@ -171,8 +180,10 @@ static int send_pending_query_if_any (sess_t* sess) printf ("sending... %.*s\n", (int)sq->qlen, sq->qptr); if (mio_dev_mar_querywithbchars(sess->dev, sq->qptr, sq->qlen) <= -1) { +MIO_DEBUG1 (sess->svc->mio, "QQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQ SEND FAIL %js\n", mio_geterrmsg(sess->svc->mio)); sq->sent = 0; - return -1; /* failure */ + mio_dev_mar_halt (sess->dev); /* this device can't carray on */ + return -1; /* halted the device for failure */ } return 1; /* sent */ @@ -183,32 +194,74 @@ printf ("sending... %.*s\n", (int)sq->qlen, sq->qptr); } /* ------------------------------------------------------------------- */ +static mio_dev_mar_t* alloc_device (mio_svc_marc_t* marc, sess_t* sess); static void mar_on_disconnect (mio_dev_mar_t* dev) { + mio_t* mio = dev->mio; dev_xtn_t* xtn = (dev_xtn_t*)mio_dev_mar_getxtn(dev); sess_t* sess = xtn->sess; -printf ("disconnected on sid %d\n", sess->sid); + MIO_DEBUG6 (mio, "MARC(%p) - device disconnected - sid %lu session %p session-connected %d device %p device-broken %d\n", sess->svc, (unsigned long int)sess->sid, sess, (int)sess->connected, dev, (int)dev->broken); + MIO_ASSERT (mio, dev == sess->dev); + + if (MIO_UNLIKELY(!sess->svc->stopping && mio->stopreq == MIO_STOPREQ_NONE)) + { + if (sess->connected && sess->dev->broken) /* risk of infinite cycle if the underlying db suffers never-ending 'broken' issue after getting connected */ + { + /* restart the dead device */ + mio_dev_mar_t* dev; + + sess->connected = 0; + + dev = alloc_device(sess->svc, sess); + if (MIO_LIKELY(dev)) + { + sess->dev = dev; + /* the pending query will be sent in on_connect() */ + return; + } + + /* if device allocation fails, just carry on */ + } + } + sess->connected = 0; + + while (1) + { + sess_qry_t* sq; + mio_svc_marc_dev_error_t err; + + sq = get_first_session_query(sess); + if (!sq) break; + + /* what is the best error code and message to use for this? */ + err.mar_errcode = CR_SERVER_LOST; + err.mar_errmsg = "server lost"; + sq->on_result (sess->svc, sess->sid, MIO_SVC_MARC_RCODE_ERROR, &err, sq->qctx); + dequeue_session_query (mio, sess); + } + + /* it should point to a placeholder node(either the initial one or the transited one after dequeing */ + MIO_ASSERT (mio, sess->q_head == sess->q_tail); + MIO_ASSERT (mio, sess->q_head->sq_next == MIO_NULL); + free_session_query (mio, sess->q_head); + sess->q_head = sess->q_tail = MIO_NULL; + sess->dev = MIO_NULL; - - - //if (there is a query which has been sent but not processed... ) <--- can this be handled by the caller? } static void mar_on_connect (mio_dev_mar_t* dev) { + mio_t* mio = dev->mio; dev_xtn_t* xtn = (dev_xtn_t*)mio_dev_mar_getxtn(dev); sess_t* sess = xtn->sess; - sess->connected = 1; -printf ("connected on sid %d\n", sess->sid); + MIO_DEBUG5 (mio, "MARC(%p) - device connected - sid %lu session %p device %p device-broken %d\n", sess->svc, (unsigned long int)sess->sid, sess, dev, dev->broken); - if (send_pending_query_if_any (sess) <= -1) - { - mio_dev_mar_halt (sess->dev); - } + sess->connected = 1; + send_pending_query_if_any (sess); } static void mar_on_query_started (mio_dev_mar_t* dev, int mar_ret, const mio_bch_t* mar_errmsg) @@ -221,10 +274,6 @@ static void mar_on_query_started (mio_dev_mar_t* dev, int mar_ret, const mio_bch { mio_svc_marc_dev_error_t err; printf ("QUERY FAILED...%d -> %s\n", mar_ret, mar_errmsg); -#if 0 - if (mar_ret == CR_SERVER_GONE_ERROR || /* server gone away between queries */ - mar_ret == CR_SERVER_LOST) /* server gone away during a query */ -#endif err.mar_errcode = mar_ret; err.mar_errmsg = mar_errmsg; @@ -246,9 +295,7 @@ printf ("QUERY STARTED\n"); } else { - if (sq->on_result) - sq->on_result (sess->svc, sess->sid, MIO_SVC_MARC_RCODE_DONE, MIO_NULL, sq->qctx); - + sq->on_result (sess->svc, sess->sid, MIO_SVC_MARC_RCODE_DONE, MIO_NULL, sq->qctx); dequeue_session_query (sess->svc->mio, sess); send_pending_query_if_any (sess); } @@ -261,10 +308,7 @@ static void mar_on_row_fetched (mio_dev_mar_t* dev, void* data) sess_t* sess = xtn->sess; sess_qry_t* sq = get_first_session_query(sess); - if (sq->on_result) - { - sq->on_result (sess->svc, sess->sid, (data? MIO_SVC_MARC_RCODE_ROW: MIO_SVC_MARC_RCODE_DONE), data, sq->qctx); - } + sq->on_result (sess->svc, sess->sid, (data? MIO_SVC_MARC_RCODE_ROW: MIO_SVC_MARC_RCODE_DONE), data, sq->qctx); if (!data) { @@ -278,7 +322,6 @@ static mio_dev_mar_t* alloc_device (mio_svc_marc_t* marc, sess_t* sess) mio_t* mio = (mio_t*)marc->mio; mio_dev_mar_t* mar; mio_dev_mar_make_t mi; - mio_dev_mar_connect_t ci; dev_xtn_t* xtn; MIO_MEMSET (&mi, 0, MIO_SIZEOF(mi)); @@ -347,7 +390,26 @@ static sess_t* get_session (mio_svc_marc_t* marc, mio_oow_t sid) return MIO_NULL; } - /* queue initialization with a place holder */ + /* queue initialization with a place holder. the queue maintains a placeholder node. + * the first actual data node enqueued is inserted at the back and becomes the second + * node in terms of the entire queue. + * + * PH -> DN1 -> DN2 -> ... -> DNX + * ^ ^ + * q_head q_tail + * + * get_first_session_query() returns the data of DN1, not the data held in PH. + * + * the first dequeing operations kills PH. + * + * DN1 -> DN2 -> ... -> DNX + * ^ ^ + * q_head q_tail + * + * get_first_session_query() at this point returns the data of DN2, not the data held in DN1. + * dequeing kills DN1, however. + */ + sess->q_head = sess->q_tail = sq; } @@ -367,33 +429,37 @@ int mio_svc_mar_querywithbchars (mio_svc_marc_t* marc, mio_oow_t sid, mio_svc_ma sq = make_session_query(mio, qtype, qptr, qlen, qctx, on_result); if (MIO_UNLIKELY(!sq)) return -1; - if (get_first_session_query(sess)) + if (get_first_session_query(sess) || !sess->connected) { -printf ("XXXXXXXXXx\n"); /* there are other ongoing queries */ enqueue_session_query (sess, sq); } else { - /* this is the first query */ + /* this is the first query or the device is not connected yet */ sess_qry_t* old_q_tail = sess->q_tail; + enqueue_session_query (sess, sq); -printf ("YYYYYYYYYYYYYY\n"); - if(sess->dev->connected && !sq->sent) - { - sq->sent = 1; - if (mio_dev_mar_querywithbchars(sess->dev, qptr, qlen) <= -1) - { - sq->sent = 0; + MIO_ASSERT (mio, sq->sent == 0); - /* ugly to unlink the the last item added */ + sq->sent = 1; + if (mio_dev_mar_querywithbchars(sess->dev, sq->qptr, sq->qlen) <= -1) + { + sq->sent = 0; + if (!sess->dev->broken) + { + /* unlink the the last item added */ old_q_tail->sq_next = MIO_NULL; sess->q_tail = old_q_tail; free_session_query (mio, sq); return -1; } + + /* the underlying socket of the device may get disconnected. + * in such a case, keep the enqueued query with sq->sent 0 + * and defer actual sending and processing */ } } diff --git a/mio/lib/mar.c b/mio/lib/mar.c index 9c4357a..685a2f9 100644 --- a/mio/lib/mar.c +++ b/mio/lib/mar.c @@ -28,7 +28,7 @@ #include "mio-prv.h" #include - +#include /* ========================================================================= */ @@ -47,14 +47,14 @@ static int dev_mar_make (mio_dev_t* dev, void* ctx) if (mysql_options(rdev->hnd, MYSQL_OPT_NONBLOCK, 0) != 0) { - mio_seterrbfmt (mio, MIO_ESYSERR, "%s", mysql_error(rdev->hnd)); + mio_seterrbfmt (mio, MIO_ESYSERR, "%hs", mysql_error(rdev->hnd)); mysql_close (rdev->hnd); rdev->hnd = MIO_NULL; return -1; } { - my_bool x = 0; + my_bool x = 0; /* no auto-reconnect */ mysql_options(rdev->hnd, MYSQL_OPT_RECONNECT, &x); } @@ -65,6 +65,9 @@ static int dev_mar_make (mio_dev_t* dev, void* ctx) rdev->on_disconnect = mi->on_disconnect; rdev->on_query_started = mi->on_query_started; rdev->on_row_fetched = mi->on_row_fetched; + + rdev->progress = MIO_DEV_MAR_INITIAL; + return 0; } @@ -73,6 +76,8 @@ static int dev_mar_kill (mio_dev_t* dev, int force) /*mio_t* mio = dev->mio;*/ mio_dev_mar_t* rdev = (mio_dev_mar_t*)dev; + /* if rdev->connected is 0 at this point, + * the underlying socket of this device is down */ if (rdev->on_disconnect) rdev->on_disconnect (rdev); if (rdev->res) @@ -86,12 +91,16 @@ static int dev_mar_kill (mio_dev_t* dev, int force) rdev->hnd = MIO_NULL; } + rdev->connected = 0; + rdev->broken = 0; + return 0; } static mio_syshnd_t dev_mar_getsyshnd (mio_dev_t* dev) { mio_dev_mar_t* rdev = (mio_dev_mar_t*)dev; + if (rdev->broken) return rdev->broken_syshnd; /* hack!! */ return (mio_syshnd_t)mysql_get_socket(rdev->hnd); } @@ -123,7 +132,6 @@ static MIO_INLINE void watch_mysql (mio_dev_mar_t* rdev, int wstatus) } } - static void start_fetch_row (mio_dev_mar_t* rdev) { MYSQL_ROW row; @@ -147,7 +155,6 @@ static void start_fetch_row (mio_dev_mar_t* rdev) } } - static int dev_mar_ioctl (mio_dev_t* dev, int cmd, void* arg) { mio_t* mio = dev->mio; @@ -158,38 +165,41 @@ static int dev_mar_ioctl (mio_dev_t* dev, int cmd, void* arg) case MIO_DEV_MAR_CONNECT: { mio_dev_mar_connect_t* ci = (mio_dev_mar_connect_t*)arg; - MYSQL* ret; + MYSQL* tmp; int status; - - if (MIO_DEV_MAR_GET_PROGRESS(rdev)) + if (MIO_DEV_MAR_GET_PROGRESS(rdev) != MIO_DEV_MAR_INITIAL) { /* can't connect again */ mio_seterrbfmt (mio, MIO_EPERM, "operation in progress. disallowed to connect again"); return -1; } - status = mysql_real_connect_start(&ret, rdev->hnd, ci->host, ci->username, ci->password, ci->dbname, ci->port, MIO_NULL, 0); + MIO_ASSERT (mio, rdev->connected_deferred == 0); + + status = mysql_real_connect_start(&tmp, rdev->hnd, ci->host, ci->username, ci->password, ci->dbname, ci->port, MIO_NULL, 0); rdev->dev_cap &= ~MIO_DEV_CAP_VIRTUAL; /* a socket is created in mysql_real_connect_start() */ if (status) { /* not connected */ MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_CONNECTING); - rdev->connected = 0; watch_mysql (rdev, status); } else { - /* connected immediately */ - if (MIO_UNLIKELY(!ret)) + if (MIO_UNLIKELY(!tmp)) /* connection attempt failed immediately */ { - mio_seterrbfmt (mio, MIO_ESYSERR, "%s", mysql_error(rdev->hnd)); + /* immediate failure doesn't invoke on_discoonect(). + * the caller must check the return code of this function. */ + mio_seterrbfmt (mio, MIO_ESYSERR, "%hs", mysql_error(rdev->hnd)); return -1; } + /* connected_deferred immediately. postpone actual handling to the ready() callback */ + MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_CONNECTING); + rdev->connected_deferred = 1; /* to let the ready() handler to trigger on_connect() */ /* regiter it in the multiplexer so that the ready() handler is * invoked to call the on_connect() callback */ - rdev->connected = 1; watch_mysql (rdev, MYSQL_WAIT_READ | MYSQL_WAIT_WRITE); /* TODO: verify this */ } return 0; @@ -199,6 +209,13 @@ static int dev_mar_ioctl (mio_dev_t* dev, int cmd, void* arg) { const mio_bcs_t* qstr = (const mio_bcs_t*)arg; int err, status; + mio_syshnd_t syshnd; + + if (!rdev->connected) + { + mio_seterrbfmt (mio, MIO_EPERM, "not connected. disallowed to query"); + return -1; + } if (rdev->res) /* TODO: more accurate check */ { @@ -206,12 +223,13 @@ static int dev_mar_ioctl (mio_dev_t* dev, int cmd, void* arg) return -1; } + + syshnd = mysql_get_socket(rdev->hnd); status = mysql_real_query_start(&err, rdev->hnd, qstr->ptr, qstr->len); MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_QUERY_STARTING); if (status) { /* not done */ - rdev->query_started = 0; watch_mysql (rdev, status); } else @@ -219,11 +237,26 @@ static int dev_mar_ioctl (mio_dev_t* dev, int cmd, void* arg) /* query sent immediately */ if (err) { + /* but there is an error */ if (err == 1) err = mysql_errno(rdev->hnd); - mio_copy_bcstr (rdev->errbuf, MIO_COUNTOF(rdev->errbuf), mysql_error(rdev->hnd)); + + mio_seterrbfmt (mio, MIO_ESYSERR, "%hs", mysql_error(rdev->hnd)); + if (err == CR_SERVER_LOST || err == CR_SERVER_GONE_ERROR) + { + /* the underlying socket must have gotten closed by mysql_real_query_start() */ + const mio_ooch_t* prev_errmsg; + prev_errmsg = mio_backuperrmsg(mio); + rdev->broken = 1; + rdev->broken_syshnd = syshnd; + watch_mysql (rdev, 0); + mio_dev_mar_halt (rdev); /* i can't keep this device alive regardless of the caller's post-action */ + mio_seterrbfmt (mio, MIO_ESYSERR, "%js", prev_errmsg); + } + return -1; } - rdev->query_started = 1; - rdev->query_ret = err; + + /* sent without an error */ + rdev->query_started_deferred = 1; watch_mysql (rdev, MYSQL_WAIT_READ | MYSQL_WAIT_WRITE); } return 0; @@ -236,7 +269,7 @@ static int dev_mar_ioctl (mio_dev_t* dev, int cmd, void* arg) rdev->res = mysql_use_result(rdev->hnd); if (MIO_UNLIKELY(!rdev->res)) { - mio_seterrbfmt (mio, MIO_ESYSERR, "%s", mysql_error(rdev->hnd)); + mio_seterrbfmt (mio, MIO_ESYSERR, "%hs", mysql_error(rdev->hnd)); return -1; } } @@ -298,10 +331,11 @@ static int dev_evcb_mar_ready (mio_dev_t* dev, int events) switch (MIO_DEV_MAR_GET_PROGRESS(rdev)) { case MIO_DEV_MAR_CONNECTING: - - if (rdev->connected) + if (rdev->connected_deferred) { - rdev->connected = 0; + /* connection esablished dev_mar_ioctl() but postponed to this function */ + rdev->connected_deferred = 0; + rdev->connected = 1; /* really connected */ MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_CONNECTED); if (rdev->on_connect) rdev->on_connect (rdev); } @@ -309,40 +343,100 @@ static int dev_evcb_mar_ready (mio_dev_t* dev, int events) { int status; MYSQL* tmp; + mio_syshnd_t syshnd; + syshnd = mysql_get_socket(rdev->hnd); /* ugly hack for handling a socket closed b y mysql_real_connect_cont() */ status = mysql_real_connect_cont(&tmp, rdev->hnd, events_to_mysql_wstatus(events)); - watch_mysql (rdev, status); - if (!status) + if (status) { - /* connected */ - MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_CONNECTED); - if (rdev->on_connect) rdev->on_connect (rdev); + /* connection in progress */ + watch_mysql (rdev, status); + } + else + { + /* connection completed. */ + if (tmp) + { + /* established ok */ + watch_mysql (rdev, status); + + rdev->connected = 1; /* really connected */ + MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_CONNECTED); + if (rdev->on_connect) rdev->on_connect (rdev); + } + else + { + /* connection attempt failed */ + + rdev->broken = 1; /* trick dev_mar_getsyshnd() to return rdev->broken_syshnd. */ + rdev->broken_syshnd = syshnd; /* mysql_get_socket() over a failed mariadb handle ends up with segfault */ + + /* this attempts to trigger the low-level multiplxer to delete 'syshnd' closed by mysql_real_connect_cont(). + * the underlying low-level operation may fail. but i don't care. the best is not to open + * new file descriptor between mysql_real_connect_cont() and watch_mysql(rdev, 0). + * + * close(6); <- mysql_real_connect_cont(); + * epoll_ctl(4, EPOLL_CTL_DEL, 6, 0x7ffc785e7154) = -1 EBADF (Bad file descriptor) <- by mio_dev_watch() in watch_mysql + */ + watch_mysql (rdev, 0); + + /* on_disconnect() will be called without on_connect(). + * you may assume that the initial connectinon attempt failed. + * reconnectin doesn't apply in this context. */ + mio_dev_mar_halt (rdev); + } } } break; case MIO_DEV_MAR_QUERY_STARTING: - if (rdev->query_started) + if (rdev->query_started_deferred) { - rdev->query_started = 0; + rdev->query_started_deferred = 0; MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_QUERY_STARTED); - if (rdev->on_query_started) rdev->on_query_started (rdev, rdev->query_ret, (rdev->query_ret? rdev->errbuf: MIO_NULL)); + if (rdev->on_query_started) rdev->on_query_started (rdev, 0, MIO_NULL); } else { - int status; - int tmp; + int status, err; + mio_syshnd_t syshnd; - status = mysql_real_query_cont(&tmp, rdev->hnd, events_to_mysql_wstatus(events)); - watch_mysql (rdev, status); + syshnd = mysql_get_socket(rdev->hnd); + status = mysql_real_query_cont(&err, rdev->hnd, events_to_mysql_wstatus(events)); - if (!status) + if (status) { - /* query sent */ - MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_QUERY_STARTED); - if (tmp == 1) tmp = mysql_errno(rdev->hnd); /* tmp is set to 1 by mariadb-connector-c 3.1 as of this writing. let me work around it by fetching the error code */ - if (rdev->on_query_started) rdev->on_query_started (rdev, tmp, (tmp? mysql_error(rdev->hnd): MIO_NULL)); + watch_mysql (rdev, status); + } + else + { + if (err) + { + /* query send failure */ + if (err == 1) err = mysql_errno(rdev->hnd); /* err is set to 1 by mariadb-connector-c 3.1 as of this writing. let me work around it by fetching the error code */ + + if (err == CR_SERVER_LOST || err == CR_SERVER_GONE_ERROR) + { + rdev->broken = 1; + rdev->broken_syshnd = syshnd; + watch_mysql (rdev, 0); + mio_dev_mar_halt (rdev); /* i can't keep this device alive regardless of the caller's post-action */ + /* don't invoke on_query_started(). in this case, on_disconnect() will be called later */ + } + else + { + /* query not sent for other reasons. probably nothing to watch? */ + watch_mysql (rdev, 0); /* TODO: use status instead of 0? is status reliable in this context? */ + if (rdev->on_query_started) rdev->on_query_started (rdev, err, mysql_error(rdev->hnd)); + } + } + else + { + /* query really sent */ + MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_QUERY_STARTED); + if (rdev->on_query_started) rdev->on_query_started (rdev, 0, MIO_NULL); + } } } @@ -404,7 +498,7 @@ static int dev_evcb_mar_ready (mio_dev_t* dev, int events) } default: - mio_seterrbfmt (mio, MIO_EINTERN, "invalid progress state in mar"); + mio_seterrbfmt (mio, MIO_EINTERN, "invalid progress value in mar"); return -1; } diff --git a/mio/lib/mio-json.h b/mio/lib/mio-json.h index 26007da..06a2b74 100644 --- a/mio/lib/mio-json.h +++ b/mio/lib/mio-json.h @@ -167,7 +167,7 @@ struct mio_jsonwr_t mio_jsonwr_state_node_t* state_stack; int flags; - void* wctx; + void* wctx; mio_bch_t wbuf[8192]; mio_oow_t wbuf_len; }; diff --git a/mio/lib/mio-mar.h b/mio/lib/mio-mar.h index a51b22f..4b1ff2d 100644 --- a/mio/lib/mio-mar.h +++ b/mio/lib/mio-mar.h @@ -31,39 +31,20 @@ typedef struct mio_dev_mar_t mio_dev_mar_t; -enum mio_dev_mar_state_t +enum mio_dev_mar_progress_t { - /* the following items(progress bits) are mutually exclusive */ - MIO_DEV_MAR_CONNECTING = (1 << 0), - MIO_DEV_MAR_CONNECTED = (1 << 1), - MIO_DEV_MAR_QUERY_STARTING = (1 << 2), - MIO_DEV_MAR_QUERY_STARTED = (1 << 3), - MIO_DEV_MAR_ROW_FETCHING = (1 << 4), - MIO_DEV_MAR_ROW_FETCHED = (1 << 5), - - -#if 0 - /* the following items can be bitwise-ORed with an exclusive item above */ - MIO_DEV_MAR_LENIENT = (1 << 14), - MIO_DEV_MAR_INTERCEPTED = (1 << 15), -#endif - - /* convenience bit masks */ - MIO_DEV_MAR_ALL_PROGRESS_BITS = (MIO_DEV_MAR_CONNECTING | - MIO_DEV_MAR_CONNECTED | - MIO_DEV_MAR_QUERY_STARTING | - MIO_DEV_MAR_QUERY_STARTED | - MIO_DEV_MAR_ROW_FETCHING | - MIO_DEV_MAR_ROW_FETCHED) + MIO_DEV_MAR_INITIAL, + MIO_DEV_MAR_CONNECTING, + MIO_DEV_MAR_CONNECTED, + MIO_DEV_MAR_QUERY_STARTING, + MIO_DEV_MAR_QUERY_STARTED, + MIO_DEV_MAR_ROW_FETCHING, + MIO_DEV_MAR_ROW_FETCHED }; -typedef enum mio_dev_mar_state_t mio_dev_mar_state_t; +typedef enum mio_dev_mar_progress_t mio_dev_mar_progress_t; -#define MIO_DEV_MAR_SET_PROGRESS(dev,bit) do { \ - (dev)->state &= ~MIO_DEV_MAR_ALL_PROGRESS_BITS; \ - (dev)->state |= (bit); \ -} while(0) - -#define MIO_DEV_MAR_GET_PROGRESS(dev) ((dev)->state & MIO_DEV_MAR_ALL_PROGRESS_BITS) +#define MIO_DEV_MAR_SET_PROGRESS(dev,value) ((dev)->progress = (value)) +#define MIO_DEV_MAR_GET_PROGRESS(dev) ((dev)->progress) typedef int (*mio_dev_mar_on_read_t) ( @@ -103,14 +84,16 @@ struct mio_dev_mar_t void* hnd; void* res; - int state; + mio_dev_mar_progress_t progress; - unsigned int connected; - unsigned int query_started; - unsigned int row_fetched; + unsigned int connected: 1; + unsigned int connected_deferred: 1; + unsigned int query_started_deferred: 1; + //unsigned int query_started: 1; + unsigned int row_fetched: 1; + unsigned int broken: 1; + mio_syshnd_t broken_syshnd; - char errbuf[256]; - int query_ret; int row_wstatus; void* row; @@ -122,9 +105,16 @@ struct mio_dev_mar_t mio_dev_mar_on_row_fetched_t on_row_fetched; }; +enum mio_dev_mar_make_flag_t +{ + MIO_DEV_MAR_UNUSED_YET = (1 << 0) +}; +typedef enum mio_dev_mar_make_flag_t mio_dev_mar_make_flag_t; + typedef struct mio_dev_mar_make_t mio_dev_mar_make_t; struct mio_dev_mar_make_t { + int flags; mio_dev_mar_on_write_t on_write; /* mandatory */ mio_dev_mar_on_read_t on_read; /* mandatory */ mio_dev_mar_on_connect_t on_connect; /* optional */ diff --git a/mio/lib/sck.c b/mio/lib/sck.c index 1d00145..2c99e96 100644 --- a/mio/lib/sck.c +++ b/mio/lib/sck.c @@ -259,7 +259,7 @@ static int schedule_timer_job_at (mio_dev_sck_t* dev, const mio_ntime_t* fire_at tmrjob.idxptr = &dev->tmrjob_index; MIO_ASSERT (dev->mio, dev->tmrjob_index == MIO_TMRIDX_INVALID); - dev->tmrjob_index = mio_instmrjob (dev->mio, &tmrjob); + dev->tmrjob_index = mio_instmrjob(dev->mio, &tmrjob); return dev->tmrjob_index == MIO_TMRIDX_INVALID? -1: 0; } @@ -1196,7 +1196,7 @@ static int harvest_outgoing_connection (mio_dev_sck_t* rdev) int x; MIO_ASSERT (mio, !rdev->ssl); /* must not be SSL-connected yet */ - x = connect_ssl (rdev); + x = connect_ssl(rdev); if (x <= -1) return -1; if (x == 0) { @@ -1540,7 +1540,6 @@ static int dev_evcb_sck_ready_stateful (mio_dev_t* dev, int events) } MIO_DEV_SCK_SET_PROGRESS (rdev, MIO_DEV_SCK_ACCEPTED); - /*if (rdev->on_connect(rdev) <= -1) mio_dev_sck_halt (rdev);*/ if (rdev->on_connect) rdev->on_connect (rdev); return 0;