diff --git a/moo/kernel/Process.moo b/moo/kernel/Process.moo index 5ff13fa..ef5f1f7 100644 --- a/moo/kernel/Process.moo +++ b/moo/kernel/Process.moo @@ -104,6 +104,15 @@ class Semaphore(Object) ## ================================================================== + method(#primitive) signalAfterSecs: secs. + method(#primitive) signalAfterSecs: secs nanosecs: nanosecs. + method(#primitive) signalOnInput: io_handle. + method(#primitive) signalOnOutput: io_handle. + method(#primitive) signalOnGCFin. + method(#primitive) unsignal. + + ## ================================================================== + method heapIndex: index { self.heapIndex := index. @@ -214,7 +223,7 @@ method(#class,#abstract) xxx. => method(#class) xxx { self subclassResponsibilit [ ## arrange the processor to notify upon timeout. - System signal: s afterSecs: seconds. + s signalAfterSecs: seconds. ## wait on the semaphore group. r := self wait. @@ -227,9 +236,9 @@ method(#class,#abstract) xxx. => method(#class) xxx { self subclassResponsibilit ## System<= 0) { ^n }. - - if (self.tmoutsecs notNil) { System signal: self.tmoutsem afterSecs: self.tmoutsecs nanosecs: self.tmoutnsecs }. - System signal: self.iosem onInput: self.handle. - s := self.sg wait. - System unsignal: self.iosem. - if (self.tmoutsecs notNil) { System unsignal: self.tmoutsem }. - if (s == self.tmoutsem) { Exception signal: 'timed out' }. + self __wait_for_input. } } @@ -339,10 +353,7 @@ class SyncSocket(CoreSocket) { n := super readBytes: bytes offset: offset length: length. if (n >= 0) { ^n }. - - System signal: self.iosem onInput: self.handle. - self.sg wait. - System unsignal: self.iosem. + self __wait_for_input. } } @@ -353,10 +364,7 @@ class SyncSocket(CoreSocket) { n := super _writeBytes: bytes. if (n >= 0) { ^n }. - - System signal: self.iosem onOutput: self.handle. - self.sg wait. - System unsignal: self.iosem. + self __wait_for_output. } } @@ -367,10 +375,7 @@ class SyncSocket(CoreSocket) { n := super _writeBytes: bytes offset: offset length: length. if (n >= 0) { ^n }. - - System signal: self.iosem onOutput: self.handle. - self.sg wait. - System unsignal: self.iosem. + self __wait_for_output. } } } @@ -434,7 +439,7 @@ extend Socket self.outdonesem signalAction: [ :sem | self onSocketDataOut. - System unsignal: self.outreadysem. + self.outreadysem unsignal. ]. self.outreadysem signalAction: [ :sem | @@ -478,21 +483,21 @@ extend Socket { if (self.outdonesem notNil) { - System unsignal: self.outdonesem. + self.outdonesem unsignal. if (self.outdonesem _group notNil) { thisProcess removeAsyncSemaphore: self.outdonesem }. self.outdonesem := nil. }. if (self.outreadysem notNil) { - System unsignal: self.outreadysem. + self.outreadysem unsignal. if (self.outreadysem _group notNil) { thisProcess removeAsyncSemaphore: self.outreadysem }. self.outreadysem := nil. }. if (self.inreadysem notNil) { - System unsignal: self.inreadysem. + self.inreadysem unsignal. if (self.inreadysem _group notNil) { thisProcess removeAsyncSemaphore: self.inreadysem }. self.inreadysem := nil. }. @@ -503,7 +508,7 @@ extend Socket method beWatched { thisProcess addAsyncSemaphore: self.inreadysem. - System signal: self.inreadysem onInput: self.handle. + self.inreadysem signalOnInput: self.handle. thisProcess addAsyncSemaphore: self.outdonesem. } @@ -542,7 +547,7 @@ extend Socket self.pending_length := rem. thisProcess addAsyncSemaphore: self.outreadysem. - System signal: self.outreadysem onOutput: self.handle. + self.outreadysem signalOnOutput: self.handle. } method writeBytes: bytes @@ -581,7 +586,7 @@ class ClientSocket(Socket) if (soerr >= 0) { ## finalize connection if not in progress - System unsignal: sem. + sem unsignal. thisProcess removeAsyncSemaphore: sem. self onSocketConnected: (soerr == 0). @@ -595,7 +600,7 @@ class ClientSocket(Socket) { if (self.connsem notNil) { - System unsignal: self.connsem. + self.connsem unsignal. if (self.connsem _group notNil) { thisProcess removeAsyncSemaphore: self.connsem }. self.connsem := nil. }. @@ -608,7 +613,7 @@ class ClientSocket(Socket) if ((self _connect: target) <= -1) { thisProcess addAsyncSemaphore: self.connsem. - System signal: self.connsem onOutput: self.handle. + self.connsem signalOnOutput: self.handle. } else { @@ -617,7 +622,7 @@ class ClientSocket(Socket) self onSocketConnected: true. thisProcess addAsyncSemaphore: self.inreadysem. - System signal: self.inreadysem onInput: self.handle. + self.inreadysem signalOnInput: self.handle. thisProcess addAsyncSemaphore: self.outdonesem. } } @@ -655,7 +660,7 @@ class ServerSocket(Socket) 'CLOSING SERVER SOCEKT.... ' dump. if (self.inreadysem notNil) { - System unsignal: self.inreadysem. + self.inreadysem unsignal. if (self.inreadysem _group notNil) { thisProcess removeAsyncSemaphore: self.inreadysem }. self.inreadysem := nil. }. @@ -671,12 +676,12 @@ class ServerSocket(Socket) ## added to the multiplexer, a spurious hangup event might ## be generated. At least, such behavior was observed ## in linux with epoll in the level trigger mode. - ## System signal: self.inreadysem onInput: self.handle. + ## self.inreadysem signalOnInput: self.handle. ## thisProcess addAsyncSemaphore: self.inreadysem. ## self _listen: backlog. n := self _listen: backlog. - System signal: self.inreadysem onInput: self.handle. + self.inreadysem signalOnInput: self.handle. thisProcess addAsyncSemaphore: self.inreadysem. ^n. } diff --git a/moo/kernel/System.moo b/moo/kernel/System.moo index 4f638f9..c863fee 100644 --- a/moo/kernel/System.moo +++ b/moo/kernel/System.moo @@ -51,12 +51,13 @@ class System(Apex) method(#class) __gc_finalizer { - | tmp gc fin_sem | + | tmp gc gcfin_sem | gc := false. - fin_sem := Semaphore new. + gcfin_sem := Semaphore new. + + gcfin_sem signalOnGCFin. - self signalOnGCFin: fin_sem. [ while (true) { @@ -87,11 +88,11 @@ class System(Apex) ##System logNl: '^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^gc_waiting....'. ##System sleepForSecs: 1. ## TODO: wait on semaphore instead.. - fin_sem wait. + gcfin_sem wait. ##System logNl: 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX gc_waitED....'. } ] ensure: [ - System unsignal: fin_sem. + gcfin_sem unsignal. System logNl: 'End of GC finalization process ' & (thisProcess id) asString. ]. } @@ -101,15 +102,6 @@ class System(Apex) method(#class,#primitive) gc. method(#class,#primitive) return: object to: context. - ## ======================================================================================= - - method(#class,#primitive) signal: semaphore afterSecs: secs. - method(#class,#primitive) signal: semaphore afterSecs: secs nanosecs: nanosecs. - method(#class,#primitive) signal: semaphore onInput: file. - method(#class,#primitive) signal: semaphore onOutput: file. - method(#class,#primitive) signalOnGCFin: semaphore. - method(#class,#primitive) unsignal: semaphore. - ## ======================================================================================= method(#class) sleepForSecs: secs { diff --git a/moo/kernel/X11.moo b/moo/kernel/X11.moo index 94a9563..99bdba3 100644 --- a/moo/kernel/X11.moo +++ b/moo/kernel/X11.moo @@ -589,7 +589,7 @@ extend X11 if (self.event_loop_sem isNil) { self.event_loop_sem := Semaphore new. - System signal: self.event_loop_sem onInput: (self _get_fd). + self.event_loop_sem signalOnInput: (self _get_fd). self.event_loop_proc := [ | llevtbuf llevent ongoing | @@ -618,7 +618,7 @@ extend X11 'CLOSING X11 EVENT LOOP' dump. - System unsignal: self.event_loop_sem. + self.event_loop_sem unsignal. ## TODO: LOOK HERE FOR RACE CONDITION self.event_loop_sem := nil. self.event_loop_proc := nil. diff --git a/moo/kernel/test-004.moo b/moo/kernel/test-004.moo index 283a61e..74e6b8f 100644 --- a/moo/kernel/test-004.moo +++ b/moo/kernel/test-004.moo @@ -38,9 +38,9 @@ class MyObject(Object) sg addSemaphore: s2. sg addSemaphore: s3. - System signal: s1 onInput: 0. - ##System signal: s2 onInput: 0. ## this should raise an exception. - ##System signal: s3 onInput: 0. + s1 signalOnInput: 0. + s2 signalOnInput: 0. ## this should raise an exception. + s3 signalOnInput: 0. [ sg wait. ] fork. [ sg wait. ] fork. diff --git a/moo/lib/exec.c b/moo/lib/exec.c index 4091342..aa83912 100644 --- a/moo/lib/exec.c +++ b/moo/lib/exec.c @@ -2365,6 +2365,208 @@ static moo_pfrc_t pf_semaphore_wait (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs return MOO_PF_SUCCESS; } +static moo_pfrc_t pf_semaphore_signal_on_gcfin (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs) +{ + moo_oop_semaphore_t sem; + + sem = (moo_oop_semaphore_t)MOO_STACK_GETRCV(moo, nargs); + MOO_PF_CHECK_RCV (moo, moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore)); + +/* TODO: no overwriting.. */ + moo->sem_gcfin = sem; + + MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */ + return MOO_PF_SUCCESS; +} + +static moo_pfrc_t pf_semaphore_signal_timed (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs) +{ + moo_oop_semaphore_t sem; + moo_oop_t sec, nsec; + moo_ntime_t now, ft; + + sem = (moo_oop_semaphore_t)MOO_STACK_GETRCV(moo, nargs); + MOO_PF_CHECK_RCV (moo, moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore)); + + sec = MOO_STACK_GETARG(moo, nargs, 0); + nsec = (nargs == 2? MOO_STACK_GETARG(moo, nargs, 1): MOO_SMOOI_TO_OOP(0)); + + if (!MOO_OOP_IS_SMOOI(sec)) + { + moo_seterrbfmt (moo, MOO_EINVAL, "invalid second - %O", sec); + return MOO_PF_FAILURE; + } + + if (!MOO_OOP_IS_SMOOI(sec)) + { + moo_seterrbfmt (moo, MOO_EINVAL, "invalid nanosecond - %O", nsec); + return MOO_PF_FAILURE; + } + +#if 0 + if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED)) + { + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0); + + /* if the semaphore is already been added. remove it first */ + delete_from_sem_heap (moo, MOO_OOP_TO_SMOOI(sem->u.timed.index)); + MOO_ASSERT (moo, sem->subtype == moo->_nil && sem->u.timed.index == moo->_nil); + + /* + Is this more desired??? + MOO_STACK_SETRET (moo, nargs, moo->_false); + return MOO_PF_SUCCESS; + */ + } +#else + if (sem->subtype != moo->_nil) + { + if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO)) + { + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.index) && MOO_OOP_TO_SMOOI(sem->u.io.index) >= 0); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.handle) && MOO_OOP_TO_SMOOI(sem->u.io.handle) >= 0); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.type)); + moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already linked with a handle %zd", MOO_OOP_TO_SMOOI(sem->u.io.handle)); + } + else + { + MOO_ASSERT (moo, sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED)); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_sec)); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_nsec)); + moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already activated for timer"); + } + + return MOO_PF_FAILURE; + } +#endif + /* this code assumes that the monotonic clock returns a small value + * that can fit into a SmallInteger, even after some additions. */ + vm_gettime (moo, &now); + MOO_ADDNTIMESNS (&ft, &now, MOO_OOP_TO_SMOOI(sec), MOO_OOP_TO_SMOOI(nsec)); + if (ft.sec < 0 || ft.sec > MOO_SMOOI_MAX) + { + /* soft error - cannot represent the expiry time in a small integer. */ + MOO_LOG3 (moo, MOO_LOG_PRIMITIVE | MOO_LOG_ERROR, + "Error(%hs) - time (%ld) out of range(0 - %zd) when adding a timed semaphore\n", + __PRIMITIVE_NAME__, (unsigned long int)ft.sec, (moo_ooi_t)MOO_SMOOI_MAX); + + moo_seterrnum (moo, MOO_ERANGE); + return MOO_PF_FAILURE; + } + + sem->u.timed.ftime_sec = MOO_SMOOI_TO_OOP(ft.sec); + sem->u.timed.ftime_nsec = MOO_SMOOI_TO_OOP(ft.nsec); + + if (add_to_sem_heap(moo, sem) <= -1) return MOO_PF_HARD_FAILURE; + MOO_ASSERT (moo, sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED)); + + MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */ + return MOO_PF_SUCCESS; +} + +static moo_pfrc_t __semaphore_signal_on_io (moo_t* moo, moo_ooi_t nargs, moo_semaphore_io_type_t io_type) +{ + moo_oop_semaphore_t sem; + moo_oop_t fd; + + sem = (moo_oop_semaphore_t)MOO_STACK_GETRCV(moo, nargs); + MOO_PF_CHECK_RCV (moo, moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore)); + + fd = MOO_STACK_GETARG(moo, nargs, 0); + + if (!MOO_OOP_IS_SMOOI(fd)) + { + moo_seterrbfmt (moo, MOO_EINVAL, "handle not a small integer - %O", fd); + return MOO_PF_FAILURE; + } + + if (sem->subtype != moo->_nil) + { + if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO)) + { + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.index) && MOO_OOP_TO_SMOOI(sem->u.io.index) >= 0); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.handle) && MOO_OOP_TO_SMOOI(sem->u.io.handle) >= 0); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.type)); + moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already linked with a handle %zd", MOO_OOP_TO_SMOOI(sem->u.io.handle)); + } + else + { + MOO_ASSERT (moo, sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED)); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_sec)); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_nsec)); + moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already activated for timer"); + } + + return MOO_PF_FAILURE; + } + + if (add_sem_to_sem_io_tuple(moo, sem, MOO_OOP_TO_SMOOI(fd), io_type) <= -1) + { + const moo_ooch_t* oldmsg = moo_backuperrmsg(moo); + moo_seterrbfmt (moo, moo->errnum, "unable to add the handle %zd to the multiplexer for %hs - %js", MOO_OOP_TO_SMOOI(fd), io_type_str[io_type], oldmsg); + return MOO_PF_FAILURE; + } + + MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */ + return MOO_PF_SUCCESS; +} + +static moo_pfrc_t pf_semaphore_signal_on_input (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs) +{ + return __semaphore_signal_on_io(moo, nargs, MOO_SEMAPHORE_IO_TYPE_INPUT); +} + +static moo_pfrc_t pf_semaphore_signal_on_output (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs) +{ + return __semaphore_signal_on_io(moo, nargs, MOO_SEMAPHORE_IO_TYPE_OUTPUT); +} + +static moo_pfrc_t pf_semaphore_unsignal (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs) +{ + /* remove a semaphore from processor's signal scheduling */ + moo_oop_semaphore_t sem; + + sem = (moo_oop_semaphore_t)MOO_STACK_GETRCV(moo, nargs); + MOO_PF_CHECK_RCV (moo, moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore)); + + if (sem == moo->sem_gcfin) + { + moo->sem_gcfin = (moo_oop_semaphore_t)moo->_nil; + } + + if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED)) + { + /* the semaphore is in the timed semaphore heap */ + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0); + delete_from_sem_heap (moo, MOO_OOP_TO_SMOOI(sem->u.timed.index)); + MOO_ASSERT (moo, sem->u.timed.index == moo->_nil); + } + else if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO)) + { + /* the semaphore is associated with IO */ + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.index) && MOO_OOP_TO_SMOOI(sem->u.io.index) >= 0); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.type)); + MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.handle) && MOO_OOP_TO_SMOOI(sem->u.io.handle) >= 0); + + if (delete_sem_from_sem_io_tuple(moo, sem, 0) <= -1) + { + const moo_ooch_t* oldmsg = moo_backuperrmsg(moo); + moo_seterrbfmt (moo, moo->errnum, "cannot delete the handle %zd from the multiplexer - %js", MOO_OOP_TO_SMOOI(sem->u.io.handle), oldmsg); + return MOO_PF_FAILURE; + } + + MOO_ASSERT (moo, (moo_oop_t)sem->u.io.index == moo->_nil); + } + MOO_ASSERT (moo, sem->subtype == moo->_nil); + + MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */ + return MOO_PF_SUCCESS; +} + +/* ------------------------------------------------------------------ */ + static moo_pfrc_t pf_semaphore_group_add_semaphore (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs) { moo_oop_semaphore_group_t sg; @@ -2553,245 +2755,6 @@ static moo_pfrc_t pf_semaphore_group_wait (moo_t* moo, moo_mod_t* mod, moo_ooi_t /* ------------------------------------------------------------------ */ -static moo_pfrc_t pf_system_add_gcfin_semaphore (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs) -{ - moo_oop_semaphore_t sem; - - /*MOO_PF_CHECK_RCV (moo, MOO_STACK_GETRCV(moo, nargs) == (moo_oop_t)moo->processor);*/ - - MOO_ASSERT (moo, nargs == 1); - sem = (moo_oop_semaphore_t)MOO_STACK_GETARG(moo, nargs, 0); - - if (!moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore)) - { - moo_seterrbfmt (moo, MOO_EINVAL, "parameter not a kind of semaphore - %O", sem); - return MOO_PF_FAILURE; - } - -/* TODO: no overwriting.. */ - moo->sem_gcfin = sem; - - MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */ - return MOO_PF_SUCCESS; -} - -static moo_pfrc_t pf_system_add_timed_semaphore (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs) -{ - moo_oop_t sec, nsec; - moo_oop_semaphore_t sem; - moo_ntime_t now, ft; - - /* don't care about the receiver much as the receiver is not used at all. - * however, it's intended to be called from the System class. */ - - MOO_ASSERT (moo, nargs >= 2 || nargs <= 3); - - sem = (moo_oop_semaphore_t)MOO_STACK_GETARG(moo, nargs, 0); - sec = MOO_STACK_GETARG(moo, nargs, 1); - nsec = (nargs == 3? MOO_STACK_GETARG(moo, nargs, 2): MOO_SMOOI_TO_OOP(0)); - - if (!moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore)) - { - moo_seterrbfmt (moo, MOO_EINVAL, "parameter not a kind of semaphore - %O", sem); - return MOO_PF_FAILURE; - } - - if (!MOO_OOP_IS_SMOOI(sec)) - { - moo_seterrbfmt (moo, MOO_EINVAL, "invalid second - %O", sec); - return MOO_PF_FAILURE; - } - - if (!MOO_OOP_IS_SMOOI(sec)) - { - moo_seterrbfmt (moo, MOO_EINVAL, "invalid nanosecond - %O", nsec); - return MOO_PF_FAILURE; - } - -#if 0 - if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED)) - { - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0); - - /* if the semaphore is already been added. remove it first */ - delete_from_sem_heap (moo, MOO_OOP_TO_SMOOI(sem->u.timed.index)); - MOO_ASSERT (moo, sem->subtype == moo->_nil && sem->u.timed.index == moo->_nil); - - /* - Is this more desired??? - MOO_STACK_SETRET (moo, nargs, moo->_false); - return MOO_PF_SUCCESS; - */ - } -#else - if (sem->subtype != moo->_nil) - { - if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO)) - { - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.index) && MOO_OOP_TO_SMOOI(sem->u.io.index) >= 0); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.handle) && MOO_OOP_TO_SMOOI(sem->u.io.handle) >= 0); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.type)); - moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already linked with a handle %zd", MOO_OOP_TO_SMOOI(sem->u.io.handle)); - } - else - { - MOO_ASSERT (moo, sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED)); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_sec)); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_nsec)); - moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already activated for timer"); - } - - return MOO_PF_FAILURE; - } -#endif - /* this code assumes that the monotonic clock returns a small value - * that can fit into a SmallInteger, even after some additions. */ - vm_gettime (moo, &now); - MOO_ADDNTIMESNS (&ft, &now, MOO_OOP_TO_SMOOI(sec), MOO_OOP_TO_SMOOI(nsec)); - if (ft.sec < 0 || ft.sec > MOO_SMOOI_MAX) - { - /* soft error - cannot represent the expiry time in a small integer. */ - MOO_LOG3 (moo, MOO_LOG_PRIMITIVE | MOO_LOG_ERROR, - "Error(%hs) - time (%ld) out of range(0 - %zd) when adding a timed semaphore\n", - __PRIMITIVE_NAME__, (unsigned long int)ft.sec, (moo_ooi_t)MOO_SMOOI_MAX); - - moo_seterrnum (moo, MOO_ERANGE); - return MOO_PF_FAILURE; - } - - sem->u.timed.ftime_sec = MOO_SMOOI_TO_OOP(ft.sec); - sem->u.timed.ftime_nsec = MOO_SMOOI_TO_OOP(ft.nsec); - - if (add_to_sem_heap(moo, sem) <= -1) return MOO_PF_HARD_FAILURE; - MOO_ASSERT (moo, sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED)); - - MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */ - return MOO_PF_SUCCESS; -} - -static moo_pfrc_t __system_add_io_semaphore (moo_t* moo, moo_ooi_t nargs, moo_semaphore_io_type_t io_type) -{ - moo_oop_t fd; - moo_oop_semaphore_t sem; - - /*MOO_PF_CHECK_RCV (moo, MOO_STACK_GETRCV(moo, nargs) == (moo_oop_t)moo->system);*/ - - MOO_ASSERT (moo, nargs == 2); - - fd = MOO_STACK_GETARG(moo, nargs, 1); - sem = (moo_oop_semaphore_t)MOO_STACK_GETARG(moo, nargs, 0); - - if (!moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore)) - { - moo_seterrbfmt (moo, MOO_EINVAL, "parameter not a kind of semaphore - %O", sem); - return MOO_PF_FAILURE; - } - - if (!MOO_OOP_IS_SMOOI(fd)) - { - moo_seterrbfmt (moo, MOO_EINVAL, "handle not a small integer - %O", fd); - return MOO_PF_FAILURE; - } - - if (sem->subtype != moo->_nil) - { - if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO)) - { - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.index) && MOO_OOP_TO_SMOOI(sem->u.io.index) >= 0); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.handle) && MOO_OOP_TO_SMOOI(sem->u.io.handle) >= 0); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.type)); - moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already linked with a handle %zd", MOO_OOP_TO_SMOOI(sem->u.io.handle)); - } - else - { - MOO_ASSERT (moo, sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED)); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_sec)); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_nsec)); - moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already activated for timer"); - } - - return MOO_PF_FAILURE; - } - - if (add_sem_to_sem_io_tuple(moo, sem, MOO_OOP_TO_SMOOI(fd), io_type) <= -1) - { - const moo_ooch_t* oldmsg = moo_backuperrmsg(moo); - moo_seterrbfmt (moo, moo->errnum, "unable to add the handle %zd to the multiplexer for %hs - %js", MOO_OOP_TO_SMOOI(fd), io_type_str[io_type], oldmsg); - return MOO_PF_FAILURE; - } - - MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */ - return MOO_PF_SUCCESS; -} - -static moo_pfrc_t pf_system_add_input_semaphore (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs) -{ - return __system_add_io_semaphore (moo, nargs, MOO_SEMAPHORE_IO_TYPE_INPUT); -} - -static moo_pfrc_t pf_system_add_output_semaphore (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs) -{ - return __system_add_io_semaphore (moo, nargs, MOO_SEMAPHORE_IO_TYPE_OUTPUT); -} - -static moo_pfrc_t pf_system_remove_semaphore (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs) -{ - /* remove a semaphore from processor's signal scheduling */ - - moo_oop_semaphore_t sem; - - /*MOO_PF_CHECK_RCV (moo, MOO_STACK_GETRCV(moo, nargs) == (moo_oop_t)moo->system);*/ - - MOO_ASSERT (moo, nargs == 1); - - sem = (moo_oop_semaphore_t)MOO_STACK_GETARG(moo, nargs, 0); - if (!moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore)) - { - moo_seterrbfmt (moo, MOO_EINVAL, "parameter not a kind of semaphore - %O", sem); - return MOO_PF_FAILURE; - } - -/* TODO: remove a semaphore from IO handler if it's registered... - * remove a semaphore from elsewhere registered too */ - - if (sem == moo->sem_gcfin) - { - moo->sem_gcfin = (moo_oop_semaphore_t)moo->_nil; - } - - if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED)) - { - /* the semaphore is in the timed semaphore heap */ - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0); - delete_from_sem_heap (moo, MOO_OOP_TO_SMOOI(sem->u.timed.index)); - MOO_ASSERT (moo, sem->u.timed.index == moo->_nil); - } - else if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO)) - { - /* the semaphore is associated with IO */ - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.index) && MOO_OOP_TO_SMOOI(sem->u.io.index) >= 0); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.type)); - MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.handle) && MOO_OOP_TO_SMOOI(sem->u.io.handle) >= 0); - - if (delete_sem_from_sem_io_tuple(moo, sem, 0) <= -1) - { - const moo_ooch_t* oldmsg = moo_backuperrmsg(moo); - moo_seterrbfmt (moo, moo->errnum, "cannot delete the handle %zd from the multiplexer - %js", MOO_OOP_TO_SMOOI(sem->u.io.handle), oldmsg); - return MOO_PF_FAILURE; - } - - MOO_ASSERT (moo, (moo_oop_t)sem->u.io.index == moo->_nil); - } - MOO_ASSERT (moo, sem->subtype == moo->_nil); - - MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */ - return MOO_PF_SUCCESS; -} - -/* ------------------------------------------------------------------ */ - static moo_pfrc_t pf_system_return_value_to_context (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs) { moo_oop_t ret, ctx; @@ -3543,6 +3506,12 @@ static pf_t pftab[] = { "SemaphoreGroup_wait", { pf_semaphore_group_wait, 0, 0 } }, { "Semaphore_signal", { pf_semaphore_signal, 0, 0 } }, + { "Semaphore_signalAfterSecs:", { pf_semaphore_signal_timed, 1, 1 } }, + { "Semaphore_signalAfterSecs:nanosecs:", { pf_semaphore_signal_timed, 2, 2 } }, + { "Semaphore_signalOnGCFin", { pf_semaphore_signal_on_gcfin, 0, 0 } }, + { "Semaphore_signalOnInput:", { pf_semaphore_signal_on_input, 1, 1 } }, + { "Semaphore_signalOnOutput:", { pf_semaphore_signal_on_output, 1, 1 } }, + { "Semaphore_unsignal", { pf_semaphore_unsignal, 0, 0 } }, { "Semaphore_wait", { pf_semaphore_wait, 0, 0 } }, { "SmallInteger_asCharacter", { pf_smooi_as_character, 0, 0 } }, @@ -3600,12 +3569,6 @@ static pf_t pftab[] = { "System_putUint64", { moo_pf_system_put_uint64, 3, 3 } }, { "System_putUint8", { moo_pf_system_put_uint8, 3, 3 } }, { "System_return:to:", { pf_system_return_value_to_context, 2, 2 } }, - { "System_signal:afterSecs:", { pf_system_add_timed_semaphore, 2, 2 } }, - { "System_signal:afterSecs:nanosecs:", { pf_system_add_timed_semaphore, 3, 3 } }, - { "System_signal:onInput:", { pf_system_add_input_semaphore, 2, 2 } }, - { "System_signal:onOutput:", { pf_system_add_output_semaphore, 2, 2 } }, - { "System_signalOnGCFin:", { pf_system_add_gcfin_semaphore, 1, 1 } }, - { "System_unsignal:", { pf_system_remove_semaphore, 1, 1 } }, { "_dump", { pf_dump, 0, MA } }, @@ -4202,7 +4165,6 @@ static MOO_INLINE int switch_process_if_needed (moo_t* moo) do { - MOO_INITNTIME (&ft, 3, 0); /* TODO: use a configured time */ vm_muxwait (moo, &ft); } @@ -4215,9 +4177,9 @@ static MOO_INLINE int switch_process_if_needed (moo_t* moo) * before proceeding to handle normal process scheduling */ /* NOTE: the check with the multiplexer may happen too frequently - * because this is called everytime process switching is requested - * the actual callback implementation may avoid using actual - * system calls for the check too frequently */ + * because this is called everytime process switching is requested. + * the actual callback implementation should try to avoid invoking + * actual system calls too frequently for less overhead. */ vm_muxwait (moo, MOO_NULL); } } diff --git a/moo/lib/main.c b/moo/lib/main.c index 5357254..1f40670 100644 --- a/moo/lib/main.c +++ b/moo/lib/main.c @@ -1832,7 +1832,7 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c pthread_mutex_unlock (&xtn->ev.mtx); } -#else +#else /* USE_THREAD */ int tmout = 0, n; #if defined(USE_DEVPOLL) struct dvpoll dvp; @@ -1882,7 +1882,7 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c maxfd = xtn->ev.reg.maxfd; MOO_MEMCPY (&rfds, &xtn->ev.reg.rfds, MOO_SIZEOF(rfds)); MOO_MEMCPY (&wfds, &xtn->ev.reg.wfds, MOO_SIZEOF(wfds)); - n = select (maxfd + 1, &rfds, &wfds, NULL, &tv); + n = select(maxfd + 1, &rfds, &wfds, NULL, &tv); if (n > 0) { int fd, count = 0; @@ -1956,7 +1956,7 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c } xtn->ev.len = 0; -#endif +#endif /* USE_THREAD */ } static void vm_sleep (moo_t* moo, const moo_ntime_t* dur)