some code for delayed multiplexer operations
This commit is contained in:
parent
cf7bdd66fd
commit
c2d6ce0f8a
125
ctx.c
125
ctx.c
@ -37,6 +37,7 @@ typedef hip_int64_t hip_ooi_t;
|
|||||||
typedef hip_uint64_t hip_nsecdur_t;
|
typedef hip_uint64_t hip_nsecdur_t;
|
||||||
|
|
||||||
#define HIP_NULL ((void*)0)
|
#define HIP_NULL ((void*)0)
|
||||||
|
#define HIP_INVALID_FD (-1)
|
||||||
|
|
||||||
typedef struct hip_t hip_t;
|
typedef struct hip_t hip_t;
|
||||||
typedef struct hip_uctx_t hip_uctx_t;
|
typedef struct hip_uctx_t hip_uctx_t;
|
||||||
@ -70,6 +71,7 @@ struct hip_uctx_t
|
|||||||
unsigned int stid;
|
unsigned int stid;
|
||||||
|
|
||||||
hip_uctx_link_t uctx;
|
hip_uctx_link_t uctx;
|
||||||
|
hip_uctx_link_t io_done;
|
||||||
};
|
};
|
||||||
|
|
||||||
#define HIP_OFFSETOF(type, field) ((hip_oow_t)&(((type*)0)->field))
|
#define HIP_OFFSETOF(type, field) ((hip_oow_t)&(((type*)0)->field))
|
||||||
@ -98,8 +100,16 @@ struct hip_uctx_t
|
|||||||
|
|
||||||
#define UCTX_FROM_LINK(_link) HIP_CONTAINEROF(_link, hip_uctx_t, uctx)
|
#define UCTX_FROM_LINK(_link) HIP_CONTAINEROF(_link, hip_uctx_t, uctx)
|
||||||
|
|
||||||
|
enum hip_flag_t
|
||||||
|
{
|
||||||
|
HIP_FLAG_LAZY = (1 << 0)
|
||||||
|
};
|
||||||
|
typedef enum hip_flag_t hip_flag_t;
|
||||||
|
|
||||||
struct hip_t
|
struct hip_t
|
||||||
{
|
{
|
||||||
|
int flags;
|
||||||
|
|
||||||
struct sigaction oldsa;
|
struct sigaction oldsa;
|
||||||
timer_t tmr_id;
|
timer_t tmr_id;
|
||||||
int mux_id;
|
int mux_id;
|
||||||
@ -108,6 +118,7 @@ struct hip_t
|
|||||||
hip_uctx_link_t terminated;
|
hip_uctx_link_t terminated;
|
||||||
hip_uctx_link_t sleeping;
|
hip_uctx_link_t sleeping;
|
||||||
hip_uctx_link_t io_waiting;
|
hip_uctx_link_t io_waiting;
|
||||||
|
hip_uctx_link_t io_done;
|
||||||
|
|
||||||
hip_uctx_link_t* running;
|
hip_uctx_link_t* running;
|
||||||
|
|
||||||
@ -172,6 +183,7 @@ hip_uctx_t* hip_uctx_open(hip_t* hip, hip_oow_t stack_size, hip_ufun_t uf, void*
|
|||||||
uc->hip = hip;
|
uc->hip = hip;
|
||||||
uc->uf = uf;
|
uc->uf = uf;
|
||||||
uc->ctx = ctx;
|
uc->ctx = ctx;
|
||||||
|
uc->waiting_fd = HIP_INVALID_FD;
|
||||||
|
|
||||||
getcontext(&uc->uc);
|
getcontext(&uc->uc);
|
||||||
|
|
||||||
@ -273,6 +285,7 @@ static void multiplex(hip_uctx_t* uctx, void *ctx)
|
|||||||
hip = uctx->hip;
|
hip = uctx->hip;
|
||||||
while (1)
|
while (1)
|
||||||
{
|
{
|
||||||
|
printf ("sleeping empty[%d] io_waiting empty[%d]\n", IS_EMPTY(&hip->sleeping), IS_EMPTY(&hip->io_waiting));
|
||||||
if (IS_EMPTY(&hip->sleeping) && IS_EMPTY(&hip->io_waiting))
|
if (IS_EMPTY(&hip->sleeping) && IS_EMPTY(&hip->io_waiting))
|
||||||
{
|
{
|
||||||
// STILL NEED TO HANDLE IO FILE DESCRIPTORS...
|
// STILL NEED TO HANDLE IO FILE DESCRIPTORS...
|
||||||
@ -298,6 +311,14 @@ printf ("NO SLEEPING. BACK TO SCEDULER\n");
|
|||||||
* lazy removal of unneeded context here???
|
* lazy removal of unneeded context here???
|
||||||
* epoll_ctl(DELETE here
|
* epoll_ctl(DELETE here
|
||||||
*/
|
*/
|
||||||
|
// lazy deletion of file descriptors
|
||||||
|
while (!IS_EMPTY(&hip->io_done))
|
||||||
|
{
|
||||||
|
l = HEAD(&hip->io_done);
|
||||||
|
u = UCTX_FROM_LINK(l);
|
||||||
|
UNCHAIN(l);
|
||||||
|
epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, u->waiting_fd, HIP_NULL);
|
||||||
|
}
|
||||||
|
|
||||||
printf ("WAITING %llu\n", (unsigned long long)wait);
|
printf ("WAITING %llu\n", (unsigned long long)wait);
|
||||||
/* TODO: support different io multiplexers...*/
|
/* TODO: support different io multiplexers...*/
|
||||||
@ -315,6 +336,7 @@ printf ("WAITING %llu\n", (unsigned long long)wait);
|
|||||||
*/
|
*/
|
||||||
n = epoll_pwait(hip->mux_id, ee, 100, (wait + 1000000 - 1) / 1000000, HIP_NULL);
|
n = epoll_pwait(hip->mux_id, ee, 100, (wait + 1000000 - 1) / 1000000, HIP_NULL);
|
||||||
#endif
|
#endif
|
||||||
|
printf ("Epoll returned [%d]\n", (int)n);
|
||||||
if (n <= -1)
|
if (n <= -1)
|
||||||
{
|
{
|
||||||
/* TODO: some error handling action is required */
|
/* TODO: some error handling action is required */
|
||||||
@ -328,12 +350,27 @@ printf ("WAITING %llu\n", (unsigned long long)wait);
|
|||||||
l = ee[--n].data.ptr;
|
l = ee[--n].data.ptr;
|
||||||
u = UCTX_FROM_LINK(l);
|
u = UCTX_FROM_LINK(l);
|
||||||
|
|
||||||
UNCHAIN(l);
|
UNCHAIN(l); /* unchain it from io_waiting */
|
||||||
ADD_BACK(l, &hip->runnables);
|
ADD_BACK(l, &hip->runnables);
|
||||||
|
|
||||||
printf ("PWAIT link MULTIPLEX DEL %p fd[%d]\n", l, u->waiting_fd);
|
if (hip->flags & HIP_FLAG_LAZY)
|
||||||
|
{
|
||||||
|
/* arrange to remove the file descriptor from
|
||||||
|
* the multiplexer just before actual waiting.
|
||||||
|
* add this to the io_done list for clean-up
|
||||||
|
* before waiting and keep waiting_fd untouched
|
||||||
|
* for adjustment later. */
|
||||||
|
ADD_BACK(&u->io_done, &hip->io_done);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
|
||||||
|
printf ("PWAIT link MULTIPLEX DEL %p fd[%d] io_waiting empty[%d]\n", l, u->waiting_fd, IS_EMPTY(&hip->io_waiting));
|
||||||
|
|
||||||
|
epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, u->waiting_fd, HIP_NULL);
|
||||||
|
u->waiting_fd = HIP_INVALID_FD;
|
||||||
|
}
|
||||||
|
|
||||||
epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, u->waiting_fd, HIP_NULL);
|
|
||||||
}
|
}
|
||||||
while(n > 0);
|
while(n > 0);
|
||||||
}
|
}
|
||||||
@ -359,7 +396,7 @@ printf ("BACK TO SCHEDULER \n");
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
hip_t* hip_open(void)
|
hip_t* hip_open(int flags)
|
||||||
{
|
{
|
||||||
hip_t* hip;
|
hip_t* hip;
|
||||||
struct sigaction sa;
|
struct sigaction sa;
|
||||||
@ -372,6 +409,7 @@ hip_t* hip_open(void)
|
|||||||
if (!hip) return HIP_NULL;
|
if (!hip) return HIP_NULL;
|
||||||
|
|
||||||
memset(hip, 0, sizeof(*hip));
|
memset(hip, 0, sizeof(*hip));
|
||||||
|
hip->flags = flags;
|
||||||
|
|
||||||
/* initialize to an empty list by making each pointer point to itself.*/
|
/* initialize to an empty list by making each pointer point to itself.*/
|
||||||
hip->runnables.next = &hip->runnables;
|
hip->runnables.next = &hip->runnables;
|
||||||
@ -382,6 +420,8 @@ hip_t* hip_open(void)
|
|||||||
hip->sleeping.prev = &hip->sleeping;
|
hip->sleeping.prev = &hip->sleeping;
|
||||||
hip->io_waiting.next = &hip->io_waiting;
|
hip->io_waiting.next = &hip->io_waiting;
|
||||||
hip->io_waiting.prev = &hip->io_waiting;
|
hip->io_waiting.prev = &hip->io_waiting;
|
||||||
|
hip->io_done.next = &hip->io_done;
|
||||||
|
hip->io_done.prev = &hip->io_done;
|
||||||
|
|
||||||
sigemptyset(&sm);
|
sigemptyset(&sm);
|
||||||
sigaddset(&sm, SIGRTMIN);
|
sigaddset(&sm, SIGRTMIN);
|
||||||
@ -471,10 +511,10 @@ hip_uctx_t* hip_newrtn(hip_t* hip, hip_ufun_t uf, void* ctx)
|
|||||||
|
|
||||||
int hip_schedule(hip_t* hip, int preempt)
|
int hip_schedule(hip_t* hip, int preempt)
|
||||||
{
|
{
|
||||||
hip->uctx_sched = hip_uctx_open(hip, 4096, schedule, hip);
|
hip->uctx_sched = hip_uctx_open(hip, 40960, schedule, hip);
|
||||||
if (!hip->uctx_sched) return -1;
|
if (!hip->uctx_sched) return -1;
|
||||||
|
|
||||||
hip->uctx_mux = hip_uctx_open(hip, 4096, multiplex, hip);
|
hip->uctx_mux = hip_uctx_open(hip, 40960, multiplex, hip);
|
||||||
if (!hip->uctx_mux)
|
if (!hip->uctx_mux)
|
||||||
{
|
{
|
||||||
hip_uctx_close (hip->uctx_sched);
|
hip_uctx_close (hip->uctx_sched);
|
||||||
@ -566,18 +606,51 @@ void hip_awaitio(hip_t* hip, int fd, int flags)
|
|||||||
struct epoll_event ev;
|
struct epoll_event ev;
|
||||||
hip_uctx_t* rr;
|
hip_uctx_t* rr;
|
||||||
|
|
||||||
memset(&ev, 0, sizeof(ev));
|
/* if waiting on multiple items are supported, the following code requires a whole rewrite */
|
||||||
if (flags & HIP_IO_READ) ev.events |= EPOLLIN;
|
|
||||||
if (flags & HIP_IO_WRITE) ev.events |= EPOLLOUT;
|
|
||||||
ev.data.ptr = hip->running; /* store running context link */
|
|
||||||
printf ("PWAIT link ADD %p fd[%d]\n", ev.data.ptr, fd);
|
|
||||||
epoll_ctl(hip->mux_id, EPOLL_CTL_ADD, fd, &ev);
|
|
||||||
/* TODO: error handling - probably panic? */
|
|
||||||
|
|
||||||
rr = UCTX_FROM_LINK(hip->running);
|
rr = UCTX_FROM_LINK(hip->running);
|
||||||
|
|
||||||
|
if (rr->waiting_fd != HIP_INVALID_FD)
|
||||||
|
{
|
||||||
|
UNCHAIN(&rr->io_done);
|
||||||
|
|
||||||
|
if (rr->waiting_fd == fd)
|
||||||
|
{
|
||||||
|
/* update */
|
||||||
|
memset(&ev, 0, sizeof(ev));
|
||||||
|
if (flags & HIP_IO_READ) ev.events |= EPOLLIN;
|
||||||
|
if (flags & HIP_IO_WRITE) ev.events |= EPOLLOUT;
|
||||||
|
|
||||||
|
/* - the event mask could have chagned.
|
||||||
|
* - the waiting context could have changed. */
|
||||||
|
/* TODO: more optimization based on above */
|
||||||
|
ev.data.ptr = hip->running; /* store running context link */
|
||||||
|
printf ("PWAIT link UPDATE %p fd[%d]\n", ev.data.ptr, fd);
|
||||||
|
epoll_ctl(hip->mux_id, EPOLL_CTL_MOD, fd, &ev);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* delete and add */
|
||||||
|
printf ("PWAIT link DELETE fd[%d]\n", fd);
|
||||||
|
epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, fd, HIP_NULL);
|
||||||
|
goto add;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
add:
|
||||||
|
memset(&ev, 0, sizeof(ev));
|
||||||
|
if (flags & HIP_IO_READ) ev.events |= EPOLLIN;
|
||||||
|
if (flags & HIP_IO_WRITE) ev.events |= EPOLLOUT;
|
||||||
|
ev.data.ptr = hip->running; /* store running context link */
|
||||||
|
printf ("PWAIT link ADD %p fd[%d]\n", ev.data.ptr, fd);
|
||||||
|
epoll_ctl(hip->mux_id, EPOLL_CTL_ADD, fd, &ev);
|
||||||
|
/* TODO: error handling - probably panic? */
|
||||||
|
}
|
||||||
|
|
||||||
ADD_BACK(hip->running, &hip->io_waiting);
|
ADD_BACK(hip->running, &hip->io_waiting);
|
||||||
hip->running = HIP_NULL;
|
hip->running = HIP_NULL;
|
||||||
rr->waiting_fd = fd; /* remember the file descriptor being waited on */
|
rr->waiting_fd = fd; /* remember the file descriptor being waited on */
|
||||||
|
|
||||||
swapcontext(&rr->uc, &hip->uctx_sched->uc);
|
swapcontext(&rr->uc, &hip->uctx_sched->uc);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -663,6 +736,7 @@ static int socket_connect(hip_t* hip, int proto, const struct sockaddr* addr, so
|
|||||||
|
|
||||||
static void socket_close(hip_t* hip, int fd)
|
static void socket_close(hip_t* hip, int fd)
|
||||||
{
|
{
|
||||||
|
/* TODO: DEL only if fd is still in the multiplexer? */
|
||||||
epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, fd, HIP_NULL); /* NOTE: this is not be necessary */
|
epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, fd, HIP_NULL); /* NOTE: this is not be necessary */
|
||||||
close(fd);
|
close(fd);
|
||||||
}
|
}
|
||||||
@ -725,7 +799,28 @@ static void uf3(hip_uctx_t* uc, void* ctx)
|
|||||||
sa.sin_port = htons(80);
|
sa.sin_port = htons(80);
|
||||||
fd = socket_connect(hip, SOCK_STREAM, (struct sockaddr*)&sa, sizeof(sa));
|
fd = socket_connect(hip, SOCK_STREAM, (struct sockaddr*)&sa, sizeof(sa));
|
||||||
printf ("*********CONNECTED>>>>>>>>>>>>>>>\n");
|
printf ("*********CONNECTED>>>>>>>>>>>>>>>\n");
|
||||||
|
{
|
||||||
|
int n;
|
||||||
|
char buf[256];
|
||||||
|
// if the server side doesn't close the connection,
|
||||||
|
// the following loop will never end as this client side doesn't actively
|
||||||
|
// close the connection after having read the response chunk. it's not http aware...
|
||||||
|
// use 'Connection: close' for now
|
||||||
|
|
||||||
|
//const char x[] = "GET / HTTP/1.1\r\nHost: www.google.com\r\n\r\n";
|
||||||
|
const char x[] = "GET / HTTP/1.1\r\nHost: www.google.com\r\nConnection: close\r\n\r\n";
|
||||||
|
|
||||||
|
socket_write(hip, fd, x, sizeof(x) - 1);
|
||||||
|
do
|
||||||
|
{
|
||||||
|
n = socket_read(hip, fd, buf, sizeof(buf));
|
||||||
|
if (n <= 0) break;
|
||||||
|
printf("GOT [%.*s]\n", n, buf);
|
||||||
|
}
|
||||||
|
while(1);
|
||||||
|
}
|
||||||
socket_close(hip, fd);
|
socket_close(hip, fd);
|
||||||
|
printf ("*********DISCONNECTED>>>>>>>>>>>>>>>\n");
|
||||||
|
|
||||||
for (i = 0; i < 15; i++)
|
for (i = 0; i < 15; i++)
|
||||||
{
|
{
|
||||||
@ -743,7 +838,7 @@ int main()
|
|||||||
{
|
{
|
||||||
hip_t* hip;
|
hip_t* hip;
|
||||||
|
|
||||||
hip = hip_open();
|
hip = hip_open(HIP_FLAG_LAZY);
|
||||||
|
|
||||||
hip_newrtn(hip, uf1, HIP_NULL);
|
hip_newrtn(hip, uf1, HIP_NULL);
|
||||||
hip_newrtn(hip, uf2, HIP_NULL);
|
hip_newrtn(hip, uf2, HIP_NULL);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user