diff --git a/ctx.c b/ctx.c index 2bfd111..f7e9800 100644 --- a/ctx.c +++ b/ctx.c @@ -37,6 +37,7 @@ typedef hip_int64_t hip_ooi_t; typedef hip_uint64_t hip_nsecdur_t; #define HIP_NULL ((void*)0) +#define HIP_INVALID_FD (-1) typedef struct hip_t hip_t; typedef struct hip_uctx_t hip_uctx_t; @@ -70,6 +71,7 @@ struct hip_uctx_t unsigned int stid; hip_uctx_link_t uctx; + hip_uctx_link_t io_done; }; #define HIP_OFFSETOF(type, field) ((hip_oow_t)&(((type*)0)->field)) @@ -98,8 +100,16 @@ struct hip_uctx_t #define UCTX_FROM_LINK(_link) HIP_CONTAINEROF(_link, hip_uctx_t, uctx) +enum hip_flag_t +{ + HIP_FLAG_LAZY = (1 << 0) +}; +typedef enum hip_flag_t hip_flag_t; + struct hip_t { + int flags; + struct sigaction oldsa; timer_t tmr_id; int mux_id; @@ -108,6 +118,7 @@ struct hip_t hip_uctx_link_t terminated; hip_uctx_link_t sleeping; hip_uctx_link_t io_waiting; + hip_uctx_link_t io_done; hip_uctx_link_t* running; @@ -172,6 +183,7 @@ hip_uctx_t* hip_uctx_open(hip_t* hip, hip_oow_t stack_size, hip_ufun_t uf, void* uc->hip = hip; uc->uf = uf; uc->ctx = ctx; + uc->waiting_fd = HIP_INVALID_FD; getcontext(&uc->uc); @@ -273,6 +285,7 @@ static void multiplex(hip_uctx_t* uctx, void *ctx) hip = uctx->hip; while (1) { +printf ("sleeping empty[%d] io_waiting empty[%d]\n", IS_EMPTY(&hip->sleeping), IS_EMPTY(&hip->io_waiting)); if (IS_EMPTY(&hip->sleeping) && IS_EMPTY(&hip->io_waiting)) { // STILL NEED TO HANDLE IO FILE DESCRIPTORS... @@ -298,6 +311,14 @@ printf ("NO SLEEPING. BACK TO SCEDULER\n"); * lazy removal of unneeded context here??? * epoll_ctl(DELETE here */ + // lazy deletion of file descriptors + while (!IS_EMPTY(&hip->io_done)) + { + l = HEAD(&hip->io_done); + u = UCTX_FROM_LINK(l); + UNCHAIN(l); + epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, u->waiting_fd, HIP_NULL); + } printf ("WAITING %llu\n", (unsigned long long)wait); /* TODO: support different io multiplexers...*/ @@ -315,6 +336,7 @@ printf ("WAITING %llu\n", (unsigned long long)wait); */ n = epoll_pwait(hip->mux_id, ee, 100, (wait + 1000000 - 1) / 1000000, HIP_NULL); #endif +printf ("Epoll returned [%d]\n", (int)n); if (n <= -1) { /* TODO: some error handling action is required */ @@ -328,12 +350,27 @@ printf ("WAITING %llu\n", (unsigned long long)wait); l = ee[--n].data.ptr; u = UCTX_FROM_LINK(l); - UNCHAIN(l); + UNCHAIN(l); /* unchain it from io_waiting */ ADD_BACK(l, &hip->runnables); -printf ("PWAIT link MULTIPLEX DEL %p fd[%d]\n", l, u->waiting_fd); + if (hip->flags & HIP_FLAG_LAZY) + { + /* arrange to remove the file descriptor from + * the multiplexer just before actual waiting. + * add this to the io_done list for clean-up + * before waiting and keep waiting_fd untouched + * for adjustment later. */ + ADD_BACK(&u->io_done, &hip->io_done); + } + else + { + +printf ("PWAIT link MULTIPLEX DEL %p fd[%d] io_waiting empty[%d]\n", l, u->waiting_fd, IS_EMPTY(&hip->io_waiting)); + + epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, u->waiting_fd, HIP_NULL); + u->waiting_fd = HIP_INVALID_FD; + } -epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, u->waiting_fd, HIP_NULL); } while(n > 0); } @@ -359,7 +396,7 @@ printf ("BACK TO SCHEDULER \n"); } } -hip_t* hip_open(void) +hip_t* hip_open(int flags) { hip_t* hip; struct sigaction sa; @@ -372,6 +409,7 @@ hip_t* hip_open(void) if (!hip) return HIP_NULL; memset(hip, 0, sizeof(*hip)); + hip->flags = flags; /* initialize to an empty list by making each pointer point to itself.*/ hip->runnables.next = &hip->runnables; @@ -382,6 +420,8 @@ hip_t* hip_open(void) hip->sleeping.prev = &hip->sleeping; hip->io_waiting.next = &hip->io_waiting; hip->io_waiting.prev = &hip->io_waiting; + hip->io_done.next = &hip->io_done; + hip->io_done.prev = &hip->io_done; sigemptyset(&sm); sigaddset(&sm, SIGRTMIN); @@ -471,10 +511,10 @@ hip_uctx_t* hip_newrtn(hip_t* hip, hip_ufun_t uf, void* ctx) int hip_schedule(hip_t* hip, int preempt) { - hip->uctx_sched = hip_uctx_open(hip, 4096, schedule, hip); + hip->uctx_sched = hip_uctx_open(hip, 40960, schedule, hip); if (!hip->uctx_sched) return -1; - hip->uctx_mux = hip_uctx_open(hip, 4096, multiplex, hip); + hip->uctx_mux = hip_uctx_open(hip, 40960, multiplex, hip); if (!hip->uctx_mux) { hip_uctx_close (hip->uctx_sched); @@ -566,18 +606,51 @@ void hip_awaitio(hip_t* hip, int fd, int flags) struct epoll_event ev; hip_uctx_t* rr; - memset(&ev, 0, sizeof(ev)); - if (flags & HIP_IO_READ) ev.events |= EPOLLIN; - if (flags & HIP_IO_WRITE) ev.events |= EPOLLOUT; - ev.data.ptr = hip->running; /* store running context link */ -printf ("PWAIT link ADD %p fd[%d]\n", ev.data.ptr, fd); - epoll_ctl(hip->mux_id, EPOLL_CTL_ADD, fd, &ev); - /* TODO: error handling - probably panic? */ - +/* if waiting on multiple items are supported, the following code requires a whole rewrite */ rr = UCTX_FROM_LINK(hip->running); + + if (rr->waiting_fd != HIP_INVALID_FD) + { + UNCHAIN(&rr->io_done); + + if (rr->waiting_fd == fd) + { + /* update */ + memset(&ev, 0, sizeof(ev)); + if (flags & HIP_IO_READ) ev.events |= EPOLLIN; + if (flags & HIP_IO_WRITE) ev.events |= EPOLLOUT; + + /* - the event mask could have chagned. + * - the waiting context could have changed. */ + /* TODO: more optimization based on above */ + ev.data.ptr = hip->running; /* store running context link */ +printf ("PWAIT link UPDATE %p fd[%d]\n", ev.data.ptr, fd); + epoll_ctl(hip->mux_id, EPOLL_CTL_MOD, fd, &ev); + } + else + { + /* delete and add */ +printf ("PWAIT link DELETE fd[%d]\n", fd); + epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, fd, HIP_NULL); + goto add; + } + } + else + { + add: + memset(&ev, 0, sizeof(ev)); + if (flags & HIP_IO_READ) ev.events |= EPOLLIN; + if (flags & HIP_IO_WRITE) ev.events |= EPOLLOUT; + ev.data.ptr = hip->running; /* store running context link */ +printf ("PWAIT link ADD %p fd[%d]\n", ev.data.ptr, fd); + epoll_ctl(hip->mux_id, EPOLL_CTL_ADD, fd, &ev); + /* TODO: error handling - probably panic? */ + } + ADD_BACK(hip->running, &hip->io_waiting); hip->running = HIP_NULL; rr->waiting_fd = fd; /* remember the file descriptor being waited on */ + swapcontext(&rr->uc, &hip->uctx_sched->uc); } @@ -663,6 +736,7 @@ static int socket_connect(hip_t* hip, int proto, const struct sockaddr* addr, so static void socket_close(hip_t* hip, int fd) { +/* TODO: DEL only if fd is still in the multiplexer? */ epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, fd, HIP_NULL); /* NOTE: this is not be necessary */ close(fd); } @@ -725,7 +799,28 @@ static void uf3(hip_uctx_t* uc, void* ctx) sa.sin_port = htons(80); fd = socket_connect(hip, SOCK_STREAM, (struct sockaddr*)&sa, sizeof(sa)); printf ("*********CONNECTED>>>>>>>>>>>>>>>\n"); + { + int n; + char buf[256]; +// if the server side doesn't close the connection, +// the following loop will never end as this client side doesn't actively +// close the connection after having read the response chunk. it's not http aware... +// use 'Connection: close' for now + + //const char x[] = "GET / HTTP/1.1\r\nHost: www.google.com\r\n\r\n"; + const char x[] = "GET / HTTP/1.1\r\nHost: www.google.com\r\nConnection: close\r\n\r\n"; + + socket_write(hip, fd, x, sizeof(x) - 1); + do + { + n = socket_read(hip, fd, buf, sizeof(buf)); + if (n <= 0) break; + printf("GOT [%.*s]\n", n, buf); + } + while(1); + } socket_close(hip, fd); +printf ("*********DISCONNECTED>>>>>>>>>>>>>>>\n"); for (i = 0; i < 15; i++) { @@ -743,7 +838,7 @@ int main() { hip_t* hip; - hip = hip_open(); + hip = hip_open(HIP_FLAG_LAZY); hip_newrtn(hip, uf1, HIP_NULL); hip_newrtn(hip, uf2, HIP_NULL);