diff --git a/moo/kernel/Process.moo b/moo/kernel/Process.moo index 0e36366..3031053 100644 --- a/moo/kernel/Process.moo +++ b/moo/kernel/Process.moo @@ -79,8 +79,12 @@ class Semaphore(Object) TODO: timed wait... method waitWithTimeout: seconds { - - self primitiveFailed + | s | + s := Semaphore new. + Processor signal: s after: seconds. + self waitWithTimedSemaphore: s. + + if (self. } method waitWithTimeout: seconds and: nanoSeconds @@ -89,6 +93,7 @@ TODO: timed wait... self primitiveFailed } *) + method critical: aBlock { self wait. @@ -194,7 +199,6 @@ class SemaphoreGroup(Object) size := 0, pos := 0, semarr := nil. - var(#get,#set) signaledSemaphore := nil. (* TODO: good idea to a shortcut way to prohibit a certain method in the heirarchy chain? @@ -250,6 +254,36 @@ method(#class,#abstract) xxx. => method(#class) xxx { self subclassResponsibilit } method(#primitive) wait. + + method waitWithTimeout: seconds + { + | s r | + + ## create an internal semaphore for timeout notification. + s := Semaphore new. + + ## grant the partial membership to the internal semaphore. + ## it's partial because it's not added to self.semarr. + s _group: self. + + ## arrange the processor to notify upon timeout. + Processor signal: s after: seconds. + + ## wait on the semaphore group. + r := self wait. + + ## if the internal semaphore has been signaled, + ## arrange to return nil to indicate timeout. + if (r == s) { r := nil }. + + ## nullify the membership + s _group: nil. + + ## cancel the notification arrangement in case it didn't time out. + Processor unsignal: s. + + ^r. + } } class SemaphoreHeap(Object) diff --git a/moo/lib/exec.c b/moo/lib/exec.c index fe67503..a84b4f2 100644 --- a/moo/lib/exec.c +++ b/moo/lib/exec.c @@ -214,7 +214,6 @@ static MOO_INLINE int prepare_to_alloc_pid (moo_t* moo) moo->proc_map = tmp; moo->proc_map_capa = new_capa; - return 0; } @@ -745,8 +744,15 @@ static moo_oop_process_t signal_semaphore (moo_t* moo, moo_oop_semaphore_t sem) unchain_from_semaphore (moo, proc); resume_process (moo, proc); - /* store the last signaled semaphore into the semaphore group */ - semgrp->sigsem = sem; + /* the waiting process has been suspended after a waiting + * primitive function in Semaphore or SemaphoreGroup. + * the top of the stack of the process must hold the temporary + * return value set by await_semaphore() or await_semaphore_group(). + * change the return value forcibly to the actual signaled + * semaphore */ + MOO_ASSERT (moo, MOO_OOP_TO_SMOOI(proc->sp) < (moo_ooi_t)(MOO_OBJ_GET_SIZE(proc) - MOO_PROCESS_NAMED_INSTVARS)); + proc->slot[MOO_OOP_TO_SMOOI(proc->sp)] = (moo_oop_t)sem; + return proc; } } @@ -790,7 +796,7 @@ static moo_oop_process_t signal_semaphore (moo_t* moo, moo_oop_semaphore_t sem) } } -static MOO_INLINE int await_semaphore (moo_t* moo, moo_oop_semaphore_t sem) +static MOO_INLINE void await_semaphore (moo_t* moo, moo_oop_semaphore_t sem) { /* TODO: support timeout */ moo_oop_process_t proc; @@ -831,10 +837,12 @@ static MOO_INLINE int await_semaphore (moo_t* moo, moo_oop_semaphore_t sem) MOO_ASSERT (moo, moo->processor->active != proc); } +#if 0 return 0; +#endif } -static MOO_INLINE moo_oop_t await_semaphore_group (moo_t* moo, moo_oop_semaphore_group_t semgrp) +static MOO_INLINE moo_oop_t await_semaphore_group (moo_t* moo, moo_oop_semaphore_group_t semgrp, const moo_ntime_t* tmout) { /* TODO: support timeout and wait all */ /* wait for one of semaphores in the group to be signaled */ @@ -859,13 +867,22 @@ static MOO_INLINE moo_oop_t await_semaphore_group (moo_t* moo, moo_oop_semaphore count--; sem->count = MOO_SMOOI_TO_OOP(count); semgrp->pos = MOO_SMOOI_TO_OOP(sempos); /* position of the last inspected semaphore */ - semgrp->sigsem = sem; /* remember the last signaled semaphore */ return (moo_oop_t)sem; } } /* no semaphores have been signaled. suspend the current process * until the at least one of them is signaled */ + +#if 0 + if (tmout) + { + /* create an internal semaphore for timeout signaling */ +/* TODO: */ + if (add_to_sem_heap (moo, tmout_sem) <= -1) return MOO_PF_HARD_FAILURE; + } +#endif + proc = moo->processor->active; /* suspend the active process */ @@ -2471,22 +2488,25 @@ static moo_pfrc_t pf_semaphore_wait (moo_t* moo, moo_ooi_t nargs) * await_semaphore() may switch the active process and the stack * manipulation macros target at the active process. i'm not supposed * to change the return value of a new active process. */ - MOO_STACK_SETRETTORCV (moo, nargs); +#if 0 if (await_semaphore (moo, (moo_oop_semaphore_t)rcv) <= -1) { /* i must switch the top because the return value has been set already */ MOO_STACK_SETTOP (moo, MOO_ERROR_TO_OOP(moo->errnum)); return MOO_PF_SUCCESS; } +#else + await_semaphore (moo, (moo_oop_semaphore_t)rcv); +#endif return MOO_PF_SUCCESS; } static moo_pfrc_t pf_semaphore_group_wait (moo_t* moo, moo_ooi_t nargs) { - moo_oop_t rcv; + moo_oop_t rcv, sem; rcv = MOO_STACK_GETRCV(moo, nargs); MOO_PF_CHECK_RCV (moo, MOO_CLASSOF(moo,rcv) == moo->_semaphore_group); @@ -2500,7 +2520,17 @@ static moo_pfrc_t pf_semaphore_group_wait (moo_t* moo, moo_ooi_t nargs) * the stack from this moment on. */ MOO_STACK_SETRETTORCV (moo, nargs); - await_semaphore_group (moo, (moo_oop_semaphore_group_t)rcv); + sem = await_semaphore_group (moo, (moo_oop_semaphore_group_t)rcv, MOO_NULL); + if (sem != moo->_nil) + { + /* there was a singaled semaphore. the active process won't get + * suspended. change the return value of the current process + * forcibly to the signaled semaphore */ + MOO_STACK_SETTOP (moo, sem); + } + + /* the return value will get changed to an actual semaphore signaled + * when the semaphore is signaled. see signal_semaphore() */ return MOO_PF_SUCCESS; } @@ -4763,7 +4793,7 @@ static MOO_INLINE int switch_process_if_needed (moo_t* moo) do { vm_gettime (moo, &now); - now.sec += 3; + now.sec += 3; /* TODO: use a configured value? */ vm_muxwait (moo, &now); } while (moo->processor->active == moo->nil_process && !moo->abort_req); diff --git a/moo/lib/moo.h b/moo/lib/moo.h index 3950b89..43cec1b 100644 --- a/moo/lib/moo.h +++ b/moo/lib/moo.h @@ -752,7 +752,7 @@ typedef struct moo_process_t* moo_oop_process_t; typedef struct moo_semaphore_t moo_semaphore_t; typedef struct moo_semaphore_t* moo_oop_semaphore_t; -#define MOO_SEMAPHORE_GROUP_NAMED_INSTVARS 6 +#define MOO_SEMAPHORE_GROUP_NAMED_INSTVARS 5 typedef struct moo_semaphore_group_t moo_semaphore_group_t; typedef struct moo_semaphore_group_t* moo_oop_semaphore_group_t; @@ -833,7 +833,6 @@ struct moo_semaphore_group_t moo_oop_t size; /* SmallInteger */ moo_oop_t pos; /* current processing position */ moo_oop_oop_t semarr; /* Array of Semaphores */ - moo_oop_semaphore_t sigsem; /* Last signaled semaphore */ }; #define MOO_PROCESS_SCHEDULER_NAMED_INSTVARS 9