wrote mote mariadb client service code

This commit is contained in:
hyung-hwan 2020-06-19 09:17:42 +00:00
parent 364acf2b41
commit dedc6082d3
3 changed files with 287 additions and 87 deletions

View File

@ -6,6 +6,7 @@
#include <mariadb/mysql.h> #include <mariadb/mysql.h>
#if 0
static void mar_on_disconnect (mio_dev_mar_t* dev) static void mar_on_disconnect (mio_dev_mar_t* dev)
{ {
} }
@ -111,3 +112,79 @@ oops:
if (mio) mio_close (mio); if (mio) mio_close (mio);
return 0; return 0;
} }
#endif
int main (int argc, char* argv[])
{
mio_t* mio = MIO_NULL;
mio_svc_marc_t* marc;
mio_svc_marc_connect_t ci;
if (argc != 6)
{
fprintf (stderr, "Usage: %s ipaddr port username password dbname\n", argv[0]);
return -1;
}
mio = mio_open(MIO_NULL, 0, MIO_NULL, 512, MIO_NULL);
if (!mio)
{
printf ("Cannot open mio\n");
goto oops;
}
memset (&ci, 0, MIO_SIZEOF(ci));
ci.host = argv[1];
ci.port = 3306; /* TODO: argv[2]; */
ci.username = argv[3];
ci.password = argv[4];
ci.dbname = argv[5];
marc = mio_svc_marc_start(mio, &ci);
if (!marc)
{
printf ("Cannot start a mariadb client service\n");
goto oops;
}
mio_svc_mar_querywithbchars (marc, 0, "SHOW STATUS", 11, MIO_NULL);
#if 0
memset (&mi, 0, MIO_SIZEOF(mi));
/*mi.on_write = mar_on_write;
mi.on_read = mar_on_read;*/
mi.on_connect = mar_on_connect;
mi.on_disconnect = mar_on_disconnect;
mi.on_query_started = mar_on_query_started;
mi.on_row_fetched = mar_on_row_fetched;
mar = mio_dev_mar_make(mio, 0, &mi);
if (!mar)
{
printf ("Cannot make a mar db client device\n");
goto oops;
}
if (mio_dev_mar_connect(mar, &ci) <= -1)
{
printf ("Cannot connect to mar db server\n");
goto oops;
}
#endif
mio_loop (mio);
oops:
if (mio) mio_close (mio);
return 0;
}

View File

@ -36,11 +36,16 @@ struct mio_svc_marc_t
{ {
MIO_SVC_HEADER; MIO_SVC_HEADER;
mio_svc_marc_connect_t ci;
struct struct
{ {
sess_t* ptr; sess_t* ptr;
mio_oow_t capa; mio_oow_t capa;
} sess; } sess;
}; };
struct sess_qry_t struct sess_qry_t
@ -48,19 +53,32 @@ struct sess_qry_t
mio_bch_t* qptr; mio_bch_t* qptr;
mio_oow_t qlen; mio_oow_t qlen;
void* qctx; void* qctx;
int sent;
sess_qry_t* sq_next; sess_qry_t* sq_next;
}; };
struct sess_t struct sess_t
{ {
mio_oow_t sid;
mio_svc_marc_t* svc;
mio_dev_mar_t* dev; mio_dev_mar_t* dev;
int connected;
sess_qry_t* q_head; sess_qry_t* q_head;
sess_qry_t* q_tail sess_qry_t* q_tail;
}; };
mio_svc_marc_t* mio_svc_marc_start (mio_t* mio) typedef struct dev_xtn_t dev_xtn_t;
struct dev_xtn_t
{
sess_t* sess;
};
mio_svc_marc_t* mio_svc_marc_start (mio_t* mio, const mio_svc_marc_connect_t* ci)
{ {
mio_svc_marc_t* marc = MIO_NULL; mio_svc_marc_t* marc = MIO_NULL;
@ -69,6 +87,7 @@ mio_svc_marc_t* mio_svc_marc_start (mio_t* mio)
marc->mio = mio; marc->mio = mio;
marc->svc_stop = mio_svc_marc_stop; marc->svc_stop = mio_svc_marc_stop;
marc->ci = *ci;
MIO_SVCL_APPEND_SVC (&mio->actsvc, (mio_svc_t*)marc); MIO_SVCL_APPEND_SVC (&mio->actsvc, (mio_svc_t*)marc);
return marc; return marc;
@ -89,22 +108,119 @@ void mio_svc_marc_stop (mio_svc_marc_t* marc)
mio_freemem (mio, marc); mio_freemem (mio, marc);
} }
/* ------------------------------------------------------------------- */
static sess_qry_t* make_session_query (mio_t* mio, const mio_bch_t* qptr, mio_oow_t qlen, void* qctx)
{
sess_qry_t* sq;
sq = mio_allocmem(mio, MIO_SIZEOF(*sq) + (MIO_SIZEOF(*qptr) * qlen));
if (MIO_UNLIKELY(!sq)) return MIO_NULL;
MIO_MEMCPY (sq + 1, qptr, (MIO_SIZEOF(*qptr) * qlen));
sq->sent = 0;
sq->qptr = (mio_bch_t*)(sq + 1);
sq->qlen = qlen;
sq->qctx = qctx;
sq->sq_next = MIO_NULL;
return sq;
}
static MIO_INLINE void free_session_query (mio_t* mio, sess_qry_t* sq)
{
mio_freemem (mio, sq);
}
static MIO_INLINE void enqueue_session_query (sess_t* sess, sess_qry_t* sq)
{
/* the initialization creates a place holder. so no need to check if q_tail is NULL */
sess->q_tail->sq_next = sq;
sess->q_tail = sq;
}
static MIO_INLINE void dequeue_session_query (mio_t* mio, sess_t* sess)
{
sess_qry_t* sq;
sq = sess->q_head;
MIO_ASSERT (mio, sq->sq_next != MIO_NULL); /* must not be empty */
sess->q_head = sq->sq_next;
free_session_query (mio, sq);
}
static MIO_INLINE sess_qry_t* get_first_session_query (sess_t* sess)
{
return sess->q_head->sq_next;
}
/* ------------------------------------------------------------------- */
static int send_pending_query_if_any (sess_t* sess)
{
sess_qry_t* sq;
sq = get_first_session_query(sess);
if (sq)
{
sq->sent = 1;
printf ("sending... %.*s\n", (int)sq->qlen, sq->qptr);
if (mio_dev_mar_querywithbchars(sess->dev, sq->qptr, sq->qlen) <= -1)
{
sq->sent = 0;
return -1; /* failure */
}
return 1; /* sent */
}
return 0; /* nothing to send */
}
/* ------------------------------------------------------------------- */
static void mar_on_disconnect (mio_dev_mar_t* dev) static void mar_on_disconnect (mio_dev_mar_t* dev)
{ {
dev_xtn_t* xtn = (dev_xtn_t*)mio_dev_mar_getxtn(dev);
sess_t* sess = xtn->sess;
printf ("disconnected on sid %d\n", sess->sid);
sess->connected = 0;
sess->dev = MIO_NULL;
//if (there is a query which has been sent but not processed... ) <--- can this be handled by the caller?
} }
static void mar_on_connect (mio_dev_mar_t* dev) static void mar_on_connect (mio_dev_mar_t* dev)
{ {
/* dev_xtn_t* xtn = (dev_xtn_t*)mio_dev_mar_getxtn(dev);
if (mio_dev_mar_querywithbchars(dev, "SHOW STATUS", 11) <= -1) sess_t* sess = xtn->sess;
sess->connected = 1;
printf ("connected on sid %d\n", sess->sid);
if (send_pending_query_if_any (sess) <= -1)
{ {
mio_dev_mar_halt (dev); mio_dev_mar_halt (sess->dev);
} }
*/
} }
static void mar_on_query_started (mio_dev_mar_t* dev, int mar_ret) static void mar_on_query_started (mio_dev_mar_t* dev, int mar_ret)
{ {
if (mar_ret != 0)
{
}
else
{
if (mio_dev_mar_fetchrows(dev) <= -1)
{
mio_dev_mar_halt (dev);
}
}
#if 0 #if 0
if (mar_ret != 0) if (mar_ret != 0)
{ {
@ -142,21 +258,31 @@ static void mar_on_row_fetched (mio_dev_mar_t* dev, void* data)
//printf ("GOT ROW\n"); //printf ("GOT ROW\n");
} }
#endif #endif
dev_xtn_t* xtn = (dev_xtn_t*)mio_dev_mar_getxtn(dev);
sess_t* sess = xtn->sess;
if (!data)
{
/* no more rows */
//marc->on_row_fetched (marc, void* data, sess->sid, sess->qctx);
printf ("there is no more row...\n");
}
else
{
printf ("there is row...\n");
}
} }
static mio_dev_mar_t* alloc_device (mio_svc_marc_t* marc) static mio_dev_mar_t* alloc_device (mio_svc_marc_t* marc, sess_t* sess)
{ {
mio_t* mio = (mio_t*)marc->mio; mio_t* mio = (mio_t*)marc->mio;
mio_dev_mar_t* mar; mio_dev_mar_t* mar;
mio_dev_mar_make_t mi; mio_dev_mar_make_t mi;
mio_dev_mar_connect_t ci; mio_dev_mar_connect_t ci;
dev_xtn_t* xtn;
MIO_MEMSET (&ci, 0, MIO_SIZEOF(ci));
ci.host = "localhost"; /* TOOD: use marc configuration */
ci.port = 3306; /* TODO: use marc configuration */
ci.username = ""; /* TODO: use marc configuration */
ci.password = ""; /* TODO: use marc conifguration */
ci.dbname = ""; /* TODO: use marc configuration */
MIO_MEMSET (&mi, 0, MIO_SIZEOF(mi)); MIO_MEMSET (&mi, 0, MIO_SIZEOF(mi));
mi.on_connect = mar_on_connect; mi.on_connect = mar_on_connect;
@ -164,38 +290,20 @@ static mio_dev_mar_t* alloc_device (mio_svc_marc_t* marc)
mi.on_query_started = mar_on_query_started; mi.on_query_started = mar_on_query_started;
mi.on_row_fetched = mar_on_row_fetched; mi.on_row_fetched = mar_on_row_fetched;
mar = mio_dev_mar_make(mio, 0, &mi); mar = mio_dev_mar_make(mio, MIO_SIZEOF(*xtn), &mi);
if (!mar) return MIO_NULL; if (!mar) return MIO_NULL;
if (mio_dev_mar_connect(mar, &ci) <= -1) return MIO_NULL; xtn = (dev_xtn_t*)mio_dev_mar_getxtn(mar);
xtn->sess = sess;
if (mio_dev_mar_connect(mar, &marc->ci) <= -1) return MIO_NULL;
return mar; return mar;
} }
/* ------------------------------------------------------------------- */
static sess_qry_t* make_session_query (mio_t* mio, const mio_bch_t* qptr, mio_oow_t qlen, void* qctx) static sess_t* get_session (mio_svc_marc_t* marc, mio_oow_t sid)
{
sess_qry_t* sq;
sq = mio_allocmem(mio, MIO_SIZEOF(*sq) + (MIO_SIZEOF(*qptr) * qlen));
if (MIO_UNLIKELY(!sq)) return MIO_NULL;
MIO_MEMCPY (sq + 1, qptr, (MIO_SIZEOF(*qptr) * qlen));
sq->qptr = (mio_bch_t*)(sq + 1);
sq->qlen = qlen;
sq->qctx = qctx;
sq->sq_next = MIO_NULL;
return sq;
}
static MIO_INLINE void free_session_query (mio_t* mio, sess_qry_t* sq)
{
mio_freemem (mio, sq);
}
static sess_t* get_session (mio_svc_marc_t* marc, int sid)
{ {
mio_t* mio = marc->mio; mio_t* mio = marc->mio;
sess_t* sess; sess_t* sess;
@ -212,13 +320,22 @@ static sess_t* get_session (mio_svc_marc_t* marc, int sid)
tmp = mio_reallocmem(mio, marc->sess.ptr, MIO_SIZEOF(sess_t) * newcapa); tmp = mio_reallocmem(mio, marc->sess.ptr, MIO_SIZEOF(sess_t) * newcapa);
if (MIO_UNLIKELY(!tmp)) return MIO_NULL; if (MIO_UNLIKELY(!tmp)) return MIO_NULL;
MIO_MEMSET (&marc->sess.ptr[marc->sess.capa], 0, MIO_SIZEOF(sess_t) * (newcapa - marc->sess.capa)); MIO_MEMSET (&tmp[marc->sess.capa], 0, MIO_SIZEOF(sess_t) * (newcapa - marc->sess.capa));
marc->sess.ptr = tmp; marc->sess.ptr = tmp;
marc->sess.capa = newcapa; marc->sess.capa = newcapa;
sess = &marc->sess.ptr[sid];
sess->svc = marc;
sess->sid = sid;
}
else
{
sess = &marc->sess.ptr[sid];
MIO_ASSERT (mio, sess->sid == sid);
MIO_ASSERT (mio, sess->svc == marc);
} }
sess = &marc->sess.ptr[sid];
if (!sess->dev) if (!sess->dev)
{ {
sess_qry_t* sq; sess_qry_t* sq;
@ -226,13 +343,14 @@ static sess_t* get_session (mio_svc_marc_t* marc, int sid)
sq = make_session_query(mio, "", 0, MIO_NULL); /* this is a place holder */ sq = make_session_query(mio, "", 0, MIO_NULL); /* this is a place holder */
if (MIO_UNLIKELY(!sq)) return MIO_NULL; if (MIO_UNLIKELY(!sq)) return MIO_NULL;
sess->dev = alloc_device(marc); sess->dev = alloc_device(marc, sess);
if (MIO_UNLIKELY(!sess->dev)) if (MIO_UNLIKELY(!sess->dev))
{ {
free_session_query (mio, sq); free_session_query (mio, sq);
return MIO_NULL; return MIO_NULL;
} }
/* queue initialization with a place holder */
sess->q_head = sess->q_tail = sq; sess->q_head = sess->q_tail = sq;
} }
@ -240,62 +358,47 @@ static sess_t* get_session (mio_svc_marc_t* marc, int sid)
} }
int mio_svc_mar_querywithbchars (mio_svc_marc_t* marc, int sid, const mio_bch_t* qptr, mio_oow_t qlen, void* qctx) int mio_svc_mar_querywithbchars (mio_svc_marc_t* marc, mio_oow_t sid, const mio_bch_t* qptr, mio_oow_t qlen, void* qctx)
{ {
mio_t* mio = marc->mio; mio_t* mio = marc->mio;
sess_t* sess; sess_t* sess;
sess_qry_t* sq;
sess = get_session(marc, sid); sess = get_session(marc, sid);
if (MIO_UNLIKELY(!sess)) return -1; if (MIO_UNLIKELY(!sess)) return -1;
sq = make_session_query(mio, qptr, qlen, qctx);
if (MIO_UNLIKELY(!sq)) return -1;
if (!sess->q_head) if (get_first_session_query(sess))
{ {
/* the first query for the device */ printf ("XXXXXXXXXx\n");
sess_qry_t* sq; /* there are other ongoing queries */
sq = make_session_query(mio, qptr, qlen, qctx); enqueue_session_query (sess, sq);
if (MIO_UNLIKELY(!sq)) return -1;
sess->q_head = sq;
sess->q_tail = sq;
/* what if it's not connected??? */
if (mio_dev_mar_querywithbchars(sess->dev, qptr, qlen) <= -1)
{
sess->q_head = MIO_NULL;
sess->q_tail = MIO_NULL;
free_session_query (mio, sq);
return -1; /* TODO: need a context pointer */
}
} }
else else
{ {
/* there is an ongoing query for the device */ /* this is the first query */
sess_qry_t* sq; sess_qry_t* old_q_tail = sess->q_tail;
sq = make_session_query(mio, qptr, qlen, qctx); enqueue_session_query (sess, sq);
if (MIO_UNLIKELY(!sq)) return -1;
/* push it at the back */ printf ("YYYYYYYYYYYYYY\n");
sess->q_tail->sq_next = sq; if(sess->dev->connected && !sq->sent)
sess->q_tail = sq; {
sq->sent = 1;
if (mio_dev_mar_querywithbchars(sess->dev, qptr, qlen) <= -1)
{
sq->sent = 0;
/* ugly to unlink the the last item added */
old_q_tail->sq_next = MIO_NULL;
sess->q_tail = old_q_tail;
free_session_query (mio, sq);
return -1;
}
}
} }
return 0;
#if 0
dev = get_session(marc, sid);
if (!dev)
{
}
if (mio_dev_mar_querywithbchars(dev, qptr, qlen) <= -1) return -1; /* TODO: need a context pointer */
#endif
} }
#if 0
mio_svc_mar_querywithbchars (1, "select...");
for (each row)
{
mio_svc_mar_querywithbchars (2, "xxxxxx");
}
#endif

View File

@ -153,6 +153,16 @@ typedef enum mio_dev_mar_ioctl_cmd_t mio_dev_mar_ioctl_cmd_t;
/* -------------------------------------------------------------- */ /* -------------------------------------------------------------- */
typedef struct mio_svc_marc_t mio_svc_marc_t; typedef struct mio_svc_marc_t mio_svc_marc_t;
typedef mio_dev_mar_connect_t mio_svc_marc_connect_t;
typedef void (*mio_svc_marc_on_row_fetched) (
mio_svc_marc_t* marc,
mio_oow_t sid,
void* data,
void* qctx
);
/* -------------------------------------------------------------- */
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -206,7 +216,8 @@ static MIO_INLINE void mio_dev_mar_halt (mio_dev_mar_t* mar) { mio_dev_halt ((mi
/* ------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------- */
MIO_EXPORT mio_svc_marc_t* mio_svc_marc_start ( MIO_EXPORT mio_svc_marc_t* mio_svc_marc_start (
mio_t* mio mio_t* mio,
const mio_svc_marc_connect_t* ci
); );
MIO_EXPORT void mio_svc_marc_stop ( MIO_EXPORT void mio_svc_marc_stop (
@ -219,6 +230,15 @@ static MIO_INLINE mio_t* mio_svc_marc_getmio(mio_svc_marc_t* svc) { return mio_s
# define mio_svc_marc_getmio(svc) mio_svc_getmio(svc) # define mio_svc_marc_getmio(svc) mio_svc_getmio(svc)
#endif #endif
MIO_EXPORT int mio_svc_mar_querywithbchars (
mio_svc_marc_t* marc,
mio_oow_t sid,
const mio_bch_t* qptr,
mio_oow_t qlen,
void* qctx
);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif