diff --git a/mio/bin/t04.c b/mio/bin/t04.c index ed54832..364e2f1 100644 --- a/mio/bin/t04.c +++ b/mio/bin/t04.c @@ -6,6 +6,7 @@ #include +#if 0 static void mar_on_disconnect (mio_dev_mar_t* dev) { } @@ -111,3 +112,79 @@ oops: if (mio) mio_close (mio); return 0; } +#endif + + + + + + + + + +int main (int argc, char* argv[]) +{ + + mio_t* mio = MIO_NULL; + mio_svc_marc_t* marc; + mio_svc_marc_connect_t ci; + + if (argc != 6) + { + fprintf (stderr, "Usage: %s ipaddr port username password dbname\n", argv[0]); + return -1; + } + + mio = mio_open(MIO_NULL, 0, MIO_NULL, 512, MIO_NULL); + if (!mio) + { + printf ("Cannot open mio\n"); + goto oops; + } + + + memset (&ci, 0, MIO_SIZEOF(ci)); + ci.host = argv[1]; + ci.port = 3306; /* TODO: argv[2]; */ + ci.username = argv[3]; + ci.password = argv[4]; + ci.dbname = argv[5]; + + marc = mio_svc_marc_start(mio, &ci); + if (!marc) + { + printf ("Cannot start a mariadb client service\n"); + goto oops; + } + + mio_svc_mar_querywithbchars (marc, 0, "SHOW STATUS", 11, MIO_NULL); + +#if 0 + memset (&mi, 0, MIO_SIZEOF(mi)); + /*mi.on_write = mar_on_write; + mi.on_read = mar_on_read;*/ + mi.on_connect = mar_on_connect; + mi.on_disconnect = mar_on_disconnect; + mi.on_query_started = mar_on_query_started; + mi.on_row_fetched = mar_on_row_fetched; + + mar = mio_dev_mar_make(mio, 0, &mi); + if (!mar) + { + printf ("Cannot make a mar db client device\n"); + goto oops; + } + + if (mio_dev_mar_connect(mar, &ci) <= -1) + { + printf ("Cannot connect to mar db server\n"); + goto oops; + } +#endif + + mio_loop (mio); + +oops: + if (mio) mio_close (mio); + return 0; +} diff --git a/mio/lib/mar-cli.c b/mio/lib/mar-cli.c index f29fa9a..67589e0 100644 --- a/mio/lib/mar-cli.c +++ b/mio/lib/mar-cli.c @@ -36,11 +36,16 @@ struct mio_svc_marc_t { MIO_SVC_HEADER; + mio_svc_marc_connect_t ci; + struct { sess_t* ptr; mio_oow_t capa; } sess; + + + }; struct sess_qry_t @@ -48,19 +53,32 @@ struct sess_qry_t mio_bch_t* qptr; mio_oow_t qlen; void* qctx; + int sent; sess_qry_t* sq_next; }; struct sess_t { + mio_oow_t sid; + mio_svc_marc_t* svc; mio_dev_mar_t* dev; + int connected; sess_qry_t* q_head; - sess_qry_t* q_tail + sess_qry_t* q_tail; }; -mio_svc_marc_t* mio_svc_marc_start (mio_t* mio) +typedef struct dev_xtn_t dev_xtn_t; + +struct dev_xtn_t +{ + sess_t* sess; +}; + + + +mio_svc_marc_t* mio_svc_marc_start (mio_t* mio, const mio_svc_marc_connect_t* ci) { mio_svc_marc_t* marc = MIO_NULL; @@ -69,6 +87,7 @@ mio_svc_marc_t* mio_svc_marc_start (mio_t* mio) marc->mio = mio; marc->svc_stop = mio_svc_marc_stop; + marc->ci = *ci; MIO_SVCL_APPEND_SVC (&mio->actsvc, (mio_svc_t*)marc); return marc; @@ -89,22 +108,119 @@ void mio_svc_marc_stop (mio_svc_marc_t* marc) mio_freemem (mio, marc); } + +/* ------------------------------------------------------------------- */ + +static sess_qry_t* make_session_query (mio_t* mio, const mio_bch_t* qptr, mio_oow_t qlen, void* qctx) +{ + sess_qry_t* sq; + + sq = mio_allocmem(mio, MIO_SIZEOF(*sq) + (MIO_SIZEOF(*qptr) * qlen)); + if (MIO_UNLIKELY(!sq)) return MIO_NULL; + + MIO_MEMCPY (sq + 1, qptr, (MIO_SIZEOF(*qptr) * qlen)); + + sq->sent = 0; + sq->qptr = (mio_bch_t*)(sq + 1); + sq->qlen = qlen; + sq->qctx = qctx; + sq->sq_next = MIO_NULL; + + return sq; +} + +static MIO_INLINE void free_session_query (mio_t* mio, sess_qry_t* sq) +{ + mio_freemem (mio, sq); +} + +static MIO_INLINE void enqueue_session_query (sess_t* sess, sess_qry_t* sq) +{ + /* the initialization creates a place holder. so no need to check if q_tail is NULL */ + sess->q_tail->sq_next = sq; + sess->q_tail = sq; +} + +static MIO_INLINE void dequeue_session_query (mio_t* mio, sess_t* sess) +{ + sess_qry_t* sq; + + sq = sess->q_head; + MIO_ASSERT (mio, sq->sq_next != MIO_NULL); /* must not be empty */ + sess->q_head = sq->sq_next; + free_session_query (mio, sq); +} + +static MIO_INLINE sess_qry_t* get_first_session_query (sess_t* sess) +{ + return sess->q_head->sq_next; +} + +/* ------------------------------------------------------------------- */ + +static int send_pending_query_if_any (sess_t* sess) +{ + sess_qry_t* sq; + + sq = get_first_session_query(sess); + if (sq) + { + sq->sent = 1; +printf ("sending... %.*s\n", (int)sq->qlen, sq->qptr); + if (mio_dev_mar_querywithbchars(sess->dev, sq->qptr, sq->qlen) <= -1) + { + sq->sent = 0; + return -1; /* failure */ + } + + return 1; /* sent */ + } + + + return 0; /* nothing to send */ +} + +/* ------------------------------------------------------------------- */ + static void mar_on_disconnect (mio_dev_mar_t* dev) { + dev_xtn_t* xtn = (dev_xtn_t*)mio_dev_mar_getxtn(dev); + sess_t* sess = xtn->sess; + +printf ("disconnected on sid %d\n", sess->sid); + sess->connected = 0; + sess->dev = MIO_NULL; + + + //if (there is a query which has been sent but not processed... ) <--- can this be handled by the caller? } static void mar_on_connect (mio_dev_mar_t* dev) { -/* - if (mio_dev_mar_querywithbchars(dev, "SHOW STATUS", 11) <= -1) + dev_xtn_t* xtn = (dev_xtn_t*)mio_dev_mar_getxtn(dev); + sess_t* sess = xtn->sess; + + sess->connected = 1; +printf ("connected on sid %d\n", sess->sid); + + if (send_pending_query_if_any (sess) <= -1) { - mio_dev_mar_halt (dev); + mio_dev_mar_halt (sess->dev); } -*/ } static void mar_on_query_started (mio_dev_mar_t* dev, int mar_ret) { + if (mar_ret != 0) + { + } + else + { + if (mio_dev_mar_fetchrows(dev) <= -1) + { + mio_dev_mar_halt (dev); + } + } #if 0 if (mar_ret != 0) { @@ -142,21 +258,31 @@ static void mar_on_row_fetched (mio_dev_mar_t* dev, void* data) //printf ("GOT ROW\n"); } #endif + + dev_xtn_t* xtn = (dev_xtn_t*)mio_dev_mar_getxtn(dev); + sess_t* sess = xtn->sess; + + if (!data) + { + /* no more rows */ + + //marc->on_row_fetched (marc, void* data, sess->sid, sess->qctx); +printf ("there is no more row...\n"); + } + else + { +printf ("there is row...\n"); + } + } -static mio_dev_mar_t* alloc_device (mio_svc_marc_t* marc) +static mio_dev_mar_t* alloc_device (mio_svc_marc_t* marc, sess_t* sess) { mio_t* mio = (mio_t*)marc->mio; mio_dev_mar_t* mar; mio_dev_mar_make_t mi; mio_dev_mar_connect_t ci; - - MIO_MEMSET (&ci, 0, MIO_SIZEOF(ci)); - ci.host = "localhost"; /* TOOD: use marc configuration */ - ci.port = 3306; /* TODO: use marc configuration */ - ci.username = ""; /* TODO: use marc configuration */ - ci.password = ""; /* TODO: use marc conifguration */ - ci.dbname = ""; /* TODO: use marc configuration */ + dev_xtn_t* xtn; MIO_MEMSET (&mi, 0, MIO_SIZEOF(mi)); mi.on_connect = mar_on_connect; @@ -164,38 +290,20 @@ static mio_dev_mar_t* alloc_device (mio_svc_marc_t* marc) mi.on_query_started = mar_on_query_started; mi.on_row_fetched = mar_on_row_fetched; - mar = mio_dev_mar_make(mio, 0, &mi); + mar = mio_dev_mar_make(mio, MIO_SIZEOF(*xtn), &mi); if (!mar) return MIO_NULL; - if (mio_dev_mar_connect(mar, &ci) <= -1) return MIO_NULL; - + xtn = (dev_xtn_t*)mio_dev_mar_getxtn(mar); + xtn->sess = sess; + + if (mio_dev_mar_connect(mar, &marc->ci) <= -1) return MIO_NULL; + return mar; } +/* ------------------------------------------------------------------- */ -static sess_qry_t* make_session_query (mio_t* mio, const mio_bch_t* qptr, mio_oow_t qlen, void* qctx) -{ - sess_qry_t* sq; - - sq = mio_allocmem(mio, MIO_SIZEOF(*sq) + (MIO_SIZEOF(*qptr) * qlen)); - if (MIO_UNLIKELY(!sq)) return MIO_NULL; - - MIO_MEMCPY (sq + 1, qptr, (MIO_SIZEOF(*qptr) * qlen)); - - sq->qptr = (mio_bch_t*)(sq + 1); - sq->qlen = qlen; - sq->qctx = qctx; - sq->sq_next = MIO_NULL; - - return sq; -} - -static MIO_INLINE void free_session_query (mio_t* mio, sess_qry_t* sq) -{ - mio_freemem (mio, sq); -} - -static sess_t* get_session (mio_svc_marc_t* marc, int sid) +static sess_t* get_session (mio_svc_marc_t* marc, mio_oow_t sid) { mio_t* mio = marc->mio; sess_t* sess; @@ -212,13 +320,22 @@ static sess_t* get_session (mio_svc_marc_t* marc, int sid) tmp = mio_reallocmem(mio, marc->sess.ptr, MIO_SIZEOF(sess_t) * newcapa); if (MIO_UNLIKELY(!tmp)) return MIO_NULL; - MIO_MEMSET (&marc->sess.ptr[marc->sess.capa], 0, MIO_SIZEOF(sess_t) * (newcapa - marc->sess.capa)); + MIO_MEMSET (&tmp[marc->sess.capa], 0, MIO_SIZEOF(sess_t) * (newcapa - marc->sess.capa)); marc->sess.ptr = tmp; marc->sess.capa = newcapa; + + sess = &marc->sess.ptr[sid]; + sess->svc = marc; + sess->sid = sid; + } + else + { + sess = &marc->sess.ptr[sid]; + MIO_ASSERT (mio, sess->sid == sid); + MIO_ASSERT (mio, sess->svc == marc); } - sess = &marc->sess.ptr[sid]; if (!sess->dev) { sess_qry_t* sq; @@ -226,13 +343,14 @@ static sess_t* get_session (mio_svc_marc_t* marc, int sid) sq = make_session_query(mio, "", 0, MIO_NULL); /* this is a place holder */ if (MIO_UNLIKELY(!sq)) return MIO_NULL; - sess->dev = alloc_device(marc); + sess->dev = alloc_device(marc, sess); if (MIO_UNLIKELY(!sess->dev)) { free_session_query (mio, sq); return MIO_NULL; } + /* queue initialization with a place holder */ sess->q_head = sess->q_tail = sq; } @@ -240,62 +358,47 @@ static sess_t* get_session (mio_svc_marc_t* marc, int sid) } -int mio_svc_mar_querywithbchars (mio_svc_marc_t* marc, int sid, const mio_bch_t* qptr, mio_oow_t qlen, void* qctx) +int mio_svc_mar_querywithbchars (mio_svc_marc_t* marc, mio_oow_t sid, const mio_bch_t* qptr, mio_oow_t qlen, void* qctx) { mio_t* mio = marc->mio; sess_t* sess; + sess_qry_t* sq; sess = get_session(marc, sid); if (MIO_UNLIKELY(!sess)) return -1; + sq = make_session_query(mio, qptr, qlen, qctx); + if (MIO_UNLIKELY(!sq)) return -1; - if (!sess->q_head) + if (get_first_session_query(sess)) { - /* the first query for the device */ - sess_qry_t* sq; - sq = make_session_query(mio, qptr, qlen, qctx); - if (MIO_UNLIKELY(!sq)) return -1; - - sess->q_head = sq; - sess->q_tail = sq; - -/* what if it's not connected??? */ - - if (mio_dev_mar_querywithbchars(sess->dev, qptr, qlen) <= -1) - { - sess->q_head = MIO_NULL; - sess->q_tail = MIO_NULL; - free_session_query (mio, sq); - return -1; /* TODO: need a context pointer */ - } +printf ("XXXXXXXXXx\n"); + /* there are other ongoing queries */ + enqueue_session_query (sess, sq); } else { - /* there is an ongoing query for the device */ - sess_qry_t* sq; - sq = make_session_query(mio, qptr, qlen, qctx); - if (MIO_UNLIKELY(!sq)) return -1; + /* this is the first query */ + sess_qry_t* old_q_tail = sess->q_tail; + enqueue_session_query (sess, sq); - /* push it at the back */ - sess->q_tail->sq_next = sq; - sess->q_tail = sq; +printf ("YYYYYYYYYYYYYY\n"); + if(sess->dev->connected && !sq->sent) + { + sq->sent = 1; + if (mio_dev_mar_querywithbchars(sess->dev, qptr, qlen) <= -1) + { + sq->sent = 0; + + /* ugly to unlink the the last item added */ + old_q_tail->sq_next = MIO_NULL; + sess->q_tail = old_q_tail; + + free_session_query (mio, sq); + return -1; + } + } } - -#if 0 - dev = get_session(marc, sid); - if (!dev) - { - } - if (mio_dev_mar_querywithbchars(dev, qptr, qlen) <= -1) return -1; /* TODO: need a context pointer */ -#endif + return 0; } - - -#if 0 -mio_svc_mar_querywithbchars (1, "select..."); -for (each row) -{ -mio_svc_mar_querywithbchars (2, "xxxxxx"); -} -#endif diff --git a/mio/lib/mio-mar.h b/mio/lib/mio-mar.h index 7fcb2ee..071c0aa 100644 --- a/mio/lib/mio-mar.h +++ b/mio/lib/mio-mar.h @@ -153,6 +153,16 @@ typedef enum mio_dev_mar_ioctl_cmd_t mio_dev_mar_ioctl_cmd_t; /* -------------------------------------------------------------- */ typedef struct mio_svc_marc_t mio_svc_marc_t; +typedef mio_dev_mar_connect_t mio_svc_marc_connect_t; + +typedef void (*mio_svc_marc_on_row_fetched) ( + mio_svc_marc_t* marc, + mio_oow_t sid, + void* data, + void* qctx +); + +/* -------------------------------------------------------------- */ #ifdef __cplusplus extern "C" { @@ -206,7 +216,8 @@ static MIO_INLINE void mio_dev_mar_halt (mio_dev_mar_t* mar) { mio_dev_halt ((mi /* ------------------------------------------------------------------------- */ MIO_EXPORT mio_svc_marc_t* mio_svc_marc_start ( - mio_t* mio + mio_t* mio, + const mio_svc_marc_connect_t* ci ); MIO_EXPORT void mio_svc_marc_stop ( @@ -219,6 +230,15 @@ static MIO_INLINE mio_t* mio_svc_marc_getmio(mio_svc_marc_t* svc) { return mio_s # define mio_svc_marc_getmio(svc) mio_svc_getmio(svc) #endif + +MIO_EXPORT int mio_svc_mar_querywithbchars ( + mio_svc_marc_t* marc, + mio_oow_t sid, + const mio_bch_t* qptr, + mio_oow_t qlen, + void* qctx +); + #ifdef __cplusplus } #endif