some code changes related to async socket io

This commit is contained in:
hyunghwan.chung 2018-02-04 16:35:45 +00:00
parent 0269e10fd8
commit 76c3d78d46
3 changed files with 190 additions and 154 deletions

View File

@ -225,10 +225,10 @@ class(#byte) SocketAddress(Object) from 'sck.addr'
}
}
class Socket(Object) from 'sck'
class AsyncHandle(Object)
{
## the handle must be the first field in the Socket object to match
## the internal socket representation used by the sck module.
## the handle must be the first field in this object to match
## the internal representation used by various modules. (e.g. sck)
var(#get) handle := -1.
var inwc := 0, outwc := 0. ## input watcher count and ouput watcher count
@ -236,48 +236,10 @@ class Socket(Object) from 'sck'
var(#get,#set) inputAction, outputAction.
var(#get) inputReady := false, outputReady := false.
method(#primitive) open(domain, type, proto).
method(#primitive) _close.
method(#primitive) bind: addr.
method(#primitive) _listen: backlog.
method(#primitive) accept: addr.
method(#primitive) _connect: addr.
method(#primitive) _socketError.
method(#primitive) readBytes: bytes.
method(#primitive) _writeBytes: bytes.
method(#primitive) _writeBytes: bytes offset: offset length: length.
}
(* TODO: generate these domain and type from the C header *)
pooldic Socket.Domain
{
INET := 2.
INET6 := 10.
}
pooldic Socket.Type
{
STREAM := 1.
DGRAM := 2.
}
extend Socket
{
method(#class) new { self messageProhibited: #new }
method(#class) new: size { self messageProhibited: #new: }
method(#class) domain: domain type: type
{
^super new open(domain, type, 0).
}
method close
{
if (self.handle >= 0)
{
## this primitive method may return failure.
## but ignore it here.
if (self.insem notNil)
{
System unsignal: self.insem;
@ -291,96 +253,15 @@ extend Socket
self.outsem := nil.
}.
self.outwc := 0.
self.inwc := 0.
self _close.
self.handle := -1.
}
}
method listen: backlog do: acceptBlock
{
self.inputAction := acceptBlock.
self watchInput.
^self _listen: backlog.
}
method connectTo: target do: connectBlock
{
| s1 s2 sa |
s1 := Semaphore new.
s2 := Semaphore new.
sa := [:sem |
| connected |
connected := false.
System unsignal: s1;
unsignal: s2;
removeAsyncSemaphore: s1;
removeAsyncSemaphore: s2.
if (sem == s1)
{
[ connected := (self _socketError == 0) ] ifCurtailed: [ connected := false ].
}.
connectBlock value: self value: connected.
].
s1 signalAction: sa.
s2 signalAction: sa.
[
System signal: s1 onOutput: self.handle;
signal: s2 afterSecs: 10;
addAsyncSemaphore: s1;
addAsyncSemaphore: s2.
self _connect: target.
] ifCurtailed: [
## rollback
sa value: s2.
]
}
method writeBytes: bytes offset: offset length: length
{
| old_output_action |
if (self.outputReady)
{
if ((self _writeBytes: bytes offset: offset length: length) >= 0) { ^self }.
self.outputReady := false.
}.
old_output_action := self.outputAction.
self.outputAction := [ :sck :state |
if ((self _writeBytes: bytes offset: offset length: length) <= -1)
{
## EAGAIN
self.outputReady := false.
^self.
}.
## TODO: handle _writeBytes may not write in full.
## restore the output action block before executing the previous
## one. i don't want this action block to be chained by the
## previous block if it ever does
self.outputAction := old_output_action.
if (old_output_action notNil) { old_output_action value: self value: true }.
self unwatchOutput.
].
self watchOutput.
}
method writeBytes: bytes
{
^self writeBytes: bytes offset: 0 length: (bytes size)
}
## TODO: how to specify a timeout for an action? using another semaphore??
method watchInput
{
if (self.inwc == 0)
@ -432,6 +313,7 @@ extend Socket
self.outwc := self.outwc + 1.
}
method unwatchOutput
{
if (self.outwc > 0)
@ -439,11 +321,159 @@ extend Socket
self.outwc := self.outwc - 1.
if (self.outwc == 0)
{
##if (self.outsem notNil) { System unsignal: self.outsem }.
## self.outsem must not be nil here.
System unsignal: self.outsem.
}.
}.
}
method writeBytes: bytes offset: offset length: length
{
| oldact |
#######################################
## TODO: if data still in progress, failure... or success while concatening the message?
## for a stream, concatening is not bad. but it's not good if the socket requires message boundary preservation.
######################################
if (self.outputReady)
{
if ((self _writeBytes: bytes offset: offset length: length) >= 0) { ^self }.
self.outputReady := false.
}.
oldact := self.outputAction.
self.outputAction := [ :sck :state |
if (state)
{
if ((self _writeBytes: bytes offset: offset length: length) <= -1)
{
## EAGAIN
self.outputReady := false.
^self.
}.
## TODO: handle _writeBytes may not write in full.
}.
self.outputAction := oldact.
self unwatchOutput.
].
## TODO: set timeout?
self watchOutput.
}
method writeBytes: bytes
{
^self writeBytes: bytes offset: 0 length: (bytes size)
}
}
class Socket(AsyncHandle) from 'sck'
{
method(#primitive) open(domain, type, proto).
method(#primitive) _close.
method(#primitive) bind: addr.
method(#primitive) _listen: backlog.
method(#primitive) accept: addr.
method(#primitive) _connect: addr.
method(#primitive) _socketError.
method(#primitive) readBytes: bytes.
method(#primitive) _writeBytes: bytes.
method(#primitive) _writeBytes: bytes offset: offset length: length.
}
(* TODO: generate these domain and type from the C header *)
pooldic Socket.Domain
{
INET := 2.
INET6 := 10.
}
pooldic Socket.Type
{
STREAM := 1.
DGRAM := 2.
}
extend Socket
{
method(#class) new { self messageProhibited: #new }
method(#class) new: size { self messageProhibited: #new: }
method(#class) domain: domain type: type
{
^super new open(domain, type, 0).
}
method listen: backlog do: acceptBlock
{
self.inputAction := acceptBlock.
self watchInput.
^self _listen: backlog.
}
method connect: target do: connectBlock
{
| conblk oldact |
if ((self _connect: target) <= -1)
{
## connection in progress
oldact := self.outputAction.
self.outputAction := [ :sck :state |
| soerr |
if (state)
{
## i don't map a connection error to an exception.
## it's not a typical exception. it is a normal failure
## that is caused by an external system.
##
## or should i map an error to an exception?
## i can treat EINPROGRESS, ECONNREFUSED as failure.
## all other errors may get treated as an exception?
## what about timeout???
soerr := self _socketError.
if (soerr >= 0)
{
## finalize connection if not in progress
self.outputAction := oldact.
self unwatchOutput.
if (connectBlock notNil)
{
connectBlock value: sck value: (soerr == 0).
}.
}.
}
else
{
## timed out
self.outputAction := oldact.
self unwatchOutput.
if (connectBlock notNil)
{
## TODO: tri-state? success, failure, timeout? or boolean with extra error code
connectBlock value: sck value: false.
}.
}.
].
###self.outputTimeout: 10 do: xxxx.
self watchOutput.
}
else
{
## connected immediately.
if (connectBlock notNil)
{
connectBlock value: self value: true.
}
}
}
}
class MyObject(Object)
@ -502,7 +532,7 @@ thisProcess terminate.
| data n |
(*
end of data -> 0.
no data -> -1. (e.g. EINPROGRESS)
no data -> -1. (e.g. EAGAIN)
has data -> 1 or more
error -> exception
*)
@ -531,7 +561,6 @@ error -> exception
## TODO: what should it accept as block parameter
## socket, output result? , output object?
outact := [:sck :state |
if (state)
{
## what if i want write more data???
@ -547,9 +576,14 @@ error -> exception
if (state)
{
'CONNECTED NOW.............' dump.
###sck inputTimeout: 10; outputTimeout: 10; connectTimeout: 10.
sck outputAction: outact.
sck writeBytes: #[ $h, $e, $l, $l, $o, $-, $m, $o, $o, C'\n' ].
sck inputAction: inact.
sck watchInput.
sck writeBytes: #[ $h, $e, $l, $l, $o, $w, $o, C'\n' ].
###sck watchInput; watchOutput.
}
else
{
@ -577,17 +611,14 @@ error -> exception
| s s2 |
[
s := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM.
s inputAction: inact; outputAction: outact.
s connectTo: (SocketAddress fromString: '127.0.0.1:9999') do: conact.
##s connect: (SocketAddress fromString: '127.0.0.1:9999') do: conact.
s connect: (SocketAddress fromString: '127.0.0.1:9999') do: conact.
s2 := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM.
s2 bind: (SocketAddress fromString: '0.0.0.0:9998').
##s2 inputAction: accact.
###s2 listen: 10; watchInput.
s2 listen: 10 do: accact.
### when there is an exception something is not right....
Exception signal: 'XXXXXXXXXX'.
## s2 := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM.
## s2 bind: (SocketAddress fromString: '0.0.0.0:9998').
## ##s2 inputAction: accact.
## ###s2 listen: 10; watchInput.
## s2 listen: 10 do: accact.
while (true)
{

View File

@ -4190,7 +4190,7 @@ static MOO_INLINE int switch_process_if_needed (moo_t* moo)
{
/* there exist suspended processes while no processes are runnable.
* most likely, the running program contains process/semaphore related bugs */
MOO_LOG1 (moo, MOO_LOG_IC | MOO_LOG_DEBUG,
MOO_LOG1 (moo, MOO_LOG_IC | MOO_LOG_WARN,
"%zd suspended process(es) found - check your program\n",
MOO_OOP_TO_SMOOI(moo->processor->suspended.count));
}

View File

@ -299,7 +299,7 @@ static moo_pfrc_t pf_listen_socket (moo_t* moo, moo_ooi_t nargs)
}
static moo_pfrc_t pf_connect (moo_t* moo, moo_ooi_t nargs)
static moo_pfrc_t pf_connect_socket (moo_t* moo, moo_ooi_t nargs)
{
oop_sck_t sck;
int fd, n;
@ -321,19 +321,23 @@ static moo_pfrc_t pf_connect (moo_t* moo, moo_ooi_t nargs)
return MOO_PF_FAILURE;
}
do
{
n = connect(fd, (struct sockaddr*)MOO_OBJ_GET_BYTE_SLOT(arg), moo_sck_addr_len((sck_addr_t*)MOO_OBJ_GET_BYTE_SLOT(arg)));
if (n == -1)
{
if (errno == EINPROGRESS)
{
/* have the primitive function to return -1 */
MOO_STACK_SETRET (moo, nargs, MOO_SMOOI_TO_OOP(-1));
return MOO_PF_SUCCESS;
}
while (n == -1 && errno == EINTR);
if (n == -1 && errno != EINPROGRESS)
else
{
moo_seterrwithsyserr (moo, errno);
return MOO_PF_FAILURE;
}
}
MOO_STACK_SETRETTORCV (moo, nargs);
MOO_STACK_SETRET (moo, nargs, MOO_SMOOI_TO_OOP(0));
return MOO_PF_SUCCESS;
}
@ -364,7 +368,8 @@ static moo_pfrc_t pf_get_socket_error (moo_t* moo, moo_ooi_t nargs)
return MOO_PF_FAILURE;
}
/* if ret == EINPROGRESS .. it's in progress */
if (ret == EINPROGRESS) ret = -1; /* map EINPROGRESS to -1. all others are returned without change */
MOO_STACK_SETRET (moo, nargs, MOO_SMOOI_TO_OOP(ret));
return MOO_PF_SUCCESS;
}
@ -505,7 +510,7 @@ static moo_pfinfo_t pfinfos[] =
{ I, { 'a','c','c','e','p','t',':','\0' }, 0, { pf_accept_socket, 1, 1 } },
{ I, { 'b','i','n','d',':','\0' }, 0, { pf_bind_socket, 1, 1 } },
{ I, { 'c','l','o','s','e','\0' }, 0, { pf_close_socket, 0, 0 } },
{ I, { 'c','o','n','n','e','c','t',':','\0' }, 0, { pf_connect, 1, 1 } },
{ I, { 'c','o','n','n','e','c','t',':','\0' }, 0, { pf_connect_socket, 1, 1 } },
{ I, { 'l','i','s','t','e','n',':','\0' }, 0, { pf_listen_socket, 1, 1 } },
{ I, { 'o','p','e','n','\0' }, 0, { pf_open_socket, 3, 3 } },
{ I, { 'r','e','a','d','B','y','t','e','s',':','\0' }, 0, { pf_read_socket, 1, 1 } },