From 76c3d78d4640498ddef8c0d36d7efaf05698e793 Mon Sep 17 00:00:00 2001 From: "hyunghwan.chung" Date: Sun, 4 Feb 2018 16:35:45 +0000 Subject: [PATCH] some code changes related to async socket io --- moo/kernel/Socket.moo | 311 +++++++++++++++++++++++------------------- moo/lib/exec.c | 2 +- moo/mod/sck.c | 31 +++-- 3 files changed, 190 insertions(+), 154 deletions(-) diff --git a/moo/kernel/Socket.moo b/moo/kernel/Socket.moo index e6a61bc..21ced5d 100644 --- a/moo/kernel/Socket.moo +++ b/moo/kernel/Socket.moo @@ -225,10 +225,10 @@ class(#byte) SocketAddress(Object) from 'sck.addr' } } -class Socket(Object) from 'sck' +class AsyncHandle(Object) { - ## the handle must be the first field in the Socket object to match - ## the internal socket representation used by the sck module. + ## the handle must be the first field in this object to match + ## the internal representation used by various modules. (e.g. sck) var(#get) handle := -1. var inwc := 0, outwc := 0. ## input watcher count and ouput watcher count @@ -236,48 +236,10 @@ class Socket(Object) from 'sck' var(#get,#set) inputAction, outputAction. var(#get) inputReady := false, outputReady := false. - method(#primitive) open(domain, type, proto). - method(#primitive) _close. - method(#primitive) bind: addr. - method(#primitive) _listen: backlog. - method(#primitive) accept: addr. - method(#primitive) _connect: addr. - method(#primitive) _socketError. - - method(#primitive) readBytes: bytes. - method(#primitive) _writeBytes: bytes. - method(#primitive) _writeBytes: bytes offset: offset length: length. -} - -(* TODO: generate these domain and type from the C header *) -pooldic Socket.Domain -{ - INET := 2. - INET6 := 10. -} - -pooldic Socket.Type -{ - STREAM := 1. - DGRAM := 2. -} - -extend Socket -{ - method(#class) new { self messageProhibited: #new } - method(#class) new: size { self messageProhibited: #new: } - - method(#class) domain: domain type: type - { - ^super new open(domain, type, 0). - } - method close { if (self.handle >= 0) { - ## this primitive method may return failure. - ## but ignore it here. if (self.insem notNil) { System unsignal: self.insem; @@ -291,96 +253,15 @@ extend Socket self.outsem := nil. }. + self.outwc := 0. + self.inwc := 0. + self _close. self.handle := -1. } } - method listen: backlog do: acceptBlock - { - self.inputAction := acceptBlock. - self watchInput. - ^self _listen: backlog. - } - - method connectTo: target do: connectBlock - { - | s1 s2 sa | - - s1 := Semaphore new. - s2 := Semaphore new. - - sa := [:sem | - - | connected | - - connected := false. - System unsignal: s1; - unsignal: s2; - removeAsyncSemaphore: s1; - removeAsyncSemaphore: s2. - - if (sem == s1) - { - [ connected := (self _socketError == 0) ] ifCurtailed: [ connected := false ]. - }. - - connectBlock value: self value: connected. - ]. - - s1 signalAction: sa. - s2 signalAction: sa. - - [ - System signal: s1 onOutput: self.handle; - signal: s2 afterSecs: 10; - addAsyncSemaphore: s1; - addAsyncSemaphore: s2. - self _connect: target. - ] ifCurtailed: [ - ## rollback - sa value: s2. - ] - } - - method writeBytes: bytes offset: offset length: length - { - | old_output_action | - - if (self.outputReady) - { - if ((self _writeBytes: bytes offset: offset length: length) >= 0) { ^self }. - self.outputReady := false. - }. - - old_output_action := self.outputAction. - self.outputAction := [ :sck :state | - if ((self _writeBytes: bytes offset: offset length: length) <= -1) - { - ## EAGAIN - self.outputReady := false. - ^self. - }. -## TODO: handle _writeBytes may not write in full. - - ## restore the output action block before executing the previous - ## one. i don't want this action block to be chained by the - ## previous block if it ever does - self.outputAction := old_output_action. - if (old_output_action notNil) { old_output_action value: self value: true }. - self unwatchOutput. - ]. - - self watchOutput. - } - - method writeBytes: bytes - { - ^self writeBytes: bytes offset: 0 length: (bytes size) - } - ## TODO: how to specify a timeout for an action? using another semaphore?? - method watchInput { if (self.inwc == 0) @@ -432,6 +313,7 @@ extend Socket self.outwc := self.outwc + 1. } + method unwatchOutput { if (self.outwc > 0) @@ -439,11 +321,159 @@ extend Socket self.outwc := self.outwc - 1. if (self.outwc == 0) { - ##if (self.outsem notNil) { System unsignal: self.outsem }. + ## self.outsem must not be nil here. System unsignal: self.outsem. }. }. } + + method writeBytes: bytes offset: offset length: length + { + | oldact | + +####################################### +## TODO: if data still in progress, failure... or success while concatening the message? +## for a stream, concatening is not bad. but it's not good if the socket requires message boundary preservation. +###################################### + + if (self.outputReady) + { + if ((self _writeBytes: bytes offset: offset length: length) >= 0) { ^self }. + self.outputReady := false. + }. + + oldact := self.outputAction. + self.outputAction := [ :sck :state | + if (state) + { + if ((self _writeBytes: bytes offset: offset length: length) <= -1) + { + ## EAGAIN + self.outputReady := false. + ^self. + }. +## TODO: handle _writeBytes may not write in full. + }. + + self.outputAction := oldact. + self unwatchOutput. + ]. + + ## TODO: set timeout? + self watchOutput. + } + + method writeBytes: bytes + { + ^self writeBytes: bytes offset: 0 length: (bytes size) + } +} + +class Socket(AsyncHandle) from 'sck' +{ + method(#primitive) open(domain, type, proto). + method(#primitive) _close. + method(#primitive) bind: addr. + method(#primitive) _listen: backlog. + method(#primitive) accept: addr. + method(#primitive) _connect: addr. + method(#primitive) _socketError. + + method(#primitive) readBytes: bytes. + method(#primitive) _writeBytes: bytes. + method(#primitive) _writeBytes: bytes offset: offset length: length. +} + +(* TODO: generate these domain and type from the C header *) +pooldic Socket.Domain +{ + INET := 2. + INET6 := 10. +} + +pooldic Socket.Type +{ + STREAM := 1. + DGRAM := 2. +} + +extend Socket +{ + method(#class) new { self messageProhibited: #new } + method(#class) new: size { self messageProhibited: #new: } + + method(#class) domain: domain type: type + { + ^super new open(domain, type, 0). + } + + method listen: backlog do: acceptBlock + { + self.inputAction := acceptBlock. + self watchInput. + ^self _listen: backlog. + } + + method connect: target do: connectBlock + { + | conblk oldact | + + if ((self _connect: target) <= -1) + { + ## connection in progress + + oldact := self.outputAction. + self.outputAction := [ :sck :state | + | soerr | + + if (state) + { + ## i don't map a connection error to an exception. + ## it's not a typical exception. it is a normal failure + ## that is caused by an external system. + ## + ## or should i map an error to an exception? + ## i can treat EINPROGRESS, ECONNREFUSED as failure. + ## all other errors may get treated as an exception? + ## what about timeout??? + + soerr := self _socketError. + if (soerr >= 0) + { + ## finalize connection if not in progress + self.outputAction := oldact. + self unwatchOutput. + if (connectBlock notNil) + { + connectBlock value: sck value: (soerr == 0). + }. + }. + } + else + { + ## timed out + self.outputAction := oldact. + self unwatchOutput. + if (connectBlock notNil) + { + ## TODO: tri-state? success, failure, timeout? or boolean with extra error code + connectBlock value: sck value: false. + }. + }. + ]. + + ###self.outputTimeout: 10 do: xxxx. + self watchOutput. + } + else + { + ## connected immediately. + if (connectBlock notNil) + { + connectBlock value: self value: true. + } + } + } } class MyObject(Object) @@ -502,7 +532,7 @@ thisProcess terminate. | data n | (* end of data -> 0. -no data -> -1. (e.g. EINPROGRESS) +no data -> -1. (e.g. EAGAIN) has data -> 1 or more error -> exception *) @@ -531,7 +561,6 @@ error -> exception ## TODO: what should it accept as block parameter ## socket, output result? , output object? outact := [:sck :state | - if (state) { ## what if i want write more data??? @@ -547,9 +576,14 @@ error -> exception if (state) { 'CONNECTED NOW.............' dump. + + ###sck inputTimeout: 10; outputTimeout: 10; connectTimeout: 10. + + sck outputAction: outact. + sck writeBytes: #[ $h, $e, $l, $l, $o, $-, $m, $o, $o, C'\n' ]. + + sck inputAction: inact. sck watchInput. - sck writeBytes: #[ $h, $e, $l, $l, $o, $w, $o, C'\n' ]. - ###sck watchInput; watchOutput. } else { @@ -577,17 +611,14 @@ error -> exception | s s2 | [ s := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM. - s inputAction: inact; outputAction: outact. - s connectTo: (SocketAddress fromString: '127.0.0.1:9999') do: conact. + ##s connect: (SocketAddress fromString: '127.0.0.1:9999') do: conact. + s connect: (SocketAddress fromString: '127.0.0.1:9999') do: conact. - s2 := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM. - s2 bind: (SocketAddress fromString: '0.0.0.0:9998'). - ##s2 inputAction: accact. - ###s2 listen: 10; watchInput. - s2 listen: 10 do: accact. - -### when there is an exception something is not right.... -Exception signal: 'XXXXXXXXXX'. +## s2 := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM. +## s2 bind: (SocketAddress fromString: '0.0.0.0:9998'). +## ##s2 inputAction: accact. +## ###s2 listen: 10; watchInput. +## s2 listen: 10 do: accact. while (true) { diff --git a/moo/lib/exec.c b/moo/lib/exec.c index edaa541..1754706 100644 --- a/moo/lib/exec.c +++ b/moo/lib/exec.c @@ -4190,7 +4190,7 @@ static MOO_INLINE int switch_process_if_needed (moo_t* moo) { /* there exist suspended processes while no processes are runnable. * most likely, the running program contains process/semaphore related bugs */ - MOO_LOG1 (moo, MOO_LOG_IC | MOO_LOG_DEBUG, + MOO_LOG1 (moo, MOO_LOG_IC | MOO_LOG_WARN, "%zd suspended process(es) found - check your program\n", MOO_OOP_TO_SMOOI(moo->processor->suspended.count)); } diff --git a/moo/mod/sck.c b/moo/mod/sck.c index 3bbb144..7a94803 100644 --- a/moo/mod/sck.c +++ b/moo/mod/sck.c @@ -299,7 +299,7 @@ static moo_pfrc_t pf_listen_socket (moo_t* moo, moo_ooi_t nargs) } -static moo_pfrc_t pf_connect (moo_t* moo, moo_ooi_t nargs) +static moo_pfrc_t pf_connect_socket (moo_t* moo, moo_ooi_t nargs) { oop_sck_t sck; int fd, n; @@ -321,19 +321,23 @@ static moo_pfrc_t pf_connect (moo_t* moo, moo_ooi_t nargs) return MOO_PF_FAILURE; } - do + n = connect(fd, (struct sockaddr*)MOO_OBJ_GET_BYTE_SLOT(arg), moo_sck_addr_len((sck_addr_t*)MOO_OBJ_GET_BYTE_SLOT(arg))); + if (n == -1) { - n = connect(fd, (struct sockaddr*)MOO_OBJ_GET_BYTE_SLOT(arg), moo_sck_addr_len((sck_addr_t*)MOO_OBJ_GET_BYTE_SLOT(arg))); - } - while (n == -1 && errno == EINTR); - - if (n == -1 && errno != EINPROGRESS) - { - moo_seterrwithsyserr (moo, errno); - return MOO_PF_FAILURE; + if (errno == EINPROGRESS) + { + /* have the primitive function to return -1 */ + MOO_STACK_SETRET (moo, nargs, MOO_SMOOI_TO_OOP(-1)); + return MOO_PF_SUCCESS; + } + else + { + moo_seterrwithsyserr (moo, errno); + return MOO_PF_FAILURE; + } } - MOO_STACK_SETRETTORCV (moo, nargs); + MOO_STACK_SETRET (moo, nargs, MOO_SMOOI_TO_OOP(0)); return MOO_PF_SUCCESS; } @@ -364,7 +368,8 @@ static moo_pfrc_t pf_get_socket_error (moo_t* moo, moo_ooi_t nargs) return MOO_PF_FAILURE; } - /* if ret == EINPROGRESS .. it's in progress */ + if (ret == EINPROGRESS) ret = -1; /* map EINPROGRESS to -1. all others are returned without change */ + MOO_STACK_SETRET (moo, nargs, MOO_SMOOI_TO_OOP(ret)); return MOO_PF_SUCCESS; } @@ -505,7 +510,7 @@ static moo_pfinfo_t pfinfos[] = { I, { 'a','c','c','e','p','t',':','\0' }, 0, { pf_accept_socket, 1, 1 } }, { I, { 'b','i','n','d',':','\0' }, 0, { pf_bind_socket, 1, 1 } }, { I, { 'c','l','o','s','e','\0' }, 0, { pf_close_socket, 0, 0 } }, - { I, { 'c','o','n','n','e','c','t',':','\0' }, 0, { pf_connect, 1, 1 } }, + { I, { 'c','o','n','n','e','c','t',':','\0' }, 0, { pf_connect_socket, 1, 1 } }, { I, { 'l','i','s','t','e','n',':','\0' }, 0, { pf_listen_socket, 1, 1 } }, { I, { 'o','p','e','n','\0' }, 0, { pf_open_socket, 3, 3 } }, { I, { 'r','e','a','d','B','y','t','e','s',':','\0' }, 0, { pf_read_socket, 1, 1 } },