Added asyncsg field into Process

This commit is contained in:
hyunghwan.chung 2018-05-15 16:38:37 +00:00
parent 2d24c53481
commit da0b655d40
4 changed files with 81 additions and 22 deletions

View File

@ -90,7 +90,7 @@ class HttpSocket(Socket)
method onSocketDataIn method onSocketDataIn
{ {
'CLIENT got DATA' dump. 'CLIENT got DATA' dump.
self readBytes ###self readBytes: buf.
self close. self close.
} }
@ -333,9 +333,9 @@ class MyObject(Object)
while (true) while (true)
{ {
ss := System handleAsyncEvent. ss := thisProcess handleAsyncEvent.
if (ss isError) { break }. if (ss isError) { break }.
###if (ss == st) { System removeAsyncSemaphore: st }. ###if (ss == st) { thisProcess removeAsyncSemaphore: st }.
}. }.
] ]
ensure: ensure:
@ -350,11 +350,43 @@ class MyObject(Object)
} }
method(#class) main method(#class) another_proc
{ {
| httpd | | httpd |
[ [
thisProcess initAsync.
httpd := HttpServer new.
[
| ss |
httpd start: %(
SocketAddress fromString: '[::]:8666',
SocketAddress fromString: '0.0.0.0:8665'
).
while (true)
{
ss := thisProcess handleAsyncEvent.
if (ss isError) { break }.
}.
] ensure: [
if (httpd notNil) { httpd close }
]
] on: Exception do: [:ex | ('Exception - ' & ex messageText) dump].
'----- END OF ANOTHER PROC ------' dump.
}
method(#class) main
{
| httpd |
[ self another_proc ] fork.
###[ self another_proc ] fork.
[
thisProcess initAsync.
httpd := HttpServer new. httpd := HttpServer new.
[ [
| ss | | ss |
@ -363,8 +395,9 @@ class MyObject(Object)
SocketAddress fromString: '0.0.0.0:7776' SocketAddress fromString: '0.0.0.0:7776'
). ).
while (true) { while (true)
ss := System handleAsyncEvent. {
ss := thisProcess handleAsyncEvent.
if (ss isError) { break }. if (ss isError) { break }.
}. }.
] ensure: [ ] ensure: [

View File

@ -6,6 +6,8 @@ class(#pointer,#final,#limited) Process(Object)
var(#get) ps_prev, ps_next, sem_wait_prev, sem_wait_next. var(#get) ps_prev, ps_next, sem_wait_prev, sem_wait_next.
var sem, perr, perrmsg. var sem, perr, perrmsg.
var asyncsg.
method primError { ^self.perr } method primError { ^self.perr }
method primErrorMessage { ^self.perrmsg } method primErrorMessage { ^self.perrmsg }
@ -46,6 +48,26 @@ class(#pointer,#final,#limited) Process(Object)
self.currentContext unwindTo: self.initialContext return: nil. self.currentContext unwindTo: self.initialContext return: nil.
^self _terminate ^self _terminate
} }
method initAsync
{
if (self.asyncsg isNil) { self.asyncsg := SemaphoreGroup new }.
}
method addAsyncSemaphore: sem
{
^self.asyncsg addSemaphore: sem
}
method removeAsyncSemaphore: sem
{
^self.asyncsg removeSemaphore: sem
}
method handleAsyncEvent
{
^self.asyncsg wait.
}
} }
class Semaphore(Object) class Semaphore(Object)

View File

@ -353,21 +353,21 @@ socketConnected:
if (self.outdonesem notNil) if (self.outdonesem notNil)
{ {
System unsignal: self.outdonesem. System unsignal: self.outdonesem.
if (self.outdonesem _group notNil) { System removeAsyncSemaphore: self.outdonesem }. if (self.outdonesem _group notNil) { thisProcess removeAsyncSemaphore: self.outdonesem }.
self.outdonesem := nil. self.outdonesem := nil.
}. }.
if (self.outreadysem notNil) if (self.outreadysem notNil)
{ {
System unsignal: self.outreadysem. System unsignal: self.outreadysem.
if (self.outreadysem _group notNil) { System removeAsyncSemaphore: self.outreadysem }. if (self.outreadysem _group notNil) { thisProcess removeAsyncSemaphore: self.outreadysem }.
self.outreadysem := nil. self.outreadysem := nil.
}. }.
if (self.inreadysem notNil) if (self.inreadysem notNil)
{ {
System unsignal: self.inreadysem. System unsignal: self.inreadysem.
if (self.inreadysem _group notNil) { System removeAsyncSemaphore: self.inreadysem }. if (self.inreadysem _group notNil) { thisProcess removeAsyncSemaphore: self.inreadysem }.
self.inreadysem := nil. self.inreadysem := nil.
}. }.
@ -406,9 +406,9 @@ socketConnected:
method beWatched method beWatched
{ {
System addAsyncSemaphore: self.inreadysem. thisProcess addAsyncSemaphore: self.inreadysem.
System signal: self.inreadysem onInput: self.handle. System signal: self.inreadysem onInput: self.handle.
System addAsyncSemaphore: self.outdonesem. thisProcess addAsyncSemaphore: self.outdonesem.
} }
method writeBytes: bytes offset: offset length: length method writeBytes: bytes offset: offset length: length
@ -443,9 +443,9 @@ socketConnected:
self.pending_bytes := bytes. self.pending_bytes := bytes.
self.pending_offset := pos. self.pending_offset := pos.
self.pending_length := rem self.pending_length := rem.
System addAsyncSemaphore: self.outreadysem. thisProcess addAsyncSemaphore: self.outreadysem.
System signal: self.outreadysem onOutput: self.handle. System signal: self.outreadysem onOutput: self.handle.
} }
@ -486,7 +486,7 @@ class ClientSocket(Socket)
{ {
## finalize connection if not in progress ## finalize connection if not in progress
System unsignal: sem. System unsignal: sem.
System removeAsyncSemaphore: sem. thisProcess removeAsyncSemaphore: sem.
##self.connectedEventAction value: self value: (soerr == 0). ##self.connectedEventAction value: self value: (soerr == 0).
self onSocketConnected: (soerr == 0). self onSocketConnected: (soerr == 0).
@ -501,7 +501,7 @@ class ClientSocket(Socket)
if (self.connsem notNil) if (self.connsem notNil)
{ {
System unsignal: self.connsem. System unsignal: self.connsem.
if (self.connsem _group notNil) { System removeAsyncSemaphore: self.connsem }. if (self.connsem _group notNil) { thisProcess removeAsyncSemaphore: self.connsem }.
self.connsem := nil. self.connsem := nil.
}. }.
^super close ^super close
@ -522,7 +522,7 @@ class ClientSocket(Socket)
| sem | | sem |
if ((self _connect: target) <= -1) if ((self _connect: target) <= -1)
{ {
System addAsyncSemaphore: self.connsem. thisProcess addAsyncSemaphore: self.connsem.
System signal: self.connsem onOutput: self.handle. System signal: self.connsem onOutput: self.handle.
} }
else else
@ -532,9 +532,9 @@ class ClientSocket(Socket)
###self.connectedEventAction value: self value: true. ###self.connectedEventAction value: self value: true.
self onSocketConnected: true. self onSocketConnected: true.
System addAsyncSemaphore: self.inreadysem. thisProcess addAsyncSemaphore: self.inreadysem.
System signal: self.inreadysem onInput: self.handle. System signal: self.inreadysem onInput: self.handle.
System addAsyncSemaphore: self.outdonesem. thisProcess addAsyncSemaphore: self.outdonesem.
} }
} }
@ -556,8 +556,10 @@ class ServerSocket(Socket)
| cliaddr clisck cliact fd | | cliaddr clisck cliact fd |
cliaddr := SocketAddress new. cliaddr := SocketAddress new.
'IN READYSEM action performing.........' dump.
fd := self _accept: cliaddr. fd := self _accept: cliaddr.
if (fd >= 0) ##if (fd >= 0)
if (fd notNil)
{ {
clisck := (self acceptedSocketClass) __with: fd. clisck := (self acceptedSocketClass) __with: fd.
clisck beWatched. clisck beWatched.
@ -592,7 +594,7 @@ class ServerSocket(Socket)
if (self.inreadysem notNil) if (self.inreadysem notNil)
{ {
System unsignal: self.inreadysem. System unsignal: self.inreadysem.
if (self.inreadysem _group notNil) { System removeAsyncSemaphore: self.inreadysem }. if (self.inreadysem _group notNil) { thisProcess removeAsyncSemaphore: self.inreadysem }.
self.inreadysem := nil. self.inreadysem := nil.
}. }.
@ -612,7 +614,7 @@ class ServerSocket(Socket)
method listen: backlog method listen: backlog
{ {
System addAsyncSemaphore: self.inreadysem. thisProcess addAsyncSemaphore: self.inreadysem.
System signal: self.inreadysem onInput: self.handle. System signal: self.inreadysem onInput: self.handle.
^self _listen: backlog. ^self _listen: backlog.
} }

View File

@ -753,7 +753,7 @@ struct moo_context_t
}; };
#define MOO_PROCESS_NAMED_INSTVARS 12 #define MOO_PROCESS_NAMED_INSTVARS 13
typedef struct moo_process_t moo_process_t; typedef struct moo_process_t moo_process_t;
typedef struct moo_process_t* moo_oop_process_t; typedef struct moo_process_t* moo_oop_process_t;
@ -791,6 +791,8 @@ struct moo_process_t
moo_oop_t perr; /* last error set by a primitive function */ moo_oop_t perr; /* last error set by a primitive function */
moo_oop_t perrmsg; moo_oop_t perrmsg;
moo_oop_t asyncsg;
/* == variable indexed part == */ /* == variable indexed part == */
moo_oop_t slot[1]; /* process stack */ moo_oop_t slot[1]; /* process stack */
}; };