#include "hip-prv.h" #include #include struct hip_chan_t { hip_t* hip; int closed; hip_oow_t unit_size; hip_oow_t unit_capa; hip_oow_t unit_count; hip_oow_t unit_rpos; hip_oow_t unit_wpos; hip_uint8_t* buf; hip_uctx_link_t sendq; // list of senders waiting on this channel hip_uctx_link_t recvq; // list of receivers waiting on this channel }; hip_chan_t* hip_chan_open(hip_t* hip, hip_oow_t unit_size, hip_oow_t unit_capa) { hip_chan_t* c; c = (hip_chan_t*)malloc(HIP_SIZEOF(*c) + (unit_size * unit_capa)); if (HIP_UNLIKELY(!c)) return HIP_NULL; memset(c, 0, HIP_SIZEOF(*c)); c->hip = hip; c->closed = 0; c->unit_size = unit_size; c->unit_capa = unit_capa; c->unit_count = 0; c->unit_rpos = 0; c->unit_wpos = 0; c->buf = (hip_uint8_t*)(c + 1); HIP_LIST_INIT(&c->recvq); HIP_LIST_INIT(&c->sendq); return c; } void hip_chan_close(hip_chan_t* c) { free(c); } int hip_chan_send(hip_chan_t* c, const void* ptr, hip_oow_t len) { hip_t* hip; hip_uctx_t* caller; int ret; hip = c->hip; caller = UCTX_FROM_LINK(hip->running); if (HIP_LIST_IS_EMPTY(&c->recvq)) { hip_uint8_t* dptr; if (c->unit_count >= c->unit_capa) goto block; /* the buffer is not full. write to the buffer */ if (len > c->unit_size) len = c->unit_size; /* can't buffer longer than the specified unit size */ dptr = &c->buf[c->unit_wpos * c->unit_size]; memcpy(dptr, ptr, len); if (len < c->unit_size) { /* zero out the unfilled if the writing length is smaller than the unit size */ memset(dptr + len, 0, c->unit_size - len); } c->unit_wpos = (c->unit_wpos + 1) % c->unit_capa; c->unit_count++; /* give another routine a chance to read. * does this cause too many context switches? * without this, there is a high chance the the same routine * starves the channel by reading and writing in succession * if it read and write on the same channel */ hip_yield(hip); return len; } else /* there is a receiving routing waiting */ { hip_uctx_link_t* x; hip_uctx_t* recver; x = HIP_LIST_HEAD(&c->recvq); recver = UCTX_FROM_LINK(x); if (len > recver->chan_data_len) len = recver->chan_data_len; memcpy(recver->chan_data_ptr, ptr, len); //printf ("USE SEND CHAT CALLER %p RECEIVER %p RET %p\n", caller, recver, recver->chan_ret_ptr); *(recver->chan_ret_ptr) = len; /* set the return value of the receiver */ /* remove the receiving routine from the queue and make it runnable */ HIP_LIST_UNCHAIN(x); HIP_LIST_ADD_BACK(x, &hip->runnables); hip_yield(hip); /* let other routines go first */ return len; } block: /* remember the data holder and places the current running routine in the send queue */ caller->chan_data_ptr = (hip_uint8_t*)ptr; caller->chan_data_len = len; caller->chan_ret_ptr = &ret; /* remember the pointer to the return value holder to be set by the receiver */ //printf ("REMEMBER SEND CHAT caller %p RET %p\n", caller, caller->chan_ret_ptr); hip_switch(hip, &c->sendq); //printf ("CALLER RETURNNING SEND caller %p, ret %p/%p => %d\n", caller, caller->chan_ret_ptr, &ret, ret); return ret; } int hip_chan_recv(hip_chan_t* c, void* ptr, hip_oow_t len) { hip_t* hip; hip_uctx_t* caller; int ret; hip = c->hip; caller = UCTX_FROM_LINK(hip->running); if (HIP_LIST_IS_EMPTY(&c->sendq)) { if (c->unit_count <= 0) goto block; /* the buffer is not empty. there's data to read */ if (len > c->unit_size) len = c->unit_size; memcpy(ptr, &c->buf[c->unit_rpos * c->unit_size], len); c->unit_rpos = (c->unit_rpos + 1) % c->unit_capa; c->unit_count--; /* In receiving, i doubt it causes starvation to not yield to another routine. * let me not switch to the scheduler here for now. */ /*hip_yield(hip);*/ return len; } else /* there is a sending routine */ { hip_uctx_link_t* x; hip_uctx_t* sener; /* remove the sending routine from the queue and make it runnable */ x = HIP_LIST_HEAD(&c->sendq); /* copy the data from the sending routine's buffer */ sener = UCTX_FROM_LINK(x); if (len > sener->chan_data_len) len = sener->chan_data_len; /* TODO: do something else instead of simple truncation */ memcpy(ptr, sener->chan_data_ptr, len); //printf ("USE RECV CHAT caller %p sender %p RET %p\n",caller, sener, sener->chan_ret_ptr); *(sener->chan_ret_ptr) = len; /* set the return value of the sender */ HIP_LIST_UNCHAIN(x); HIP_LIST_ADD_BACK(x, &hip->runnables); hip_yield(hip); /* let other routines go first */ return len; } block: /* remember the data holder and place the calling routine in the receive queue */ caller->chan_data_ptr = ptr; caller->chan_data_len = len; caller->chan_ret_ptr = &ret; /* remember the pointer to the return value holder to be set by the sender */ //printf ("REMEMBER RECV caller %p CHAT RET %p\n", caller, caller->chan_ret_ptr); hip_switch(hip, &c->recvq); /* switch to the scheduler */ //printf ("CALLER RETURNNING RECV caller %p, ret %p/%p => %d\n", caller, caller->chan_ret_ptr, &ret, ret); return ret; } #if 0 int hip_chan_select(hip_chan_t* chans, hip_oow_t count, int nonblock) { /* returns the index to the chan that has data to read */ return -1; } #endif