diff --git a/mio/bin/t04.c b/mio/bin/t04.c index c6c4db3..894c1bc 100644 --- a/mio/bin/t04.c +++ b/mio/bin/t04.c @@ -115,12 +115,9 @@ oops: } #endif - - - - static void on_result (mio_svc_marc_t* svc, mio_oow_t sid, mio_svc_marc_rcode_t rcode, void* data, void* qctx) { +static int x = 0; switch (rcode) { case MIO_SVC_MARC_RCODE_ROW: @@ -131,6 +128,15 @@ static void on_result (mio_svc_marc_t* svc, mio_oow_t sid, mio_svc_marc_rcode_t // else if (x == 1) // printf ("%s %s %s %s %s\n", row[0], row[1], row[2], row[3], row[4]); //printf ("GOT ROW\n"); +#if 0 +x++; +if (x == 1) +{ +printf ("BLOCKING PACKET...........................\n"); +system ("/sbin/iptables -I OUTPUT -p tcp --dport 3306 -j REJECT"); +system ("/sbin/iptables -I INPUT -p tcp --sport 3306 -j REJECT"); +} +#endif break; } @@ -157,11 +163,21 @@ static void handle_signal (int sig) static void send_test_query (mio_t* mio, const mio_ntime_t* now, mio_tmrjob_t* job) { mio_svc_marc_t* marc = (mio_svc_marc_t*)job->ctx; + mio_bch_t buf[256]; + mio_bch_t tmp[256]; + int len; if (mio_svc_mar_querywithbchars(marc, 0, MIO_SVC_MARC_QTYPE_SELECT, "SHOW STATUS", 11, on_result, MIO_NULL) <= -1) { MIO_INFO1 (mio, "FAILED TO SEND QUERY - %js\n", mio_geterrmsg(mio)); } + + mio_svc_mar_escapebchars (marc, "wild", 4, tmp); + len = snprintf(buf, MIO_COUNTOF(buf), "SELECT name, content FROM records WHERE name like '%%%s%%'", tmp); + if (mio_svc_mar_querywithbchars(marc, 1, MIO_SVC_MARC_QTYPE_SELECT, buf, len, on_result, MIO_NULL) <= -1) + { + MIO_INFO1 (mio, "FAILED TO SEND QUERY - %js\n", mio_geterrmsg(mio)); + } } static int schedule_timer_job_after (mio_t* mio, const mio_ntime_t* fire_after, mio_tmrjob_handler_t handler, void* ctx) @@ -187,6 +203,7 @@ int main (int argc, char* argv[]) mio_t* mio = MIO_NULL; mio_svc_marc_t* marc; mio_svc_marc_connect_t ci; +/* mio_svc_marc_tmout_t tmout;*/ if (argc != 6) { @@ -208,7 +225,13 @@ int main (int argc, char* argv[]) ci.password = argv[4]; ci.dbname = argv[5]; - marc = mio_svc_marc_start(mio, &ci); +/* timeout not implemented yet in the mardiab device and services + MIO_INIT_NTIME (&tmout.c, 2, 0); + MIO_INIT_NTIME (&tmout.r, -1, 0); + MIO_INIT_NTIME (&tmout.w, -1, 0); +*/ + + marc = mio_svc_marc_start(mio, &ci, MIO_NULL); if (!marc) { printf ("Cannot start a mariadb client service\n"); diff --git a/mio/lib/mar-cli.c b/mio/lib/mar-cli.c index 2ea85fa..806f24c 100644 --- a/mio/lib/mar-cli.c +++ b/mio/lib/mar-cli.c @@ -37,8 +37,13 @@ struct mio_svc_marc_t { MIO_SVC_HEADER; - mio_svc_marc_connect_t ci; int stopping; + int tmout_set; + + mio_svc_marc_connect_t ci; + mio_svc_marc_tmout_t tmout; + + MYSQL* edev; struct { @@ -78,25 +83,31 @@ struct dev_xtn_t }; -mio_svc_marc_t* mio_svc_marc_start (mio_t* mio, const mio_svc_marc_connect_t* ci) +mio_svc_marc_t* mio_svc_marc_start (mio_t* mio, const mio_svc_marc_connect_t* ci, const mio_svc_marc_tmout_t* tmout) { mio_svc_marc_t* marc = MIO_NULL; marc = (mio_svc_marc_t*)mio_callocmem(mio, MIO_SIZEOF(*marc)); if (MIO_UNLIKELY(!marc)) goto oops; + marc->edev = mysql_init(MIO_NULL); + if (MIO_UNLIKELY(!marc->edev)) goto oops; + marc->mio = mio; marc->svc_stop = mio_svc_marc_stop; marc->ci = *ci; + if (tmout) + { + marc->tmout = *tmout; + marc->tmout_set = 1; + } MIO_SVCL_APPEND_SVC (&mio->actsvc, (mio_svc_t*)marc); return marc; oops: - if (marc) - { - mio_freemem (mio, marc); - } + if (marc->edev) mysql_close (marc->edev); + if (marc) mio_freemem (mio, marc); return MIO_NULL; } @@ -114,10 +125,11 @@ void mio_svc_marc_stop (mio_svc_marc_t* marc) mio_freemem (mio, marc->sess.ptr); MIO_SVCL_UNLINK_SVC (marc); + + mysql_close (marc->edev); mio_freemem (mio, marc); } - /* ------------------------------------------------------------------- */ static sess_qry_t* make_session_query (mio_t* mio, mio_svc_marc_qtype_t qtype, const mio_bch_t* qptr, mio_oow_t qlen, void* qctx, mio_svc_marc_on_result_t on_result) @@ -325,6 +337,12 @@ static mio_dev_mar_t* alloc_device (mio_svc_marc_t* marc, sess_t* sess) dev_xtn_t* xtn; MIO_MEMSET (&mi, 0, MIO_SIZEOF(mi)); + if (marc->tmout_set) + { + mi.flags = MIO_DEV_MAR_USE_TMOUT; + mi.tmout = marc->tmout; + } + mi.on_connect = mar_on_connect; mi.on_disconnect = mar_on_disconnect; mi.on_query_started = mar_on_query_started; @@ -351,7 +369,7 @@ static sess_t* get_session (mio_svc_marc_t* marc, mio_oow_t sid) if (sid >= marc->sess.capa) { sess_t* tmp; - mio_oow_t newcapa; + mio_oow_t newcapa, i; newcapa = marc->sess.capa + 16; if (newcapa <= sid) newcapa = sid + 1; @@ -361,20 +379,19 @@ static sess_t* get_session (mio_svc_marc_t* marc, mio_oow_t sid) if (MIO_UNLIKELY(!tmp)) return MIO_NULL; MIO_MEMSET (&tmp[marc->sess.capa], 0, MIO_SIZEOF(sess_t) * (newcapa - marc->sess.capa)); + for (i = marc->sess.capa; i < newcapa; i++) + { + tmp[i].svc = marc; + tmp[i].sid = i; + } marc->sess.ptr = tmp; marc->sess.capa = newcapa; + } - sess = &marc->sess.ptr[sid]; - sess->svc = marc; - sess->sid = sid; - } - else - { - sess = &marc->sess.ptr[sid]; - MIO_ASSERT (mio, sess->sid == sid); - MIO_ASSERT (mio, sess->svc == marc); - } + sess = &marc->sess.ptr[sid]; + MIO_ASSERT (mio, sess->sid == sid); + MIO_ASSERT (mio, sess->svc == marc); if (!sess->dev) { @@ -465,3 +482,8 @@ int mio_svc_mar_querywithbchars (mio_svc_marc_t* marc, mio_oow_t sid, mio_svc_ma return 0; } + +mio_oow_t mio_svc_mar_escapebchars (mio_svc_marc_t* marc, const mio_bch_t* qptr, mio_oow_t qlen, mio_bch_t* buf) +{ + return mysql_real_escape_string(marc->edev, buf, qptr, qlen); +} diff --git a/mio/lib/mar.c b/mio/lib/mar.c index 685a2f9..4897866 100644 --- a/mio/lib/mar.c +++ b/mio/lib/mar.c @@ -29,6 +29,7 @@ #include #include +#include /* ========================================================================= */ @@ -58,6 +59,25 @@ static int dev_mar_make (mio_dev_t* dev, void* ctx) mysql_options(rdev->hnd, MYSQL_OPT_RECONNECT, &x); } +#if 0 +/* TOOD: timeout not implemented... + * timeout can't be implemented using the mysql timeout options in the nonblocking mode. + * i must create a timeer jobs for these */ + if (mi->flags & MIO_DEV_MAR_USE_TMOUT) + { + unsigned int tmout; + + tmout = mi->tmout.c.sec; /* mysql supports the granularity of seconds only */ + if (tmout >= 0) mysql_options(rdev->hnd, MYSQL_OPT_CONNECT_TIMEOUT, &tmout); + + tmout = mi->tmout.r.sec; + if (tmout >= 0) mysql_options(rdev->hnd, MYSQL_OPT_READ_TIMEOUT, &tmout); + + tmout = mi->tmout.w.sec; + if (tmout >= 0) mysql_options(rdev->hnd, MYSQL_OPT_WRITE_TIMEOUT, &tmout); + } +#endif + rdev->dev_cap = MIO_DEV_CAP_IN | MIO_DEV_CAP_OUT | MIO_DEV_CAP_VIRTUAL; /* mysql_init() doesn't create a socket. so no IO is possible at this point */ rdev->on_read = mi->on_read; rdev->on_write = mi->on_write; @@ -78,16 +98,31 @@ static int dev_mar_kill (mio_dev_t* dev, int force) /* if rdev->connected is 0 at this point, * the underlying socket of this device is down */ - if (rdev->on_disconnect) rdev->on_disconnect (rdev); + if (MIO_LIKELY(rdev->on_disconnect)) rdev->on_disconnect (rdev); + + /* hack */ + if (!rdev->broken) + { + /* mysql_free_result() blocks if not all rows have been read. + * mysql_close() also blocks to transmit COM_QUIT, + * in this context, it is not appropriate to call + * mysql_free_result_start()/mysql_free_result_cont() and + * mysql_close_start()/mysql_close_cont(). + * let me just call shutdown on the underlying socket to work around this issue. + * as a result, mysql_close() will be unable to send COM_QUIT but will return fast + */ + shutdown (mysql_get_socket(rdev->hnd), SHUT_RDWR); + } if (rdev->res) { mysql_free_result (rdev->res); rdev->res = MIO_NULL; } + if (rdev->hnd) { - mysql_close (rdev->hnd); + mysql_close (rdev->hnd); rdev->hnd = MIO_NULL; } @@ -138,17 +173,20 @@ static void start_fetch_row (mio_dev_mar_t* rdev) int status; status = mysql_fetch_row_start(&row, rdev->res); + MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_ROW_FETCHING); if (status) { - /* row fetched */ - rdev->row_fetched = 0; +printf ("fetch_row_start not fetched %d\n", status); + /* row not fetched */ + rdev->row_fetched_deferred = 0; watch_mysql (rdev, status); } else { /* row fetched - don't handle it immediately here */ - rdev->row_fetched = 1; +printf ("fetch_row_start returning %d %p \n", status, row); + rdev->row_fetched_deferred = 1; rdev->row_wstatus = status; rdev->row = row; watch_mysql (rdev, MYSQL_WAIT_READ | MYSQL_WAIT_WRITE); @@ -447,10 +485,10 @@ static int dev_evcb_mar_ready (mio_dev_t* dev, int events) int status; MYSQL_ROW row; - if (rdev->row_fetched) + if (rdev->row_fetched_deferred) { row = (MYSQL_ROW)rdev->row; - rdev->row_fetched = 0; + rdev->row_fetched_deferred = 0; if (!row) { @@ -462,8 +500,9 @@ static int dev_evcb_mar_ready (mio_dev_t* dev, int events) } MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_ROW_FETCHED); - if (rdev->on_row_fetched) rdev->on_row_fetched (rdev, row); + if (MIO_LIKELY(rdev->on_row_fetched)) rdev->on_row_fetched (rdev, row); +printf ("CALLING sTARTING FFETCH ROW %p \n", row); if (row) start_fetch_row (rdev); } else @@ -471,8 +510,10 @@ static int dev_evcb_mar_ready (mio_dev_t* dev, int events) /* TODO: if rdev->res is MIO_NULL, error.. */ status = mysql_fetch_row_cont(&row, rdev->res, events_to_mysql_wstatus(events)); +printf ("FETCH_ROW -> %d %p\n", status, row); if (!status) { + /* row is available */ if (!row) { /* the last row has been received - cleanup before invoking the callback */ @@ -484,12 +525,13 @@ static int dev_evcb_mar_ready (mio_dev_t* dev, int events) } MIO_DEV_MAR_SET_PROGRESS (rdev, MIO_DEV_MAR_ROW_FETCHED); - if (rdev->on_row_fetched) rdev->on_row_fetched (rdev, row); + if (MIO_LIKELY(rdev->on_row_fetched)) rdev->on_row_fetched (rdev, row); if (row) start_fetch_row (rdev); /* arrange to fetch the next row */ } else { + /* no row is available */ watch_mysql (rdev, status); } } @@ -539,3 +581,8 @@ int mio_dev_mar_fetchrows (mio_dev_mar_t* dev) return mio_dev_ioctl((mio_dev_t*)dev, MIO_DEV_MAR_FETCH_ROW, MIO_NULL); } +mio_oow_t mio_dev_mar_escapebchars (mio_dev_mar_t* dev, const mio_bch_t* qstr, mio_oow_t qlen, mio_bch_t* buf) +{ + mio_dev_mar_t* rdev = (mio_dev_mar_t*)dev; + return mysql_real_escape_string (rdev->hnd, buf, qstr, qlen); +} diff --git a/mio/lib/mio-mar.h b/mio/lib/mio-mar.h index 4b1ff2d..72d7690 100644 --- a/mio/lib/mio-mar.h +++ b/mio/lib/mio-mar.h @@ -89,8 +89,8 @@ struct mio_dev_mar_t unsigned int connected: 1; unsigned int connected_deferred: 1; unsigned int query_started_deferred: 1; - //unsigned int query_started: 1; - unsigned int row_fetched: 1; + /*unsigned int query_started: 1;*/ + unsigned int row_fetched_deferred: 1; unsigned int broken: 1; mio_syshnd_t broken_syshnd; @@ -107,14 +107,24 @@ struct mio_dev_mar_t enum mio_dev_mar_make_flag_t { - MIO_DEV_MAR_UNUSED_YET = (1 << 0) + MIO_DEV_MAR_USE_TMOUT = (1 << 0) }; typedef enum mio_dev_mar_make_flag_t mio_dev_mar_make_flag_t; + +typedef struct mio_dev_mar_tmout_t mio_dev_mar_tmout_t; +struct mio_dev_mar_tmout_t +{ + mio_ntime_t c; + mio_ntime_t r; + mio_ntime_t w; +}; + typedef struct mio_dev_mar_make_t mio_dev_mar_make_t; struct mio_dev_mar_make_t { int flags; + mio_dev_mar_tmout_t tmout; mio_dev_mar_on_write_t on_write; /* mandatory */ mio_dev_mar_on_read_t on_read; /* mandatory */ mio_dev_mar_on_connect_t on_connect; /* optional */ @@ -146,6 +156,7 @@ typedef enum mio_dev_mar_ioctl_cmd_t mio_dev_mar_ioctl_cmd_t; typedef struct mio_svc_marc_t mio_svc_marc_t; typedef mio_dev_mar_connect_t mio_svc_marc_connect_t; +typedef mio_dev_mar_tmout_t mio_svc_marc_tmout_t; enum mio_svc_marc_qtype_t { @@ -162,7 +173,6 @@ enum mio_svc_marc_rcode_t }; typedef enum mio_svc_marc_rcode_t mio_svc_marc_rcode_t; - struct mio_svc_marc_dev_error_t { int mar_errcode; @@ -178,6 +188,7 @@ typedef void (*mio_svc_marc_on_result_t) ( void* qctx ); + /* -------------------------------------------------------------- */ #ifdef __cplusplus @@ -233,7 +244,8 @@ static MIO_INLINE void mio_dev_mar_halt (mio_dev_mar_t* mar) { mio_dev_halt ((mi MIO_EXPORT mio_svc_marc_t* mio_svc_marc_start ( mio_t* mio, - const mio_svc_marc_connect_t* ci + const mio_svc_marc_connect_t* ci, + const mio_svc_marc_tmout_t* tmout ); MIO_EXPORT void mio_svc_marc_stop ( @@ -246,7 +258,6 @@ static MIO_INLINE mio_t* mio_svc_marc_getmio(mio_svc_marc_t* svc) { return mio_s # define mio_svc_marc_getmio(svc) mio_svc_getmio(svc) #endif - MIO_EXPORT int mio_svc_mar_querywithbchars ( mio_svc_marc_t* marc, mio_oow_t sid, @@ -257,6 +268,13 @@ MIO_EXPORT int mio_svc_mar_querywithbchars ( void* qctx ); +MIO_EXPORT mio_oow_t mio_dev_mar_escapebchars ( + mio_dev_mar_t* dev, + const mio_bch_t* qstr, + mio_oow_t qlen, + mio_bch_t* buf +); + #ifdef __cplusplus } #endif