#include #include #include #include #include #include #include #include #if defined(__NetBSD__) #include #include #else #include #include #endif typedef unsigned char hip_uint8_t; typedef unsigned short hip_uint16_t; typedef unsigned int hip_uint32_t; typedef unsigned long long hip_uint64_t; typedef signed char hip_int8_t; typedef signed short hip_int16_t; typedef signed int hip_int32_t; typedef signed long long hip_int64_t; typedef hip_uint64_t hip_oow_t; typedef hip_int64_t hip_ooi_t; typedef hip_uint64_t hip_nsecdur_t; #define HIP_NULL ((void*)0) typedef struct hip_t hip_t; typedef struct hip_uctx_t hip_uctx_t; typedef struct hip_uctx_link_t hip_uctx_link_t; typedef void (*hip_ufun_t) (hip_uctx_t* uc, void* ctx); struct hip_uctx_link_t { hip_uctx_link_t* next; hip_uctx_link_t* prev; }; struct hip_uctx_t { hip_t* hip; ucontext_t uc; hip_ufun_t uf; void* ctx; hip_oow_t csi; /* number of context switches in */ hip_oow_t cso; /* number of context switches out */ hip_nsecdur_t wakeup_time; unsigned int stid; hip_uctx_link_t uctx; }; #define HIP_OFFSETOF(type, field) ((hip_oow_t)&(((type*)0)->field)) #define HIP_CONTAINEROF(ptr, type, field) ((type*)((hip_uint8_t*)ptr - HIP_OFFSETOF(type, field))) #define CHAIN(_link, _prev, _next) do { \ hip_uctx_link_t* _p = _prev; \ hip_uctx_link_t* _n = _next; \ (_n)->prev = _link; \ (_link)->prev = _p; \ (_link)->next = _n; \ (_p)->next = _link; \ } while(0) #define UNCHAIN(_link) do { \ (_link)->next->prev = (_link)->prev; \ (_link)->prev->next = (_link)->next; \ } while(0) #define HEAD(ll) ((ll)->next) #define TAIL(ll) ((ll)->prev) #define ADD_FRONT(_link, ll) CHAIN(_link, (ll), HEAD(ll)) #define ADD_BACK(_link, ll) CHAIN(_link, TAIL(ll), (ll)) #define IS_EMPTY(ll) (HEAD(ll) == ll) #define UCTX_FROM_LINK(_link) HIP_CONTAINEROF(_link, hip_uctx_t, uctx) struct hip_t { struct sigaction oldsa; timer_t tmr_id; int mux_id; hip_uctx_link_t runnables; hip_uctx_link_t terminated; hip_uctx_link_t pending; hip_uctx_link_t* running; hip_uctx_t* uctx_sched; hip_uctx_t* uctx_mux; ucontext_t uc_main; }; /* ---------------------------------------------------- */ static void invoke_uf(unsigned int a, unsigned int b) { hip_t* hip; hip_uctx_t* uctx; uctx = (hip_uctx_t*)(((hip_oow_t)a << 32) | (hip_oow_t)b); uctx->uf(uctx, uctx->ctx); printf ("invoke_uf XXXXXXXXXXXXXXXXXXXXX...%p\n", uctx); //TODO: This part must not be interrupted by the timer handler??? hip = uctx->hip; if (uctx == hip->uctx_sched) /* if not the scheduler itself */ { /* back to where the schedule got activated for the first time */ setcontext(&hip->uc_main); } else if (uctx == hip->uctx_mux) { /* TDOO: is this correct? */ setcontext(&hip->uctx_sched->uc); } else { /* switch to the scheduler */ UNCHAIN(&uctx->uctx); /* TODO: if auto-destroy is on, delete it */ //if (uctx->flags & HIP_UCTX_FLAG_AUTO_DESTROY) hip_freertn(hip, uctx); //else ADD_BACK(&uctx->uctx, &hip->terminated); printf ("ADDED TO TERMINATED\n"); setcontext(&hip->uctx_sched->uc); } //TODO: This part must not be interrupted? // TODO: schedule the runnable process? // TODO: move to terminated list? // TODO: destroy the object? } hip_uctx_t* hip_uctx_open(hip_t* hip, hip_oow_t stack_size, hip_ufun_t uf, void* ctx) { hip_uctx_t* uc; void* sp; uc = (hip_uctx_t*)malloc(sizeof(*uc) + stack_size); if (!uc) return HIP_NULL; sp = (void*)(uc + 1); memset(uc, 0, sizeof(*uc) + stack_size); uc->hip = hip; uc->uf = uf; uc->ctx = ctx; getcontext(&uc->uc); if (uf) { sigemptyset (&uc->uc.uc_sigmask); // sigaddset (&uc->uc.uc_sigmask, SIGRTMIN); /* block SIGRTMIN. TODO: take this value from the outer scheduler object? */ uc->stid = VALGRIND_STACK_REGISTER(sp, (hip_uint8_t*)sp + stack_size); uc->uc.uc_stack.ss_sp = sp; uc->uc.uc_stack.ss_size = stack_size; uc->uc.uc_stack.ss_flags = 0; uc->uc.uc_link = HIP_NULL; makecontext(&uc->uc, (void(*)(void))invoke_uf, 2, (unsigned int)((hip_oow_t)uc >> 32), (unsigned int)((hip_oow_t)uc & 0xFFFFFFFFu)); } printf("NEW UCTX %p\n", uc); return uc; } void hip_uctx_close(hip_uctx_t* uctx) { if (uctx->uf) { VALGRIND_STACK_DEREGISTER(uctx->stid); } free(uctx); } void hip_uctx_swap(hip_uctx_t* uc, hip_uctx_t* new_uc) { uc->cso++; new_uc->csi++; swapcontext(&uc->uc, &new_uc->uc); } /* ---------------------------------------------------- */ static hip_nsecdur_t monotime(void) { struct timespec ts; clock_gettime(CLOCK_MONOTONIC, &ts); return (hip_nsecdur_t)ts.tv_sec * 1000000000 + ts.tv_nsec; } /* ---------------------------------------------------- */ static hip_uint8_t signal_stack[8012]; static hip_t* g_hip; static void on_timer_signal(int sig, siginfo_t* si, void* ctx) { /* inform all running threads that time slice expired */ //pthread_kill(....); #if 0 hip_uctx_t* uctx; uctx = UCTX_FROM_LINK(g_hip->uctx_cur); swapcontext(&uctx->uc, &g_hip->uctx_sched->uc); #endif } static void schedule(hip_uctx_t* uctx, void *ctx) { hip_t* hip; hip = uctx->hip; while(1) { hip_uctx_link_t* h; hip_uctx_t* u; if (IS_EMPTY(&hip->runnables)) { /* TODO: check if there are pending ... */ printf (" NO TASK\n"); swapcontext(&hip->uctx_sched->uc, &hip->uctx_mux->uc); if (IS_EMPTY(&hip->runnables)) break; /* no task? */ } h = HEAD(&hip->runnables); UNCHAIN(h); hip->running = h; u = UCTX_FROM_LINK(h); /* TODO: extract these lines to a macro or something*/ hip->uctx_sched->cso++; u->csi++; swapcontext(&hip->uctx_sched->uc, &u->uc); hip->running = HIP_NULL; } } static void poll(hip_uctx_t* uctx, void *ctx) { /* this is a builtin task to handle multiplexing and sleep */ hip_t* hip; hip_uctx_link_t* l; hip = uctx->hip; while (1) { if (IS_EMPTY(&hip->pending)) { // STILL NEED TO HANDLE IO FILE DESCRIPTORS... /* go back to the scheduler */ swapcontext(&hip->uctx_mux->uc, &hip->uctx_sched->uc); } else { hip_nsecdur_t now; hip_nsecdur_t wait; struct timespec tmout; struct epoll_event ee[100]; /* TODO: hold it in hip_t struct... */ l = HEAD(&hip->pending); uctx = UCTX_FROM_LINK(l); now = monotime(); wait = now >= uctx->wakeup_time? 0: uctx->wakeup_time - now; // TODO: support different io multiplexers... tmout.tv_sec = wait / 1000000000; tmout.tv_nsec = wait % 1000000000; epoll_pwait2(hip->mux_id, ee, 100, &tmout, HIP_NULL); } } } hip_t* hip_open(void) { hip_t* hip; struct sigaction sa; struct sigevent se; stack_t ss; sigset_t sm, oldsm; timer_t tid; hip = (hip_t*)malloc(sizeof(*hip)); if (!hip) return HIP_NULL; memset(hip, 0, sizeof(*hip)); /* initialize to an empty list by making each pointer point to itself.*/ hip->runnables.next = &hip->runnables; hip->runnables.prev = &hip->runnables; hip->terminated.next = &hip->terminated; hip->terminated.prev = &hip->terminated; hip->pending.next = &hip->pending; hip->pending.prev = &hip->pending; sigemptyset(&sm); sigaddset(&sm, SIGRTMIN); sigprocmask(SIG_BLOCK, &sm, &oldsm); memset(&sa, 0, sizeof(sa)); sa.sa_flags = SA_SIGINFO /*| SA_RESTART*/; sa.sa_sigaction = on_timer_signal; sigemptyset(&sa.sa_mask); sigaction(SIGRTMIN, &sa, &hip->oldsa); ss.ss_sp = signal_stack; /* TODO: allocate this dynamically */ ss.ss_size = sizeof(signal_stack); ss.ss_flags = 0; sigaltstack(&ss, HIP_NULL); memset(&se, 0, sizeof(se)); se.sigev_notify = SIGEV_SIGNAL; se.sigev_signo = SIGRTMIN; /*se.sigev_value.sival_ptr = hip; TOOD: possible to use sigev_notify_function and use this field? * what about sigwaitinfo() or sigtimedwait() in a dedicated thread? * */ timer_create(CLOCK_MONOTONIC, &se, &tid); hip->tmr_id = tid; sigprocmask(SIG_SETMASK, &oldsm, HIP_NULL); #if defined(__NetBSD__) hip->mux_id = kqueue1(O_CLOEXEC); #else hip->mux_id = epoll_create1(EPOLL_CLOEXEC); #endif if (hip->mux_id <= -1) { timer_delete(tid); free(hip); return HIP_NULL; } g_hip = hip; /* TODO: avoid using a global hip */ return hip; } void hip_close(hip_t* hip) { hip_uctx_link_t* l; hip_uctx_t* uctx; sigaction(SIGRTMIN, &hip->oldsa, NULL); close(hip->mux_id); timer_delete(hip->tmr_id); while (!IS_EMPTY(&hip->terminated)) { l = HEAD(&hip->terminated); UNCHAIN(l); uctx = UCTX_FROM_LINK(l); hip_uctx_close(uctx); } while (!IS_EMPTY(&hip->runnables)) { l = HEAD(&hip->runnables); UNCHAIN(l); uctx = UCTX_FROM_LINK(l); hip_uctx_close(uctx); } if (hip->uctx_mux) hip_uctx_close(hip->uctx_mux); if (hip->uctx_sched) hip_uctx_close(hip->uctx_sched); free(hip); } hip_uctx_t* hip_newrtn(hip_t* hip, hip_ufun_t uf, void* ctx) { hip_uctx_t* uctx; uctx = hip_uctx_open(hip, 8192, uf, ctx); /* TODO: stack size */ if (!uctx) return HIP_NULL; /* append to the list */ ADD_BACK(&uctx->uctx, &hip->runnables); return uctx; } int hip_schedule(hip_t* hip, int preempt) { hip->uctx_sched = hip_uctx_open(hip, 4096, schedule, hip); if (!hip->uctx_sched) return -1; hip->uctx_mux = hip_uctx_open(hip, 4096, poll, hip); if (!hip->uctx_mux) { hip_uctx_close (hip->uctx_sched); hip->uctx_sched = HIP_NULL; return -1; } /* fire the schduler tick for preemptive scheduling */ if (preempt) { /* start the timer tick */ struct itimerspec its; memset(&its, 0, sizeof(its)); its.it_interval.tv_sec = 0; //its.it_interval.tv_nsec = 10000000; /* 10 milliseconds */ its.it_interval.tv_nsec = 100000000; /* 100 milliseconds */ its.it_value = its.it_interval; timer_settime(hip->tmr_id, 0, &its, NULL); } /* jump to the scheduler */ swapcontext(&hip->uc_main, &hip->uctx_sched->uc); if (preempt) { /* stop the timer tick */ struct itimerspec its; memset(&its, 0, sizeof(its)); its.it_interval.tv_sec = 0; its.it_interval.tv_nsec = 0; its.it_value = its.it_interval; timer_settime(hip->tmr_id, 0, &its, NULL); } return 0; } void hip_yield(hip_t* hip) { hip_uctx_t* r; assert (hip->running != HIP_NULL); /* switch from the running context to the scheduler context */ ADD_BACK(hip->running, &hip->runnables); r = UCTX_FROM_LINK(hip->running); /* TODO: extract these lines to a macro or something */ r->cso++; hip->uctx_sched->csi++; swapcontext(&r->uc, &hip->uctx_sched->uc); } void hip_sleep(hip_t* hip, hip_nsecdur_t nsecs) { hip_uctx_t* rr; assert (hip->running != HIP_NULL); rr = UCTX_FROM_LINK(hip->running); rr->wakeup_time = monotime() + nsecs; /* TODO: switch to HEAP. for now simply linear search to keep this list sorted. */ if (IS_EMPTY(&hip->pending)) { ADD_BACK(hip->running, &hip->pending); } else { hip_uctx_link_t* l; hip_uctx_t* r; for (l = HEAD(&hip->pending); l != &hip->pending; l = l->next) { r = UCTX_FROM_LINK(l); if (rr->wakeup_time <= r->wakeup_time) { CHAIN(hip->running, l->prev, l->next); goto do_sched; } } ADD_BACK(hip->running, &hip->pending); } do_sched: hip->running = HIP_NULL; swapcontext(&rr->uc, &hip->uctx_sched->uc); } /* ---------------------------------------------------- */ static void uf1(hip_uctx_t* uc, void* ctx) { int i; for (i =0; i < 5; i++) { printf ("************************** uf1 \n"); hip_yield(uc->hip); //sleep(1); printf ("************************* uf1 1111\n"); hip_yield(uc->hip); //sleep(1); } } static void uf2(hip_uctx_t* uc, void* ctx) { int i; for (i =0; i < 10; i++) { printf ("************************** uf2 \n"); hip_yield(uc->hip); //sleep(1); printf ("************************* uf2 2222 XXXXXXXXXXXXXXxxxxxxxxxxxxxxxxxxxxxx\n"); hip_yield(uc->hip); //sleep(1); } } static void uf3(hip_uctx_t* uc, void* ctx) { int i; for (i =0; i < 15; i++) { printf ("************************** uf3 TOP\n"); hip_yield(uc->hip); //sleep(1); printf ("************************* uf3 3333 xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n"); hip_yield(uc->hip); //sleep(1); if (i == 8) hip_newrtn(uc->hip, uf2, HIP_NULL); } } int main() { hip_t* hip; hip = hip_open(); hip_newrtn(hip, uf1, HIP_NULL); hip_newrtn(hip, uf2, HIP_NULL); hip_newrtn(hip, uf3, HIP_NULL); /* jump to the scheduler */ hip_schedule(hip, 0); printf("XXXXXXXXXXXXXXXXX ABOUT TO CLOSE ********************\n"); /* TODO: close all uctxes ? */ hip_close(hip); return 0; }