diff --git a/moo/kernel/Socket.moo b/moo/kernel/Socket.moo index 8fe7036..5a7ba88 100644 --- a/moo/kernel/Socket.moo +++ b/moo/kernel/Socket.moo @@ -258,7 +258,9 @@ class AsyncHandle(Object) class Socket(AsyncHandle) from 'sck' { - var(#get) eventActions. + var(#get) dataInEventAction. + var(#get) dataOutEventAction. + var pending_bytes, pending_offset, pending_length. var outreadysem, outdonesem, inreadysem. @@ -288,13 +290,6 @@ pooldic Socket.Type DGRAM := 2. } -pooldic Socket.EventType -{ - CONNECTED := 0. - DATA_IN := 1. - DATA_OUT := 2. -} - extend Socket { method(#class) new { self messageProhibited: #new } @@ -308,31 +303,43 @@ extend Socket method initialize { super initialize. - self.eventActions := %(nil, nil, nil). self.outdonesem := Semaphore new. self.outreadysem := Semaphore new. self.inreadysem := Semaphore new. self.outdonesem signalAction: [ :sem | - (self.eventActions at: Socket.EventType.DATA_OUT) value: self. + self.dataOutEventAction value: self. + ##(self.eventActions at: Socket.EventType.DATA_OUT) value: self. System unsignal: self.outreadysem. ]. self.outreadysem signalAction: [ :sem | - | nwritten | - nwritten := self _writeBytes: self.pending_bytes offset: self.pending_offset length: self.pending_length. - if (nwritten >= 0) + | nbytes pos rem | + + pos := self.pending_offset. + rem := self.pending_length. + + while (rem > 0) + { + nbytes := self _writeBytes: self.pending_bytes offset: pos length: rem. + if (nbytes <= -1) { break }. + pos := pos + nbytes. + rem := rem - nbytes. + }. + + if (rem <= 0) { self.pending_bytes := nil. self.pending_offset := 0. self.pending_length := 0. self.outdonesem signal. - } + }. ]. self.inreadysem signalAction: [ :sem | - (self.eventActions at: Socket.EventType.DATA_IN) value: self. + ##(self.eventActions at: Socket.EventType.DATA_IN) value: self. + self.dataInEventAction value: self. ]. } @@ -370,14 +377,26 @@ extend Socket ^super close. } + method onEvent: event_type do: action_block { - self.eventActions at: event_type put: action_block. + if (event_type == #data_in) + { + self.dataInEventAction := action_block. + } + elsif (event_type == #data_out) + { + self.dataOutEventAction := action_block. + } + else + { + Exception signal: 'unknown event type ' & event_type asString. + } } method writeBytes: bytes offset: offset length: length { - | n | + | n pos rem | if (self.outreadysem _group notNil) { @@ -387,40 +406,43 @@ extend Socket ## n >= 0: written ## n <= -1: tolerable error (e.g. EAGAIN) ## exception: fatal error - ##while (true) ## TODO: loop to write as much as possible - ##{ - n := self _writeBytes: bytes offset: offset length: length. - if (n >= 0) - { - self.outdonesem signal. - ^n - }. - ##}. - ## TODO: adjust offset and length + pos := offset. + rem := length. + + while (rem > 0) ## TODO: loop to write as much as possible + { + n := self _writeBytes: bytes offset: pos length: rem. + if (n <= -1) { break }. + rem := rem - n. + pos := pos + n. + }. + + if (rem <= 0) + { + self.outdonesem signal. + ^length + }. + self.pending_bytes := bytes. - self.pending_offset := offset. - self.pending_length := length. + self.pending_offset := pos. + self.pending_length := rem System addAsyncSemaphore: self.outreadysem. System signal: self.outreadysem onOutput: self.handle. } - method beWatched { System addAsyncSemaphore: self.inreadysem. System signal: self.inreadysem onInput: self.handle. System addAsyncSemaphore: self.outdonesem. } - - method beUnwatched - { - } } class ClientSocket(Socket) { + var(#get) connectedEventAction. var connsem. method initialize @@ -438,13 +460,8 @@ class ClientSocket(Socket) System unsignal: sem. System removeAsyncSemaphore: sem. -'CHECKING FOR CONNECTION.....' dump. - (self.eventActions at: Socket.EventType.CONNECTED) value: self value: (soerr == 0). - - if (soerr == 0) - { - self beWatched - }. + self.connectedEventAction value: self value: (soerr == 0). + if (soerr == 0) { self beWatched }. }. (* HOW TO HANDLE TIMEOUT? *) ]. @@ -461,6 +478,16 @@ class ClientSocket(Socket) ^super close } + method onEvent: event_type do: action_block + { + if (event_type == #connected) + { + self.connectedEventAction := action_block. + ^self. + }. + + ^super onEvent: event_type do: action_block + } method connect: target { | sem | @@ -473,7 +500,7 @@ class ClientSocket(Socket) { ## connected immediately 'IMMEDIATELY CONNECTED.....' dump. - (self.eventActions at: Socket.EventType.CONNECTED) value: self value: true. + self.connectedEventAction value: self value: true. System addAsyncSemaphore: self.inreadysem. System signal: self.inreadysem onInput: self.handle. @@ -484,6 +511,8 @@ class ClientSocket(Socket) class ServerSocket(Socket) { + var(#get) acceptedEventAction. + method initialize { 'Server Socket initialize...........' dump. @@ -499,10 +528,9 @@ class ServerSocket(Socket) ## i should invoke it manually here. clisck initialize. - cliact := self.eventActions at: Socket.EventType.CONNECTED. - if (cliact notNil) + if (self.acceptedEventAction notNil) { - cliact value: self value: clisck (* value: cliaddr *). + self.acceptedEventAction value: self value: clisck value: cliaddr. clisck beWatched. } else { clisck close }. @@ -523,6 +551,17 @@ class ServerSocket(Socket) ^super close. } + method onEvent: event_type do: action_block + { + if (event_type == #accepted) + { + self.acceptedEventAction := action_block. + ^self. + }. + + ^super onEvent: event_type do: action_block + } + method listen: backlog { System addAsyncSemaphore: self.inreadysem. @@ -533,72 +572,90 @@ class ServerSocket(Socket) class MyObject(Object) { + method(#class) start_server_socket + { + | s2 buf | + s2 := ServerSocket domain: Socket.Domain.INET type: Socket.Type.STREAM. + buf := ByteArray new: 128. + + s2 onEvent: #accepted do: [ :sck :clisck :cliaddr | +'SERVER ACCEPTED new client' dump. + clisck onEvent: #data_in do: [ :csck | + | nbytes | + while (true) + { + nbytes := csck readBytes: buf. + if (nbytes <= 0) + { + if (nbytes == 0) { csck close }. + ('Got ' & (nbytes asString)) dump. + break. + }. + + buf dump. + csck writeBytes: buf offset: 0 length: nbytes. + }. + ]. + clisck onEvent: #data_out do: [ :csck | + ##csck writeBytes: #[ $a, $b, C'\n' ]. + ]. + ]. + + s2 bind: (SocketAddress fromString: '0.0.0.0:7777'). + s2 listen: 10. + ^s2. + } + + method(#class) start_client_socket + { + | s buf count | + s := ClientSocket domain: Socket.Domain.INET type: Socket.Type.STREAM. + buf := ByteArray new: 128. + + count := 0. + + s onEvent: #connected do: [ :sck :state | + if (state) + { + s writeBytes: #[ $a, $b, $c ]. + s writeBytes: #[ $d, $e, $f ]. + } + else + { + 'FAILED TO CONNECT' dump. + }. + ]. + + s onEvent: #data_in do: [ :sck | + | nbytes | + while (true) + { + nbytes := sck readBytes: buf. + if (nbytes <= 0) + { + if (nbytes == 0) { sck close }. + break. + }. + ('Got ' & (nbytes asString)) dump. + buf dump. + }. + ]. + s onEvent: #data_out do: [ :sck | + if (count < 10) { sck writeBytes: #[ $a, $b, C'\n' ]. count := count + 1. }. + ]. + + s connect: (SocketAddress fromString: '127.0.0.1:9999'). + } + method(#class) main { [ - | s s2 st sg ss buf count | + | s s2 ss | - count := 0. [ - buf := ByteArray new: 128. - s := ClientSocket domain: Socket.Domain.INET type: Socket.Type.STREAM. - s2 := ServerSocket domain: Socket.Domain.INET type: Socket.Type.STREAM. - - s2 onEvent: Socket.EventType.CONNECTED do: [ :sck :clisck | -'SERVER ACCEPTED new client' dump. - clisck onEvent: Socket.EventType.DATA_IN do: [ :csck | - | nbytes | - nbytes := csck readBytes: buf. - if (nbytes == 0) - { - csck close. - }. - ('Got ' & (nbytes asString)) dump. - - if (nbytes > 0) - { - - buf dump. - csck writeBytes: buf offset: 0 length: nbytes. - }. - ]. - clisck onEvent: Socket.EventType.DATA_OUT do: [ :csck | - ##csck writeBytes: #[ $a, $b, C'\n' ]. - ]. - ###clisck close. - ]. + s := self start_client_socket. + s2 := self start_server_socket. - s2 bind: (SocketAddress fromString: '0.0.0.0:7777'). - s2 listen: 10. - - s onEvent: Socket.EventType.CONNECTED do: [ :sck :state | - if (state) - { - s writeBytes: #[ $a, $b, $c ]. - s writeBytes: #[ $d, $e, $f ]. - } - else - { - 'FAILED TO CONNECT' dump. - }. - ]. - - s onEvent: Socket.EventType.DATA_IN do: [ :sck | - | nbytes | - nbytes := sck readBytes: buf. - if (nbytes == 0) - { - sck close. - s := nil. - }. - ('Got ' & (nbytes asString)) dump. - buf dump. - ]. - s onEvent: Socket.EventType.DATA_OUT do: [ :sck | - if (count < 10) { sck writeBytes: #[ $a, $b, C'\n' ]. count := count + 1. }. - ]. - - s connect: (SocketAddress fromString: '127.0.0.1:9999'). while (true) { ss := System handleAsyncEvent. @@ -609,6 +666,7 @@ class MyObject(Object) ensure: [ if (s notNil) { s close }. + if (s2 notNil) { s2 close }. ] ] on: Exception do: [:ex | ('Exception - ' & ex messageText) dump ].