diff --git a/chan.c b/chan.c index 2d11fca..68f0e00 100644 --- a/chan.c +++ b/chan.c @@ -10,32 +10,32 @@ struct hip_chan_t int closed; hip_oow_t unit_size; - hip_oow_t buf_size; - + hip_oow_t unit_capa; + hip_oow_t unit_count; + hip_oow_t unit_rpos; + hip_oow_t unit_wpos; hip_uint8_t* buf; - hip_uint8_t* endptr; - hip_uint8_t* readptr; - hip_uint8_t* writeptr; hip_uctx_link_t sendq; // list of senders waiting on this channel hip_uctx_link_t recvq; // list of receivers waiting on this channel }; -hip_chan_t* hip_chan_open(hip_t* hip, hip_oow_t buf_size) +hip_chan_t* hip_chan_open(hip_t* hip, hip_oow_t unit_size, hip_oow_t unit_capa) { hip_chan_t* c; - c = (hip_chan_t*)malloc(HIP_SIZEOF(*c) + buf_size); + c = (hip_chan_t*)malloc(HIP_SIZEOF(*c) + (unit_size * unit_capa)); if (HIP_UNLIKELY(!c)) return HIP_NULL; memset(c, 0, HIP_SIZEOF(*c)); c->hip = hip; c->closed = 0; - c->buf_size = buf_size; + c->unit_size = unit_size; + c->unit_capa = unit_capa; + c->unit_count = 0; + c->unit_rpos = 0; + c->unit_wpos = 0; c->buf = (hip_uint8_t*)(c + 1); - c->endptr = 0; - c->readptr = 0; - c->writeptr = 0; HIP_LIST_INIT(&c->recvq); HIP_LIST_INIT(&c->sendq); @@ -57,20 +57,35 @@ int hip_chan_send(hip_chan_t* c, const void* ptr, hip_oow_t len) hip = c->hip; caller = UCTX_FROM_LINK(hip->running); -// TODO: IS BUFFER EMPTY + if (c->unit_count < c->unit_capa) + { + /* buffered */ + hip_uint8_t* dptr; + if (len > c->unit_size) len = c->unit_size; /* can't buffer longer than the specified unit size */ + dptr = &c->buf[c->unit_wpos * c->unit_size]; + memcpy(dptr, ptr, len); + if (len < c->unit_size) + { + /* zero out the unfilled if the writing length is smaller than the unit size */ + memset(dptr + len, 0, c->unit_size - len); + } + c->unit_wpos = (c->unit_wpos + 1) % c->unit_capa; + c->unit_count++; + return len; + } if (!HIP_LIST_IS_EMPTY(&c->recvq)) /* there is a receving routine */ { hip_uctx_link_t* x; - hip_uctx_t* xx; + hip_uctx_t* recver; x = HIP_LIST_HEAD(&c->recvq); - xx = UCTX_FROM_LINK(x); - if (len > xx->chan_data_len) len = xx->chan_data_len; - memcpy(xx->chan_data_ptr, ptr, len); -//printf ("USE SEND CHAT CALLER %p RECEIVER %p RET %p\n", caller, xx, xx->chan_ret_ptr); - *(xx->chan_ret_ptr) = len; /* manipulate the return value of the receiver */ + recver = UCTX_FROM_LINK(x); + if (len > recver->chan_data_len) len = recver->chan_data_len; + memcpy(recver->chan_data_ptr, ptr, len); +//printf ("USE SEND CHAT CALLER %p RECEIVER %p RET %p\n", caller, recver, recver->chan_ret_ptr); + *(recver->chan_ret_ptr) = len; /* set the return value of the receiver */ /* remove the receiving routine from the queue and make it runnable */ HIP_LIST_UNCHAIN(x); @@ -100,22 +115,29 @@ int hip_chan_recv(hip_chan_t* c, void* ptr, hip_oow_t len) hip = c->hip; caller = UCTX_FROM_LINK(hip->running); - /* TODO: IS THERE DATA IN BUFFER ? */ + if (c->unit_count > 0) + { + if (len > c->unit_size) len = c->unit_size; + memcpy(ptr, &c->buf[c->unit_rpos * c->unit_size], len); + c->unit_rpos = (c->unit_rpos + 1) % c->unit_capa; + c->unit_count--; + return len; + } if (!HIP_LIST_IS_EMPTY(&c->sendq)) /* there is a sending routine */ { hip_uctx_link_t* x; - hip_uctx_t* xx; + hip_uctx_t* sener; /* remove the sending routine from the queue and make it runnable */ x = HIP_LIST_HEAD(&c->sendq); /* copy the data from the sending routine's buffer */ - xx = UCTX_FROM_LINK(x); - if (len > xx->chan_data_len) len = xx->chan_data_len; /* TODO: do something else instead of simple truncation */ - memcpy(ptr, xx->chan_data_ptr, len); -//printf ("USE RECV CHAT caller %p sender %p RET %p\n",caller, xx, xx->chan_ret_ptr); - *(xx->chan_ret_ptr) = len; /* manipulate the return value of the sender */ + sener = UCTX_FROM_LINK(x); + if (len > sener->chan_data_len) len = sener->chan_data_len; /* TODO: do something else instead of simple truncation */ + memcpy(ptr, sener->chan_data_ptr, len); +//printf ("USE RECV CHAT caller %p sender %p RET %p\n",caller, sener, sener->chan_ret_ptr); + *(sener->chan_ret_ptr) = len; /* set the return value of the sender */ HIP_LIST_UNCHAIN(x); HIP_LIST_ADD_BACK(x, &hip->runnables); diff --git a/ctx.c b/ctx.c index ce5da4d..873ffe0 100644 --- a/ctx.c +++ b/ctx.c @@ -36,13 +36,13 @@ static void invoke_uf(unsigned int a, unsigned int b) #else uctx = (hip_uctx_t*)(((hip_oow_t)a << 32) | (hip_oow_t)b); #endif -printf ("invoke_uf START ...%p\n", uctx); +//printf ("invoke_uf START ...%p\n", uctx); uctx->uf(uctx, uctx->ctx); -printf ("invoke_uf DONE ...%p\n", uctx); +//printf ("invoke_uf DONE ...%p\n", uctx); //TODO: This part must not be interrupted by the timer handler??? hip = uctx->hip; - if (uctx == hip->uctx_sched) /* if not the scheduler itcaller */ + if (uctx == hip->uctx_sched) /* if not the scheduler itself */ { /* back to where the schedule got activated for the first time */ setcontext(&hip->uc_main); @@ -55,27 +55,17 @@ printf ("invoke_uf DONE ...%p\n", uctx); else { /* switch to the scheduler */ - HIP_LIST_UNCHAIN(&uctx->uctx); + assert (&uctx->uctx == hip->running); - if (uctx->flags & HIP_RTN_FLAG_AUTO_DESTROY) - { -//printf (">>>>>>>>>> *********************** AUTHO DESTROYING UCTX %p\n", uctx); - hip_uctx_close(uctx); -//printf (">>>>>>>>>> *********************** CLOSED TERMINATED %p\n", uctx); - } - else - { - HIP_LIST_ADD_BACK(&uctx->uctx, &hip->terminated); -//printf (">>>>>>>>>> ************************ ADDED TO TERMINATED\n"); - } + /* place the routine in the terminated routines list and switch to the scheduler */ + hip_switch(hip, &hip->terminated); - setcontext(&hip->uctx_sched->uc); /* switch to the scheduler */ + /* the control must never reach here for a terminated routine + * as the scheduler never reschdule to this function */ } //TODO: This part must not be interrupted? // TODO: schedule the runnable process? - // TODO: move to terminated list? - // TODO: destroy the object? } hip_uctx_t* hip_uctx_open(hip_t* hip, hip_oow_t stack_size, int flags, hip_ufun_t uf, void* ctx) @@ -85,7 +75,7 @@ hip_uctx_t* hip_uctx_open(hip_t* hip, hip_oow_t stack_size, int flags, hip_ufun_ uctx = (hip_uctx_t*)malloc(HIP_SIZEOF(*uctx) + stack_size); if (HIP_UNLIKELY(!uctx)) return HIP_NULL; -printf ("MALLOC UCTX %p\n", uctx); +//printf ("MALLOC UCTX %p\n", uctx); sp = (void*)(uctx + 1); @@ -115,7 +105,7 @@ printf ("MALLOC UCTX %p\n", uctx); #endif } -printf("NEW UCTX %p\n", uctx); +//printf("NEW UCTX %p\n", uctx); return uctx; } @@ -170,6 +160,15 @@ static void schedule(hip_uctx_t* uctx, void *ctx) hip_uctx_link_t* h; hip_uctx_t* u; + while (!HIP_LIST_IS_EMPTY(&hip->terminated)) + { + // clean up terminated routines + h = HIP_LIST_HEAD(&hip->terminated); + HIP_LIST_UNCHAIN(h); + u = UCTX_FROM_LINK(h); + hip_uctx_close(u); + } + if (HIP_LIST_IS_EMPTY(&hip->runnables)) { /* TODO: check if there are sleeping ... */ @@ -323,7 +322,7 @@ hip_t* hip_open(int flags) hip = (hip_t*)malloc(HIP_SIZEOF(*hip)); if (!hip) return HIP_NULL; -printf ("MALLOC HIP %p\n", hip); +//printf ("MALLOC HIP %p\n", hip); memset(hip, 0, HIP_SIZEOF(*hip)); hip->flags = flags; @@ -370,7 +369,7 @@ printf ("MALLOC HIP %p\n", hip); if (hip->mux_id <= -1) { timer_delete(tid); -printf("FREEING HIP %p\n", hip); +//printf("FREEING HIP %p\n", hip); free(hip); return HIP_NULL; } @@ -441,7 +440,7 @@ void hip_close(hip_t* hip) if (hip->uctx_mux) hip_uctx_close(hip->uctx_mux); if (hip->uctx_sched) hip_uctx_close(hip->uctx_sched); -printf("FREEING HIP2 %p\n", hip); +//printf("FREEING HIP2 %p\n", hip); free(hip); } @@ -520,13 +519,6 @@ hip->uctx_sched->csi++; void hip_yield(hip_t* hip) { hip_switch(hip, &hip->runnables); -#if 0 - hip_uctx_t* caller; - /* the caller must deal with hip->running before calling this function */ - caller = UCTX_FROM_LINK(hip->running); - hip->running = HIP_NULL; - swapcontext(&caller->uc, &hip->uctx_sched->uc); /* switch to the scheduler */ -#endif } void hip_suspend(hip_t* hip) @@ -738,12 +730,13 @@ printf (">> UF1 SEC [%d]\n", i); hip_sleep(uc->hip, 1000000000); } +for (i = 0; i < 10; i++) { int n; char buf[32]; printf (">> UF1 ALMOST DONE. WAITING FOR UF3\n"); n = hip_chan_recv(chan, buf, sizeof(buf)); - printf (">> UF1 RECEIVED [%.*s]\n", (int)i, buf); + printf (">> UF1 RECEIVED [%.*s]\n", (int)n, buf); n = hip_chan_send(chan, "holy cow", 8); printf (">> UF1 SENT [%d]\n", (int)n); } @@ -774,7 +767,7 @@ static void uf3(hip_uctx_t* uc, void* ctx) hip = uc->hip; chan = (hip_chan_t*)ctx; -#if 0 +#if 1 memset(&sa, 0, HIP_SIZEOF(sa)); sa.sin_family = AF_INET; sa.sin_addr.s_addr = inet_addr("142.250.206.196"); @@ -813,10 +806,11 @@ hip_sleep(uc->hip, 1000000000); printf (">> UF3 SEC [%d]\n", i); //hip_yield(uc->hip); hip_sleep(uc->hip, 1000000000); - if (i == 8) hip_newrtn(uc->hip, HIP_RTN_FLAG_AUTO_DESTROY, uf2, HIP_NULL); + if (i == 8) hip_newrtn(uc->hip, 0, uf2, HIP_NULL); } printf (">> UF3 DONE DONE\n"); +for (i = 0; i < 10; i++) { int n; char buf[64]; @@ -834,19 +828,20 @@ int main() setbuf(stdout, NULL); hip = hip_open(HIP_FLAG_LAZY); - chan = hip_chan_open(hip, 100); + chan = hip_chan_open(hip, 100, 0); - hip_newrtn(hip, HIP_RTN_FLAG_AUTO_DESTROY, uf1, chan); - hip_newrtn(hip, HIP_RTN_FLAG_AUTO_DESTROY, uf2, HIP_NULL); - hip_newrtn(hip, HIP_RTN_FLAG_AUTO_DESTROY, uf3, chan); + hip_newrtn(hip, 0, uf1, chan); + hip_newrtn(hip, 0, uf2, HIP_NULL); + hip_newrtn(hip, 0, uf3, chan); /* jump to the scheduler */ hip_schedule(hip, 0); -printf("XXXXXXXXXXXXXXXXX ABOUT TO CLOSE ********************\n"); +printf("END OF SCHEDULE\n"); hip_chan_close(chan); /* TODO: close all uctxes ? */ hip_close(hip); +printf("END OF APP\n"); return 0; diff --git a/hip.h b/hip.h index f55b648..7774fda 100644 --- a/hip.h +++ b/hip.h @@ -3,6 +3,8 @@ #include +#define HIP_DEBUG + /* TODO: define these properly */ #define HIP_LIKELY(x) x #define HIP_UNLIKELY(x) x @@ -65,7 +67,7 @@ typedef enum hip_flag_t hip_flag_t; enum hip_rtn_flag_t { - HIP_RTN_FLAG_AUTO_DESTROY = (1 << 0) + HIP_RTN_FLAG_DUMMY = (1 << 0) }; typedef enum hip_rtn_flag_t hip_rtn_flag_t; @@ -83,10 +85,19 @@ typedef enum hip_rtn_flag_t hip_rtn_flag_t; (_p)->next = _link; \ } while(0) +#if defined(HIP_DEBUG) +#define HIP_LIST_UNCHAIN(_link) do { \ + (_link)->next->prev = (_link)->prev; \ + (_link)->prev->next = (_link)->next; \ + (_link)->next = HIP_NULL; \ + (_link)->prev = HIP_NULL; \ +} while(0) +#else #define HIP_LIST_UNCHAIN(_link) do { \ (_link)->next->prev = (_link)->prev; \ (_link)->prev->next = (_link)->next; \ } while(0) +#endif #define HIP_LIST_HEAD(ll) ((ll)->next) #define HIP_LIST_TAIL(ll) ((ll)->prev) @@ -118,7 +129,7 @@ void hip_switch(hip_t* hip, hip_uctx_link_t* list_to_deposit); void hip_suspend(hip_t* hip); -hip_chan_t* hip_chan_open(hip_t* hip, hip_oow_t buf_size); +hip_chan_t* hip_chan_open(hip_t* hip, hip_oow_t unit_size, hip_oow_t unit_capa); void hip_chan_close(hip_chan_t* c); int hip_chan_send(hip_chan_t* c, const void* ptr, hip_oow_t len); int hip_chan_recv(hip_chan_t* c, void* ptr, hip_oow_t len);