diff --git a/moo/kernel/Socket.moo b/moo/kernel/Socket.moo index 29c7a7e..ba4db53 100644 --- a/moo/kernel/Socket.moo +++ b/moo/kernel/Socket.moo @@ -234,6 +234,7 @@ class Socket(Object) from 'sck' var inwc := 0, outwc := 0. ## input watcher count and ouput watcher count var insem, outsem. var(#get,#set) inputAction, outputAction. + var(#get) inputReady := false, outputReady := false. method(#primitive) open(domain, type, proto). method(#primitive) _close. @@ -339,9 +340,20 @@ extend Socket { | old_output_action | + if (self.outputReady) + { + if ((self _writeBytes: bytes) >= 0) { ^self }. + self.outputReady := false. + }. + old_output_action := self.outputAction. self.outputAction := [ :sck :state | - self _writeBytes: bytes. + if ((self _writeBytes: bytes) <= -1) + { + ## EAGAIN + self.outputReady := false. + ^self. + }. ## TODO: handle _writeBytes may not write in full. ## restore the output action block before executing the previous @@ -364,9 +376,13 @@ extend Socket if (self.insem isNil) { self.insem := Semaphore new. - self.insem signalAction: [:sem | self.inputAction value: self value: true]. + self.insem signalAction: [:sem | + self.inputReady := true. + self.inputAction value: self value: true + ]. System addAsyncSemaphore: self.insem. }. + self.inputReady := false. System signal: self.insem onInput: self.handle }. self.inwc := self.inwc + 1. @@ -392,9 +408,13 @@ extend Socket if (self.outsem isNil) { self.outsem := Semaphore new. - self.outsem signalAction: [:sem | self.outputAction value: self value: true]. + self.outsem signalAction: [:sem | + self.outputReady := true. + self.outputAction value: self value: true + ]. System addAsyncSemaphore: self.outsem. }. + self.outputReady := false. System signal: self.outsem onOutput: self.handle. }. self.outwc := self.outwc + 1. diff --git a/moo/lib/main.c b/moo/lib/main.c index 038a1a2..fbbfef5 100644 --- a/moo/lib/main.c +++ b/moo/lib/main.c @@ -859,7 +859,7 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask) /* epoll_wait may return again if the worker thread consumes events. * switch to level-trigger. */ /* TODO: verify if EPOLLLET is desired */ - //ev.events |= EPOLLET; + ev.events |= EPOLLET; #endif /*ev.data.ptr = (void*)event_data;*/ ev.data.fd = fd; @@ -1043,7 +1043,7 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask) /* epoll_wait may return again if the worker thread consumes events. * switch to level-trigger. */ /* TODO: verify if EPOLLLET is desired */ - //ev.events |= EPOLLET; + ev.events |= EPOLLET; #endif ev.data.fd = fd; if (epoll_ctl (xtn->ep, EPOLL_CTL_MOD, fd, &ev) == -1) @@ -1357,7 +1357,7 @@ static int vm_muxadd (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask) { int event_mask; - event_mask = 0; /*EPOLLET; */ /* TODO: use edge trigger(EPOLLLET)? */ + event_mask = 0; if (mask & MOO_SEMAPHORE_IO_MASK_INPUT) event_mask |= XPOLLIN; if (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT) event_mask |= XPOLLOUT; @@ -1368,14 +1368,14 @@ static int vm_muxadd (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask) return -1; } - return _add_poll_fd (moo, io_handle, event_mask); + return _add_poll_fd(moo, io_handle, event_mask); } static int vm_muxmod (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask) { int event_mask; - event_mask = 0; /*EPOLLET; */ /* TODO: use edge trigger(EPOLLLET)? */ + event_mask = 0; if (mask & MOO_SEMAPHORE_IO_MASK_INPUT) event_mask |= XPOLLIN; if (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT) event_mask |= XPOLLOUT; @@ -1386,12 +1386,12 @@ static int vm_muxmod (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask) return -1; } - return _mod_poll_fd (moo, io_handle, event_mask); + return _mod_poll_fd(moo, io_handle, event_mask); } static int vm_muxdel (moo_t* moo, moo_ooi_t io_handle) { - return _del_poll_fd (moo, io_handle); + return _del_poll_fd(moo, io_handle); } #if defined(USE_THREAD) diff --git a/moo/mod/sck.c b/moo/mod/sck.c index 132221b..9df0284 100644 --- a/moo/mod/sck.c +++ b/moo/mod/sck.c @@ -396,13 +396,15 @@ static moo_pfrc_t pf_read_socket (moo_t* moo, moo_ooi_t nargs) return MOO_PF_FAILURE; } - n = recv (fd, MOO_OBJ_GET_BYTE_SLOT(buf), MOO_OBJ_GET_SIZE(buf), 0); - if (n <= -1 && errno != EWOULDBLOCK) + n = recv(fd, MOO_OBJ_GET_BYTE_SLOT(buf), MOO_OBJ_GET_SIZE(buf), 0); + if (n <= -1 && errno != EWOULDBLOCK && errno != EAGAIN) { moo_seterrwithsyserr (moo, errno); return MOO_PF_FAILURE; } + /* NOTE: on EWOULDBLOCK or EGAIN, -1 is returned */ + MOO_ASSERT (moo, MOO_IN_SMOOI_RANGE(n)); MOO_STACK_SETRET (moo, nargs, MOO_SMOOI_TO_OOP(n)); @@ -438,7 +440,6 @@ static moo_pfrc_t pf_write_socket (moo_t* moo, moo_ooi_t nargs) return MOO_PF_FAILURE; } - offset = 0; maxlen = MOO_OBJ_GET_SIZE(buf); length = maxlen; @@ -457,7 +458,7 @@ static moo_pfrc_t pf_write_socket (moo_t* moo, moo_ooi_t nargs) if (nargs >= 3) { tmp = MOO_STACK_GETARG(moo, nargs, 2); - if (moo_inttooow (moo, tmp, &length) <= 0) + if (moo_inttooow(moo, tmp, &length) <= 0) { moo_seterrbfmt (moo, MOO_EINVAL, "invalid length - %O", tmp); return MOO_PF_FAILURE; @@ -468,13 +469,14 @@ static moo_pfrc_t pf_write_socket (moo_t* moo, moo_ooi_t nargs) if (length > maxlen - offset) length = maxlen - offset; } - n = send (fd, &MOO_OBJ_GET_BYTE_SLOT(buf)[offset], length, 0); - if (n <= -1 && errno != EWOULDBLOCK) + n = send(fd, &MOO_OBJ_GET_BYTE_SLOT(buf)[offset], length, 0); + if (n <= -1 && errno != EWOULDBLOCK && errno != EAGAIN) { moo_seterrwithsyserr (moo, errno); return MOO_PF_FAILURE; } + /* NOTE: on EWOULDBLOCK or EGAIN, -1 is returned */ MOO_ASSERT (moo, MOO_IN_SMOOI_RANGE(n)); MOO_STACK_SETRET (moo, nargs, MOO_SMOOI_TO_OOP(n));