From acb71f521c1ac64256a58cc2fd385b2c41ebfa5b Mon Sep 17 00:00:00 2001 From: "hyunghwan.chung" Date: Wed, 2 May 2018 09:53:02 +0000 Subject: [PATCH] work in progress. trying a different way of implementing async socket --- moo/kernel/Except.moo | 15 +- moo/kernel/Socket.moo | 528 ++++++++++++++---------------------------- moo/lib/pf-basic.c | 6 +- moo/lib/syntax.txt | 15 ++ 4 files changed, 204 insertions(+), 360 deletions(-) diff --git a/moo/kernel/Except.moo b/moo/kernel/Except.moo index 5db9311..067ebfe 100644 --- a/moo/kernel/Except.moo +++ b/moo/kernel/Except.moo @@ -498,8 +498,21 @@ extend Apex method(#dual) doesNotUnderstand: message_name { ## TODO: implement this properly - | class_name | + | class_name ctx | class_name := if (self class == Class) { self name } else { self class name }. + +## TOOD: IMPROVE THIS EXPERIMENTAL BACKTRACE... +System logNl: '== BACKTRACE =='. +ctx := thisContext. +while (ctx notNil) +{ + if (ctx class == MethodContext) { System logNl: (' ' & ctx method owner name & '>>' & ctx method name) }. + ## TODO: include blockcontext??? + ctx := ctx sender. +}. +System logNl: '== END OF BACKTRACE =='. + + NoSuchMessageException signal: (message_name & ' not understood by ' & class_name). } diff --git a/moo/kernel/Socket.moo b/moo/kernel/Socket.moo index ff1a53a..fc2c1e9 100644 --- a/moo/kernel/Socket.moo +++ b/moo/kernel/Socket.moo @@ -230,22 +230,23 @@ class AsyncHandle(Object) ## the handle must be the first field in this object to match ## the internal representation used by various modules. (e.g. sck) var(#get) handle := -1. + var outsem := nil. - var inwc := 0, outwc := 0. ## input watcher count and ouput watcher count - var insem, outsem. - var(#get,#set) inputAction, outputAction. - var(#get) inputReady := false, outputReady := false. + ##method initialize + ##{ + ## ^super initialize + ##} method close { if (self.handle >= 0) { - if (self.insem notNil) - { - System unsignal: self.insem; - removeAsyncSemaphore: self.insem. - self.insem := nil. - }. + ###if (self.insem notNil) + ###{ + ### System unsignal: self.insem; + ### removeAsyncSemaphore: self.insem. + ### self.insem := nil. + ###}. if (self.outsem notNil) { System unsignal: self.outsem; @@ -253,151 +254,34 @@ class AsyncHandle(Object) self.outsem := nil. }. - self.outwc := 0. - self.inwc := 0. - self _close. self.handle := -1. } } -## TODO: how to specify a timeout for an action? using another semaphore?? - method watchInput - { - if (self.inwc == 0) - { - if (self.insem isNil) - { - self.insem := Semaphore new. - self.insem signalAction: [:sem | - self.inputReady := true. - self.inputAction value: self value: true - ]. - System addAsyncSemaphore: self.insem. - }. - self.inputReady := false. - System signal: self.insem onInput: self.handle - }. - self.inwc := self.inwc + 1. - } - - method unwatchInput - { - if (self.inwc > 0) - { - self.inwc := self.inwc - 1. - if (self.inwc == 0) - { - ##if (self.insem notNil) { System unsignal: self.insem }. - System unsignal: self.insem. - System removeAsyncSemaphore: self.insem. - self.insem := nil. - }. - }. - } - - method watchOutput - { - if (self.outwc == 0) - { - if (self.outsem isNil) - { - self.outsem := Semaphore new. - self.outsem signalAction: [:sem | - self.outputReady := true. - self.outputAction value: self value: true - ]. - System addAsyncSemaphore: self.outsem. - }. - self.outputReady := false. - System signal: self.outsem onOutput: self.handle. - }. - self.outwc := self.outwc + 1. - } - - - method unwatchOutput - { - if (self.outwc > 0) - { - self.outwc := self.outwc - 1. - if (self.outwc == 0) - { - ## self.outsem must not be nil here. - System unsignal: self.outsem. - System removeAsyncSemaphore: self.outsem. - self.outsem := nil. - }. - }. - } - - method writeBytes: bytes offset: offset length: length signal: sem - { - | oldact n | -####################################### -## TODO: if data still in progress, failure... or success while concatening the message? -## for a stream, concatening is not bad. but it's not good if the socket requires message boundary preservation. -###################################### - - if (self.outputReady) - { - ## n >= 0: written - ## n <= -1: tolerable error (e.g. EAGAIN) - ## exception: fatal error - ##while (true) ## TODO: loop to write as much as possible - ##{ - n := self _writeBytes: bytes offset: offset length: length. - if (n >= 0) - { - if (sem notNil) { sem signal }. - ^n - }. - ##}. - - self.outputReady := false. - }. - - oldact := self.outputAction. - self.outputAction := [ :sck :state | - ##### schedule write. - if (state) - { - if ((self _writeBytes: bytes offset: offset length: length) <= -1) - { - ## EAGAIN - self.outputReady := false. - ^self. - }. -## TODO: handle _writeBytes may not write in full. - }. - - self.outputAction := oldact. - self unwatchOutput. - ]. - - ## TODO: set timeout? - self watchOutput. - } - method writeBytes: bytes signal: sem { - ^self writeBytes: bytes offset: 0 length: (bytes size) signal: sem. + ^self writeBytes: bytes offset: 0 length: (bytes size) } method writeBytes: bytes offset: offset length: length { - ^self writeBytes: bytes offset: offset length: length signal: nil. + ^self writeBytes: bytes offset: offset length: length. } method writeBytes: bytes { - ^self writeBytes: bytes offset: 0 length: (bytes size) signal: nil. + ^self writeBytes: bytes offset: 0 length: (bytes size) } } class Socket(AsyncHandle) from 'sck' { - method(#primitive) open(domain, type, proto). + var eventActions. + var pending_bytes, pending_offset, pending_length. + var outreadysem, outdonesem, inreadysem. + + method(#primitive) _open(domain, type, proto). method(#primitive) _close. method(#primitive) bind: addr. method(#primitive) _listen: backlog. @@ -423,6 +307,13 @@ pooldic Socket.Type DGRAM := 2. } +pooldic Socket.EventType +{ + CONNECTED := 0. + DATA_IN := 1. + DATA_OUT := 2. +} + extend Socket { method(#class) new { self messageProhibited: #new } @@ -430,266 +321,191 @@ extend Socket method(#class) domain: domain type: type { - ^super new open(domain, type, 0). + ^(super new) open(domain, type, 0) } - method listen: backlog do: acceptBlock + method initialize { - self.inputAction := acceptBlock. - self watchInput. - ^self _listen: backlog. + super initialize. + self.eventActions := #(nil nil nil). + + self.outdonesem := Semaphore new. + self.outreadysem := Semaphore new. + self.inreadysem := Semaphore new. + + self.outdonesem signalAction: [ :xsem | + (self.eventActions at: Socket.EventType.DATA_OUT) value: self. + System unsignal: self.outreadysem. + ]. + + self.outreadysem signalAction: [ :xsem | + | nwritten | + nwritten := self _writeBytes: self.pending_bytes offset: self.pending_offset length: self.pending_length. + if (nwritten >= 0) + { + self.pending_bytes := nil. + self.pending_offset := 0. + self.pending_length := 0. + self.outdonesem signal. + } + ]. + + self.inreadysem signalAction: [ :ysem | + (self.eventActions at: Socket.EventType.DATA_IN) value: self. + ]. } - method connect: target do: connectBlock + method open(domain, type, proto) { - | conblk oldact | + | sck | + sck := self _open(domain, type, proto). + if (self.handle >= 0) + { + System addAsyncSemaphore: self.outdonesem. + System addAsyncSemaphore: self.outreadysem. + }. + + ^sck + } + + method close + { +'Socket close' dump. + System removeAsyncSemaphore: self.outdonesem. + System removeAsyncSemaphore: self.outreadysem. + ^super close. + } +### method listen: backlog do: acceptBlock +### { +### self.inputAction := acceptBlock. +### self watchInput. +### ^self _listen: backlog. +### } + + method onEvent: event_type do: action_block + { + self.eventActions at: event_type put: action_block. + } + + method connect: target + { + | sem | if ((self _connect: target) <= -1) { - ## connection in progress - - oldact := self.outputAction. - self.outputAction := [ :sck :state | - | soerr | - - if (state) + sem := Semaphore new. + sem signalAction: [ :xsem | + | soerr dra | + soerr := self _socketError. + if (soerr >= 0) { - ## i don't map a connection error to an exception. - ## it's not a typical exception. it is a normal failure - ## that is caused by an external system. - ## - ## or should i map an error to an exception? - ## i can treat EINPROGRESS, ECONNREFUSED as failure. - ## all other errors may get treated as an exception? - ## what about timeout??? + ## finalize connection if not in progress +'CHECKING FOR CONNECTION.....' dump. + System unsignal: xsem. + System removeAsyncSemaphore: xsem. - soerr := self _socketError. - if (soerr >= 0) + (self.eventActions at: Socket.EventType.CONNECTED) value: self value: (soerr == 0). + + if (soerr == 0) { - ## finalize connection if not in progress - self.outputAction := oldact. - self unwatchOutput. - if (connectBlock notNil) + if ((self.eventActions at: Socket.EventType.DATA_IN) notNil) { - connectBlock value: sck value: (soerr == 0). + xsem signalAction: [ :ysem | + (self.eventActions at: Socket.EventType.DATA_IN) value: self. + ]. + System addAsyncSemaphore: xsem. + System signal: xsem onInput: self.handle. }. }. - } - else - { - ## timed out - self.outputAction := oldact. - self unwatchOutput. - if (connectBlock notNil) - { - ## TODO: tri-state? success, failure, timeout? or boolean with extra error code - connectBlock value: sck value: false. - }. }. + (* HOW TO HANDLE TIMEOUT? *) ]. - ###self.outputTimeout: 10 do: xxxx. - self watchOutput. + System addAsyncSemaphore: sem. + System signal: sem onOutput: self.handle. } else { - ## connected immediately. - if (connectBlock notNil) + ## connected immediately +'IMMEDIATELY CONNECTED.....' dump. + (self.eventActions at: Socket.EventType.CONNECTED) value: self value: true. + if ((self.eventActions at: Socket.EventType.DATA_IN) notNil) { - connectBlock value: self value: true. - } + sem := Semaphore new. + sem signalAction: [ :xsem | + (self.eventActions at: Socket.EventType.DATA_IN) value: self. + ]. + System addAsyncSemaphore: sem. + System signal: sem onInput: self.handle. + }. } } + + method writeBytes: bytes offset: offset length: length + { + | n | + + ## n >= 0: written + ## n <= -1: tolerable error (e.g. EAGAIN) + ## exception: fatal error + ##while (true) ## TODO: loop to write as much as possible + ##{ + n := self _writeBytes: bytes offset: offset length: length. + if (n >= 0) + { + self.outdonesem signal. + ^n + }. + ##}. + + ## TODO: adjust offset and length + self.pending_bytes := bytes. + self.pending_offset := offset. + self.pending_length := length. + + System signal: self.outreadysem onOutput: self.handle. + } + } class MyObject(Object) { method(#class) main { - | conact inact outact accact | - - -(SocketAddress fromString: '192.168.123.232:99') dump. -'****************************' dump. - -(* -s:= X new: 20. -s basicSize dump. -'****************************' dump. - -s := Y new: 10. -s x. -s basicAt: 1 put: 20. -s dump. -s basicSize dump. -'****************************' dump. -*) - -(*********************************** -s := ByteArray new: 100. -s basicFillFrom: 0 with: ($a asInteger) count: 100. -s basicFillFrom: 50 with: ($b asInteger) count: 50. -(s basicShiftFrom: 50 to: 94 count: 10) dump. -s dump. -##thisProcess terminate. - -s := IP4Address fromString: '192.168.123.232'. -s dump. -s basicSize dump. - -s := IP6Address fromString: 'fe80::c225:e9ff:fe47:99.2.3.4'. -##s := IP6Address fromString: '::99.12.34.54'. -##s := IP6Address fromString: '::FFFF:0:0'. -##s := IP6Address fromString: 'fe80::'. -s dump. -s basicSize dump. - -s := IP6Address fromString: 'fe80::c225:e9ff:fe47:b1b6'. -s dump. -s basicSize dump. -##s := IP6Address new. -##s dump. -##s := IP4SocketAddress new. -##s dump. -thisProcess terminate. -**************************) - - inact := [:sck :state | - | data n | -(* -end of data -> 0. -no data -> -1. (e.g. EAGAIN) -has data -> 1 or more -error -> exception -*) - - data := ByteArray new: 100. - do - { - n := sck readBytes: data. - if (n <= 0) - { - if (n == 0) { sck close }. ## end of data - break. - } - elsif (n > 0) - { - (n asString & ' bytes read') dump. - data dump. - - ##sck writeBytes: #[ $h, $e, $l, $l, $o, $., $., $., C'\n' ]. - sck writeBytes: data offset: 0 length: n. - }. - } - while (true). - ]. - -## TODO: what should it accept as block parameter -## socket, output result? , output object? - outact := [:sck :state | - if (state) - { - ## what if i want write more data??? - ##[ sck writeBytes: #[ $h, $e, $l, $l, $o, C'\n' ] ] - ## on: Exception do: [:ex | sck close. ]. - } - else - { - } - ]. - - - conact := [:sck :state | - - | x write_more count | + [ + | s s2 st sg ss buf count | count := 0. - if (state) - { - 'CONNECTED NOW.............' dump. - ###sck inputTimeout: 10; outputTimeout: 10; connectTimeout: 10. - -############################################# - write_more := [:sem | - if (count <= 26) + [ + buf := ByteArray new: 128. + s := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM. + + s onEvent: Socket.EventType.CONNECTED do: [ :sck :state | + if (state) { - sck writeBytes: %[ $h, $e, $l, $l, $o, $-, $m, $o, count + 65, $o, $o, C'\n' ] signal: x. - count := count + 1. + 'AAAAAAAA' dump. + s writeBytes: #[ $a, $b, $c ] } else { - System removeAsyncSemaphore: x. + 'FAILED TO CONNECT' dump. }. ]. + s onEvent: Socket.EventType.DATA_IN do: [ :sck | + | nbytes | + nbytes := s readBytes: buf. + if (nbytes == 0) + { + sck close + }. + ('Got ' & (nbytes asString)) dump. + buf dump. + ]. + s onEvent: Socket.EventType.DATA_OUT do: [ :sck | + if (count < 10) { s writeBytes: #[ $a, $b, C'\n' ]. count := count + 1. }. + ]. - x := Semaphore new. - x signalAction: write_more. - System addAsyncSemaphore: x. - x signal. - - ##sck outputAction: outact. - ##sck writeBytes: #[ $h, $e, $l, $l, $o, $-, $m, $o, $o, C'\n' ] signal: x. -############################################### - - sck inputAction: inact. - sck watchInput. - } - else - { - 'UNABLE TO CONNECT............' dump. - } - ]. - - ## ------------------------------------------------------ - accact := [:sck :state | - | newsck newaddr | - - newaddr := SocketAddress new. - newsck := sck accept: newaddr. - - System log: 'new connection - '; log: newaddr; log: ' '; log: (newsck handle); logNl. - - newsck inputAction: inact; outputAction: outact. - ##newsck watchInput; watchOutput. - newsck watchInput. - - - newsck writeBytes: #[ $W, $e, $l, $c, $o, $m, $e, $., C'\n' ]. - ]. - - - - [ - | s s2 st sg ss | - [ - s := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM. - s connect: (SocketAddress fromString: '127.0.0.1:9999') do: conact. - -## s2 := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM. -## s2 bind: (SocketAddress fromString: '0.0.0.0:9998'). -## ##s2 inputAction: accact. -## ###s2 listen: 10; watchInput. -## s2 listen: 10 do: accact. - -(* -st := Semaphore new. -System addAsyncSemaphore: st. -System signal: st afterSecs: 5. -'JJJJJJJJJJJ' dump. -sg := SemaphoreGroup new. -'JJJJJJJJJJJ' dump. -sg wait. -'YYYYYYYYYYYYYYY' dump. -*) - -###[ while (1) { '1111' dump. System sleepForSecs: 1 } ] fork. - -(* -st := Semaphore new. -System addAsyncSemaphore: st. -System signal: st afterSecs: 20. -*) - + s connect: (SocketAddress fromString: '127.0.0.1:9999'). while (true) { diff --git a/moo/lib/pf-basic.c b/moo/lib/pf-basic.c index e7a386c..86836c1 100644 --- a/moo/lib/pf-basic.c +++ b/moo/lib/pf-basic.c @@ -373,7 +373,7 @@ moo_pfrc_t moo_pf_basic_at_put (moo_t* moo, moo_ooi_t nargs) if (MOO_OBJ_GET_FLAGS_RDONLY(rcv)) { - moo_seterrbfmt (moo, MOO_EPERM, "now allowed to change a read-only object - %O", rcv); + moo_seterrbfmt (moo, MOO_EPERM, "not allowed to change a read-only object - %O", rcv); return MOO_PF_FAILURE; } @@ -471,7 +471,7 @@ moo_pfrc_t moo_pf_basic_fill (moo_t* moo, moo_ooi_t nargs) if (MOO_OBJ_GET_FLAGS_RDONLY(rcv)) { - moo_seterrbfmt (moo, MOO_EPERM, "now allowed to change a read-only object - %O", rcv); + moo_seterrbfmt (moo, MOO_EPERM, "not allowed to change a read-only object - %O", rcv); return MOO_PF_FAILURE; } @@ -588,7 +588,7 @@ moo_pfrc_t moo_pf_basic_shift (moo_t* moo, moo_ooi_t nargs) if (MOO_OBJ_GET_FLAGS_RDONLY(rcv)) { - moo_seterrbfmt (moo, MOO_EPERM, "now allowed to change a read-only object - %O", rcv); + moo_seterrbfmt (moo, MOO_EPERM, "not allowed to change a read-only object - %O", rcv); return MOO_PF_FAILURE; } diff --git a/moo/lib/syntax.txt b/moo/lib/syntax.txt index 3cba1b1..5174661 100644 --- a/moo/lib/syntax.txt +++ b/moo/lib/syntax.txt @@ -71,3 +71,18 @@ binary-argument := expression-primary unary-selector* #main + + +================================================ +#library Dynlib from 'dyn-lib.so' <--- this is a generic shared library +{ +## no instance variables allowed +## class-level(static) functions only. + int abc (int, float, void*) <--- call this as Dynlib.abc (...). anc proper type conversion back and forth must occur. + int abc (int, float, void*) as abc: aaa x: x y: y <--- remap the original name to a moo-style name. +} + +#class abc from 'x11.so' <--- this is a moo-only shared library +{ +} +