fixed quite some issues mar.c and mar-cli.c regarding broken underlying sockets in libmariadbclient

This commit is contained in:
hyung-hwan 2020-06-24 03:00:46 +00:00
parent dce9353bc8
commit 9b429cfd3c
6 changed files with 316 additions and 119 deletions

View File

@ -3,6 +3,7 @@
#include <mio-mar.h> #include <mio-mar.h>
#include <stdio.h> #include <stdio.h>
#include <string.h> #include <string.h>
#include <signal.h>
#include <mariadb/mysql.h> #include <mariadb/mysql.h>
@ -146,6 +147,40 @@ printf ("[%lu] NO DATA..\n", sid);
} }
} }
static mio_t* g_mio = MIO_NULL;
static void handle_signal (int sig)
{
mio_stop (g_mio, MIO_STOPREQ_TERMINATION);
}
static void send_test_query (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job)
{
mio_svc_marc_t* marc = (mio_svc_marc_t*)job->ctx;
if (mio_svc_mar_querywithbchars(marc, 0, MIO_SVC_MARC_QTYPE_SELECT, "SHOW STATUS", 11, on_result, MIO_NULL) <= -1)
{
MIO_INFO1 (mio, "FAILED TO SEND QUERY - %js\n", mio_geterrmsg(mio));
}
}
static int schedule_timer_job_after (mio_t* mio, const mio_ntime_t* fire_after, mio_tmrjob_handler_t handler, void* ctx)
{
mio_tmrjob_t tmrjob;
memset (&tmrjob, 0, MIO_SIZEOF(tmrjob));
tmrjob.ctx = ctx;
mio_gettime (mio, &tmrjob.when);
MIO_ADD_NTIME (&tmrjob.when, &tmrjob.when, fire_after);
tmrjob.handler = handler;
tmrjob.idxptr = MIO_NULL;
return mio_instmrjob(mio, &tmrjob);
}
int main (int argc, char* argv[]) int main (int argc, char* argv[])
{ {
@ -166,7 +201,6 @@ int main (int argc, char* argv[])
goto oops; goto oops;
} }
memset (&ci, 0, MIO_SIZEOF(ci)); memset (&ci, 0, MIO_SIZEOF(ci));
ci.host = argv[1]; ci.host = argv[1];
ci.port = 3306; /* TODO: argv[2]; */ ci.port = 3306; /* TODO: argv[2]; */
@ -209,9 +243,23 @@ int main (int argc, char* argv[])
} }
#endif #endif
mio_loop (mio); g_mio = mio;
signal (SIGINT, handle_signal);
/* ---------------------------------------- */
{
mio_ntime_t x;
MIO_INIT_NTIME (&x, 32, 0);
schedule_timer_job_after (mio, &x, send_test_query, marc);
mio_loop (mio);
}
/* ---------------------------------------- */
signal (SIGINT, SIG_IGN);
g_mio = MIO_NULL;
oops: oops:
printf ("about to close mio...\n");
if (mio) mio_close (mio); if (mio) mio_close (mio);
return 0; return 0;
} }

View File

@ -28,6 +28,7 @@
#include "mio-prv.h" #include "mio-prv.h"
#include <mariadb/mysql.h> #include <mariadb/mysql.h>
#include <mariadb/errmsg.h>
typedef struct sess_t sess_t; typedef struct sess_t sess_t;
typedef struct sess_qry_t sess_qry_t; typedef struct sess_qry_t sess_qry_t;
@ -37,13 +38,13 @@ struct mio_svc_marc_t
MIO_SVC_HEADER; MIO_SVC_HEADER;
mio_svc_marc_connect_t ci; mio_svc_marc_connect_t ci;
int stopping;
struct struct
{ {
sess_t* ptr; sess_t* ptr;
mio_oow_t capa; mio_oow_t capa;
} sess; } sess;
}; };
struct sess_qry_t struct sess_qry_t
@ -77,7 +78,6 @@ struct dev_xtn_t
}; };
mio_svc_marc_t* mio_svc_marc_start (mio_t* mio, const mio_svc_marc_connect_t* ci) mio_svc_marc_t* mio_svc_marc_start (mio_t* mio, const mio_svc_marc_connect_t* ci)
{ {
mio_svc_marc_t* marc = MIO_NULL; mio_svc_marc_t* marc = MIO_NULL;
@ -103,6 +103,15 @@ oops:
void mio_svc_marc_stop (mio_svc_marc_t* marc) void mio_svc_marc_stop (mio_svc_marc_t* marc)
{ {
mio_t* mio = marc->mio; mio_t* mio = marc->mio;
mio_oow_t i;
marc->stopping = 1;
for (i = 0; i < marc->sess.capa; i++)
{
if (marc->sess.ptr[i].dev) mio_dev_mar_kill (marc->sess.ptr[i].dev);
}
mio_freemem (mio, marc->sess.ptr);
MIO_SVCL_UNLINK_SVC (marc); MIO_SVCL_UNLINK_SVC (marc);
mio_freemem (mio, marc); mio_freemem (mio, marc);
@ -171,8 +180,10 @@ static int send_pending_query_if_any (sess_t* sess)
printf ("sending... %.*s\n", (int)sq->qlen, sq->qptr); printf ("sending... %.*s\n", (int)sq->qlen, sq->qptr);
if (mio_dev_mar_querywithbchars(sess->dev, sq->qptr, sq->qlen) <= -1) if (mio_dev_mar_querywithbchars(sess->dev, sq->qptr, sq->qlen) <= -1)
{ {
MIO_DEBUG1 (sess->svc->mio, "QQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQQ SEND FAIL %js\n", mio_geterrmsg(sess->svc->mio));
sq->sent = 0; sq->sent = 0;
return -1; /* failure */ mio_dev_mar_halt (sess->dev); /* this device can't carray on */
return -1; /* halted the device for failure */
} }
return 1; /* sent */ return 1; /* sent */
@ -183,32 +194,74 @@ printf ("sending... %.*s\n", (int)sq->qlen, sq->qptr);
} }
/* ------------------------------------------------------------------- */ /* ------------------------------------------------------------------- */
static mio_dev_mar_t* alloc_device (mio_svc_marc_t* marc, sess_t* sess);
static void mar_on_disconnect (mio_dev_mar_t* dev) static void mar_on_disconnect (mio_dev_mar_t* dev)
{ {
mio_t* mio = dev->mio;
dev_xtn_t* xtn = (dev_xtn_t*)mio_dev_mar_getxtn(dev); dev_xtn_t* xtn = (dev_xtn_t*)mio_dev_mar_getxtn(dev);
sess_t* sess = xtn->sess; sess_t* sess = xtn->sess;
printf ("disconnected on sid %d\n", sess->sid); MIO_DEBUG6 (mio, "MARC(%p) - device disconnected - sid %lu session %p session-connected %d device %p device-broken %d\n", sess->svc, (unsigned long int)sess->sid, sess, (int)sess->connected, dev, (int)dev->broken);
MIO_ASSERT (mio, dev == sess->dev);
if (MIO_UNLIKELY(!sess->svc->stopping && mio->stopreq == MIO_STOPREQ_NONE))
{
if (sess->connected && sess->dev->broken) /* risk of infinite cycle if the underlying db suffers never-ending 'broken' issue after getting connected */
{
/* restart the dead device */
mio_dev_mar_t* dev;
sess->connected = 0;
dev = alloc_device(sess->svc, sess);
if (MIO_LIKELY(dev))
{
sess->dev = dev;
/* the pending query will be sent in on_connect() */
return;
}
/* if device allocation fails, just carry on */
}
}
sess->connected = 0; sess->connected = 0;
while (1)
{
sess_qry_t* sq;
mio_svc_marc_dev_error_t err;
sq = get_first_session_query(sess);
if (!sq) break;
/* what is the best error code and message to use for this? */
err.mar_errcode = CR_SERVER_LOST;
err.mar_errmsg = "server lost";
sq->on_result (sess->svc, sess->sid, MIO_SVC_MARC_RCODE_ERROR, &err, sq->qctx);
dequeue_session_query (mio, sess);
}
/* it should point to a placeholder node(either the initial one or the transited one after dequeing */
MIO_ASSERT (mio, sess->q_head == sess->q_tail);
MIO_ASSERT (mio, sess->q_head->sq_next == MIO_NULL);
free_session_query (mio, sess->q_head);
sess->q_head = sess->q_tail = MIO_NULL;
sess->dev = MIO_NULL; sess->dev = MIO_NULL;
//if (there is a query which has been sent but not processed... ) <--- can this be handled by the caller?
} }
static void mar_on_connect (mio_dev_mar_t* dev) static void mar_on_connect (mio_dev_mar_t* dev)
{ {
mio_t* mio = dev->mio;
dev_xtn_t* xtn = (dev_xtn_t*)mio_dev_mar_getxtn(dev); dev_xtn_t* xtn = (dev_xtn_t*)mio_dev_mar_getxtn(dev);
sess_t* sess = xtn->sess; sess_t* sess = xtn->sess;
sess->connected = 1; MIO_DEBUG5 (mio, "MARC(%p) - device connected - sid %lu session %p device %p device-broken %d\n", sess->svc, (unsigned long int)sess->sid, sess, dev, dev->broken);
printf ("connected on sid %d\n", sess->sid);
if (send_pending_query_if_any (sess) <= -1) sess->connected = 1;
{ send_pending_query_if_any (sess);
mio_dev_mar_halt (sess->dev);
}
} }
static void mar_on_query_started (mio_dev_mar_t* dev, int mar_ret, const mio_bch_t* mar_errmsg) static void mar_on_query_started (mio_dev_mar_t* dev, int mar_ret, const mio_bch_t* mar_errmsg)
@ -221,10 +274,6 @@ static void mar_on_query_started (mio_dev_mar_t* dev, int mar_ret, const mio_bch
{ {
mio_svc_marc_dev_error_t err; mio_svc_marc_dev_error_t err;
printf ("QUERY FAILED...%d -> %s\n", mar_ret, mar_errmsg); printf ("QUERY FAILED...%d -> %s\n", mar_ret, mar_errmsg);
#if 0
if (mar_ret == CR_SERVER_GONE_ERROR || /* server gone away between queries */
mar_ret == CR_SERVER_LOST) /* server gone away during a query */
#endif
err.mar_errcode = mar_ret; err.mar_errcode = mar_ret;
err.mar_errmsg = mar_errmsg; err.mar_errmsg = mar_errmsg;
@ -246,9 +295,7 @@ printf ("QUERY STARTED\n");
} }
else else
{ {
if (sq->on_result) sq->on_result (sess->svc, sess->sid, MIO_SVC_MARC_RCODE_DONE, MIO_NULL, sq->qctx);
sq->on_result (sess->svc, sess->sid, MIO_SVC_MARC_RCODE_DONE, MIO_NULL, sq->qctx);
dequeue_session_query (sess->svc->mio, sess); dequeue_session_query (sess->svc->mio, sess);
send_pending_query_if_any (sess); send_pending_query_if_any (sess);
} }
@ -261,10 +308,7 @@ static void mar_on_row_fetched (mio_dev_mar_t* dev, void* data)
sess_t* sess = xtn->sess; sess_t* sess = xtn->sess;
sess_qry_t* sq = get_first_session_query(sess); sess_qry_t* sq = get_first_session_query(sess);
if (sq->on_result) sq->on_result (sess->svc, sess->sid, (data? MIO_SVC_MARC_RCODE_ROW: MIO_SVC_MARC_RCODE_DONE), data, sq->qctx);
{
sq->on_result (sess->svc, sess->sid, (data? MIO_SVC_MARC_RCODE_ROW: MIO_SVC_MARC_RCODE_DONE), data, sq->qctx);
}
if (!data) if (!data)
{ {
@ -278,7 +322,6 @@ static mio_dev_mar_t* alloc_device (mio_svc_marc_t* marc, sess_t* sess)
mio_t* mio = (mio_t*)marc->mio; mio_t* mio = (mio_t*)marc->mio;
mio_dev_mar_t* mar; mio_dev_mar_t* mar;
mio_dev_mar_make_t mi; mio_dev_mar_make_t mi;
mio_dev_mar_connect_t ci;
dev_xtn_t* xtn; dev_xtn_t* xtn;
MIO_MEMSET (&mi, 0, MIO_SIZEOF(mi)); MIO_MEMSET (&mi, 0, MIO_SIZEOF(mi));
@ -347,7 +390,26 @@ static sess_t* get_session (mio_svc_marc_t* marc, mio_oow_t sid)
return MIO_NULL; return MIO_NULL;
} }
/* queue initialization with a place holder */ /* queue initialization with a place holder. the queue maintains a placeholder node.
* the first actual data node enqueued is inserted at the back and becomes the second
* node in terms of the entire queue.
*
* PH -> DN1 -> DN2 -> ... -> DNX
* ^ ^
* q_head q_tail
*
* get_first_session_query() returns the data of DN1, not the data held in PH.
*
* the first dequeing operations kills PH.
*
* DN1 -> DN2 -> ... -> DNX
* ^ ^
* q_head q_tail
*
* get_first_session_query() at this point returns the data of DN2, not the data held in DN1.
* dequeing kills DN1, however.
*/
sess->q_head = sess->q_tail = sq; sess->q_head = sess->q_tail = sq;
} }
@ -367,33 +429,37 @@ int mio_svc_mar_querywithbchars (mio_svc_marc_t* marc, mio_oow_t sid, mio_svc_ma
sq = make_session_query(mio, qtype, qptr, qlen, qctx, on_result); sq = make_session_query(mio, qtype, qptr, qlen, qctx, on_result);
if (MIO_UNLIKELY(!sq)) return -1; if (MIO_UNLIKELY(!sq)) return -1;
if (get_first_session_query(sess)) if (get_first_session_query(sess) || !sess->connected)
{ {
printf ("XXXXXXXXXx\n");
/* there are other ongoing queries */ /* there are other ongoing queries */
enqueue_session_query (sess, sq); enqueue_session_query (sess, sq);
} }
else else
{ {
/* this is the first query */ /* this is the first query or the device is not connected yet */
sess_qry_t* old_q_tail = sess->q_tail; sess_qry_t* old_q_tail = sess->q_tail;
enqueue_session_query (sess, sq); enqueue_session_query (sess, sq);
printf ("YYYYYYYYYYYYYY\n"); MIO_ASSERT (mio, sq->sent == 0);
if(sess->dev->connected && !sq->sent)
{
sq->sent = 1;
if (mio_dev_mar_querywithbchars(sess->dev, qptr, qlen) <= -1)
{
sq->sent = 0;
/* ugly to unlink the the last item added */ sq->sent = 1;
if (mio_dev_mar_querywithbchars(sess->dev, sq->qptr, sq->qlen) <= -1)
{
sq->sent = 0;
if (!sess->dev->broken)
{
/* unlink the the last item added */
old_q_tail->sq_next = MIO_NULL; old_q_tail->sq_next = MIO_NULL;
sess->q_tail = old_q_tail; sess->q_tail = old_q_tail;
free_session_query (mio, sq); free_session_query (mio, sq);
return -1; return -1;
} }
/* the underlying socket of the device may get disconnected.
* in such a case, keep the enqueued query with sq->sent 0
* and defer actual sending and processing */
} }
} }

View File

@ -28,7 +28,7 @@
#include "mio-prv.h" #include "mio-prv.h"
#include <mariadb/mysql.h> #include <mariadb/mysql.h>
#include <mariadb/errmsg.h>
/* ========================================================================= */ /* ========================================================================= */
@ -47,14 +47,14 @@ static int dev_mar_make (mio_dev_t* dev, void* ctx)
if (mysql_options(rdev->hnd, MYSQL_OPT_NONBLOCK, 0) != 0) if (mysql_options(rdev->hnd, MYSQL_OPT_NONBLOCK, 0) != 0)
{ {
mio_seterrbfmt (mio, MIO_ESYSERR, "%s", mysql_error(rdev->hnd)); mio_seterrbfmt (mio, MIO_ESYSERR, "%hs", mysql_error(rdev->hnd));
mysql_close (rdev->hnd); mysql_close (rdev->hnd);
rdev->hnd = MIO_NULL; rdev->hnd = MIO_NULL;
return -1; return -1;
} }
{ {
my_bool x = 0; my_bool x = 0; /* no auto-reconnect */
mysql_options(rdev->hnd, MYSQL_OPT_RECONNECT, &x); mysql_options(rdev->hnd, MYSQL_OPT_RECONNECT, &x);
} }
@ -65,6 +65,9 @@ static int dev_mar_make (mio_dev_t* dev, void* ctx)
rdev->on_disconnect = mi->on_disconnect; rdev->on_disconnect = mi->on_disconnect;
rdev->on_query_started = mi->on_query_started; rdev->on_query_started = mi->on_query_started;
rdev->on_row_fetched = mi->on_row_fetched; rdev->on_row_fetched = mi->on_row_fetched;
rdev->progress = MIO_DEV_MAR_INITIAL;
return 0; return 0;
} }
@ -73,6 +76,8 @@ static int dev_mar_kill (mio_dev_t* dev, int force)
/*mio_t* mio = dev->mio;*/ /*mio_t* mio = dev->mio;*/
mio_dev_mar_t* rdev = (mio_dev_mar_t*)dev; mio_dev_mar_t* rdev = (mio_dev_mar_t*)dev;
/* if rdev->connected is 0 at this point,
* the underlying socket of this device is down */
if (rdev->on_disconnect) rdev->on_disconnect (rdev); if (rdev->on_disconnect) rdev->on_disconnect (rdev);
if (rdev->res) if (rdev->res)
@ -86,12 +91,16 @@ static int dev_mar_kill (mio_dev_t* dev, int force)
rdev->hnd = MIO_NULL; rdev->hnd = MIO_NULL;
} }
rdev->connected = 0;
rdev->broken = 0;
return 0; return 0;
} }
static mio_syshnd_t dev_mar_getsyshnd (mio_dev_t* dev) static mio_syshnd_t dev_mar_getsyshnd (mio_dev_t* dev)
{ {
mio_dev_mar_t* rdev = (mio_dev_mar_t*)dev; mio_dev_mar_t* rdev = (mio_dev_mar_t*)dev;
if (rdev->broken) return rdev->broken_syshnd; /* hack!! */
return (mio_syshnd_t)mysql_get_socket(rdev->hnd); return (mio_syshnd_t)mysql_get_socket(rdev->hnd);
} }
@ -123,7 +132,6 @@ static MIO_INLINE void watch_mysql (mio_dev_mar_t* rdev, int wstatus)
} }
} }
static void start_fetch_row (mio_dev_mar_t* rdev) static void start_fetch_row (mio_dev_mar_t* rdev)
{ {
MYSQL_ROW row; MYSQL_ROW row;
@ -147,7 +155,6 @@ static void start_fetch_row (mio_dev_mar_t* rdev)
} }
} }
static int dev_mar_ioctl (mio_dev_t* dev, int cmd, void* arg) static int dev_mar_ioctl (mio_dev_t* dev, int cmd, void* arg)
{ {
mio_t* mio = dev->mio; mio_t* mio = dev->mio;
@ -158,38 +165,41 @@ static int dev_mar_ioctl (mio_dev_t* dev, int cmd, void* arg)
case MIO_DEV_MAR_CONNECT: case MIO_DEV_MAR_CONNECT:
{ {
mio_dev_mar_connect_t* ci = (mio_dev_mar_connect_t*)arg; mio_dev_mar_connect_t* ci = (mio_dev_mar_connect_t*)arg;
MYSQL* ret; MYSQL* tmp;
int status; int status;
if (MIO_DEV_MAR_GET_PROGRESS(rdev) != MIO_DEV_MAR_INITIAL)
if (MIO_DEV_MAR_GET_PROGRESS(rdev))
{ {
/* can't connect again */ /* can't connect again */
mio_seterrbfmt (mio, MIO_EPERM, "operation in progress. disallowed to connect again"); mio_seterrbfmt (mio, MIO_EPERM, "operation in progress. disallowed to connect again");
return -1; return -1;
} }
status = mysql_real_connect_start(&ret, rdev->hnd, ci->host, ci->username, ci->password, ci->dbname, ci->port, MIO_NULL, 0); MIO_ASSERT (mio, rdev->connected_deferred == 0);
status = mysql_real_connect_start(&tmp, rdev->hnd, ci->host, ci->username, ci->password, ci->dbname, ci->port, MIO_NULL, 0);
rdev->dev_cap &= ~MIO_DEV_CAP_VIRTUAL; /* a socket is created in mysql_real_connect_start() */ rdev->dev_cap &= ~MIO_DEV_CAP_VIRTUAL; /* a socket is created in mysql_real_connect_start() */
if (status) if (status)
{ {
/* not connected */ /* not connected */
MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_CONNECTING); MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_CONNECTING);
rdev->connected = 0;
watch_mysql (rdev, status); watch_mysql (rdev, status);
} }
else else
{ {
/* connected immediately */ if (MIO_UNLIKELY(!tmp)) /* connection attempt failed immediately */
if (MIO_UNLIKELY(!ret))
{ {
mio_seterrbfmt (mio, MIO_ESYSERR, "%s", mysql_error(rdev->hnd)); /* immediate failure doesn't invoke on_discoonect().
* the caller must check the return code of this function. */
mio_seterrbfmt (mio, MIO_ESYSERR, "%hs", mysql_error(rdev->hnd));
return -1; return -1;
} }
/* connected_deferred immediately. postpone actual handling to the ready() callback */
MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_CONNECTING);
rdev->connected_deferred = 1; /* to let the ready() handler to trigger on_connect() */
/* regiter it in the multiplexer so that the ready() handler is /* regiter it in the multiplexer so that the ready() handler is
* invoked to call the on_connect() callback */ * invoked to call the on_connect() callback */
rdev->connected = 1;
watch_mysql (rdev, MYSQL_WAIT_READ | MYSQL_WAIT_WRITE); /* TODO: verify this */ watch_mysql (rdev, MYSQL_WAIT_READ | MYSQL_WAIT_WRITE); /* TODO: verify this */
} }
return 0; return 0;
@ -199,6 +209,13 @@ static int dev_mar_ioctl (mio_dev_t* dev, int cmd, void* arg)
{ {
const mio_bcs_t* qstr = (const mio_bcs_t*)arg; const mio_bcs_t* qstr = (const mio_bcs_t*)arg;
int err, status; int err, status;
mio_syshnd_t syshnd;
if (!rdev->connected)
{
mio_seterrbfmt (mio, MIO_EPERM, "not connected. disallowed to query");
return -1;
}
if (rdev->res) /* TODO: more accurate check */ if (rdev->res) /* TODO: more accurate check */
{ {
@ -206,12 +223,13 @@ static int dev_mar_ioctl (mio_dev_t* dev, int cmd, void* arg)
return -1; return -1;
} }
syshnd = mysql_get_socket(rdev->hnd);
status = mysql_real_query_start(&err, rdev->hnd, qstr->ptr, qstr->len); status = mysql_real_query_start(&err, rdev->hnd, qstr->ptr, qstr->len);
MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_QUERY_STARTING); MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_QUERY_STARTING);
if (status) if (status)
{ {
/* not done */ /* not done */
rdev->query_started = 0;
watch_mysql (rdev, status); watch_mysql (rdev, status);
} }
else else
@ -219,11 +237,26 @@ static int dev_mar_ioctl (mio_dev_t* dev, int cmd, void* arg)
/* query sent immediately */ /* query sent immediately */
if (err) if (err)
{ {
/* but there is an error */
if (err == 1) err = mysql_errno(rdev->hnd); if (err == 1) err = mysql_errno(rdev->hnd);
mio_copy_bcstr (rdev->errbuf, MIO_COUNTOF(rdev->errbuf), mysql_error(rdev->hnd));
mio_seterrbfmt (mio, MIO_ESYSERR, "%hs", mysql_error(rdev->hnd));
if (err == CR_SERVER_LOST || err == CR_SERVER_GONE_ERROR)
{
/* the underlying socket must have gotten closed by mysql_real_query_start() */
const mio_ooch_t* prev_errmsg;
prev_errmsg = mio_backuperrmsg(mio);
rdev->broken = 1;
rdev->broken_syshnd = syshnd;
watch_mysql (rdev, 0);
mio_dev_mar_halt (rdev); /* i can't keep this device alive regardless of the caller's post-action */
mio_seterrbfmt (mio, MIO_ESYSERR, "%js", prev_errmsg);
}
return -1;
} }
rdev->query_started = 1;
rdev->query_ret = err; /* sent without an error */
rdev->query_started_deferred = 1;
watch_mysql (rdev, MYSQL_WAIT_READ | MYSQL_WAIT_WRITE); watch_mysql (rdev, MYSQL_WAIT_READ | MYSQL_WAIT_WRITE);
} }
return 0; return 0;
@ -236,7 +269,7 @@ static int dev_mar_ioctl (mio_dev_t* dev, int cmd, void* arg)
rdev->res = mysql_use_result(rdev->hnd); rdev->res = mysql_use_result(rdev->hnd);
if (MIO_UNLIKELY(!rdev->res)) if (MIO_UNLIKELY(!rdev->res))
{ {
mio_seterrbfmt (mio, MIO_ESYSERR, "%s", mysql_error(rdev->hnd)); mio_seterrbfmt (mio, MIO_ESYSERR, "%hs", mysql_error(rdev->hnd));
return -1; return -1;
} }
} }
@ -298,10 +331,11 @@ static int dev_evcb_mar_ready (mio_dev_t* dev, int events)
switch (MIO_DEV_MAR_GET_PROGRESS(rdev)) switch (MIO_DEV_MAR_GET_PROGRESS(rdev))
{ {
case MIO_DEV_MAR_CONNECTING: case MIO_DEV_MAR_CONNECTING:
if (rdev->connected_deferred)
if (rdev->connected)
{ {
rdev->connected = 0; /* connection esablished dev_mar_ioctl() but postponed to this function */
rdev->connected_deferred = 0;
rdev->connected = 1; /* really connected */
MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_CONNECTED); MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_CONNECTED);
if (rdev->on_connect) rdev->on_connect (rdev); if (rdev->on_connect) rdev->on_connect (rdev);
} }
@ -309,40 +343,100 @@ static int dev_evcb_mar_ready (mio_dev_t* dev, int events)
{ {
int status; int status;
MYSQL* tmp; MYSQL* tmp;
mio_syshnd_t syshnd;
syshnd = mysql_get_socket(rdev->hnd); /* ugly hack for handling a socket closed b y mysql_real_connect_cont() */
status = mysql_real_connect_cont(&tmp, rdev->hnd, events_to_mysql_wstatus(events)); status = mysql_real_connect_cont(&tmp, rdev->hnd, events_to_mysql_wstatus(events));
watch_mysql (rdev, status);
if (!status) if (status)
{ {
/* connected */ /* connection in progress */
MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_CONNECTED); watch_mysql (rdev, status);
if (rdev->on_connect) rdev->on_connect (rdev); }
else
{
/* connection completed. */
if (tmp)
{
/* established ok */
watch_mysql (rdev, status);
rdev->connected = 1; /* really connected */
MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_CONNECTED);
if (rdev->on_connect) rdev->on_connect (rdev);
}
else
{
/* connection attempt failed */
rdev->broken = 1; /* trick dev_mar_getsyshnd() to return rdev->broken_syshnd. */
rdev->broken_syshnd = syshnd; /* mysql_get_socket() over a failed mariadb handle ends up with segfault */
/* this attempts to trigger the low-level multiplxer to delete 'syshnd' closed by mysql_real_connect_cont().
* the underlying low-level operation may fail. but i don't care. the best is not to open
* new file descriptor between mysql_real_connect_cont() and watch_mysql(rdev, 0).
*
* close(6); <- mysql_real_connect_cont();
* epoll_ctl(4, EPOLL_CTL_DEL, 6, 0x7ffc785e7154) = -1 EBADF (Bad file descriptor) <- by mio_dev_watch() in watch_mysql
*/
watch_mysql (rdev, 0);
/* on_disconnect() will be called without on_connect().
* you may assume that the initial connectinon attempt failed.
* reconnectin doesn't apply in this context. */
mio_dev_mar_halt (rdev);
}
} }
} }
break; break;
case MIO_DEV_MAR_QUERY_STARTING: case MIO_DEV_MAR_QUERY_STARTING:
if (rdev->query_started) if (rdev->query_started_deferred)
{ {
rdev->query_started = 0; rdev->query_started_deferred = 0;
MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_QUERY_STARTED); MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_QUERY_STARTED);
if (rdev->on_query_started) rdev->on_query_started (rdev, rdev->query_ret, (rdev->query_ret? rdev->errbuf: MIO_NULL)); if (rdev->on_query_started) rdev->on_query_started (rdev, 0, MIO_NULL);
} }
else else
{ {
int status; int status, err;
int tmp; mio_syshnd_t syshnd;
status = mysql_real_query_cont(&tmp, rdev->hnd, events_to_mysql_wstatus(events)); syshnd = mysql_get_socket(rdev->hnd);
watch_mysql (rdev, status); status = mysql_real_query_cont(&err, rdev->hnd, events_to_mysql_wstatus(events));
if (!status) if (status)
{ {
/* query sent */ watch_mysql (rdev, status);
MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_QUERY_STARTED); }
if (tmp == 1) tmp = mysql_errno(rdev->hnd); /* tmp is set to 1 by mariadb-connector-c 3.1 as of this writing. let me work around it by fetching the error code */ else
if (rdev->on_query_started) rdev->on_query_started (rdev, tmp, (tmp? mysql_error(rdev->hnd): MIO_NULL)); {
if (err)
{
/* query send failure */
if (err == 1) err = mysql_errno(rdev->hnd); /* err is set to 1 by mariadb-connector-c 3.1 as of this writing. let me work around it by fetching the error code */
if (err == CR_SERVER_LOST || err == CR_SERVER_GONE_ERROR)
{
rdev->broken = 1;
rdev->broken_syshnd = syshnd;
watch_mysql (rdev, 0);
mio_dev_mar_halt (rdev); /* i can't keep this device alive regardless of the caller's post-action */
/* don't invoke on_query_started(). in this case, on_disconnect() will be called later */
}
else
{
/* query not sent for other reasons. probably nothing to watch? */
watch_mysql (rdev, 0); /* TODO: use status instead of 0? is status reliable in this context? */
if (rdev->on_query_started) rdev->on_query_started (rdev, err, mysql_error(rdev->hnd));
}
}
else
{
/* query really sent */
MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_QUERY_STARTED);
if (rdev->on_query_started) rdev->on_query_started (rdev, 0, MIO_NULL);
}
} }
} }
@ -404,7 +498,7 @@ static int dev_evcb_mar_ready (mio_dev_t* dev, int events)
} }
default: default:
mio_seterrbfmt (mio, MIO_EINTERN, "invalid progress state in mar"); mio_seterrbfmt (mio, MIO_EINTERN, "invalid progress value in mar");
return -1; return -1;
} }

View File

@ -31,39 +31,20 @@
typedef struct mio_dev_mar_t mio_dev_mar_t; typedef struct mio_dev_mar_t mio_dev_mar_t;
enum mio_dev_mar_state_t enum mio_dev_mar_progress_t
{ {
/* the following items(progress bits) are mutually exclusive */ MIO_DEV_MAR_INITIAL,
MIO_DEV_MAR_CONNECTING = (1 << 0), MIO_DEV_MAR_CONNECTING,
MIO_DEV_MAR_CONNECTED = (1 << 1), MIO_DEV_MAR_CONNECTED,
MIO_DEV_MAR_QUERY_STARTING = (1 << 2), MIO_DEV_MAR_QUERY_STARTING,
MIO_DEV_MAR_QUERY_STARTED = (1 << 3), MIO_DEV_MAR_QUERY_STARTED,
MIO_DEV_MAR_ROW_FETCHING = (1 << 4), MIO_DEV_MAR_ROW_FETCHING,
MIO_DEV_MAR_ROW_FETCHED = (1 << 5), MIO_DEV_MAR_ROW_FETCHED
#if 0
/* the following items can be bitwise-ORed with an exclusive item above */
MIO_DEV_MAR_LENIENT = (1 << 14),
MIO_DEV_MAR_INTERCEPTED = (1 << 15),
#endif
/* convenience bit masks */
MIO_DEV_MAR_ALL_PROGRESS_BITS = (MIO_DEV_MAR_CONNECTING |
MIO_DEV_MAR_CONNECTED |
MIO_DEV_MAR_QUERY_STARTING |
MIO_DEV_MAR_QUERY_STARTED |
MIO_DEV_MAR_ROW_FETCHING |
MIO_DEV_MAR_ROW_FETCHED)
}; };
typedef enum mio_dev_mar_state_t mio_dev_mar_state_t; typedef enum mio_dev_mar_progress_t mio_dev_mar_progress_t;
#define MIO_DEV_MAR_SET_PROGRESS(dev,bit) do { \ #define MIO_DEV_MAR_SET_PROGRESS(dev,value) ((dev)->progress = (value))
(dev)->state &= ~MIO_DEV_MAR_ALL_PROGRESS_BITS; \ #define MIO_DEV_MAR_GET_PROGRESS(dev) ((dev)->progress)
(dev)->state |= (bit); \
} while(0)
#define MIO_DEV_MAR_GET_PROGRESS(dev) ((dev)->state & MIO_DEV_MAR_ALL_PROGRESS_BITS)
typedef int (*mio_dev_mar_on_read_t) ( typedef int (*mio_dev_mar_on_read_t) (
@ -103,14 +84,16 @@ struct mio_dev_mar_t
void* hnd; void* hnd;
void* res; void* res;
int state; mio_dev_mar_progress_t progress;
unsigned int connected; unsigned int connected: 1;
unsigned int query_started; unsigned int connected_deferred: 1;
unsigned int row_fetched; unsigned int query_started_deferred: 1;
//unsigned int query_started: 1;
unsigned int row_fetched: 1;
unsigned int broken: 1;
mio_syshnd_t broken_syshnd;
char errbuf[256];
int query_ret;
int row_wstatus; int row_wstatus;
void* row; void* row;
@ -122,9 +105,16 @@ struct mio_dev_mar_t
mio_dev_mar_on_row_fetched_t on_row_fetched; mio_dev_mar_on_row_fetched_t on_row_fetched;
}; };
enum mio_dev_mar_make_flag_t
{
MIO_DEV_MAR_UNUSED_YET = (1 << 0)
};
typedef enum mio_dev_mar_make_flag_t mio_dev_mar_make_flag_t;
typedef struct mio_dev_mar_make_t mio_dev_mar_make_t; typedef struct mio_dev_mar_make_t mio_dev_mar_make_t;
struct mio_dev_mar_make_t struct mio_dev_mar_make_t
{ {
int flags;
mio_dev_mar_on_write_t on_write; /* mandatory */ mio_dev_mar_on_write_t on_write; /* mandatory */
mio_dev_mar_on_read_t on_read; /* mandatory */ mio_dev_mar_on_read_t on_read; /* mandatory */
mio_dev_mar_on_connect_t on_connect; /* optional */ mio_dev_mar_on_connect_t on_connect; /* optional */

View File

@ -259,7 +259,7 @@ static int schedule_timer_job_at (mio_dev_sck_t* dev, const mio_ntime_t* fire_at
tmrjob.idxptr = &dev->tmrjob_index; tmrjob.idxptr = &dev->tmrjob_index;
MIO_ASSERT (dev->mio, dev->tmrjob_index == MIO_TMRIDX_INVALID); MIO_ASSERT (dev->mio, dev->tmrjob_index == MIO_TMRIDX_INVALID);
dev->tmrjob_index = mio_instmrjob (dev->mio, &tmrjob); dev->tmrjob_index = mio_instmrjob(dev->mio, &tmrjob);
return dev->tmrjob_index == MIO_TMRIDX_INVALID? -1: 0; return dev->tmrjob_index == MIO_TMRIDX_INVALID? -1: 0;
} }
@ -1196,7 +1196,7 @@ static int harvest_outgoing_connection (mio_dev_sck_t* rdev)
int x; int x;
MIO_ASSERT (mio, !rdev->ssl); /* must not be SSL-connected yet */ MIO_ASSERT (mio, !rdev->ssl); /* must not be SSL-connected yet */
x = connect_ssl (rdev); x = connect_ssl(rdev);
if (x <= -1) return -1; if (x <= -1) return -1;
if (x == 0) if (x == 0)
{ {
@ -1540,7 +1540,6 @@ static int dev_evcb_sck_ready_stateful (mio_dev_t* dev, int events)
} }
MIO_DEV_SCK_SET_PROGRESS (rdev, MIO_DEV_SCK_ACCEPTED); MIO_DEV_SCK_SET_PROGRESS (rdev, MIO_DEV_SCK_ACCEPTED);
/*if (rdev->on_connect(rdev) <= -1) mio_dev_sck_halt (rdev);*/
if (rdev->on_connect) rdev->on_connect (rdev); if (rdev->on_connect) rdev->on_connect (rdev);
return 0; return 0;