changed Socket to work in the edge trigger mode

This commit is contained in:
hyunghwan.chung 2018-01-28 15:31:23 +00:00
parent 1bae32bf30
commit 7f7f9e2977
3 changed files with 38 additions and 16 deletions

View File

@ -234,6 +234,7 @@ class Socket(Object) from 'sck'
var inwc := 0, outwc := 0. ## input watcher count and ouput watcher count var inwc := 0, outwc := 0. ## input watcher count and ouput watcher count
var insem, outsem. var insem, outsem.
var(#get,#set) inputAction, outputAction. var(#get,#set) inputAction, outputAction.
var(#get) inputReady := false, outputReady := false.
method(#primitive) open(domain, type, proto). method(#primitive) open(domain, type, proto).
method(#primitive) _close. method(#primitive) _close.
@ -339,9 +340,20 @@ extend Socket
{ {
| old_output_action | | old_output_action |
if (self.outputReady)
{
if ((self _writeBytes: bytes) >= 0) { ^self }.
self.outputReady := false.
}.
old_output_action := self.outputAction. old_output_action := self.outputAction.
self.outputAction := [ :sck :state | self.outputAction := [ :sck :state |
self _writeBytes: bytes. if ((self _writeBytes: bytes) <= -1)
{
## EAGAIN
self.outputReady := false.
^self.
}.
## TODO: handle _writeBytes may not write in full. ## TODO: handle _writeBytes may not write in full.
## restore the output action block before executing the previous ## restore the output action block before executing the previous
@ -364,9 +376,13 @@ extend Socket
if (self.insem isNil) if (self.insem isNil)
{ {
self.insem := Semaphore new. self.insem := Semaphore new.
self.insem signalAction: [:sem | self.inputAction value: self value: true]. self.insem signalAction: [:sem |
self.inputReady := true.
self.inputAction value: self value: true
].
System addAsyncSemaphore: self.insem. System addAsyncSemaphore: self.insem.
}. }.
self.inputReady := false.
System signal: self.insem onInput: self.handle System signal: self.insem onInput: self.handle
}. }.
self.inwc := self.inwc + 1. self.inwc := self.inwc + 1.
@ -392,9 +408,13 @@ extend Socket
if (self.outsem isNil) if (self.outsem isNil)
{ {
self.outsem := Semaphore new. self.outsem := Semaphore new.
self.outsem signalAction: [:sem | self.outputAction value: self value: true]. self.outsem signalAction: [:sem |
self.outputReady := true.
self.outputAction value: self value: true
].
System addAsyncSemaphore: self.outsem. System addAsyncSemaphore: self.outsem.
}. }.
self.outputReady := false.
System signal: self.outsem onOutput: self.handle. System signal: self.outsem onOutput: self.handle.
}. }.
self.outwc := self.outwc + 1. self.outwc := self.outwc + 1.

View File

@ -859,7 +859,7 @@ static int _add_poll_fd (moo_t* moo, int fd, int event_mask)
/* epoll_wait may return again if the worker thread consumes events. /* epoll_wait may return again if the worker thread consumes events.
* switch to level-trigger. */ * switch to level-trigger. */
/* TODO: verify if EPOLLLET is desired */ /* TODO: verify if EPOLLLET is desired */
//ev.events |= EPOLLET; ev.events |= EPOLLET;
#endif #endif
/*ev.data.ptr = (void*)event_data;*/ /*ev.data.ptr = (void*)event_data;*/
ev.data.fd = fd; ev.data.fd = fd;
@ -1043,7 +1043,7 @@ static int _mod_poll_fd (moo_t* moo, int fd, int event_mask)
/* epoll_wait may return again if the worker thread consumes events. /* epoll_wait may return again if the worker thread consumes events.
* switch to level-trigger. */ * switch to level-trigger. */
/* TODO: verify if EPOLLLET is desired */ /* TODO: verify if EPOLLLET is desired */
//ev.events |= EPOLLET; ev.events |= EPOLLET;
#endif #endif
ev.data.fd = fd; ev.data.fd = fd;
if (epoll_ctl (xtn->ep, EPOLL_CTL_MOD, fd, &ev) == -1) if (epoll_ctl (xtn->ep, EPOLL_CTL_MOD, fd, &ev) == -1)
@ -1357,7 +1357,7 @@ static int vm_muxadd (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask)
{ {
int event_mask; int event_mask;
event_mask = 0; /*EPOLLET; */ /* TODO: use edge trigger(EPOLLLET)? */ event_mask = 0;
if (mask & MOO_SEMAPHORE_IO_MASK_INPUT) event_mask |= XPOLLIN; if (mask & MOO_SEMAPHORE_IO_MASK_INPUT) event_mask |= XPOLLIN;
if (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT) event_mask |= XPOLLOUT; if (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT) event_mask |= XPOLLOUT;
@ -1368,14 +1368,14 @@ static int vm_muxadd (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask)
return -1; return -1;
} }
return _add_poll_fd (moo, io_handle, event_mask); return _add_poll_fd(moo, io_handle, event_mask);
} }
static int vm_muxmod (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask) static int vm_muxmod (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask)
{ {
int event_mask; int event_mask;
event_mask = 0; /*EPOLLET; */ /* TODO: use edge trigger(EPOLLLET)? */ event_mask = 0;
if (mask & MOO_SEMAPHORE_IO_MASK_INPUT) event_mask |= XPOLLIN; if (mask & MOO_SEMAPHORE_IO_MASK_INPUT) event_mask |= XPOLLIN;
if (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT) event_mask |= XPOLLOUT; if (mask & MOO_SEMAPHORE_IO_MASK_OUTPUT) event_mask |= XPOLLOUT;
@ -1386,12 +1386,12 @@ static int vm_muxmod (moo_t* moo, moo_ooi_t io_handle, moo_ooi_t mask)
return -1; return -1;
} }
return _mod_poll_fd (moo, io_handle, event_mask); return _mod_poll_fd(moo, io_handle, event_mask);
} }
static int vm_muxdel (moo_t* moo, moo_ooi_t io_handle) static int vm_muxdel (moo_t* moo, moo_ooi_t io_handle)
{ {
return _del_poll_fd (moo, io_handle); return _del_poll_fd(moo, io_handle);
} }
#if defined(USE_THREAD) #if defined(USE_THREAD)

View File

@ -396,13 +396,15 @@ static moo_pfrc_t pf_read_socket (moo_t* moo, moo_ooi_t nargs)
return MOO_PF_FAILURE; return MOO_PF_FAILURE;
} }
n = recv (fd, MOO_OBJ_GET_BYTE_SLOT(buf), MOO_OBJ_GET_SIZE(buf), 0); n = recv(fd, MOO_OBJ_GET_BYTE_SLOT(buf), MOO_OBJ_GET_SIZE(buf), 0);
if (n <= -1 && errno != EWOULDBLOCK) if (n <= -1 && errno != EWOULDBLOCK && errno != EAGAIN)
{ {
moo_seterrwithsyserr (moo, errno); moo_seterrwithsyserr (moo, errno);
return MOO_PF_FAILURE; return MOO_PF_FAILURE;
} }
/* NOTE: on EWOULDBLOCK or EGAIN, -1 is returned */
MOO_ASSERT (moo, MOO_IN_SMOOI_RANGE(n)); MOO_ASSERT (moo, MOO_IN_SMOOI_RANGE(n));
MOO_STACK_SETRET (moo, nargs, MOO_SMOOI_TO_OOP(n)); MOO_STACK_SETRET (moo, nargs, MOO_SMOOI_TO_OOP(n));
@ -438,7 +440,6 @@ static moo_pfrc_t pf_write_socket (moo_t* moo, moo_ooi_t nargs)
return MOO_PF_FAILURE; return MOO_PF_FAILURE;
} }
offset = 0; offset = 0;
maxlen = MOO_OBJ_GET_SIZE(buf); maxlen = MOO_OBJ_GET_SIZE(buf);
length = maxlen; length = maxlen;
@ -457,7 +458,7 @@ static moo_pfrc_t pf_write_socket (moo_t* moo, moo_ooi_t nargs)
if (nargs >= 3) if (nargs >= 3)
{ {
tmp = MOO_STACK_GETARG(moo, nargs, 2); tmp = MOO_STACK_GETARG(moo, nargs, 2);
if (moo_inttooow (moo, tmp, &length) <= 0) if (moo_inttooow(moo, tmp, &length) <= 0)
{ {
moo_seterrbfmt (moo, MOO_EINVAL, "invalid length - %O", tmp); moo_seterrbfmt (moo, MOO_EINVAL, "invalid length - %O", tmp);
return MOO_PF_FAILURE; return MOO_PF_FAILURE;
@ -468,13 +469,14 @@ static moo_pfrc_t pf_write_socket (moo_t* moo, moo_ooi_t nargs)
if (length > maxlen - offset) length = maxlen - offset; if (length > maxlen - offset) length = maxlen - offset;
} }
n = send (fd, &MOO_OBJ_GET_BYTE_SLOT(buf)[offset], length, 0); n = send(fd, &MOO_OBJ_GET_BYTE_SLOT(buf)[offset], length, 0);
if (n <= -1 && errno != EWOULDBLOCK) if (n <= -1 && errno != EWOULDBLOCK && errno != EAGAIN)
{ {
moo_seterrwithsyserr (moo, errno); moo_seterrwithsyserr (moo, errno);
return MOO_PF_FAILURE; return MOO_PF_FAILURE;
} }
/* NOTE: on EWOULDBLOCK or EGAIN, -1 is returned */
MOO_ASSERT (moo, MOO_IN_SMOOI_RANGE(n)); MOO_ASSERT (moo, MOO_IN_SMOOI_RANGE(n));
MOO_STACK_SETRET (moo, nargs, MOO_SMOOI_TO_OOP(n)); MOO_STACK_SETRET (moo, nargs, MOO_SMOOI_TO_OOP(n));