added some asycn socket code

This commit is contained in:
hyunghwan.chung 2018-05-08 10:18:50 +00:00
parent 204a6b16b9
commit 82160e03dd

View File

@ -258,7 +258,9 @@ class AsyncHandle(Object)
class Socket(AsyncHandle) from 'sck' class Socket(AsyncHandle) from 'sck'
{ {
var(#get) eventActions. var(#get) dataInEventAction.
var(#get) dataOutEventAction.
var pending_bytes, pending_offset, pending_length. var pending_bytes, pending_offset, pending_length.
var outreadysem, outdonesem, inreadysem. var outreadysem, outdonesem, inreadysem.
@ -288,13 +290,6 @@ pooldic Socket.Type
DGRAM := 2. DGRAM := 2.
} }
pooldic Socket.EventType
{
CONNECTED := 0.
DATA_IN := 1.
DATA_OUT := 2.
}
extend Socket extend Socket
{ {
method(#class) new { self messageProhibited: #new } method(#class) new { self messageProhibited: #new }
@ -308,31 +303,43 @@ extend Socket
method initialize method initialize
{ {
super initialize. super initialize.
self.eventActions := %(nil, nil, nil).
self.outdonesem := Semaphore new. self.outdonesem := Semaphore new.
self.outreadysem := Semaphore new. self.outreadysem := Semaphore new.
self.inreadysem := Semaphore new. self.inreadysem := Semaphore new.
self.outdonesem signalAction: [ :sem | self.outdonesem signalAction: [ :sem |
(self.eventActions at: Socket.EventType.DATA_OUT) value: self. self.dataOutEventAction value: self.
##(self.eventActions at: Socket.EventType.DATA_OUT) value: self.
System unsignal: self.outreadysem. System unsignal: self.outreadysem.
]. ].
self.outreadysem signalAction: [ :sem | self.outreadysem signalAction: [ :sem |
| nwritten | | nbytes pos rem |
nwritten := self _writeBytes: self.pending_bytes offset: self.pending_offset length: self.pending_length.
if (nwritten >= 0) pos := self.pending_offset.
rem := self.pending_length.
while (rem > 0)
{
nbytes := self _writeBytes: self.pending_bytes offset: pos length: rem.
if (nbytes <= -1) { break }.
pos := pos + nbytes.
rem := rem - nbytes.
}.
if (rem <= 0)
{ {
self.pending_bytes := nil. self.pending_bytes := nil.
self.pending_offset := 0. self.pending_offset := 0.
self.pending_length := 0. self.pending_length := 0.
self.outdonesem signal. self.outdonesem signal.
} }.
]. ].
self.inreadysem signalAction: [ :sem | self.inreadysem signalAction: [ :sem |
(self.eventActions at: Socket.EventType.DATA_IN) value: self. ##(self.eventActions at: Socket.EventType.DATA_IN) value: self.
self.dataInEventAction value: self.
]. ].
} }
@ -370,14 +377,26 @@ extend Socket
^super close. ^super close.
} }
method onEvent: event_type do: action_block method onEvent: event_type do: action_block
{ {
self.eventActions at: event_type put: action_block. if (event_type == #data_in)
{
self.dataInEventAction := action_block.
}
elsif (event_type == #data_out)
{
self.dataOutEventAction := action_block.
}
else
{
Exception signal: 'unknown event type ' & event_type asString.
}
} }
method writeBytes: bytes offset: offset length: length method writeBytes: bytes offset: offset length: length
{ {
| n | | n pos rem |
if (self.outreadysem _group notNil) if (self.outreadysem _group notNil)
{ {
@ -387,40 +406,43 @@ extend Socket
## n >= 0: written ## n >= 0: written
## n <= -1: tolerable error (e.g. EAGAIN) ## n <= -1: tolerable error (e.g. EAGAIN)
## exception: fatal error ## exception: fatal error
##while (true) ## TODO: loop to write as much as possible
##{ pos := offset.
n := self _writeBytes: bytes offset: offset length: length. rem := length.
if (n >= 0)
while (rem > 0) ## TODO: loop to write as much as possible
{
n := self _writeBytes: bytes offset: pos length: rem.
if (n <= -1) { break }.
rem := rem - n.
pos := pos + n.
}.
if (rem <= 0)
{ {
self.outdonesem signal. self.outdonesem signal.
^n ^length
}. }.
##}.
## TODO: adjust offset and length
self.pending_bytes := bytes. self.pending_bytes := bytes.
self.pending_offset := offset. self.pending_offset := pos.
self.pending_length := length. self.pending_length := rem
System addAsyncSemaphore: self.outreadysem. System addAsyncSemaphore: self.outreadysem.
System signal: self.outreadysem onOutput: self.handle. System signal: self.outreadysem onOutput: self.handle.
} }
method beWatched method beWatched
{ {
System addAsyncSemaphore: self.inreadysem. System addAsyncSemaphore: self.inreadysem.
System signal: self.inreadysem onInput: self.handle. System signal: self.inreadysem onInput: self.handle.
System addAsyncSemaphore: self.outdonesem. System addAsyncSemaphore: self.outdonesem.
} }
method beUnwatched
{
}
} }
class ClientSocket(Socket) class ClientSocket(Socket)
{ {
var(#get) connectedEventAction.
var connsem. var connsem.
method initialize method initialize
@ -438,13 +460,8 @@ class ClientSocket(Socket)
System unsignal: sem. System unsignal: sem.
System removeAsyncSemaphore: sem. System removeAsyncSemaphore: sem.
'CHECKING FOR CONNECTION.....' dump. self.connectedEventAction value: self value: (soerr == 0).
(self.eventActions at: Socket.EventType.CONNECTED) value: self value: (soerr == 0). if (soerr == 0) { self beWatched }.
if (soerr == 0)
{
self beWatched
}.
}. }.
(* HOW TO HANDLE TIMEOUT? *) (* HOW TO HANDLE TIMEOUT? *)
]. ].
@ -461,6 +478,16 @@ class ClientSocket(Socket)
^super close ^super close
} }
method onEvent: event_type do: action_block
{
if (event_type == #connected)
{
self.connectedEventAction := action_block.
^self.
}.
^super onEvent: event_type do: action_block
}
method connect: target method connect: target
{ {
| sem | | sem |
@ -473,7 +500,7 @@ class ClientSocket(Socket)
{ {
## connected immediately ## connected immediately
'IMMEDIATELY CONNECTED.....' dump. 'IMMEDIATELY CONNECTED.....' dump.
(self.eventActions at: Socket.EventType.CONNECTED) value: self value: true. self.connectedEventAction value: self value: true.
System addAsyncSemaphore: self.inreadysem. System addAsyncSemaphore: self.inreadysem.
System signal: self.inreadysem onInput: self.handle. System signal: self.inreadysem onInput: self.handle.
@ -484,6 +511,8 @@ class ClientSocket(Socket)
class ServerSocket(Socket) class ServerSocket(Socket)
{ {
var(#get) acceptedEventAction.
method initialize method initialize
{ {
'Server Socket initialize...........' dump. 'Server Socket initialize...........' dump.
@ -499,10 +528,9 @@ class ServerSocket(Socket)
## i should invoke it manually here. ## i should invoke it manually here.
clisck initialize. clisck initialize.
cliact := self.eventActions at: Socket.EventType.CONNECTED. if (self.acceptedEventAction notNil)
if (cliact notNil)
{ {
cliact value: self value: clisck (* value: cliaddr *). self.acceptedEventAction value: self value: clisck value: cliaddr.
clisck beWatched. clisck beWatched.
} }
else { clisck close }. else { clisck close }.
@ -523,6 +551,17 @@ class ServerSocket(Socket)
^super close. ^super close.
} }
method onEvent: event_type do: action_block
{
if (event_type == #accepted)
{
self.acceptedEventAction := action_block.
^self.
}.
^super onEvent: event_type do: action_block
}
method listen: backlog method listen: backlog
{ {
System addAsyncSemaphore: self.inreadysem. System addAsyncSemaphore: self.inreadysem.
@ -533,45 +572,49 @@ class ServerSocket(Socket)
class MyObject(Object) class MyObject(Object)
{ {
method(#class) main method(#class) start_server_socket
{ {
[ | s2 buf |
| s s2 st sg ss buf count |
count := 0.
[
buf := ByteArray new: 128.
s := ClientSocket domain: Socket.Domain.INET type: Socket.Type.STREAM.
s2 := ServerSocket domain: Socket.Domain.INET type: Socket.Type.STREAM. s2 := ServerSocket domain: Socket.Domain.INET type: Socket.Type.STREAM.
buf := ByteArray new: 128.
s2 onEvent: Socket.EventType.CONNECTED do: [ :sck :clisck | s2 onEvent: #accepted do: [ :sck :clisck :cliaddr |
'SERVER ACCEPTED new client' dump. 'SERVER ACCEPTED new client' dump.
clisck onEvent: Socket.EventType.DATA_IN do: [ :csck | clisck onEvent: #data_in do: [ :csck |
| nbytes | | nbytes |
while (true)
{
nbytes := csck readBytes: buf. nbytes := csck readBytes: buf.
if (nbytes == 0) if (nbytes <= 0)
{ {
csck close. if (nbytes == 0) { csck close }.
}.
('Got ' & (nbytes asString)) dump. ('Got ' & (nbytes asString)) dump.
break.
if (nbytes > 0) }.
{
buf dump. buf dump.
csck writeBytes: buf offset: 0 length: nbytes. csck writeBytes: buf offset: 0 length: nbytes.
}. }.
]. ].
clisck onEvent: Socket.EventType.DATA_OUT do: [ :csck | clisck onEvent: #data_out do: [ :csck |
##csck writeBytes: #[ $a, $b, C'\n' ]. ##csck writeBytes: #[ $a, $b, C'\n' ].
]. ].
###clisck close.
]. ].
s2 bind: (SocketAddress fromString: '0.0.0.0:7777'). s2 bind: (SocketAddress fromString: '0.0.0.0:7777').
s2 listen: 10. s2 listen: 10.
^s2.
}
s onEvent: Socket.EventType.CONNECTED do: [ :sck :state | method(#class) start_client_socket
{
| s buf count |
s := ClientSocket domain: Socket.Domain.INET type: Socket.Type.STREAM.
buf := ByteArray new: 128.
count := 0.
s onEvent: #connected do: [ :sck :state |
if (state) if (state)
{ {
s writeBytes: #[ $a, $b, $c ]. s writeBytes: #[ $a, $b, $c ].
@ -583,22 +626,36 @@ class MyObject(Object)
}. }.
]. ].
s onEvent: Socket.EventType.DATA_IN do: [ :sck | s onEvent: #data_in do: [ :sck |
| nbytes | | nbytes |
nbytes := sck readBytes: buf. while (true)
if (nbytes == 0)
{ {
sck close. nbytes := sck readBytes: buf.
s := nil. if (nbytes <= 0)
{
if (nbytes == 0) { sck close }.
break.
}. }.
('Got ' & (nbytes asString)) dump. ('Got ' & (nbytes asString)) dump.
buf dump. buf dump.
}.
]. ].
s onEvent: Socket.EventType.DATA_OUT do: [ :sck | s onEvent: #data_out do: [ :sck |
if (count < 10) { sck writeBytes: #[ $a, $b, C'\n' ]. count := count + 1. }. if (count < 10) { sck writeBytes: #[ $a, $b, C'\n' ]. count := count + 1. }.
]. ].
s connect: (SocketAddress fromString: '127.0.0.1:9999'). s connect: (SocketAddress fromString: '127.0.0.1:9999').
}
method(#class) main
{
[
| s s2 ss |
[
s := self start_client_socket.
s2 := self start_server_socket.
while (true) while (true)
{ {
ss := System handleAsyncEvent. ss := System handleAsyncEvent.
@ -609,6 +666,7 @@ class MyObject(Object)
ensure: ensure:
[ [
if (s notNil) { s close }. if (s notNil) { s close }.
if (s2 notNil) { s2 close }.
] ]
] on: Exception do: [:ex | ('Exception - ' & ex messageText) dump ]. ] on: Exception do: [:ex | ('Exception - ' & ex messageText) dump ].