From a8eec75ce32b868203ad86b07a5cce0cbdac15d7 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Sat, 3 May 2025 13:37:50 +0900 Subject: [PATCH] adding code for io waiting --- ctx.c | 224 +++++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 198 insertions(+), 26 deletions(-) diff --git a/ctx.c b/ctx.c index 15583ee..765bd39 100644 --- a/ctx.c +++ b/ctx.c @@ -5,14 +5,19 @@ #include #include #include +#include #include +#include +#include +#include #if defined(__NetBSD__) #include #include #else #include +#include #include #endif @@ -38,6 +43,13 @@ typedef struct hip_uctx_t hip_uctx_t; typedef struct hip_uctx_link_t hip_uctx_link_t; typedef void (*hip_ufun_t) (hip_uctx_t* uc, void* ctx); +enum hip_io_flag_t +{ + HIP_IO_READ = (1 << 0), + HIP_IO_WRITE = (1 << 1) +}; +typedef enum hip_io_flag_t hip_io_flag_t; + struct hip_uctx_link_t { hip_uctx_link_t* next; @@ -93,7 +105,8 @@ struct hip_t hip_uctx_link_t runnables; hip_uctx_link_t terminated; - hip_uctx_link_t pending; + hip_uctx_link_t sleeping; + hip_uctx_link_t io_waiting; hip_uctx_link_t* running; @@ -118,7 +131,7 @@ printf ("invoke_uf XXXXXXXXXXXXXXXXXXXXX...%p\n", uctx); if (uctx == hip->uctx_sched) /* if not the scheduler itself */ { /* back to where the schedule got activated for the first time */ - setcontext(&hip->uc_main); + setcontext(&hip->uc_main); } else if (uctx == hip->uctx_mux) { @@ -132,7 +145,7 @@ printf ("invoke_uf XXXXXXXXXXXXXXXXXXXXX...%p\n", uctx); /* TODO: if auto-destroy is on, delete it */ //if (uctx->flags & HIP_UCTX_FLAG_AUTO_DESTROY) hip_freertn(hip, uctx); - //else + //else ADD_BACK(&uctx->uctx, &hip->terminated); printf ("ADDED TO TERMINATED\n"); @@ -223,14 +236,14 @@ static void schedule(hip_uctx_t* uctx, void *ctx) { hip_t* hip; hip = uctx->hip; - while(1) + while(1) { hip_uctx_link_t* h; hip_uctx_t* u; if (IS_EMPTY(&hip->runnables)) { -/* TODO: check if there are pending ... */ +/* TODO: check if there are sleeping ... */ printf (" NO TASK\n"); swapcontext(&hip->uctx_sched->uc, &hip->uctx_mux->uc); if (IS_EMPTY(&hip->runnables)) break; /* no task? */ @@ -259,7 +272,7 @@ static void multiplex(hip_uctx_t* uctx, void *ctx) hip = uctx->hip; while (1) { - if (IS_EMPTY(&hip->pending)) + if (IS_EMPTY(&hip->sleeping)) { // STILL NEED TO HANDLE IO FILE DESCRIPTORS... /* go back to the scheduler */ @@ -271,9 +284,10 @@ static void multiplex(hip_uctx_t* uctx, void *ctx) hip_nsecdur_t wait; struct timespec tmout; struct epoll_event ee[100]; /* TODO: hold it in hip_t struct... sizing must be handled in the main struct.. */ + int n; hip_uctx_t* u; - l = HEAD(&hip->pending); + l = HEAD(&hip->sleeping); u = UCTX_FROM_LINK(l); now = monotime(); wait = now >= u->wakeup_time? 0: u->wakeup_time - now; @@ -282,13 +296,37 @@ printf ("WAITING %llu\n", (unsigned long long)wait); // TODO: support different io multiplexers... tmout.tv_sec = wait / 1000000000; tmout.tv_nsec = wait % 1000000000; - epoll_pwait2(hip->mux_id, ee, 100, &tmout, HIP_NULL); - -// TODO: check other io file descriptors... - now = monotime(); - while (!IS_EMPTY(&hip->pending)) +/* TODO: + * lazy removal of unneeded context here??? + * epoll_ctl(DELETE here + */ + n = epoll_pwait2(hip->mux_id, ee, 100, &tmout, HIP_NULL); + if (n <= -1) { - l = HEAD(&hip->pending); + /* TODO: some error handling action is required */ + /* scheduler panic? */ + } + + if (n > 0) + { + do + { + l = ee[--n].data.ptr; +printf ("PWAIT link MULTIPLEX %p\n", l); + /*u = UCTX_FROM_LINK(l);*/ + + UNCHAIN(l); + ADD_BACK(l, &hip->runnables); + + //epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, + } + while(n > 0); + } + + now = monotime(); + while (!IS_EMPTY(&hip->sleeping)) + { + l = HEAD(&hip->sleeping); u = UCTX_FROM_LINK(l); if (now < u->wakeup_time) break; @@ -300,7 +338,7 @@ printf ("WAITING %llu\n", (unsigned long long)wait); if (!IS_EMPTY(&hip->runnables)) swapcontext(&hip->uctx_mux->uc, &hip->uctx_sched->uc); } - } + } } hip_t* hip_open(void) @@ -322,8 +360,10 @@ hip_t* hip_open(void) hip->runnables.prev = &hip->runnables; hip->terminated.next = &hip->terminated; hip->terminated.prev = &hip->terminated; - hip->pending.next = &hip->pending; - hip->pending.prev = &hip->pending; + hip->sleeping.next = &hip->sleeping; + hip->sleeping.prev = &hip->sleeping; + hip->io_waiting.next = &hip->io_waiting; + hip->io_waiting.prev = &hip->io_waiting; sigemptyset(&sm); sigaddset(&sm, SIGRTMIN); @@ -344,7 +384,7 @@ hip_t* hip_open(void) se.sigev_notify = SIGEV_SIGNAL; se.sigev_signo = SIGRTMIN; /*se.sigev_value.sival_ptr = hip; TOOD: possible to use sigev_notify_function and use this field? - * what about sigwaitinfo() or sigtimedwait() in a dedicated thread? + * what about sigwaitinfo() or sigtimedwait() in a dedicated thread? * */ timer_create(CLOCK_MONOTONIC, &se, &tid); hip->tmr_id = tid; @@ -417,7 +457,7 @@ int hip_schedule(hip_t* hip, int preempt) if (!hip->uctx_sched) return -1; hip->uctx_mux = hip_uctx_open(hip, 4096, multiplex, hip); - if (!hip->uctx_mux) + if (!hip->uctx_mux) { hip_uctx_close (hip->uctx_sched); hip->uctx_sched = HIP_NULL; @@ -434,7 +474,7 @@ int hip_schedule(hip_t* hip, int preempt) //its.it_interval.tv_nsec = 10000000; /* 10 milliseconds */ its.it_interval.tv_nsec = 100000000; /* 100 milliseconds */ its.it_value = its.it_interval; - timer_settime(hip->tmr_id, 0, &its, NULL); + timer_settime(hip->tmr_id, 0, &its, NULL); } /* jump to the scheduler */ @@ -448,7 +488,7 @@ int hip_schedule(hip_t* hip, int preempt) its.it_interval.tv_sec = 0; its.it_interval.tv_nsec = 0; its.it_value = its.it_interval; - timer_settime(hip->tmr_id, 0, &its, NULL); + timer_settime(hip->tmr_id, 0, &its, NULL); } return 0; @@ -478,15 +518,15 @@ void hip_sleep(hip_t* hip, hip_nsecdur_t nsecs) rr->wakeup_time = monotime() + nsecs; /* TODO: switch to HEAP. for now simply linear search to keep this list sorted. */ - if (IS_EMPTY(&hip->pending)) + if (IS_EMPTY(&hip->sleeping)) { - ADD_BACK(hip->running, &hip->pending); + ADD_BACK(hip->running, &hip->sleeping); } else { hip_uctx_link_t* l; hip_uctx_t* r; - for (l = HEAD(&hip->pending); l != &hip->pending; l = l->next) + for (l = HEAD(&hip->sleeping); l != &hip->sleeping; l = l->next) { r = UCTX_FROM_LINK(l); if (rr->wakeup_time <= r->wakeup_time) @@ -495,7 +535,7 @@ void hip_sleep(hip_t* hip, hip_nsecdur_t nsecs) goto do_sched; } } - ADD_BACK(hip->running, &hip->pending); + ADD_BACK(hip->running, &hip->sleeping); } do_sched: @@ -503,6 +543,123 @@ do_sched: swapcontext(&rr->uc, &hip->uctx_sched->uc); } +void hip_awaitio(hip_t* hip, int fd, int flags) +{ + struct epoll_event ev; + hip_uctx_t* rr; + + memset(&ev, 0, sizeof(ev)); + if (flags & HIP_IO_READ) ev.events |= EPOLLIN; + if (flags & HIP_IO_WRITE) ev.events |= EPOLLOUT; + ev.data.ptr = hip->running; /* store running context link */ +printf ("PWAIT link ADD %p\n", ev.data.ptr); + epoll_ctl(hip->mux_id, EPOLL_CTL_ADD, fd, &ev); + /* TODO: error handling - probably panic? */ + + rr = UCTX_FROM_LINK(hip->running); + ADD_BACK(hip->running, &hip->io_waiting); + hip->running = HIP_NULL; + swapcontext(&rr->uc, &hip->uctx_sched->uc); +} + +/* ---------------------------------------------------- */ +static int fd_set_nonblock(int fd) +{ + int flags; + flags = fcntl(fd, F_GETFL); + if (flags <= -1) return -1; + return fcntl(fd, F_SETFL, flags | O_NONBLOCK); +} + +static int fd_set_cloexec(int fd) +{ + int flags; + flags = fcntl(fd, F_GETFD); + if (flags <= -1) return -1; + return fcntl(fd, F_SETFD, flags | FD_CLOEXEC); +} + +static int socket_connect(hip_t* hip, int proto, const struct sockaddr* addr, socklen_t addrlen) /* TODO: timeout? */ +{ + int fd; + int x; + + x = SOCK_STREAM; +#if defined(SOCK_CLOEXEC) + x |= SOCK_CLOEXEC; +#endif +#if defined(SOCK_NONBLOCK) + x |= SOCK_NONBLOCK; +#endif + + fd = socket(addr->sa_family, x, 0); /* TODO make SOCK_STREAM selectable */ + if (fd <= -1) return -1; + +#if !defined(SOCK_CLOEXEC) + if (fd_set_cloexec(fd) <= -1) + { + close(fd); + return -1; + } +#endif +#if !defined(SOCK_NONBLOCK) + if (fd_set_nonblock(fd) <= -1) + { + close(fd); + return -1; + } +#endif + + x = connect(fd, addr, addrlen); + if (x <= -1) + { + socklen_t sl; + int ss; + + if (errno != EINPROGRESS) /* TODO: cater for EINTR? */ + { + close(fd); + return -1; + } + + hip_awaitio(hip, fd, HIP_IO_WRITE); + + sl = sizeof(ss); + if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*)&ss, &sl) <= -1) + { + close(fd); + return -1; + } + + if (ss != 0) + { + close(fd); + errno = ss; + return -1; + } + } + + return fd; +} + +static void socket_close(hip_t* hip, int fd) +{ + epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, fd, HIP_NULL); /* NOTE: this is not be necessary */ + close(fd); +} + +static hip_ooi_t socket_read(hip_t* hip, int fd, void* buf, hip_oow_t len) +{ + hip_awaitio(hip, fd, HIP_IO_READ); + return read(fd, buf, len); +} + +static hip_ooi_t socket_write(hip_t* hip, int fd, const void* buf, hip_oow_t len) +{ + hip_awaitio(hip, fd, HIP_IO_WRITE); + return write(fd, buf, len); +} + /* ---------------------------------------------------- */ static void uf1(hip_uctx_t* uc, void* ctx) @@ -512,6 +669,8 @@ for (i =0; i < 5; i++) { printf ("************************** uf1 \n"); //hip_yield(uc->hip); + + hip_sleep(uc->hip, 500000000); printf ("************************* uf1 1111\n"); //hip_yield(uc->hip); @@ -535,8 +694,21 @@ hip_sleep(uc->hip, 500000000); static void uf3(hip_uctx_t* uc, void* ctx) { - int i; - for (i =0; i < 15; i++) + hip_t* hip; + struct sockaddr_in sa; + int i, fd; + + hip = uc->hip; + + memset(&sa, 0, sizeof(sa)); + sa.sin_family = AF_INET; + sa.sin_addr.s_addr = inet_addr("142.250.206.196"); + sa.sin_port = htons(80); + fd = socket_connect(hip, SOCK_STREAM, (struct sockaddr*)&sa, sizeof(sa)); +printf ("*********CONNECTED>>>>>>>>>>>>>>>\n"); + socket_close(hip, fd); + + for (i = 0; i < 15; i++) { printf ("************************** uf3 TOP\n"); //hip_yield(uc->hip);