migrated semaphore signal/unsignal methods from System to Semaphore

This commit is contained in:
hyunghwan.chung 2018-05-18 08:10:16 +00:00
parent faef93e7d7
commit 4da0731312
7 changed files with 277 additions and 309 deletions

View File

@ -104,6 +104,15 @@ class Semaphore(Object)
## ==================================================================
method(#primitive) signalAfterSecs: secs.
method(#primitive) signalAfterSecs: secs nanosecs: nanosecs.
method(#primitive) signalOnInput: io_handle.
method(#primitive) signalOnOutput: io_handle.
method(#primitive) signalOnGCFin.
method(#primitive) unsignal.
## ==================================================================
method heapIndex: index
{
self.heapIndex := index.
@ -214,7 +223,7 @@ method(#class,#abstract) xxx. => method(#class) xxx { self subclassResponsibilit
[
## arrange the processor to notify upon timeout.
System signal: s afterSecs: seconds.
s signalAfterSecs: seconds.
## wait on the semaphore group.
r := self wait.
@ -227,9 +236,9 @@ method(#class,#abstract) xxx. => method(#class) xxx { self subclassResponsibilit
## System<<unsignal: doesn't thrown an exception even if the semaphore s is not
## register with System<<signal:afterXXX:. otherwise, i would do like this line
## commented out.
## [ System unsignal: s ] ensure: [ self removeSemaphore: s ].
## [ s unsignal ] ensure: [ self removeSemaphore: s ].
System unsignal: s.
s unsignal.
self removeSemaphore: s
].

View File

@ -302,8 +302,6 @@ class SyncSocket(CoreSocket)
self.sg addSemaphore: self.tmoutsem.
}
method beWatched
{
## do nothing. i don't want to be watched.
@ -315,20 +313,36 @@ class SyncSocket(CoreSocket)
self.tmoutnsecs := 0.
}
method __wait_for_input
{
| s |
if (self.tmoutsecs notNil) { self.tmoutsem signalAfterSecs: self.tmoutsecs nanosecs: self.tmoutnsecs }.
self.iosem signalOnInput: self.handle.
s := self.sg wait.
self.iosem unsignal.
if (self.tmoutsecs notNil) { self.tmoutsem unsignal }.
if (s == self.tmoutsem) { Exception signal: 'timed out' }.
}
method __wait_for_output
{
| s |
if (self.tmoutsecs notNil) { self.tmoutsem signalAfterSecs: self.tmoutsecs nanosecs: self.tmoutnsecs }.
self.iosem signalOnOutput: self.handle.
s := self.sg wait.
self.iosem unsignal.
if (self.tmoutsecs notNil) { self.tmoutsem unsignal }.
if (s == self.tmoutsem) { Exception signal: 'timed out' }.
}
method readBytes: bytes
{
| n s |
| n |
while (true)
{
n := super readBytes: bytes.
if (n >= 0) { ^n }.
if (self.tmoutsecs notNil) { System signal: self.tmoutsem afterSecs: self.tmoutsecs nanosecs: self.tmoutnsecs }.
System signal: self.iosem onInput: self.handle.
s := self.sg wait.
System unsignal: self.iosem.
if (self.tmoutsecs notNil) { System unsignal: self.tmoutsem }.
if (s == self.tmoutsem) { Exception signal: 'timed out' }.
self __wait_for_input.
}
}
@ -339,10 +353,7 @@ class SyncSocket(CoreSocket)
{
n := super readBytes: bytes offset: offset length: length.
if (n >= 0) { ^n }.
System signal: self.iosem onInput: self.handle.
self.sg wait.
System unsignal: self.iosem.
self __wait_for_input.
}
}
@ -353,10 +364,7 @@ class SyncSocket(CoreSocket)
{
n := super _writeBytes: bytes.
if (n >= 0) { ^n }.
System signal: self.iosem onOutput: self.handle.
self.sg wait.
System unsignal: self.iosem.
self __wait_for_output.
}
}
@ -367,10 +375,7 @@ class SyncSocket(CoreSocket)
{
n := super _writeBytes: bytes offset: offset length: length.
if (n >= 0) { ^n }.
System signal: self.iosem onOutput: self.handle.
self.sg wait.
System unsignal: self.iosem.
self __wait_for_output.
}
}
}
@ -434,7 +439,7 @@ extend Socket
self.outdonesem signalAction: [ :sem |
self onSocketDataOut.
System unsignal: self.outreadysem.
self.outreadysem unsignal.
].
self.outreadysem signalAction: [ :sem |
@ -478,21 +483,21 @@ extend Socket
{
if (self.outdonesem notNil)
{
System unsignal: self.outdonesem.
self.outdonesem unsignal.
if (self.outdonesem _group notNil) { thisProcess removeAsyncSemaphore: self.outdonesem }.
self.outdonesem := nil.
}.
if (self.outreadysem notNil)
{
System unsignal: self.outreadysem.
self.outreadysem unsignal.
if (self.outreadysem _group notNil) { thisProcess removeAsyncSemaphore: self.outreadysem }.
self.outreadysem := nil.
}.
if (self.inreadysem notNil)
{
System unsignal: self.inreadysem.
self.inreadysem unsignal.
if (self.inreadysem _group notNil) { thisProcess removeAsyncSemaphore: self.inreadysem }.
self.inreadysem := nil.
}.
@ -503,7 +508,7 @@ extend Socket
method beWatched
{
thisProcess addAsyncSemaphore: self.inreadysem.
System signal: self.inreadysem onInput: self.handle.
self.inreadysem signalOnInput: self.handle.
thisProcess addAsyncSemaphore: self.outdonesem.
}
@ -542,7 +547,7 @@ extend Socket
self.pending_length := rem.
thisProcess addAsyncSemaphore: self.outreadysem.
System signal: self.outreadysem onOutput: self.handle.
self.outreadysem signalOnOutput: self.handle.
}
method writeBytes: bytes
@ -581,7 +586,7 @@ class ClientSocket(Socket)
if (soerr >= 0)
{
## finalize connection if not in progress
System unsignal: sem.
sem unsignal.
thisProcess removeAsyncSemaphore: sem.
self onSocketConnected: (soerr == 0).
@ -595,7 +600,7 @@ class ClientSocket(Socket)
{
if (self.connsem notNil)
{
System unsignal: self.connsem.
self.connsem unsignal.
if (self.connsem _group notNil) { thisProcess removeAsyncSemaphore: self.connsem }.
self.connsem := nil.
}.
@ -608,7 +613,7 @@ class ClientSocket(Socket)
if ((self _connect: target) <= -1)
{
thisProcess addAsyncSemaphore: self.connsem.
System signal: self.connsem onOutput: self.handle.
self.connsem signalOnOutput: self.handle.
}
else
{
@ -617,7 +622,7 @@ class ClientSocket(Socket)
self onSocketConnected: true.
thisProcess addAsyncSemaphore: self.inreadysem.
System signal: self.inreadysem onInput: self.handle.
self.inreadysem signalOnInput: self.handle.
thisProcess addAsyncSemaphore: self.outdonesem.
}
}
@ -655,7 +660,7 @@ class ServerSocket(Socket)
'CLOSING SERVER SOCEKT.... ' dump.
if (self.inreadysem notNil)
{
System unsignal: self.inreadysem.
self.inreadysem unsignal.
if (self.inreadysem _group notNil) { thisProcess removeAsyncSemaphore: self.inreadysem }.
self.inreadysem := nil.
}.
@ -671,12 +676,12 @@ class ServerSocket(Socket)
## added to the multiplexer, a spurious hangup event might
## be generated. At least, such behavior was observed
## in linux with epoll in the level trigger mode.
## System signal: self.inreadysem onInput: self.handle.
## self.inreadysem signalOnInput: self.handle.
## thisProcess addAsyncSemaphore: self.inreadysem.
## self _listen: backlog.
n := self _listen: backlog.
System signal: self.inreadysem onInput: self.handle.
self.inreadysem signalOnInput: self.handle.
thisProcess addAsyncSemaphore: self.inreadysem.
^n.
}

View File

@ -51,12 +51,13 @@ class System(Apex)
method(#class) __gc_finalizer
{
| tmp gc fin_sem |
| tmp gc gcfin_sem |
gc := false.
fin_sem := Semaphore new.
gcfin_sem := Semaphore new.
gcfin_sem signalOnGCFin.
self signalOnGCFin: fin_sem.
[
while (true)
{
@ -87,11 +88,11 @@ class System(Apex)
##System logNl: '^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^gc_waiting....'.
##System sleepForSecs: 1. ## TODO: wait on semaphore instead..
fin_sem wait.
gcfin_sem wait.
##System logNl: 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX gc_waitED....'.
}
] ensure: [
System unsignal: fin_sem.
gcfin_sem unsignal.
System logNl: 'End of GC finalization process ' & (thisProcess id) asString.
].
}
@ -101,15 +102,6 @@ class System(Apex)
method(#class,#primitive) gc.
method(#class,#primitive) return: object to: context.
## =======================================================================================
method(#class,#primitive) signal: semaphore afterSecs: secs.
method(#class,#primitive) signal: semaphore afterSecs: secs nanosecs: nanosecs.
method(#class,#primitive) signal: semaphore onInput: file.
method(#class,#primitive) signal: semaphore onOutput: file.
method(#class,#primitive) signalOnGCFin: semaphore.
method(#class,#primitive) unsignal: semaphore.
## =======================================================================================
method(#class) sleepForSecs: secs
{

View File

@ -589,7 +589,7 @@ extend X11
if (self.event_loop_sem isNil)
{
self.event_loop_sem := Semaphore new.
System signal: self.event_loop_sem onInput: (self _get_fd).
self.event_loop_sem signalOnInput: (self _get_fd).
self.event_loop_proc := [
| llevtbuf llevent ongoing |
@ -618,7 +618,7 @@ extend X11
'CLOSING X11 EVENT LOOP' dump.
System unsignal: self.event_loop_sem.
self.event_loop_sem unsignal.
## TODO: LOOK HERE FOR RACE CONDITION
self.event_loop_sem := nil.
self.event_loop_proc := nil.

View File

@ -38,9 +38,9 @@ class MyObject(Object)
sg addSemaphore: s2.
sg addSemaphore: s3.
System signal: s1 onInput: 0.
##System signal: s2 onInput: 0. ## this should raise an exception.
##System signal: s3 onInput: 0.
s1 signalOnInput: 0.
s2 signalOnInput: 0. ## this should raise an exception.
s3 signalOnInput: 0.
[ sg wait. ] fork.
[ sg wait. ] fork.

View File

@ -2365,6 +2365,208 @@ static moo_pfrc_t pf_semaphore_wait (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs
return MOO_PF_SUCCESS;
}
static moo_pfrc_t pf_semaphore_signal_on_gcfin (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs)
{
moo_oop_semaphore_t sem;
sem = (moo_oop_semaphore_t)MOO_STACK_GETRCV(moo, nargs);
MOO_PF_CHECK_RCV (moo, moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore));
/* TODO: no overwriting.. */
moo->sem_gcfin = sem;
MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */
return MOO_PF_SUCCESS;
}
static moo_pfrc_t pf_semaphore_signal_timed (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs)
{
moo_oop_semaphore_t sem;
moo_oop_t sec, nsec;
moo_ntime_t now, ft;
sem = (moo_oop_semaphore_t)MOO_STACK_GETRCV(moo, nargs);
MOO_PF_CHECK_RCV (moo, moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore));
sec = MOO_STACK_GETARG(moo, nargs, 0);
nsec = (nargs == 2? MOO_STACK_GETARG(moo, nargs, 1): MOO_SMOOI_TO_OOP(0));
if (!MOO_OOP_IS_SMOOI(sec))
{
moo_seterrbfmt (moo, MOO_EINVAL, "invalid second - %O", sec);
return MOO_PF_FAILURE;
}
if (!MOO_OOP_IS_SMOOI(sec))
{
moo_seterrbfmt (moo, MOO_EINVAL, "invalid nanosecond - %O", nsec);
return MOO_PF_FAILURE;
}
#if 0
if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED))
{
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0);
/* if the semaphore is already been added. remove it first */
delete_from_sem_heap (moo, MOO_OOP_TO_SMOOI(sem->u.timed.index));
MOO_ASSERT (moo, sem->subtype == moo->_nil && sem->u.timed.index == moo->_nil);
/*
Is this more desired???
MOO_STACK_SETRET (moo, nargs, moo->_false);
return MOO_PF_SUCCESS;
*/
}
#else
if (sem->subtype != moo->_nil)
{
if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO))
{
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.index) && MOO_OOP_TO_SMOOI(sem->u.io.index) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.handle) && MOO_OOP_TO_SMOOI(sem->u.io.handle) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.type));
moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already linked with a handle %zd", MOO_OOP_TO_SMOOI(sem->u.io.handle));
}
else
{
MOO_ASSERT (moo, sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_sec));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_nsec));
moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already activated for timer");
}
return MOO_PF_FAILURE;
}
#endif
/* this code assumes that the monotonic clock returns a small value
* that can fit into a SmallInteger, even after some additions. */
vm_gettime (moo, &now);
MOO_ADDNTIMESNS (&ft, &now, MOO_OOP_TO_SMOOI(sec), MOO_OOP_TO_SMOOI(nsec));
if (ft.sec < 0 || ft.sec > MOO_SMOOI_MAX)
{
/* soft error - cannot represent the expiry time in a small integer. */
MOO_LOG3 (moo, MOO_LOG_PRIMITIVE | MOO_LOG_ERROR,
"Error(%hs) - time (%ld) out of range(0 - %zd) when adding a timed semaphore\n",
__PRIMITIVE_NAME__, (unsigned long int)ft.sec, (moo_ooi_t)MOO_SMOOI_MAX);
moo_seterrnum (moo, MOO_ERANGE);
return MOO_PF_FAILURE;
}
sem->u.timed.ftime_sec = MOO_SMOOI_TO_OOP(ft.sec);
sem->u.timed.ftime_nsec = MOO_SMOOI_TO_OOP(ft.nsec);
if (add_to_sem_heap(moo, sem) <= -1) return MOO_PF_HARD_FAILURE;
MOO_ASSERT (moo, sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED));
MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */
return MOO_PF_SUCCESS;
}
static moo_pfrc_t __semaphore_signal_on_io (moo_t* moo, moo_ooi_t nargs, moo_semaphore_io_type_t io_type)
{
moo_oop_semaphore_t sem;
moo_oop_t fd;
sem = (moo_oop_semaphore_t)MOO_STACK_GETRCV(moo, nargs);
MOO_PF_CHECK_RCV (moo, moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore));
fd = MOO_STACK_GETARG(moo, nargs, 0);
if (!MOO_OOP_IS_SMOOI(fd))
{
moo_seterrbfmt (moo, MOO_EINVAL, "handle not a small integer - %O", fd);
return MOO_PF_FAILURE;
}
if (sem->subtype != moo->_nil)
{
if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO))
{
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.index) && MOO_OOP_TO_SMOOI(sem->u.io.index) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.handle) && MOO_OOP_TO_SMOOI(sem->u.io.handle) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.type));
moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already linked with a handle %zd", MOO_OOP_TO_SMOOI(sem->u.io.handle));
}
else
{
MOO_ASSERT (moo, sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_sec));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_nsec));
moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already activated for timer");
}
return MOO_PF_FAILURE;
}
if (add_sem_to_sem_io_tuple(moo, sem, MOO_OOP_TO_SMOOI(fd), io_type) <= -1)
{
const moo_ooch_t* oldmsg = moo_backuperrmsg(moo);
moo_seterrbfmt (moo, moo->errnum, "unable to add the handle %zd to the multiplexer for %hs - %js", MOO_OOP_TO_SMOOI(fd), io_type_str[io_type], oldmsg);
return MOO_PF_FAILURE;
}
MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */
return MOO_PF_SUCCESS;
}
static moo_pfrc_t pf_semaphore_signal_on_input (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs)
{
return __semaphore_signal_on_io(moo, nargs, MOO_SEMAPHORE_IO_TYPE_INPUT);
}
static moo_pfrc_t pf_semaphore_signal_on_output (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs)
{
return __semaphore_signal_on_io(moo, nargs, MOO_SEMAPHORE_IO_TYPE_OUTPUT);
}
static moo_pfrc_t pf_semaphore_unsignal (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs)
{
/* remove a semaphore from processor's signal scheduling */
moo_oop_semaphore_t sem;
sem = (moo_oop_semaphore_t)MOO_STACK_GETRCV(moo, nargs);
MOO_PF_CHECK_RCV (moo, moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore));
if (sem == moo->sem_gcfin)
{
moo->sem_gcfin = (moo_oop_semaphore_t)moo->_nil;
}
if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED))
{
/* the semaphore is in the timed semaphore heap */
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0);
delete_from_sem_heap (moo, MOO_OOP_TO_SMOOI(sem->u.timed.index));
MOO_ASSERT (moo, sem->u.timed.index == moo->_nil);
}
else if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO))
{
/* the semaphore is associated with IO */
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.index) && MOO_OOP_TO_SMOOI(sem->u.io.index) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.type));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.handle) && MOO_OOP_TO_SMOOI(sem->u.io.handle) >= 0);
if (delete_sem_from_sem_io_tuple(moo, sem, 0) <= -1)
{
const moo_ooch_t* oldmsg = moo_backuperrmsg(moo);
moo_seterrbfmt (moo, moo->errnum, "cannot delete the handle %zd from the multiplexer - %js", MOO_OOP_TO_SMOOI(sem->u.io.handle), oldmsg);
return MOO_PF_FAILURE;
}
MOO_ASSERT (moo, (moo_oop_t)sem->u.io.index == moo->_nil);
}
MOO_ASSERT (moo, sem->subtype == moo->_nil);
MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */
return MOO_PF_SUCCESS;
}
/* ------------------------------------------------------------------ */
static moo_pfrc_t pf_semaphore_group_add_semaphore (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs)
{
moo_oop_semaphore_group_t sg;
@ -2553,245 +2755,6 @@ static moo_pfrc_t pf_semaphore_group_wait (moo_t* moo, moo_mod_t* mod, moo_ooi_t
/* ------------------------------------------------------------------ */
static moo_pfrc_t pf_system_add_gcfin_semaphore (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs)
{
moo_oop_semaphore_t sem;
/*MOO_PF_CHECK_RCV (moo, MOO_STACK_GETRCV(moo, nargs) == (moo_oop_t)moo->processor);*/
MOO_ASSERT (moo, nargs == 1);
sem = (moo_oop_semaphore_t)MOO_STACK_GETARG(moo, nargs, 0);
if (!moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore))
{
moo_seterrbfmt (moo, MOO_EINVAL, "parameter not a kind of semaphore - %O", sem);
return MOO_PF_FAILURE;
}
/* TODO: no overwriting.. */
moo->sem_gcfin = sem;
MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */
return MOO_PF_SUCCESS;
}
static moo_pfrc_t pf_system_add_timed_semaphore (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs)
{
moo_oop_t sec, nsec;
moo_oop_semaphore_t sem;
moo_ntime_t now, ft;
/* don't care about the receiver much as the receiver is not used at all.
* however, it's intended to be called from the System class. */
MOO_ASSERT (moo, nargs >= 2 || nargs <= 3);
sem = (moo_oop_semaphore_t)MOO_STACK_GETARG(moo, nargs, 0);
sec = MOO_STACK_GETARG(moo, nargs, 1);
nsec = (nargs == 3? MOO_STACK_GETARG(moo, nargs, 2): MOO_SMOOI_TO_OOP(0));
if (!moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore))
{
moo_seterrbfmt (moo, MOO_EINVAL, "parameter not a kind of semaphore - %O", sem);
return MOO_PF_FAILURE;
}
if (!MOO_OOP_IS_SMOOI(sec))
{
moo_seterrbfmt (moo, MOO_EINVAL, "invalid second - %O", sec);
return MOO_PF_FAILURE;
}
if (!MOO_OOP_IS_SMOOI(sec))
{
moo_seterrbfmt (moo, MOO_EINVAL, "invalid nanosecond - %O", nsec);
return MOO_PF_FAILURE;
}
#if 0
if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED))
{
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0);
/* if the semaphore is already been added. remove it first */
delete_from_sem_heap (moo, MOO_OOP_TO_SMOOI(sem->u.timed.index));
MOO_ASSERT (moo, sem->subtype == moo->_nil && sem->u.timed.index == moo->_nil);
/*
Is this more desired???
MOO_STACK_SETRET (moo, nargs, moo->_false);
return MOO_PF_SUCCESS;
*/
}
#else
if (sem->subtype != moo->_nil)
{
if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO))
{
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.index) && MOO_OOP_TO_SMOOI(sem->u.io.index) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.handle) && MOO_OOP_TO_SMOOI(sem->u.io.handle) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.type));
moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already linked with a handle %zd", MOO_OOP_TO_SMOOI(sem->u.io.handle));
}
else
{
MOO_ASSERT (moo, sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_sec));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_nsec));
moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already activated for timer");
}
return MOO_PF_FAILURE;
}
#endif
/* this code assumes that the monotonic clock returns a small value
* that can fit into a SmallInteger, even after some additions. */
vm_gettime (moo, &now);
MOO_ADDNTIMESNS (&ft, &now, MOO_OOP_TO_SMOOI(sec), MOO_OOP_TO_SMOOI(nsec));
if (ft.sec < 0 || ft.sec > MOO_SMOOI_MAX)
{
/* soft error - cannot represent the expiry time in a small integer. */
MOO_LOG3 (moo, MOO_LOG_PRIMITIVE | MOO_LOG_ERROR,
"Error(%hs) - time (%ld) out of range(0 - %zd) when adding a timed semaphore\n",
__PRIMITIVE_NAME__, (unsigned long int)ft.sec, (moo_ooi_t)MOO_SMOOI_MAX);
moo_seterrnum (moo, MOO_ERANGE);
return MOO_PF_FAILURE;
}
sem->u.timed.ftime_sec = MOO_SMOOI_TO_OOP(ft.sec);
sem->u.timed.ftime_nsec = MOO_SMOOI_TO_OOP(ft.nsec);
if (add_to_sem_heap(moo, sem) <= -1) return MOO_PF_HARD_FAILURE;
MOO_ASSERT (moo, sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED));
MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */
return MOO_PF_SUCCESS;
}
static moo_pfrc_t __system_add_io_semaphore (moo_t* moo, moo_ooi_t nargs, moo_semaphore_io_type_t io_type)
{
moo_oop_t fd;
moo_oop_semaphore_t sem;
/*MOO_PF_CHECK_RCV (moo, MOO_STACK_GETRCV(moo, nargs) == (moo_oop_t)moo->system);*/
MOO_ASSERT (moo, nargs == 2);
fd = MOO_STACK_GETARG(moo, nargs, 1);
sem = (moo_oop_semaphore_t)MOO_STACK_GETARG(moo, nargs, 0);
if (!moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore))
{
moo_seterrbfmt (moo, MOO_EINVAL, "parameter not a kind of semaphore - %O", sem);
return MOO_PF_FAILURE;
}
if (!MOO_OOP_IS_SMOOI(fd))
{
moo_seterrbfmt (moo, MOO_EINVAL, "handle not a small integer - %O", fd);
return MOO_PF_FAILURE;
}
if (sem->subtype != moo->_nil)
{
if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO))
{
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.index) && MOO_OOP_TO_SMOOI(sem->u.io.index) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.handle) && MOO_OOP_TO_SMOOI(sem->u.io.handle) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.type));
moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already linked with a handle %zd", MOO_OOP_TO_SMOOI(sem->u.io.handle));
}
else
{
MOO_ASSERT (moo, sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_sec));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.ftime_nsec));
moo_seterrbfmt (moo, MOO_EINVAL, "semaphore already activated for timer");
}
return MOO_PF_FAILURE;
}
if (add_sem_to_sem_io_tuple(moo, sem, MOO_OOP_TO_SMOOI(fd), io_type) <= -1)
{
const moo_ooch_t* oldmsg = moo_backuperrmsg(moo);
moo_seterrbfmt (moo, moo->errnum, "unable to add the handle %zd to the multiplexer for %hs - %js", MOO_OOP_TO_SMOOI(fd), io_type_str[io_type], oldmsg);
return MOO_PF_FAILURE;
}
MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */
return MOO_PF_SUCCESS;
}
static moo_pfrc_t pf_system_add_input_semaphore (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs)
{
return __system_add_io_semaphore (moo, nargs, MOO_SEMAPHORE_IO_TYPE_INPUT);
}
static moo_pfrc_t pf_system_add_output_semaphore (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs)
{
return __system_add_io_semaphore (moo, nargs, MOO_SEMAPHORE_IO_TYPE_OUTPUT);
}
static moo_pfrc_t pf_system_remove_semaphore (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs)
{
/* remove a semaphore from processor's signal scheduling */
moo_oop_semaphore_t sem;
/*MOO_PF_CHECK_RCV (moo, MOO_STACK_GETRCV(moo, nargs) == (moo_oop_t)moo->system);*/
MOO_ASSERT (moo, nargs == 1);
sem = (moo_oop_semaphore_t)MOO_STACK_GETARG(moo, nargs, 0);
if (!moo_iskindof(moo, (moo_oop_t)sem, moo->_semaphore))
{
moo_seterrbfmt (moo, MOO_EINVAL, "parameter not a kind of semaphore - %O", sem);
return MOO_PF_FAILURE;
}
/* TODO: remove a semaphore from IO handler if it's registered...
* remove a semaphore from elsewhere registered too */
if (sem == moo->sem_gcfin)
{
moo->sem_gcfin = (moo_oop_semaphore_t)moo->_nil;
}
if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_TIMED))
{
/* the semaphore is in the timed semaphore heap */
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.timed.index) && MOO_OOP_TO_SMOOI(sem->u.timed.index) >= 0);
delete_from_sem_heap (moo, MOO_OOP_TO_SMOOI(sem->u.timed.index));
MOO_ASSERT (moo, sem->u.timed.index == moo->_nil);
}
else if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO))
{
/* the semaphore is associated with IO */
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.index) && MOO_OOP_TO_SMOOI(sem->u.io.index) >= 0);
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.type));
MOO_ASSERT (moo, MOO_OOP_IS_SMOOI(sem->u.io.handle) && MOO_OOP_TO_SMOOI(sem->u.io.handle) >= 0);
if (delete_sem_from_sem_io_tuple(moo, sem, 0) <= -1)
{
const moo_ooch_t* oldmsg = moo_backuperrmsg(moo);
moo_seterrbfmt (moo, moo->errnum, "cannot delete the handle %zd from the multiplexer - %js", MOO_OOP_TO_SMOOI(sem->u.io.handle), oldmsg);
return MOO_PF_FAILURE;
}
MOO_ASSERT (moo, (moo_oop_t)sem->u.io.index == moo->_nil);
}
MOO_ASSERT (moo, sem->subtype == moo->_nil);
MOO_STACK_SETRETTORCV (moo, nargs); /* ^self */
return MOO_PF_SUCCESS;
}
/* ------------------------------------------------------------------ */
static moo_pfrc_t pf_system_return_value_to_context (moo_t* moo, moo_mod_t* mod, moo_ooi_t nargs)
{
moo_oop_t ret, ctx;
@ -3543,6 +3506,12 @@ static pf_t pftab[] =
{ "SemaphoreGroup_wait", { pf_semaphore_group_wait, 0, 0 } },
{ "Semaphore_signal", { pf_semaphore_signal, 0, 0 } },
{ "Semaphore_signalAfterSecs:", { pf_semaphore_signal_timed, 1, 1 } },
{ "Semaphore_signalAfterSecs:nanosecs:", { pf_semaphore_signal_timed, 2, 2 } },
{ "Semaphore_signalOnGCFin", { pf_semaphore_signal_on_gcfin, 0, 0 } },
{ "Semaphore_signalOnInput:", { pf_semaphore_signal_on_input, 1, 1 } },
{ "Semaphore_signalOnOutput:", { pf_semaphore_signal_on_output, 1, 1 } },
{ "Semaphore_unsignal", { pf_semaphore_unsignal, 0, 0 } },
{ "Semaphore_wait", { pf_semaphore_wait, 0, 0 } },
{ "SmallInteger_asCharacter", { pf_smooi_as_character, 0, 0 } },
@ -3600,12 +3569,6 @@ static pf_t pftab[] =
{ "System_putUint64", { moo_pf_system_put_uint64, 3, 3 } },
{ "System_putUint8", { moo_pf_system_put_uint8, 3, 3 } },
{ "System_return:to:", { pf_system_return_value_to_context, 2, 2 } },
{ "System_signal:afterSecs:", { pf_system_add_timed_semaphore, 2, 2 } },
{ "System_signal:afterSecs:nanosecs:", { pf_system_add_timed_semaphore, 3, 3 } },
{ "System_signal:onInput:", { pf_system_add_input_semaphore, 2, 2 } },
{ "System_signal:onOutput:", { pf_system_add_output_semaphore, 2, 2 } },
{ "System_signalOnGCFin:", { pf_system_add_gcfin_semaphore, 1, 1 } },
{ "System_unsignal:", { pf_system_remove_semaphore, 1, 1 } },
{ "_dump", { pf_dump, 0, MA } },
@ -4202,7 +4165,6 @@ static MOO_INLINE int switch_process_if_needed (moo_t* moo)
do
{
MOO_INITNTIME (&ft, 3, 0); /* TODO: use a configured time */
vm_muxwait (moo, &ft);
}
@ -4215,9 +4177,9 @@ static MOO_INLINE int switch_process_if_needed (moo_t* moo)
* before proceeding to handle normal process scheduling */
/* NOTE: the check with the multiplexer may happen too frequently
* because this is called everytime process switching is requested
* the actual callback implementation may avoid using actual
* system calls for the check too frequently */
* because this is called everytime process switching is requested.
* the actual callback implementation should try to avoid invoking
* actual system calls too frequently for less overhead. */
vm_muxwait (moo, MOO_NULL);
}
}

View File

@ -1832,7 +1832,7 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
pthread_mutex_unlock (&xtn->ev.mtx);
}
#else
#else /* USE_THREAD */
int tmout = 0, n;
#if defined(USE_DEVPOLL)
struct dvpoll dvp;
@ -1882,7 +1882,7 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
maxfd = xtn->ev.reg.maxfd;
MOO_MEMCPY (&rfds, &xtn->ev.reg.rfds, MOO_SIZEOF(rfds));
MOO_MEMCPY (&wfds, &xtn->ev.reg.wfds, MOO_SIZEOF(wfds));
n = select (maxfd + 1, &rfds, &wfds, NULL, &tv);
n = select(maxfd + 1, &rfds, &wfds, NULL, &tv);
if (n > 0)
{
int fd, count = 0;
@ -1956,7 +1956,7 @@ static void vm_muxwait (moo_t* moo, const moo_ntime_t* dur, moo_vmprim_muxwait_c
}
xtn->ev.len = 0;
#endif
#endif /* USE_THREAD */
}
static void vm_sleep (moo_t* moo, const moo_ntime_t* dur)