changed the scheduler to reset the return value of the wait context of a semaphore group forcibly to a singaled semaphore.
added the waitWithTimeout method to a semaphore group
This commit is contained in:
parent
7ee4453bf3
commit
a05c86dd27
@ -79,8 +79,12 @@ class Semaphore(Object)
|
|||||||
TODO: timed wait...
|
TODO: timed wait...
|
||||||
method waitWithTimeout: seconds
|
method waitWithTimeout: seconds
|
||||||
{
|
{
|
||||||
<primitive: #_semaphore_wait>
|
| s |
|
||||||
self primitiveFailed
|
s := Semaphore new.
|
||||||
|
Processor signal: s after: seconds.
|
||||||
|
self waitWithTimedSemaphore: s.
|
||||||
|
|
||||||
|
if (self.
|
||||||
}
|
}
|
||||||
|
|
||||||
method waitWithTimeout: seconds and: nanoSeconds
|
method waitWithTimeout: seconds and: nanoSeconds
|
||||||
@ -89,6 +93,7 @@ TODO: timed wait...
|
|||||||
self primitiveFailed
|
self primitiveFailed
|
||||||
}
|
}
|
||||||
*)
|
*)
|
||||||
|
|
||||||
method critical: aBlock
|
method critical: aBlock
|
||||||
{
|
{
|
||||||
self wait.
|
self wait.
|
||||||
@ -194,7 +199,6 @@ class SemaphoreGroup(Object)
|
|||||||
size := 0,
|
size := 0,
|
||||||
pos := 0,
|
pos := 0,
|
||||||
semarr := nil.
|
semarr := nil.
|
||||||
var(#get,#set) signaledSemaphore := nil.
|
|
||||||
|
|
||||||
(* TODO: good idea to a shortcut way to prohibit a certain method in the heirarchy chain?
|
(* TODO: good idea to a shortcut way to prohibit a certain method in the heirarchy chain?
|
||||||
|
|
||||||
@ -250,6 +254,36 @@ method(#class,#abstract) xxx. => method(#class) xxx { self subclassResponsibilit
|
|||||||
}
|
}
|
||||||
|
|
||||||
method(#primitive) wait.
|
method(#primitive) wait.
|
||||||
|
|
||||||
|
method waitWithTimeout: seconds
|
||||||
|
{
|
||||||
|
| s r |
|
||||||
|
|
||||||
|
## create an internal semaphore for timeout notification.
|
||||||
|
s := Semaphore new.
|
||||||
|
|
||||||
|
## grant the partial membership to the internal semaphore.
|
||||||
|
## it's partial because it's not added to self.semarr.
|
||||||
|
s _group: self.
|
||||||
|
|
||||||
|
## arrange the processor to notify upon timeout.
|
||||||
|
Processor signal: s after: seconds.
|
||||||
|
|
||||||
|
## wait on the semaphore group.
|
||||||
|
r := self wait.
|
||||||
|
|
||||||
|
## if the internal semaphore has been signaled,
|
||||||
|
## arrange to return nil to indicate timeout.
|
||||||
|
if (r == s) { r := nil }.
|
||||||
|
|
||||||
|
## nullify the membership
|
||||||
|
s _group: nil.
|
||||||
|
|
||||||
|
## cancel the notification arrangement in case it didn't time out.
|
||||||
|
Processor unsignal: s.
|
||||||
|
|
||||||
|
^r.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class SemaphoreHeap(Object)
|
class SemaphoreHeap(Object)
|
||||||
|
@ -214,7 +214,6 @@ static MOO_INLINE int prepare_to_alloc_pid (moo_t* moo)
|
|||||||
moo->proc_map = tmp;
|
moo->proc_map = tmp;
|
||||||
moo->proc_map_capa = new_capa;
|
moo->proc_map_capa = new_capa;
|
||||||
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -745,8 +744,15 @@ static moo_oop_process_t signal_semaphore (moo_t* moo, moo_oop_semaphore_t sem)
|
|||||||
unchain_from_semaphore (moo, proc);
|
unchain_from_semaphore (moo, proc);
|
||||||
resume_process (moo, proc);
|
resume_process (moo, proc);
|
||||||
|
|
||||||
/* store the last signaled semaphore into the semaphore group */
|
/* the waiting process has been suspended after a waiting
|
||||||
semgrp->sigsem = sem;
|
* primitive function in Semaphore or SemaphoreGroup.
|
||||||
|
* the top of the stack of the process must hold the temporary
|
||||||
|
* return value set by await_semaphore() or await_semaphore_group().
|
||||||
|
* change the return value forcibly to the actual signaled
|
||||||
|
* semaphore */
|
||||||
|
MOO_ASSERT (moo, MOO_OOP_TO_SMOOI(proc->sp) < (moo_ooi_t)(MOO_OBJ_GET_SIZE(proc) - MOO_PROCESS_NAMED_INSTVARS));
|
||||||
|
proc->slot[MOO_OOP_TO_SMOOI(proc->sp)] = (moo_oop_t)sem;
|
||||||
|
|
||||||
return proc;
|
return proc;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -790,7 +796,7 @@ static moo_oop_process_t signal_semaphore (moo_t* moo, moo_oop_semaphore_t sem)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static MOO_INLINE int await_semaphore (moo_t* moo, moo_oop_semaphore_t sem)
|
static MOO_INLINE void await_semaphore (moo_t* moo, moo_oop_semaphore_t sem)
|
||||||
{
|
{
|
||||||
/* TODO: support timeout */
|
/* TODO: support timeout */
|
||||||
moo_oop_process_t proc;
|
moo_oop_process_t proc;
|
||||||
@ -831,10 +837,12 @@ static MOO_INLINE int await_semaphore (moo_t* moo, moo_oop_semaphore_t sem)
|
|||||||
MOO_ASSERT (moo, moo->processor->active != proc);
|
MOO_ASSERT (moo, moo->processor->active != proc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
return 0;
|
return 0;
|
||||||
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
static MOO_INLINE moo_oop_t await_semaphore_group (moo_t* moo, moo_oop_semaphore_group_t semgrp)
|
static MOO_INLINE moo_oop_t await_semaphore_group (moo_t* moo, moo_oop_semaphore_group_t semgrp, const moo_ntime_t* tmout)
|
||||||
{
|
{
|
||||||
/* TODO: support timeout and wait all */
|
/* TODO: support timeout and wait all */
|
||||||
/* wait for one of semaphores in the group to be signaled */
|
/* wait for one of semaphores in the group to be signaled */
|
||||||
@ -859,13 +867,22 @@ static MOO_INLINE moo_oop_t await_semaphore_group (moo_t* moo, moo_oop_semaphore
|
|||||||
count--;
|
count--;
|
||||||
sem->count = MOO_SMOOI_TO_OOP(count);
|
sem->count = MOO_SMOOI_TO_OOP(count);
|
||||||
semgrp->pos = MOO_SMOOI_TO_OOP(sempos); /* position of the last inspected semaphore */
|
semgrp->pos = MOO_SMOOI_TO_OOP(sempos); /* position of the last inspected semaphore */
|
||||||
semgrp->sigsem = sem; /* remember the last signaled semaphore */
|
|
||||||
return (moo_oop_t)sem;
|
return (moo_oop_t)sem;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* no semaphores have been signaled. suspend the current process
|
/* no semaphores have been signaled. suspend the current process
|
||||||
* until the at least one of them is signaled */
|
* until the at least one of them is signaled */
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
if (tmout)
|
||||||
|
{
|
||||||
|
/* create an internal semaphore for timeout signaling */
|
||||||
|
/* TODO: */
|
||||||
|
if (add_to_sem_heap (moo, tmout_sem) <= -1) return MOO_PF_HARD_FAILURE;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
proc = moo->processor->active;
|
proc = moo->processor->active;
|
||||||
|
|
||||||
/* suspend the active process */
|
/* suspend the active process */
|
||||||
@ -2471,22 +2488,25 @@ static moo_pfrc_t pf_semaphore_wait (moo_t* moo, moo_ooi_t nargs)
|
|||||||
* await_semaphore() may switch the active process and the stack
|
* await_semaphore() may switch the active process and the stack
|
||||||
* manipulation macros target at the active process. i'm not supposed
|
* manipulation macros target at the active process. i'm not supposed
|
||||||
* to change the return value of a new active process. */
|
* to change the return value of a new active process. */
|
||||||
|
|
||||||
MOO_STACK_SETRETTORCV (moo, nargs);
|
MOO_STACK_SETRETTORCV (moo, nargs);
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (await_semaphore (moo, (moo_oop_semaphore_t)rcv) <= -1)
|
if (await_semaphore (moo, (moo_oop_semaphore_t)rcv) <= -1)
|
||||||
{
|
{
|
||||||
/* i must switch the top because the return value has been set already */
|
/* i must switch the top because the return value has been set already */
|
||||||
MOO_STACK_SETTOP (moo, MOO_ERROR_TO_OOP(moo->errnum));
|
MOO_STACK_SETTOP (moo, MOO_ERROR_TO_OOP(moo->errnum));
|
||||||
return MOO_PF_SUCCESS;
|
return MOO_PF_SUCCESS;
|
||||||
}
|
}
|
||||||
|
#else
|
||||||
|
await_semaphore (moo, (moo_oop_semaphore_t)rcv);
|
||||||
|
#endif
|
||||||
|
|
||||||
return MOO_PF_SUCCESS;
|
return MOO_PF_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
static moo_pfrc_t pf_semaphore_group_wait (moo_t* moo, moo_ooi_t nargs)
|
static moo_pfrc_t pf_semaphore_group_wait (moo_t* moo, moo_ooi_t nargs)
|
||||||
{
|
{
|
||||||
moo_oop_t rcv;
|
moo_oop_t rcv, sem;
|
||||||
|
|
||||||
rcv = MOO_STACK_GETRCV(moo, nargs);
|
rcv = MOO_STACK_GETRCV(moo, nargs);
|
||||||
MOO_PF_CHECK_RCV (moo, MOO_CLASSOF(moo,rcv) == moo->_semaphore_group);
|
MOO_PF_CHECK_RCV (moo, MOO_CLASSOF(moo,rcv) == moo->_semaphore_group);
|
||||||
@ -2500,7 +2520,17 @@ static moo_pfrc_t pf_semaphore_group_wait (moo_t* moo, moo_ooi_t nargs)
|
|||||||
* the stack from this moment on. */
|
* the stack from this moment on. */
|
||||||
MOO_STACK_SETRETTORCV (moo, nargs);
|
MOO_STACK_SETRETTORCV (moo, nargs);
|
||||||
|
|
||||||
await_semaphore_group (moo, (moo_oop_semaphore_group_t)rcv);
|
sem = await_semaphore_group (moo, (moo_oop_semaphore_group_t)rcv, MOO_NULL);
|
||||||
|
if (sem != moo->_nil)
|
||||||
|
{
|
||||||
|
/* there was a singaled semaphore. the active process won't get
|
||||||
|
* suspended. change the return value of the current process
|
||||||
|
* forcibly to the signaled semaphore */
|
||||||
|
MOO_STACK_SETTOP (moo, sem);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* the return value will get changed to an actual semaphore signaled
|
||||||
|
* when the semaphore is signaled. see signal_semaphore() */
|
||||||
return MOO_PF_SUCCESS;
|
return MOO_PF_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -4763,7 +4793,7 @@ static MOO_INLINE int switch_process_if_needed (moo_t* moo)
|
|||||||
do
|
do
|
||||||
{
|
{
|
||||||
vm_gettime (moo, &now);
|
vm_gettime (moo, &now);
|
||||||
now.sec += 3;
|
now.sec += 3; /* TODO: use a configured value? */
|
||||||
vm_muxwait (moo, &now);
|
vm_muxwait (moo, &now);
|
||||||
}
|
}
|
||||||
while (moo->processor->active == moo->nil_process && !moo->abort_req);
|
while (moo->processor->active == moo->nil_process && !moo->abort_req);
|
||||||
|
@ -752,7 +752,7 @@ typedef struct moo_process_t* moo_oop_process_t;
|
|||||||
typedef struct moo_semaphore_t moo_semaphore_t;
|
typedef struct moo_semaphore_t moo_semaphore_t;
|
||||||
typedef struct moo_semaphore_t* moo_oop_semaphore_t;
|
typedef struct moo_semaphore_t* moo_oop_semaphore_t;
|
||||||
|
|
||||||
#define MOO_SEMAPHORE_GROUP_NAMED_INSTVARS 6
|
#define MOO_SEMAPHORE_GROUP_NAMED_INSTVARS 5
|
||||||
typedef struct moo_semaphore_group_t moo_semaphore_group_t;
|
typedef struct moo_semaphore_group_t moo_semaphore_group_t;
|
||||||
typedef struct moo_semaphore_group_t* moo_oop_semaphore_group_t;
|
typedef struct moo_semaphore_group_t* moo_oop_semaphore_group_t;
|
||||||
|
|
||||||
@ -833,7 +833,6 @@ struct moo_semaphore_group_t
|
|||||||
moo_oop_t size; /* SmallInteger */
|
moo_oop_t size; /* SmallInteger */
|
||||||
moo_oop_t pos; /* current processing position */
|
moo_oop_t pos; /* current processing position */
|
||||||
moo_oop_oop_t semarr; /* Array of Semaphores */
|
moo_oop_oop_t semarr; /* Array of Semaphores */
|
||||||
moo_oop_semaphore_t sigsem; /* Last signaled semaphore */
|
|
||||||
};
|
};
|
||||||
|
|
||||||
#define MOO_PROCESS_SCHEDULER_NAMED_INSTVARS 9
|
#define MOO_PROCESS_SCHEDULER_NAMED_INSTVARS 9
|
||||||
|
Loading…
x
Reference in New Issue
Block a user