adding code for io waiting

This commit is contained in:
hyung-hwan 2025-05-03 13:37:50 +09:00
parent 19d2e97577
commit a8eec75ce3

206
ctx.c
View File

@ -5,14 +5,19 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h>
#include <assert.h> #include <assert.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#if defined(__NetBSD__) #if defined(__NetBSD__)
#include <sys/event.h> #include <sys/event.h>
#include <fcntl.h> #include <fcntl.h>
#else #else
#include <sys/epoll.h> #include <sys/epoll.h>
#include <fcntl.h>
#include <valgrind/valgrind.h> #include <valgrind/valgrind.h>
#endif #endif
@ -38,6 +43,13 @@ typedef struct hip_uctx_t hip_uctx_t;
typedef struct hip_uctx_link_t hip_uctx_link_t; typedef struct hip_uctx_link_t hip_uctx_link_t;
typedef void (*hip_ufun_t) (hip_uctx_t* uc, void* ctx); typedef void (*hip_ufun_t) (hip_uctx_t* uc, void* ctx);
enum hip_io_flag_t
{
HIP_IO_READ = (1 << 0),
HIP_IO_WRITE = (1 << 1)
};
typedef enum hip_io_flag_t hip_io_flag_t;
struct hip_uctx_link_t struct hip_uctx_link_t
{ {
hip_uctx_link_t* next; hip_uctx_link_t* next;
@ -93,7 +105,8 @@ struct hip_t
hip_uctx_link_t runnables; hip_uctx_link_t runnables;
hip_uctx_link_t terminated; hip_uctx_link_t terminated;
hip_uctx_link_t pending; hip_uctx_link_t sleeping;
hip_uctx_link_t io_waiting;
hip_uctx_link_t* running; hip_uctx_link_t* running;
@ -230,7 +243,7 @@ static void schedule(hip_uctx_t* uctx, void *ctx)
if (IS_EMPTY(&hip->runnables)) if (IS_EMPTY(&hip->runnables))
{ {
/* TODO: check if there are pending ... */ /* TODO: check if there are sleeping ... */
printf ("<scheduler> NO TASK\n"); printf ("<scheduler> NO TASK\n");
swapcontext(&hip->uctx_sched->uc, &hip->uctx_mux->uc); swapcontext(&hip->uctx_sched->uc, &hip->uctx_mux->uc);
if (IS_EMPTY(&hip->runnables)) break; /* no task? */ if (IS_EMPTY(&hip->runnables)) break; /* no task? */
@ -259,7 +272,7 @@ static void multiplex(hip_uctx_t* uctx, void *ctx)
hip = uctx->hip; hip = uctx->hip;
while (1) while (1)
{ {
if (IS_EMPTY(&hip->pending)) if (IS_EMPTY(&hip->sleeping))
{ {
// STILL NEED TO HANDLE IO FILE DESCRIPTORS... // STILL NEED TO HANDLE IO FILE DESCRIPTORS...
/* go back to the scheduler */ /* go back to the scheduler */
@ -271,9 +284,10 @@ static void multiplex(hip_uctx_t* uctx, void *ctx)
hip_nsecdur_t wait; hip_nsecdur_t wait;
struct timespec tmout; struct timespec tmout;
struct epoll_event ee[100]; /* TODO: hold it in hip_t struct... sizing must be handled in the main struct.. */ struct epoll_event ee[100]; /* TODO: hold it in hip_t struct... sizing must be handled in the main struct.. */
int n;
hip_uctx_t* u; hip_uctx_t* u;
l = HEAD(&hip->pending); l = HEAD(&hip->sleeping);
u = UCTX_FROM_LINK(l); u = UCTX_FROM_LINK(l);
now = monotime(); now = monotime();
wait = now >= u->wakeup_time? 0: u->wakeup_time - now; wait = now >= u->wakeup_time? 0: u->wakeup_time - now;
@ -282,13 +296,37 @@ printf ("WAITING %llu\n", (unsigned long long)wait);
// TODO: support different io multiplexers... // TODO: support different io multiplexers...
tmout.tv_sec = wait / 1000000000; tmout.tv_sec = wait / 1000000000;
tmout.tv_nsec = wait % 1000000000; tmout.tv_nsec = wait % 1000000000;
epoll_pwait2(hip->mux_id, ee, 100, &tmout, HIP_NULL); /* TODO:
* lazy removal of unneeded context here???
// TODO: check other io file descriptors... * epoll_ctl(DELETE here
now = monotime(); */
while (!IS_EMPTY(&hip->pending)) n = epoll_pwait2(hip->mux_id, ee, 100, &tmout, HIP_NULL);
if (n <= -1)
{ {
l = HEAD(&hip->pending); /* TODO: some error handling action is required */
/* scheduler panic? */
}
if (n > 0)
{
do
{
l = ee[--n].data.ptr;
printf ("PWAIT link MULTIPLEX %p\n", l);
/*u = UCTX_FROM_LINK(l);*/
UNCHAIN(l);
ADD_BACK(l, &hip->runnables);
//epoll_ctl(hip->mux_id, EPOLL_CTL_DEL,
}
while(n > 0);
}
now = monotime();
while (!IS_EMPTY(&hip->sleeping))
{
l = HEAD(&hip->sleeping);
u = UCTX_FROM_LINK(l); u = UCTX_FROM_LINK(l);
if (now < u->wakeup_time) break; if (now < u->wakeup_time) break;
@ -322,8 +360,10 @@ hip_t* hip_open(void)
hip->runnables.prev = &hip->runnables; hip->runnables.prev = &hip->runnables;
hip->terminated.next = &hip->terminated; hip->terminated.next = &hip->terminated;
hip->terminated.prev = &hip->terminated; hip->terminated.prev = &hip->terminated;
hip->pending.next = &hip->pending; hip->sleeping.next = &hip->sleeping;
hip->pending.prev = &hip->pending; hip->sleeping.prev = &hip->sleeping;
hip->io_waiting.next = &hip->io_waiting;
hip->io_waiting.prev = &hip->io_waiting;
sigemptyset(&sm); sigemptyset(&sm);
sigaddset(&sm, SIGRTMIN); sigaddset(&sm, SIGRTMIN);
@ -478,15 +518,15 @@ void hip_sleep(hip_t* hip, hip_nsecdur_t nsecs)
rr->wakeup_time = monotime() + nsecs; rr->wakeup_time = monotime() + nsecs;
/* TODO: switch to HEAP. for now simply linear search to keep this list sorted. */ /* TODO: switch to HEAP. for now simply linear search to keep this list sorted. */
if (IS_EMPTY(&hip->pending)) if (IS_EMPTY(&hip->sleeping))
{ {
ADD_BACK(hip->running, &hip->pending); ADD_BACK(hip->running, &hip->sleeping);
} }
else else
{ {
hip_uctx_link_t* l; hip_uctx_link_t* l;
hip_uctx_t* r; hip_uctx_t* r;
for (l = HEAD(&hip->pending); l != &hip->pending; l = l->next) for (l = HEAD(&hip->sleeping); l != &hip->sleeping; l = l->next)
{ {
r = UCTX_FROM_LINK(l); r = UCTX_FROM_LINK(l);
if (rr->wakeup_time <= r->wakeup_time) if (rr->wakeup_time <= r->wakeup_time)
@ -495,7 +535,7 @@ void hip_sleep(hip_t* hip, hip_nsecdur_t nsecs)
goto do_sched; goto do_sched;
} }
} }
ADD_BACK(hip->running, &hip->pending); ADD_BACK(hip->running, &hip->sleeping);
} }
do_sched: do_sched:
@ -503,6 +543,123 @@ do_sched:
swapcontext(&rr->uc, &hip->uctx_sched->uc); swapcontext(&rr->uc, &hip->uctx_sched->uc);
} }
void hip_awaitio(hip_t* hip, int fd, int flags)
{
struct epoll_event ev;
hip_uctx_t* rr;
memset(&ev, 0, sizeof(ev));
if (flags & HIP_IO_READ) ev.events |= EPOLLIN;
if (flags & HIP_IO_WRITE) ev.events |= EPOLLOUT;
ev.data.ptr = hip->running; /* store running context link */
printf ("PWAIT link ADD %p\n", ev.data.ptr);
epoll_ctl(hip->mux_id, EPOLL_CTL_ADD, fd, &ev);
/* TODO: error handling - probably panic? */
rr = UCTX_FROM_LINK(hip->running);
ADD_BACK(hip->running, &hip->io_waiting);
hip->running = HIP_NULL;
swapcontext(&rr->uc, &hip->uctx_sched->uc);
}
/* ---------------------------------------------------- */
static int fd_set_nonblock(int fd)
{
int flags;
flags = fcntl(fd, F_GETFL);
if (flags <= -1) return -1;
return fcntl(fd, F_SETFL, flags | O_NONBLOCK);
}
static int fd_set_cloexec(int fd)
{
int flags;
flags = fcntl(fd, F_GETFD);
if (flags <= -1) return -1;
return fcntl(fd, F_SETFD, flags | FD_CLOEXEC);
}
static int socket_connect(hip_t* hip, int proto, const struct sockaddr* addr, socklen_t addrlen) /* TODO: timeout? */
{
int fd;
int x;
x = SOCK_STREAM;
#if defined(SOCK_CLOEXEC)
x |= SOCK_CLOEXEC;
#endif
#if defined(SOCK_NONBLOCK)
x |= SOCK_NONBLOCK;
#endif
fd = socket(addr->sa_family, x, 0); /* TODO make SOCK_STREAM selectable */
if (fd <= -1) return -1;
#if !defined(SOCK_CLOEXEC)
if (fd_set_cloexec(fd) <= -1)
{
close(fd);
return -1;
}
#endif
#if !defined(SOCK_NONBLOCK)
if (fd_set_nonblock(fd) <= -1)
{
close(fd);
return -1;
}
#endif
x = connect(fd, addr, addrlen);
if (x <= -1)
{
socklen_t sl;
int ss;
if (errno != EINPROGRESS) /* TODO: cater for EINTR? */
{
close(fd);
return -1;
}
hip_awaitio(hip, fd, HIP_IO_WRITE);
sl = sizeof(ss);
if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*)&ss, &sl) <= -1)
{
close(fd);
return -1;
}
if (ss != 0)
{
close(fd);
errno = ss;
return -1;
}
}
return fd;
}
static void socket_close(hip_t* hip, int fd)
{
epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, fd, HIP_NULL); /* NOTE: this is not be necessary */
close(fd);
}
static hip_ooi_t socket_read(hip_t* hip, int fd, void* buf, hip_oow_t len)
{
hip_awaitio(hip, fd, HIP_IO_READ);
return read(fd, buf, len);
}
static hip_ooi_t socket_write(hip_t* hip, int fd, const void* buf, hip_oow_t len)
{
hip_awaitio(hip, fd, HIP_IO_WRITE);
return write(fd, buf, len);
}
/* ---------------------------------------------------- */ /* ---------------------------------------------------- */
static void uf1(hip_uctx_t* uc, void* ctx) static void uf1(hip_uctx_t* uc, void* ctx)
@ -512,6 +669,8 @@ for (i =0; i < 5; i++)
{ {
printf ("************************** uf1 \n"); printf ("************************** uf1 \n");
//hip_yield(uc->hip); //hip_yield(uc->hip);
hip_sleep(uc->hip, 500000000); hip_sleep(uc->hip, 500000000);
printf ("************************* uf1 1111\n"); printf ("************************* uf1 1111\n");
//hip_yield(uc->hip); //hip_yield(uc->hip);
@ -535,7 +694,20 @@ hip_sleep(uc->hip, 500000000);
static void uf3(hip_uctx_t* uc, void* ctx) static void uf3(hip_uctx_t* uc, void* ctx)
{ {
int i; hip_t* hip;
struct sockaddr_in sa;
int i, fd;
hip = uc->hip;
memset(&sa, 0, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_addr.s_addr = inet_addr("142.250.206.196");
sa.sin_port = htons(80);
fd = socket_connect(hip, SOCK_STREAM, (struct sockaddr*)&sa, sizeof(sa));
printf ("*********CONNECTED>>>>>>>>>>>>>>>\n");
socket_close(hip, fd);
for (i = 0; i < 15; i++) for (i = 0; i < 15; i++)
{ {
printf ("************************** uf3 TOP\n"); printf ("************************** uf3 TOP\n");