moo/moo/kernel/Process.moo

542 lines
11 KiB
Smalltalk
Raw Normal View History

class(#pointer,#final,#limited) Process(Object)
2015-10-15 14:40:08 +00:00
{
var(#get) initialContext, currentContext, id, state.
var sp.
var(#get) ps_prev, ps_next, sem_wait_prev, sem_wait_next.
var sem, perr, perrmsg.
2015-10-15 14:40:08 +00:00
method primError { ^self.perr }
method primErrorMessage { ^self.perrmsg }
2015-10-18 15:06:17 +00:00
method(#primitive) sp.
method(#primitive) resume.
method(#primitive) yield.
method(#primitive) suspend.
method(#primitive) _terminate.
method terminate
{
## search from the top context of the process down to intial_context and find ensure blocks and execute them.
## if a different process calls 'terminate' on a process,
## the ensureblock is not executed in the context of the
## process being terminated, but in the context of terminatig process.
##
## 1) process termianted by another process
## p := [
## [ 1 to: 10000 by: 1 do: [:ex | System logNl: i asString] ] ensure: [System logNl: 'ensured....']
## ] newProcess.
## p resume.
## p terminate.
##
## 2) process terminated by itself
## p := [
## [ thisProcess terminate. ] ensure: [System logNl: 'ensured....']
## ] newProcess.
## p resume.
## p terminate.
## ----------------------------------------------------------------------------------------------------------
## the process must be frozen first. while unwinding is performed,
## the process must not be scheduled.
## ----------------------------------------------------------------------------------------------------------
##if (Processor activeProcess ~~ self) { self suspend }.
if (thisProcess ~~ self) { self suspend }.
self.currentContext unwindTo: self.initialContext return: nil.
^self _terminate
}
2015-10-15 14:40:08 +00:00
}
class Semaphore(Object)
{
var waiting_head := nil,
waiting_tail := nil,
count := 0,
heapIndex := -1,
fireTimeSec := 0,
fireTimeNsec := 0,
ioIndex := -1,
2017-07-21 16:54:43 +00:00
ioHandle := nil,
ioMask := 0.
var(#get,#set) signalAction := nil.
var(#get,#set) _group := nil,
_grm_next := nil,
_grm_prev := nil.
## ==================================================================
method(#primitive) signal.
method(#primitive) _wait.
method wait
{
| k |
k := self _wait.
if (self.signalAction notNil) { self.signalAction value: self }.
^k
}
2016-03-16 02:27:18 +00:00
## ==================================================================
method heapIndex
2016-03-16 02:27:18 +00:00
{
^heapIndex
}
method heapIndex: anIndex
2016-03-16 02:27:18 +00:00
{
heapIndex := anIndex
}
method fireTime
2016-03-16 02:27:18 +00:00
{
^fireTimeSec
2016-03-16 02:27:18 +00:00
}
method fireTime: anInteger
2016-03-16 02:27:18 +00:00
{
self.fireTimeSec := anInteger.
2016-03-16 02:27:18 +00:00
}
method youngerThan: aSemaphore
2016-03-16 02:27:18 +00:00
{
^self.fireTimeSec < (aSemaphore fireTime)
2016-03-16 02:27:18 +00:00
}
2017-07-21 16:54:43 +00:00
method notYoungerThan: aSemaphore
{
^self.fireTimeSec >= (aSemaphore fireTime)
}
2016-03-16 02:27:18 +00:00
}
class Mutex(Semaphore)
{
method(#class) new
{
| s |
s := super new.
s signal.
^s.
}
(*
TODO: how to prohibit wait and signal???
method(#prohibited) wait.
method(#prohibited) signal.
*)
method lock { ^super wait }
method unlock { ^super signal }
method critical: block
{
self wait.
^block ensure: [ self signal ]
}
}
(*
xxx := Semaphore new.
xxx on: #signal do: [ ].
========= CASE 1 ====================
sg := SemaphoreGroup with (xxx, yyy, zzz).
Processor signal: xxx onInput: aaa.
Processor signal: yyy onInput: bbb.
Processor signal: zzz onOutput: ccc.
while (true)
{
sem := sg wait.
if (sem == xxx)
{
}
elsif (sem == yyy)
{
}
elsif (sem == zzz)
{
}.
}
============ CASE 2====================
### ASSOCIATE CALLBACK WITH SEMAPHORE.
sg := SemaphoreGroup with (xxx, yyy, zzz).
oldaction := xxx signalAction: [ ... ]. ### similar interface like unix system call signal()???? method signalAction: block {} , method signalAction { ^self.signalAction }
yyy signalAction: [ ... ].
zzz signalAction: [ ... ].
Processor signal: xxx onInput: aaa.
Processor signal: yyy onInput: bbb.
Processor signal: zzz onOutput: ccc.
while (true)
{
sem := sg wait. ### the action associated with the semaphore must get executed. => wait may be a primitive. the primitive handler may return failure... if so, the actual primitive body can execute the action easily
}
Semaphore>>method wait
{
<primitive: #Semaphore_wait>
if (errorCode == NO ERROR)
{
self.signalAction value. ## which is better???
self.sginalAction value: self.
}
}
*)
2017-08-22 13:45:37 +00:00
class SemaphoreGroup(Object)
{
var waiting_head := nil,
waiting_tail := nil,
first_sem := nil,
last_sem := nil,
first_sigsem := nil,
last_sigsem := nil.
2017-08-22 13:45:37 +00:00
(* TODO: good idea to a shortcut way to prohibit a certain method in the heirarchy chain?
method(#class,#prohibited) new.
method(#class,#prohibited) new: size.
method(#class,#abstract) xxx. => method(#class) xxx { self subclassResponsibility: #xxxx }
*)
(*
2017-08-22 13:45:37 +00:00
method(#class) new { self messageProhibited: #new }
method(#class) new: size { self messageProhibited: #new: }
*)
2017-08-22 13:45:37 +00:00
method(#primitive) _addSemaphore: sem.
method(#primitive) _removeSemaphore: sem.
method(#primitive) _wait.
method wait
{
| r |
r := self _wait.
if (r signalAction notNil) { r signalAction value: r }.
^r
}
method waitWithTimeout: seconds
{
| s r |
## create an internal semaphore for timeout notification.
s := Semaphore new.
## grant the partial membership to the internal semaphore.
## it's partial because it's not added to self.semarr.
##s _group: self.
self _addSemaphore: s.
## arrange the processor to notify upon timeout.
Processor signal: s after: seconds.
## wait on the semaphore group.
r := self wait.
## if the internal semaphore has been signaled,
## arrange to return nil to indicate timeout.
if (r == s) { r := nil }
elsif (r signalAction notNil) { r signalAction value: r }.
## nullify the membership
##s _group: nil.
self _removeSemaphore: s.
## cancel the notification arrangement in case it didn't time out.
Processor unsignal: s.
^r.
}
2017-08-22 13:45:37 +00:00
}
2016-03-16 02:27:18 +00:00
class SemaphoreHeap(Object)
2016-03-16 02:27:18 +00:00
{
var arr, size.
2016-03-16 02:27:18 +00:00
method initialize
2016-03-16 02:27:18 +00:00
{
self.size := 0.
self.arr := Array new: 100.
}
method size
2016-03-16 02:27:18 +00:00
{
^self.size
}
method at: anIndex
{
^self.arr at: anIndex.
}
method insert: aSemaphore
2016-03-16 02:27:18 +00:00
{
2017-08-22 13:45:37 +00:00
| index newarr newsize |
2016-03-16 14:05:34 +00:00
index := self.size.
2017-08-22 13:45:37 +00:00
if (index >= (self.arr size))
{
2016-03-16 02:27:18 +00:00
newsize := (self.arr size) * 2.
newarr := Array new: newsize.
newarr copy: self.arr.
self.arr := newarr.
2017-08-22 13:45:37 +00:00
}.
2016-03-16 02:27:18 +00:00
2016-03-16 14:05:34 +00:00
self.arr at: index put: aSemaphore.
aSemaphore heapIndex: index.
2016-03-16 02:27:18 +00:00
self.size := self.size + 1.
2016-03-16 14:05:34 +00:00
^self siftUp: index
2016-03-16 02:27:18 +00:00
}
method popTop
2016-03-16 02:27:18 +00:00
{
| top |
2016-03-16 14:05:34 +00:00
top := self.arr at: 0.
self deleteAt: 0.
2016-03-16 02:27:18 +00:00
^top
}
method updateAt: anIndex with: aSemaphore
2016-03-16 02:27:18 +00:00
{
| item |
item := self.arr at: anIndex.
item heapIndex: -1.
self.arr at: anIndex put: aSemaphore.
aSemaphore heapIndex: anIndex.
2017-07-21 16:54:43 +00:00
^if (aSemaphore youngerThan: item) { self siftUp: anIndex } else { self siftDown: anIndex }.
2016-03-16 02:27:18 +00:00
}
method deleteAt: anIndex
2016-03-16 02:27:18 +00:00
{
2017-07-21 16:54:43 +00:00
| item xitem |
2016-03-16 02:27:18 +00:00
item := self.arr at: anIndex.
item heapIndex: -1.
2016-03-16 14:05:34 +00:00
self.size := self.size - 1.
2017-07-21 16:54:43 +00:00
if (anIndex == self.size)
{
## the last item
self.arr at: self.size put: nil.
}
else
{
xitem := self.arr at: self.size.
self.arr at: anIndex put: xitem.
xitem heapIndex: anIndex.
self.arr at: self.size put: nil.
if (xitem youngerThan: item) { self siftUp: anIndex } else { self siftDown: anIndex }.
}
2016-03-16 02:27:18 +00:00
}
method parentIndex: anIndex
2016-03-16 02:27:18 +00:00
{
2016-03-16 14:05:34 +00:00
^(anIndex - 1) quo: 2
2016-03-16 02:27:18 +00:00
}
method leftChildIndex: anIndex
2016-03-16 02:27:18 +00:00
{
2016-03-16 14:05:34 +00:00
^(anIndex * 2) + 1.
2016-03-16 02:27:18 +00:00
}
method rightChildIndex: anIndex
2016-03-16 02:27:18 +00:00
{
2016-03-16 14:05:34 +00:00
^(anIndex * 2) + 2.
2016-03-16 02:27:18 +00:00
}
method siftUp: anIndex
2016-03-16 02:27:18 +00:00
{
2017-07-21 16:54:43 +00:00
| pindex cindex par item |
2016-03-16 02:27:18 +00:00
2017-07-21 16:54:43 +00:00
if (anIndex <= 0) { ^anIndex }.
2016-03-16 02:27:18 +00:00
pindex := anIndex.
item := self.arr at: anIndex.
2016-03-16 02:27:18 +00:00
2017-07-21 16:54:43 +00:00
while (true)
{
2016-03-16 02:27:18 +00:00
cindex := pindex.
2017-07-21 16:54:43 +00:00
if (pindex <= 0) { break }.
pindex := self parentIndex: cindex.
par := self.arr at: pindex.
if (item notYoungerThan: par) { break }.
2016-03-16 02:27:18 +00:00
2017-07-21 16:54:43 +00:00
## item is younger than the parent.
## move the parent down
self.arr at: cindex put: par.
par heapIndex: cindex.
}.
## place the item as high as it can
self.arr at: cindex put: item.
item heapIndex: cindex.
2016-03-16 02:27:18 +00:00
^cindex
}
method siftDown: anIndex
2016-03-16 02:27:18 +00:00
{
2017-07-21 16:54:43 +00:00
| base capa cindex item
left right younger xitem |
2016-03-16 02:27:18 +00:00
2016-03-16 14:05:34 +00:00
base := self.size quo: 2.
2017-07-21 16:54:43 +00:00
if (anIndex >= base) { ^anIndex }.
2016-03-16 02:27:18 +00:00
cindex := anIndex.
item := self.arr at: cindex.
2017-07-21 16:54:43 +00:00
while (cindex < base)
{
2016-03-16 02:27:18 +00:00
left := self leftChildIndex: cindex.
right := self rightChildIndex: cindex.
2017-07-21 16:54:43 +00:00
younger := if ((right < self.size) and: [(self.arr at: right) youngerThan: (self.arr at: left)]) { right } else { left }.
2016-03-16 02:27:18 +00:00
xitem := self.arr at: younger.
2017-07-21 16:54:43 +00:00
if (item youngerThan: xitem) { break }.
self.arr at: cindex put: xitem.
xitem heapIndex: cindex.
cindex := younger.
}.
2016-03-16 02:27:18 +00:00
self.arr at: cindex put: item.
item heapIndex: cindex.
^cindex
}
}
class(#final,#limited) ProcessScheduler(Object)
2015-10-15 14:40:08 +00:00
{
var(#get) active, should_exit := false, total_count := 0.
var(#get) runnable_count := 0.
var runnable_head, runnable_tail.
var(#get) suspended_count := 0.
var suspended_head, suspended_tail.
2015-10-15 14:40:08 +00:00
method activeProcess
2015-10-15 14:40:08 +00:00
{
^self.active.
}
method resume: process
{
<primitive: #_processor_schedule>
self primitiveFailed.
2015-10-15 14:40:08 +00:00
2017-07-21 16:54:43 +00:00
(* The primitive does something like the following in principle:
(self.tally == 0)
2015-10-15 14:40:08 +00:00
ifTrue: [
self.head := process.
self.tail := process.
2015-10-15 14:40:08 +00:00
self.tally := 1.
]
ifFalse: [
process ps_next: self.head.
self.head ps_prev: process.
self.head := process.
2015-10-15 14:40:08 +00:00
self.tally := self.tally + 1.
].
2017-07-21 16:54:43 +00:00
*)
2015-10-18 15:06:17 +00:00
}
2017-07-21 16:54:43 +00:00
(* -------------------
method yield
2015-10-15 14:40:08 +00:00
{
<primitive: #_processor_yield>
2015-10-15 14:40:08 +00:00
self primitiveFailed
}
2017-07-21 16:54:43 +00:00
----------------- *)
2016-03-16 02:27:18 +00:00
method signal: semaphore after: secs
2016-03-16 02:27:18 +00:00
{
<primitive: #_processor_add_timed_semaphore>
self primitiveFailed.
}
method signal: semaphore after: secs and: nanosecs
{
<primitive: #_processor_add_timed_semaphore>
self primitiveFailed.
2016-03-16 02:27:18 +00:00
}
method unsignal: semaphore
{
<primitive: #_processor_remove_semaphore>
self primitiveFailed.
}
method signalOnGCFin: semaphore
{
<primitive: #_processor_add_gcfin_semaphore>
self primitiveFailed.
}
method signal: semaphore onInput: file
{
<primitive: #_processor_add_input_semaphore>
self primitiveFailed.
}
method signal: semaphore onOutput: file
{
<primitive: #_processor_add_output_semaphore>
self primitiveFailed.
}
method signal: semaphore onInOutput: file
{
<primitive: #_processor_add_inoutput_semaphore>
self primitiveFailed.
}
method return: object to: context
{
<primitive: #_processor_return_to>
self primitiveFailed.
}
method sleepFor: secs
{
## -----------------------------------------------------
## put the calling process to sleep for given seconds.
## -----------------------------------------------------
| s |
s := Semaphore new.
self signal: s after: secs.
s wait.
}
method sleepFor: secs and: nanosecs
{
## -----------------------------------------------------
## put the calling process to sleep for given seconds.
## -----------------------------------------------------
| s |
s := Semaphore new.
self signal: s after: secs and: nanosecs.
s wait.
}
2015-10-15 14:40:08 +00:00
}