From 0a5896b16b4ce727edfa92474d6a2860f435d8d8 Mon Sep 17 00:00:00 2001 From: "hyunghwan.chung" Date: Mon, 7 May 2018 16:53:37 +0000 Subject: [PATCH] added more async socket code --- moo/kernel/Socket.moo | 100 ++++++++++++++++++++++++++++++++++-------- 1 file changed, 81 insertions(+), 19 deletions(-) diff --git a/moo/kernel/Socket.moo b/moo/kernel/Socket.moo index d9b871b..38d73ca 100644 --- a/moo/kernel/Socket.moo +++ b/moo/kernel/Socket.moo @@ -258,7 +258,7 @@ class AsyncHandle(Object) class Socket(AsyncHandle) from 'sck' { - var eventActions. + var(#get) eventActions. var pending_bytes, pending_offset, pending_length. var outreadysem, outdonesem, inreadysem. @@ -266,7 +266,7 @@ class Socket(AsyncHandle) from 'sck' method(#primitive) _close. method(#primitive) bind: addr. method(#primitive) _listen: backlog. - method(#primitive) accept: addr. + method(#primitive) _accept: addr. method(#primitive) _connect: addr. method(#primitive) _socketError. @@ -308,7 +308,7 @@ extend Socket method initialize { super initialize. - self.eventActions := #(nil nil nil). + self.eventActions := %(nil, nil, nil). self.outdonesem := Semaphore new. self.outreadysem := Semaphore new. @@ -406,6 +406,17 @@ extend Socket System signal: self.outreadysem onOutput: self.handle. } + + method beWatched + { + System addAsyncSemaphore: self.inreadysem. + System signal: self.inreadysem onInput: self.handle. + System addAsyncSemaphore: self.outdonesem. + } + + method beUnwatched + { + } } class ClientSocket(Socket) @@ -432,9 +443,7 @@ class ClientSocket(Socket) if (soerr == 0) { - System addAsyncSemaphore: self.inreadysem. - System signal: self.inreadysem onInput: self.handle. - System addAsyncSemaphore: self.outdonesem. + self beWatched }. }. (* HOW TO HANDLE TIMEOUT? *) @@ -477,13 +486,44 @@ class ServerSocket(Socket) { method initialize { +'Server Socket initialize...........' dump. super initialize. - self.inreadysem := [ :sem | - | xxx | - System accept: xxx. + + self.inreadysem signalAction: [ :sem | + | cliaddr clisck cliact | + cliaddr := SocketAddress new. + clisck := self _accept: cliaddr. + if (clisck notError) + { + ## the _accept method doesn't invoke the initialize method. + ## i should invoke it manually here. + clisck initialize. + + cliact := self.eventActions at: Socket.EventType.CONNECTED. + if (cliact notNil) + { + cliact value: self value: clisck (* value: cliaddr *). + clisck beWatched. + } + else { clisck close }. + + }. ]. } + method close + { +'CLOSING SERVER SOCEKT.... ' dump. + if (self.inreadysem notNil) + { + System unsignal: self.inreadysem. + if (self.inreadysem _group notNil) { System removeAsyncSemaphore: self.inreadysem }. + self.inreadysem := nil. + }. + + ^super close. + } + method listen: backlog { System addAsyncSemaphore: self.inreadysem. @@ -504,15 +544,37 @@ class MyObject(Object) buf := ByteArray new: 128. s := ClientSocket domain: Socket.Domain.INET type: Socket.Type.STREAM. s2 := ServerSocket domain: Socket.Domain.INET type: Socket.Type.STREAM. - - s2 onEvent: Socket.EventType.DATA_IN do: [:sck | - ### accept... + + s2 onEvent: Socket.EventType.CONNECTED do: [ :sck :clisck | +'SERVER ACCEPTED new client' dump. + clisck onEvent: Socket.EventType.DATA_IN do: [ :csck | + | nbytes | + nbytes := csck readBytes: buf. + if (nbytes == 0) + { + csck close. + }. + ('Got ' & (nbytes asString)) dump. + + if (nbytes > 0) + { + + buf dump. + csck writeBytes: buf offset: 0 length: nbytes. + }. + ]. + clisck onEvent: Socket.EventType.DATA_OUT do: [ :csck | + ##csck writeBytes: #[ $a, $b, C'\n' ]. + ]. + ###clisck close. ]. - + + s2 bind: (SocketAddress fromString: '0.0.0.0:7777'). + s2 listen: 10. + s onEvent: Socket.EventType.CONNECTED do: [ :sck :state | if (state) - { - 'AAAAAAAA' dump. + { s writeBytes: #[ $a, $b, $c ]. s writeBytes: #[ $d, $e, $f ]. } @@ -520,10 +582,11 @@ class MyObject(Object) { 'FAILED TO CONNECT' dump. }. - ]. + ]. + s onEvent: Socket.EventType.DATA_IN do: [ :sck | | nbytes | - nbytes := s readBytes: buf. + nbytes := sck readBytes: buf. if (nbytes == 0) { sck close. @@ -533,11 +596,10 @@ class MyObject(Object) buf dump. ]. s onEvent: Socket.EventType.DATA_OUT do: [ :sck | - if (count < 10) { s writeBytes: #[ $a, $b, C'\n' ]. count := count + 1. }. + if (count < 10) { sck writeBytes: #[ $a, $b, C'\n' ]. count := count + 1. }. ]. s connect: (SocketAddress fromString: '127.0.0.1:9999'). - while (true) { ss := System handleAsyncEvent.