#include #include #include #include #include #include #include #include #include #include #include #include #if defined(__NetBSD__) #include #include #else #include #include #include #endif typedef unsigned char hip_uint8_t; typedef unsigned short hip_uint16_t; typedef unsigned int hip_uint32_t; typedef unsigned long long hip_uint64_t; typedef signed char hip_int8_t; typedef signed short hip_int16_t; typedef signed int hip_int32_t; typedef signed long long hip_int64_t; typedef hip_uint64_t hip_oow_t; typedef hip_int64_t hip_ooi_t; typedef hip_uint64_t hip_nsecdur_t; #define HIP_NULL ((void*)0) typedef struct hip_t hip_t; typedef struct hip_uctx_t hip_uctx_t; typedef struct hip_uctx_link_t hip_uctx_link_t; typedef void (*hip_ufun_t) (hip_uctx_t* uc, void* ctx); enum hip_io_flag_t { HIP_IO_READ = (1 << 0), HIP_IO_WRITE = (1 << 1) }; typedef enum hip_io_flag_t hip_io_flag_t; struct hip_uctx_link_t { hip_uctx_link_t* next; hip_uctx_link_t* prev; }; struct hip_uctx_t { hip_t* hip; ucontext_t uc; hip_ufun_t uf; void* ctx; hip_oow_t csi; /* number of context switches in */ hip_oow_t cso; /* number of context switches out */ hip_nsecdur_t wakeup_time; int waiting_fd; unsigned int stid; hip_uctx_link_t uctx; }; #define HIP_OFFSETOF(type, field) ((hip_oow_t)&(((type*)0)->field)) #define HIP_CONTAINEROF(ptr, type, field) ((type*)((hip_uint8_t*)ptr - HIP_OFFSETOF(type, field))) #define CHAIN(_link, _prev, _next) do { \ hip_uctx_link_t* _p = _prev; \ hip_uctx_link_t* _n = _next; \ (_n)->prev = _link; \ (_link)->prev = _p; \ (_link)->next = _n; \ (_p)->next = _link; \ } while(0) #define UNCHAIN(_link) do { \ (_link)->next->prev = (_link)->prev; \ (_link)->prev->next = (_link)->next; \ } while(0) #define HEAD(ll) ((ll)->next) #define TAIL(ll) ((ll)->prev) #define ADD_FRONT(_link, ll) CHAIN(_link, (ll), HEAD(ll)) #define ADD_BACK(_link, ll) CHAIN(_link, TAIL(ll), (ll)) #define IS_EMPTY(ll) (HEAD(ll) == ll) #define UCTX_FROM_LINK(_link) HIP_CONTAINEROF(_link, hip_uctx_t, uctx) struct hip_t { struct sigaction oldsa; timer_t tmr_id; int mux_id; hip_uctx_link_t runnables; hip_uctx_link_t terminated; hip_uctx_link_t sleeping; hip_uctx_link_t io_waiting; hip_uctx_link_t* running; hip_uctx_t* uctx_sched; hip_uctx_t* uctx_mux; ucontext_t uc_main; }; /* ---------------------------------------------------- */ static void invoke_uf(unsigned int a, unsigned int b) { hip_t* hip; hip_uctx_t* uctx; uctx = (hip_uctx_t*)(((hip_oow_t)a << 32) | (hip_oow_t)b); uctx->uf(uctx, uctx->ctx); printf ("invoke_uf XXXXXXXXXXXXXXXXXXXXX...%p\n", uctx); //TODO: This part must not be interrupted by the timer handler??? hip = uctx->hip; if (uctx == hip->uctx_sched) /* if not the scheduler itself */ { /* back to where the schedule got activated for the first time */ setcontext(&hip->uc_main); } else if (uctx == hip->uctx_mux) { /* TDOO: is this correct? */ setcontext(&hip->uctx_sched->uc); } else { /* switch to the scheduler */ UNCHAIN(&uctx->uctx); /* TODO: if auto-destroy is on, delete it */ //if (uctx->flags & HIP_UCTX_FLAG_AUTO_DESTROY) hip_freertn(hip, uctx); //else ADD_BACK(&uctx->uctx, &hip->terminated); printf ("ADDED TO TERMINATED\n"); setcontext(&hip->uctx_sched->uc); } //TODO: This part must not be interrupted? // TODO: schedule the runnable process? // TODO: move to terminated list? // TODO: destroy the object? } hip_uctx_t* hip_uctx_open(hip_t* hip, hip_oow_t stack_size, hip_ufun_t uf, void* ctx) { hip_uctx_t* uc; void* sp; uc = (hip_uctx_t*)malloc(sizeof(*uc) + stack_size); if (!uc) return HIP_NULL; sp = (void*)(uc + 1); memset(uc, 0, sizeof(*uc) + stack_size); uc->hip = hip; uc->uf = uf; uc->ctx = ctx; getcontext(&uc->uc); if (uf) { sigemptyset (&uc->uc.uc_sigmask); // sigaddset (&uc->uc.uc_sigmask, SIGRTMIN); /* block SIGRTMIN. TODO: take this value from the outer scheduler object? */ uc->stid = VALGRIND_STACK_REGISTER(sp, (hip_uint8_t*)sp + stack_size); uc->uc.uc_stack.ss_sp = sp; uc->uc.uc_stack.ss_size = stack_size; uc->uc.uc_stack.ss_flags = 0; uc->uc.uc_link = HIP_NULL; makecontext(&uc->uc, (void(*)(void))invoke_uf, 2, (unsigned int)((hip_oow_t)uc >> 32), (unsigned int)((hip_oow_t)uc & 0xFFFFFFFFu)); } printf("NEW UCTX %p\n", uc); return uc; } void hip_uctx_close(hip_uctx_t* uctx) { if (uctx->uf) { VALGRIND_STACK_DEREGISTER(uctx->stid); } free(uctx); } void hip_uctx_swap(hip_uctx_t* uc, hip_uctx_t* new_uc) { uc->cso++; new_uc->csi++; swapcontext(&uc->uc, &new_uc->uc); } /* ---------------------------------------------------- */ static hip_nsecdur_t monotime(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (hip_nsecdur_t)ts.tv_sec * 1000000000 + ts.tv_nsec; } /* ---------------------------------------------------- */ static hip_uint8_t signal_stack[8012]; static hip_t* g_hip; static void on_timer_signal(int sig, siginfo_t* si, void* ctx) { /* inform all running threads that time slice expired */ //pthread_kill(....); #if 0 hip_uctx_t* uctx; uctx = UCTX_FROM_LINK(g_hip->uctx_cur); swapcontext(&uctx->uc, &g_hip->uctx_sched->uc); #endif } static void schedule(hip_uctx_t* uctx, void *ctx) { hip_t* hip; hip = uctx->hip; while(1) { hip_uctx_link_t* h; hip_uctx_t* u; if (IS_EMPTY(&hip->runnables)) { /* TODO: check if there are sleeping ... */ printf (" NO TASK\n"); swapcontext(&hip->uctx_sched->uc, &hip->uctx_mux->uc); if (IS_EMPTY(&hip->runnables)) break; /* no task? */ } h = HEAD(&hip->runnables); UNCHAIN(h); hip->running = h; u = UCTX_FROM_LINK(h); /* TODO: extract these lines to a macro or something*/ hip->uctx_sched->cso++; u->csi++; swapcontext(&hip->uctx_sched->uc, &u->uc); hip->running = HIP_NULL; } } static void multiplex(hip_uctx_t* uctx, void *ctx) { /* this is a builtin task to handle multiplexing and sleep */ hip_t* hip; hip_uctx_link_t* l; hip = uctx->hip; while (1) { if (IS_EMPTY(&hip->sleeping) && IS_EMPTY(&hip->io_waiting)) { // STILL NEED TO HANDLE IO FILE DESCRIPTORS... /* go back to the scheduler */ printf ("NO SLEEPING. BACK TO SCEDULER\n"); swapcontext(&hip->uctx_mux->uc, &hip->uctx_sched->uc); } else { hip_nsecdur_t now; hip_nsecdur_t wait; struct timespec tmout; struct epoll_event ee[100]; /* TODO: hold it in hip_t struct... sizing must be handled in the main struct.. */ int n; hip_uctx_t* u; l = HEAD(&hip->sleeping); u = UCTX_FROM_LINK(l); now = monotime(); wait = now >= u->wakeup_time? 0: u->wakeup_time - now; /* TODO: the io_waiting process that is time-bound must be catered for */ /* TODO: * lazy removal of unneeded context here??? * epoll_ctl(DELETE here */ printf ("WAITING %llu\n", (unsigned long long)wait); /* TODO: support different io multiplexers...*/ #if defined(HAVE_EPOLL_PWAIT2) tmout.tv_sec = wait / 1000000000; tmout.tv_nsec = wait % 1000000000; n = epoll_pwait2(hip->mux_id, ee, 100, &tmout, HIP_NULL); #else /* epoll_pwait()'s timeout is at the resolution of a millisecond. * add 1 millisecond as long as there is a fraction of nanoseconds * less than 1 millisecond. * ceil(wait / 1000000.0) * (wait / 1000000) + !!(wait % 1000000) * (wait + 1000000 - 1) / 1000000 */ n = epoll_pwait(hip->mux_id, ee, 100, (wait + 1000000 - 1) / 1000000, HIP_NULL); #endif if (n <= -1) { /* TODO: some error handling action is required */ /* scheduler panic? */ } if (n > 0) { do { l = ee[--n].data.ptr; u = UCTX_FROM_LINK(l); UNCHAIN(l); ADD_BACK(l, &hip->runnables); printf ("PWAIT link MULTIPLEX DEL %p fd[%d]\n", l, u->waiting_fd); epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, u->waiting_fd, HIP_NULL); } while(n > 0); } now = monotime(); while (!IS_EMPTY(&hip->sleeping)) { l = HEAD(&hip->sleeping); u = UCTX_FROM_LINK(l); if (now < u->wakeup_time) break; UNCHAIN(l); ADD_BACK(l, &hip->runnables); } /* go back to the scheduler */ if (!IS_EMPTY(&hip->runnables)) { printf ("BACK TO SCHEDULER \n"); swapcontext(&hip->uctx_mux->uc, &hip->uctx_sched->uc); } } } } hip_t* hip_open(void) { hip_t* hip; struct sigaction sa; struct sigevent se; stack_t ss; sigset_t sm, oldsm; timer_t tid; hip = (hip_t*)malloc(sizeof(*hip)); if (!hip) return HIP_NULL; memset(hip, 0, sizeof(*hip)); /* initialize to an empty list by making each pointer point to itself.*/ hip->runnables.next = &hip->runnables; hip->runnables.prev = &hip->runnables; hip->terminated.next = &hip->terminated; hip->terminated.prev = &hip->terminated; hip->sleeping.next = &hip->sleeping; hip->sleeping.prev = &hip->sleeping; hip->io_waiting.next = &hip->io_waiting; hip->io_waiting.prev = &hip->io_waiting; sigemptyset(&sm); sigaddset(&sm, SIGRTMIN); sigprocmask(SIG_BLOCK, &sm, &oldsm); memset(&sa, 0, sizeof(sa)); sa.sa_flags = SA_SIGINFO /*| SA_RESTART*/; sa.sa_sigaction = on_timer_signal; sigemptyset(&sa.sa_mask); sigaction(SIGRTMIN, &sa, &hip->oldsa); ss.ss_sp = signal_stack; /* TODO: allocate this dynamically */ ss.ss_size = sizeof(signal_stack); ss.ss_flags = 0; sigaltstack(&ss, HIP_NULL); memset(&se, 0, sizeof(se)); se.sigev_notify = SIGEV_SIGNAL; se.sigev_signo = SIGRTMIN; /*se.sigev_value.sival_ptr = hip; TOOD: possible to use sigev_notify_function and use this field? * what about sigwaitinfo() or sigtimedwait() in a dedicated thread? * */ timer_create(CLOCK_MONOTONIC, &se, &tid); hip->tmr_id = tid; sigprocmask(SIG_SETMASK, &oldsm, HIP_NULL); #if defined(__NetBSD__) hip->mux_id = kqueue1(O_CLOEXEC); #else hip->mux_id = epoll_create1(EPOLL_CLOEXEC); #endif if (hip->mux_id <= -1) { timer_delete(tid); free(hip); return HIP_NULL; } g_hip = hip; /* TODO: avoid using a global hip */ return hip; } void hip_close(hip_t* hip) { hip_uctx_link_t* l; hip_uctx_t* uctx; sigaction(SIGRTMIN, &hip->oldsa, NULL); close(hip->mux_id); timer_delete(hip->tmr_id); while (!IS_EMPTY(&hip->terminated)) { l = HEAD(&hip->terminated); UNCHAIN(l); uctx = UCTX_FROM_LINK(l); hip_uctx_close(uctx); } while (!IS_EMPTY(&hip->runnables)) { l = HEAD(&hip->runnables); UNCHAIN(l); uctx = UCTX_FROM_LINK(l); hip_uctx_close(uctx); } if (hip->uctx_mux) hip_uctx_close(hip->uctx_mux); if (hip->uctx_sched) hip_uctx_close(hip->uctx_sched); free(hip); } hip_uctx_t* hip_newrtn(hip_t* hip, hip_ufun_t uf, void* ctx) { hip_uctx_t* uctx; uctx = hip_uctx_open(hip, 8192, uf, ctx); /* TODO: stack size */ if (!uctx) return HIP_NULL; /* append to the list */ ADD_BACK(&uctx->uctx, &hip->runnables); return uctx; } int hip_schedule(hip_t* hip, int preempt) { hip->uctx_sched = hip_uctx_open(hip, 4096, schedule, hip); if (!hip->uctx_sched) return -1; hip->uctx_mux = hip_uctx_open(hip, 4096, multiplex, hip); if (!hip->uctx_mux) { hip_uctx_close (hip->uctx_sched); hip->uctx_sched = HIP_NULL; return -1; } /* fire the schduler tick for preemptive scheduling */ if (preempt) { /* start the timer tick */ struct itimerspec its; memset(&its, 0, sizeof(its)); its.it_interval.tv_sec = 0; //its.it_interval.tv_nsec = 10000000; /* 10 milliseconds */ its.it_interval.tv_nsec = 100000000; /* 100 milliseconds */ its.it_value = its.it_interval; timer_settime(hip->tmr_id, 0, &its, NULL); } /* jump to the scheduler */ swapcontext(&hip->uc_main, &hip->uctx_sched->uc); if (preempt) { /* stop the timer tick */ struct itimerspec its; memset(&its, 0, sizeof(its)); its.it_interval.tv_sec = 0; its.it_interval.tv_nsec = 0; its.it_value = its.it_interval; timer_settime(hip->tmr_id, 0, &its, NULL); } return 0; } void hip_yield(hip_t* hip) { hip_uctx_t* r; assert (hip->running != HIP_NULL); /* switch from the running context to the scheduler context */ ADD_BACK(hip->running, &hip->runnables); r = UCTX_FROM_LINK(hip->running); /* TODO: extract these lines to a macro or something */ r->cso++; hip->uctx_sched->csi++; swapcontext(&r->uc, &hip->uctx_sched->uc); } void hip_sleep(hip_t* hip, hip_nsecdur_t nsecs) { hip_uctx_t* rr; assert (hip->running != HIP_NULL); rr = UCTX_FROM_LINK(hip->running); rr->wakeup_time = monotime() + nsecs; /* TODO: switch to HEAP. for now simply linear search to keep this list sorted. */ if (IS_EMPTY(&hip->sleeping)) { ADD_BACK(hip->running, &hip->sleeping); } else { hip_uctx_link_t* l; hip_uctx_t* r; for (l = HEAD(&hip->sleeping); l != &hip->sleeping; l = l->next) { r = UCTX_FROM_LINK(l); if (rr->wakeup_time <= r->wakeup_time) { CHAIN(hip->running, l->prev, l->next); goto do_sched; } } ADD_BACK(hip->running, &hip->sleeping); } do_sched: hip->running = HIP_NULL; swapcontext(&rr->uc, &hip->uctx_sched->uc); } void hip_awaitio(hip_t* hip, int fd, int flags) { struct epoll_event ev; hip_uctx_t* rr; memset(&ev, 0, sizeof(ev)); if (flags & HIP_IO_READ) ev.events |= EPOLLIN; if (flags & HIP_IO_WRITE) ev.events |= EPOLLOUT; ev.data.ptr = hip->running; /* store running context link */ printf ("PWAIT link ADD %p fd[%d]\n", ev.data.ptr, fd); epoll_ctl(hip->mux_id, EPOLL_CTL_ADD, fd, &ev); /* TODO: error handling - probably panic? */ rr = UCTX_FROM_LINK(hip->running); ADD_BACK(hip->running, &hip->io_waiting); hip->running = HIP_NULL; rr->waiting_fd = fd; /* remember the file descriptor being waited on */ swapcontext(&rr->uc, &hip->uctx_sched->uc); } /* ---------------------------------------------------- */ static int fd_set_nonblock(int fd) { int flags; flags = fcntl(fd, F_GETFL); if (flags <= -1) return -1; return fcntl(fd, F_SETFL, flags | O_NONBLOCK); } static int fd_set_cloexec(int fd) { int flags; flags = fcntl(fd, F_GETFD); if (flags <= -1) return -1; return fcntl(fd, F_SETFD, flags | FD_CLOEXEC); } static int socket_connect(hip_t* hip, int proto, const struct sockaddr* addr, socklen_t addrlen) /* TODO: timeout? */ { int fd; int x; x = SOCK_STREAM; #if defined(SOCK_CLOEXEC) x |= SOCK_CLOEXEC; #endif #if defined(SOCK_NONBLOCK) x |= SOCK_NONBLOCK; #endif fd = socket(addr->sa_family, x, 0); /* TODO make SOCK_STREAM selectable */ if (fd <= -1) return -1; #if !defined(SOCK_CLOEXEC) if (fd_set_cloexec(fd) <= -1) { close(fd); return -1; } #endif #if !defined(SOCK_NONBLOCK) if (fd_set_nonblock(fd) <= -1) { close(fd); return -1; } #endif x = connect(fd, addr, addrlen); if (x <= -1) { socklen_t sl; int ss; if (errno != EINPROGRESS) /* TODO: cater for EINTR? */ { close(fd); return -1; } hip_awaitio(hip, fd, HIP_IO_WRITE); sl = sizeof(ss); if (getsockopt(fd, SOL_SOCKET, SO_ERROR, (void*)&ss, &sl) <= -1) { close(fd); return -1; } if (ss != 0) { close(fd); errno = ss; return -1; } } return fd; } static void socket_close(hip_t* hip, int fd) { epoll_ctl(hip->mux_id, EPOLL_CTL_DEL, fd, HIP_NULL); /* NOTE: this is not be necessary */ close(fd); } static hip_ooi_t socket_read(hip_t* hip, int fd, void* buf, hip_oow_t len) { hip_awaitio(hip, fd, HIP_IO_READ); return read(fd, buf, len); } static hip_ooi_t socket_write(hip_t* hip, int fd, const void* buf, hip_oow_t len) { hip_awaitio(hip, fd, HIP_IO_WRITE); return write(fd, buf, len); } /* ---------------------------------------------------- */ static void uf1(hip_uctx_t* uc, void* ctx) { int i; for (i =0; i < 5; i++) { printf ("************************** uf1 \n"); //hip_yield(uc->hip); hip_sleep(uc->hip, 500000000); printf ("************************* uf1 1111\n"); //hip_yield(uc->hip); hip_sleep(uc->hip, 1000000000); } } static void uf2(hip_uctx_t* uc, void* ctx) { int i; for (i =0; i < 10; i++) { printf ("************************** uf2 \n"); //hip_yield(uc->hip); hip_sleep(uc->hip, 1000000000); printf ("************************* uf2 2222 XXXXXXXXXXXXXXxxxxxxxxxxxxxxxxxxxxxx\n"); //hip_yield(uc->hip); hip_sleep(uc->hip, 500000000); } } static void uf3(hip_uctx_t* uc, void* ctx) { hip_t* hip; struct sockaddr_in sa; int i, fd; hip = uc->hip; memset(&sa, 0, sizeof(sa)); sa.sin_family = AF_INET; sa.sin_addr.s_addr = inet_addr("142.250.206.196"); sa.sin_port = htons(80); fd = socket_connect(hip, SOCK_STREAM, (struct sockaddr*)&sa, sizeof(sa)); printf ("*********CONNECTED>>>>>>>>>>>>>>>\n"); socket_close(hip, fd); for (i = 0; i < 15; i++) { printf ("************************** uf3 TOP\n"); //hip_yield(uc->hip); hip_sleep(uc->hip, 1000000000); printf ("************************* uf3 3333 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n"); //hip_yield(uc->hip); hip_sleep(uc->hip, 1000000000); if (i == 8) hip_newrtn(uc->hip, uf2, HIP_NULL); } } int main() { hip_t* hip; hip = hip_open(); hip_newrtn(hip, uf1, HIP_NULL); hip_newrtn(hip, uf2, HIP_NULL); hip_newrtn(hip, uf3, HIP_NULL); /* jump to the scheduler */ hip_schedule(hip, 0); printf("XXXXXXXXXXXXXXXXX ABOUT TO CLOSE ********************\n"); /* TODO: close all uctxes ? */ hip_close(hip); return 0; }