work in progress. trying a different way of implementing async socket

This commit is contained in:
hyunghwan.chung 2018-05-02 09:53:02 +00:00
parent 216e7f3b15
commit acb71f521c
4 changed files with 204 additions and 360 deletions

View File

@ -498,8 +498,21 @@ extend Apex
method(#dual) doesNotUnderstand: message_name method(#dual) doesNotUnderstand: message_name
{ {
## TODO: implement this properly ## TODO: implement this properly
| class_name | | class_name ctx |
class_name := if (self class == Class) { self name } else { self class name }. class_name := if (self class == Class) { self name } else { self class name }.
## TOOD: IMPROVE THIS EXPERIMENTAL BACKTRACE...
System logNl: '== BACKTRACE =='.
ctx := thisContext.
while (ctx notNil)
{
if (ctx class == MethodContext) { System logNl: (' ' & ctx method owner name & '>>' & ctx method name) }.
## TODO: include blockcontext???
ctx := ctx sender.
}.
System logNl: '== END OF BACKTRACE =='.
NoSuchMessageException signal: (message_name & ' not understood by ' & class_name). NoSuchMessageException signal: (message_name & ' not understood by ' & class_name).
} }

View File

@ -230,22 +230,23 @@ class AsyncHandle(Object)
## the handle must be the first field in this object to match ## the handle must be the first field in this object to match
## the internal representation used by various modules. (e.g. sck) ## the internal representation used by various modules. (e.g. sck)
var(#get) handle := -1. var(#get) handle := -1.
var outsem := nil.
var inwc := 0, outwc := 0. ## input watcher count and ouput watcher count ##method initialize
var insem, outsem. ##{
var(#get,#set) inputAction, outputAction. ## ^super initialize
var(#get) inputReady := false, outputReady := false. ##}
method close method close
{ {
if (self.handle >= 0) if (self.handle >= 0)
{ {
if (self.insem notNil) ###if (self.insem notNil)
{ ###{
System unsignal: self.insem; ### System unsignal: self.insem;
removeAsyncSemaphore: self.insem. ### removeAsyncSemaphore: self.insem.
self.insem := nil. ### self.insem := nil.
}. ###}.
if (self.outsem notNil) if (self.outsem notNil)
{ {
System unsignal: self.outsem; System unsignal: self.outsem;
@ -253,151 +254,34 @@ class AsyncHandle(Object)
self.outsem := nil. self.outsem := nil.
}. }.
self.outwc := 0.
self.inwc := 0.
self _close. self _close.
self.handle := -1. self.handle := -1.
} }
} }
## TODO: how to specify a timeout for an action? using another semaphore??
method watchInput
{
if (self.inwc == 0)
{
if (self.insem isNil)
{
self.insem := Semaphore new.
self.insem signalAction: [:sem |
self.inputReady := true.
self.inputAction value: self value: true
].
System addAsyncSemaphore: self.insem.
}.
self.inputReady := false.
System signal: self.insem onInput: self.handle
}.
self.inwc := self.inwc + 1.
}
method unwatchInput
{
if (self.inwc > 0)
{
self.inwc := self.inwc - 1.
if (self.inwc == 0)
{
##if (self.insem notNil) { System unsignal: self.insem }.
System unsignal: self.insem.
System removeAsyncSemaphore: self.insem.
self.insem := nil.
}.
}.
}
method watchOutput
{
if (self.outwc == 0)
{
if (self.outsem isNil)
{
self.outsem := Semaphore new.
self.outsem signalAction: [:sem |
self.outputReady := true.
self.outputAction value: self value: true
].
System addAsyncSemaphore: self.outsem.
}.
self.outputReady := false.
System signal: self.outsem onOutput: self.handle.
}.
self.outwc := self.outwc + 1.
}
method unwatchOutput
{
if (self.outwc > 0)
{
self.outwc := self.outwc - 1.
if (self.outwc == 0)
{
## self.outsem must not be nil here.
System unsignal: self.outsem.
System removeAsyncSemaphore: self.outsem.
self.outsem := nil.
}.
}.
}
method writeBytes: bytes offset: offset length: length signal: sem
{
| oldact n |
#######################################
## TODO: if data still in progress, failure... or success while concatening the message?
## for a stream, concatening is not bad. but it's not good if the socket requires message boundary preservation.
######################################
if (self.outputReady)
{
## n >= 0: written
## n <= -1: tolerable error (e.g. EAGAIN)
## exception: fatal error
##while (true) ## TODO: loop to write as much as possible
##{
n := self _writeBytes: bytes offset: offset length: length.
if (n >= 0)
{
if (sem notNil) { sem signal }.
^n
}.
##}.
self.outputReady := false.
}.
oldact := self.outputAction.
self.outputAction := [ :sck :state |
##### schedule write.
if (state)
{
if ((self _writeBytes: bytes offset: offset length: length) <= -1)
{
## EAGAIN
self.outputReady := false.
^self.
}.
## TODO: handle _writeBytes may not write in full.
}.
self.outputAction := oldact.
self unwatchOutput.
].
## TODO: set timeout?
self watchOutput.
}
method writeBytes: bytes signal: sem method writeBytes: bytes signal: sem
{ {
^self writeBytes: bytes offset: 0 length: (bytes size) signal: sem. ^self writeBytes: bytes offset: 0 length: (bytes size)
} }
method writeBytes: bytes offset: offset length: length method writeBytes: bytes offset: offset length: length
{ {
^self writeBytes: bytes offset: offset length: length signal: nil. ^self writeBytes: bytes offset: offset length: length.
} }
method writeBytes: bytes method writeBytes: bytes
{ {
^self writeBytes: bytes offset: 0 length: (bytes size) signal: nil. ^self writeBytes: bytes offset: 0 length: (bytes size)
} }
} }
class Socket(AsyncHandle) from 'sck' class Socket(AsyncHandle) from 'sck'
{ {
method(#primitive) open(domain, type, proto). var eventActions.
var pending_bytes, pending_offset, pending_length.
var outreadysem, outdonesem, inreadysem.
method(#primitive) _open(domain, type, proto).
method(#primitive) _close. method(#primitive) _close.
method(#primitive) bind: addr. method(#primitive) bind: addr.
method(#primitive) _listen: backlog. method(#primitive) _listen: backlog.
@ -423,6 +307,13 @@ pooldic Socket.Type
DGRAM := 2. DGRAM := 2.
} }
pooldic Socket.EventType
{
CONNECTED := 0.
DATA_IN := 1.
DATA_OUT := 2.
}
extend Socket extend Socket
{ {
method(#class) new { self messageProhibited: #new } method(#class) new { self messageProhibited: #new }
@ -430,266 +321,191 @@ extend Socket
method(#class) domain: domain type: type method(#class) domain: domain type: type
{ {
^super new open(domain, type, 0). ^(super new) open(domain, type, 0)
} }
method listen: backlog do: acceptBlock method initialize
{ {
self.inputAction := acceptBlock. super initialize.
self watchInput. self.eventActions := #(nil nil nil).
^self _listen: backlog.
self.outdonesem := Semaphore new.
self.outreadysem := Semaphore new.
self.inreadysem := Semaphore new.
self.outdonesem signalAction: [ :xsem |
(self.eventActions at: Socket.EventType.DATA_OUT) value: self.
System unsignal: self.outreadysem.
].
self.outreadysem signalAction: [ :xsem |
| nwritten |
nwritten := self _writeBytes: self.pending_bytes offset: self.pending_offset length: self.pending_length.
if (nwritten >= 0)
{
self.pending_bytes := nil.
self.pending_offset := 0.
self.pending_length := 0.
self.outdonesem signal.
}
].
self.inreadysem signalAction: [ :ysem |
(self.eventActions at: Socket.EventType.DATA_IN) value: self.
].
} }
method connect: target do: connectBlock method open(domain, type, proto)
{ {
| conblk oldact | | sck |
sck := self _open(domain, type, proto).
if (self.handle >= 0)
{
System addAsyncSemaphore: self.outdonesem.
System addAsyncSemaphore: self.outreadysem.
}.
^sck
}
method close
{
'Socket close' dump.
System removeAsyncSemaphore: self.outdonesem.
System removeAsyncSemaphore: self.outreadysem.
^super close.
}
### method listen: backlog do: acceptBlock
### {
### self.inputAction := acceptBlock.
### self watchInput.
### ^self _listen: backlog.
### }
method onEvent: event_type do: action_block
{
self.eventActions at: event_type put: action_block.
}
method connect: target
{
| sem |
if ((self _connect: target) <= -1) if ((self _connect: target) <= -1)
{ {
## connection in progress sem := Semaphore new.
sem signalAction: [ :xsem |
oldact := self.outputAction. | soerr dra |
self.outputAction := [ :sck :state | soerr := self _socketError.
| soerr | if (soerr >= 0)
if (state)
{ {
## i don't map a connection error to an exception. ## finalize connection if not in progress
## it's not a typical exception. it is a normal failure 'CHECKING FOR CONNECTION.....' dump.
## that is caused by an external system. System unsignal: xsem.
## System removeAsyncSemaphore: xsem.
## or should i map an error to an exception?
## i can treat EINPROGRESS, ECONNREFUSED as failure.
## all other errors may get treated as an exception?
## what about timeout???
soerr := self _socketError. (self.eventActions at: Socket.EventType.CONNECTED) value: self value: (soerr == 0).
if (soerr >= 0)
if (soerr == 0)
{ {
## finalize connection if not in progress if ((self.eventActions at: Socket.EventType.DATA_IN) notNil)
self.outputAction := oldact.
self unwatchOutput.
if (connectBlock notNil)
{ {
connectBlock value: sck value: (soerr == 0). xsem signalAction: [ :ysem |
(self.eventActions at: Socket.EventType.DATA_IN) value: self.
].
System addAsyncSemaphore: xsem.
System signal: xsem onInput: self.handle.
}. }.
}. }.
}
else
{
## timed out
self.outputAction := oldact.
self unwatchOutput.
if (connectBlock notNil)
{
## TODO: tri-state? success, failure, timeout? or boolean with extra error code
connectBlock value: sck value: false.
}.
}. }.
(* HOW TO HANDLE TIMEOUT? *)
]. ].
###self.outputTimeout: 10 do: xxxx. System addAsyncSemaphore: sem.
self watchOutput. System signal: sem onOutput: self.handle.
} }
else else
{ {
## connected immediately. ## connected immediately
if (connectBlock notNil) 'IMMEDIATELY CONNECTED.....' dump.
(self.eventActions at: Socket.EventType.CONNECTED) value: self value: true.
if ((self.eventActions at: Socket.EventType.DATA_IN) notNil)
{ {
connectBlock value: self value: true. sem := Semaphore new.
} sem signalAction: [ :xsem |
(self.eventActions at: Socket.EventType.DATA_IN) value: self.
].
System addAsyncSemaphore: sem.
System signal: sem onInput: self.handle.
}.
} }
} }
method writeBytes: bytes offset: offset length: length
{
| n |
## n >= 0: written
## n <= -1: tolerable error (e.g. EAGAIN)
## exception: fatal error
##while (true) ## TODO: loop to write as much as possible
##{
n := self _writeBytes: bytes offset: offset length: length.
if (n >= 0)
{
self.outdonesem signal.
^n
}.
##}.
## TODO: adjust offset and length
self.pending_bytes := bytes.
self.pending_offset := offset.
self.pending_length := length.
System signal: self.outreadysem onOutput: self.handle.
}
} }
class MyObject(Object) class MyObject(Object)
{ {
method(#class) main method(#class) main
{ {
| conact inact outact accact | [
| s s2 st sg ss buf count |
(SocketAddress fromString: '192.168.123.232:99') dump.
'****************************' dump.
(*
s:= X new: 20.
s basicSize dump.
'****************************' dump.
s := Y new: 10.
s x.
s basicAt: 1 put: 20.
s dump.
s basicSize dump.
'****************************' dump.
*)
(***********************************
s := ByteArray new: 100.
s basicFillFrom: 0 with: ($a asInteger) count: 100.
s basicFillFrom: 50 with: ($b asInteger) count: 50.
(s basicShiftFrom: 50 to: 94 count: 10) dump.
s dump.
##thisProcess terminate.
s := IP4Address fromString: '192.168.123.232'.
s dump.
s basicSize dump.
s := IP6Address fromString: 'fe80::c225:e9ff:fe47:99.2.3.4'.
##s := IP6Address fromString: '::99.12.34.54'.
##s := IP6Address fromString: '::FFFF:0:0'.
##s := IP6Address fromString: 'fe80::'.
s dump.
s basicSize dump.
s := IP6Address fromString: 'fe80::c225:e9ff:fe47:b1b6'.
s dump.
s basicSize dump.
##s := IP6Address new.
##s dump.
##s := IP4SocketAddress new.
##s dump.
thisProcess terminate.
**************************)
inact := [:sck :state |
| data n |
(*
end of data -> 0.
no data -> -1. (e.g. EAGAIN)
has data -> 1 or more
error -> exception
*)
data := ByteArray new: 100.
do
{
n := sck readBytes: data.
if (n <= 0)
{
if (n == 0) { sck close }. ## end of data
break.
}
elsif (n > 0)
{
(n asString & ' bytes read') dump.
data dump.
##sck writeBytes: #[ $h, $e, $l, $l, $o, $., $., $., C'\n' ].
sck writeBytes: data offset: 0 length: n.
}.
}
while (true).
].
## TODO: what should it accept as block parameter
## socket, output result? , output object?
outact := [:sck :state |
if (state)
{
## what if i want write more data???
##[ sck writeBytes: #[ $h, $e, $l, $l, $o, C'\n' ] ]
## on: Exception do: [:ex | sck close. ].
}
else
{
}
].
conact := [:sck :state |
| x write_more count |
count := 0. count := 0.
if (state) [
{ buf := ByteArray new: 128.
'CONNECTED NOW.............' dump. s := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM.
###sck inputTimeout: 10; outputTimeout: 10; connectTimeout: 10.
s onEvent: Socket.EventType.CONNECTED do: [ :sck :state |
############################################# if (state)
write_more := [:sem |
if (count <= 26)
{ {
sck writeBytes: %[ $h, $e, $l, $l, $o, $-, $m, $o, count + 65, $o, $o, C'\n' ] signal: x. 'AAAAAAAA' dump.
count := count + 1. s writeBytes: #[ $a, $b, $c ]
} }
else else
{ {
System removeAsyncSemaphore: x. 'FAILED TO CONNECT' dump.
}. }.
]. ].
s onEvent: Socket.EventType.DATA_IN do: [ :sck |
| nbytes |
nbytes := s readBytes: buf.
if (nbytes == 0)
{
sck close
}.
('Got ' & (nbytes asString)) dump.
buf dump.
].
s onEvent: Socket.EventType.DATA_OUT do: [ :sck |
if (count < 10) { s writeBytes: #[ $a, $b, C'\n' ]. count := count + 1. }.
].
x := Semaphore new. s connect: (SocketAddress fromString: '127.0.0.1:9999').
x signalAction: write_more.
System addAsyncSemaphore: x.
x signal.
##sck outputAction: outact.
##sck writeBytes: #[ $h, $e, $l, $l, $o, $-, $m, $o, $o, C'\n' ] signal: x.
###############################################
sck inputAction: inact.
sck watchInput.
}
else
{
'UNABLE TO CONNECT............' dump.
}
].
## ------------------------------------------------------
accact := [:sck :state |
| newsck newaddr |
newaddr := SocketAddress new.
newsck := sck accept: newaddr.
System log: 'new connection - '; log: newaddr; log: ' '; log: (newsck handle); logNl.
newsck inputAction: inact; outputAction: outact.
##newsck watchInput; watchOutput.
newsck watchInput.
newsck writeBytes: #[ $W, $e, $l, $c, $o, $m, $e, $., C'\n' ].
].
[
| s s2 st sg ss |
[
s := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM.
s connect: (SocketAddress fromString: '127.0.0.1:9999') do: conact.
## s2 := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM.
## s2 bind: (SocketAddress fromString: '0.0.0.0:9998').
## ##s2 inputAction: accact.
## ###s2 listen: 10; watchInput.
## s2 listen: 10 do: accact.
(*
st := Semaphore new.
System addAsyncSemaphore: st.
System signal: st afterSecs: 5.
'JJJJJJJJJJJ' dump.
sg := SemaphoreGroup new.
'JJJJJJJJJJJ' dump.
sg wait.
'YYYYYYYYYYYYYYY' dump.
*)
###[ while (1) { '1111' dump. System sleepForSecs: 1 } ] fork.
(*
st := Semaphore new.
System addAsyncSemaphore: st.
System signal: st afterSecs: 20.
*)
while (true) while (true)
{ {

View File

@ -373,7 +373,7 @@ moo_pfrc_t moo_pf_basic_at_put (moo_t* moo, moo_ooi_t nargs)
if (MOO_OBJ_GET_FLAGS_RDONLY(rcv)) if (MOO_OBJ_GET_FLAGS_RDONLY(rcv))
{ {
moo_seterrbfmt (moo, MOO_EPERM, "now allowed to change a read-only object - %O", rcv); moo_seterrbfmt (moo, MOO_EPERM, "not allowed to change a read-only object - %O", rcv);
return MOO_PF_FAILURE; return MOO_PF_FAILURE;
} }
@ -471,7 +471,7 @@ moo_pfrc_t moo_pf_basic_fill (moo_t* moo, moo_ooi_t nargs)
if (MOO_OBJ_GET_FLAGS_RDONLY(rcv)) if (MOO_OBJ_GET_FLAGS_RDONLY(rcv))
{ {
moo_seterrbfmt (moo, MOO_EPERM, "now allowed to change a read-only object - %O", rcv); moo_seterrbfmt (moo, MOO_EPERM, "not allowed to change a read-only object - %O", rcv);
return MOO_PF_FAILURE; return MOO_PF_FAILURE;
} }
@ -588,7 +588,7 @@ moo_pfrc_t moo_pf_basic_shift (moo_t* moo, moo_ooi_t nargs)
if (MOO_OBJ_GET_FLAGS_RDONLY(rcv)) if (MOO_OBJ_GET_FLAGS_RDONLY(rcv))
{ {
moo_seterrbfmt (moo, MOO_EPERM, "now allowed to change a read-only object - %O", rcv); moo_seterrbfmt (moo, MOO_EPERM, "not allowed to change a read-only object - %O", rcv);
return MOO_PF_FAILURE; return MOO_PF_FAILURE;
} }

View File

@ -71,3 +71,18 @@ binary-argument := expression-primary unary-selector*
#main #main
================================================
#library Dynlib from 'dyn-lib.so' <--- this is a generic shared library
{
## no instance variables allowed
## class-level(static) functions only.
int abc (int, float, void*) <--- call this as Dynlib.abc (...). anc proper type conversion back and forth must occur.
int abc (int, float, void*) as abc: aaa x: x y: y <--- remap the original name to a moo-style name.
}
#class abc from 'x11.so' <--- this is a moo-only shared library
{
}