renamed various socket functions

fixed a buggy condition that prevented moo->sem_io_wait_count from being decremented in signal_semaphore()
This commit is contained in:
hyunghwan.chung 2018-05-18 09:08:05 +00:00
parent 4da0731312
commit 4a73109340
3 changed files with 88 additions and 50 deletions

View File

@ -134,7 +134,7 @@ class HttpSocket(SyncSocket)
} }
} }
class HttpListener(ServerSocket) class HttpListener(AsyncServerSocket)
{ {
var(#get) server := nil. var(#get) server := nil.
var(#get) rid := -1. var(#get) rid := -1.
@ -274,7 +274,7 @@ class MyObject(Object)
method(#class) start_server_socket method(#class) start_server_socket
{ {
| s2 buf | | s2 buf |
s2 := ServerSocket family: Socket.Family.INET type: Socket.Type.STREAM. s2 := AsyncServerSocket family: Socket.Family.INET type: Socket.Type.STREAM.
buf := ByteArray new: 128. buf := ByteArray new: 128.
s2 onEvent: #accepted do: [ :sck :clisck :cliaddr | s2 onEvent: #accepted do: [ :sck :clisck :cliaddr |
@ -311,7 +311,7 @@ class MyObject(Object)
method(#class) start_client_socket method(#class) start_client_socket
{ {
| s buf count | | s buf count |
s := ClientSocket family: Socket.Family.INET type: Socket.Type.STREAM. s := AsyncClientSocket family: Socket.Family.INET type: Socket.Type.STREAM.
buf := ByteArray new: 128. buf := ByteArray new: 128.
count := 0. count := 0.
@ -407,7 +407,17 @@ class MyObject(Object)
method(#class) main method(#class) main
{ {
| httpd | | httpd addr |
(*
[
addr := SocketAddress fromString: '1.2.3.4:5555'.
##addr := SocketAddress fromString: '127.0.0.1:22'.
httpd := SyncSocket family: (addr family) type: Socket.Type.STREAM.
httpd timeout: 5.
httpd connect: addr.
] on: Exception do: [:ex | ].
*)
[ self another_proc: 5000 ] fork. [ self another_proc: 5000 ] fork.
[ self another_proc: 5100 ] fork. [ self another_proc: 5100 ] fork.
@ -434,7 +444,6 @@ class MyObject(Object)
] on: Exception do: [:ex | ('Exception - ' & ex messageText) dump]. ] on: Exception do: [:ex | ('Exception - ' & ex messageText) dump].
'----- END OF MAIN ------' dump. '----- END OF MAIN ------' dump.
} }
} }

View File

@ -225,7 +225,7 @@ class(#byte) SocketAddress(Object) from 'sck.addr'
} }
} }
class CoreSocket(Object) from 'sck' class Socket(Object) from 'sck'
{ {
## the handle must be the first field in this object to match ## the handle must be the first field in this object to match
## the internal representation used by various modules. (e.g. sck) ## the internal representation used by various modules. (e.g. sck)
@ -243,11 +243,28 @@ class CoreSocket(Object) from 'sck'
method(#primitive) _connect: addr. method(#primitive) _connect: addr.
method(#primitive) _socketError. method(#primitive) _socketError.
method(#primitive) readBytes: bytes. method(#primitive) _readBytes: bytes.
method(#primitive) readBytes: bytes offset: offset length: length. method(#primitive) _readBytes: bytes offset: offset length: length.
method(#primitive) _writeBytes: bytes. method(#primitive) _writeBytes: bytes.
method(#primitive) _writeBytes: bytes offset: offset length: length. method(#primitive) _writeBytes: bytes offset: offset length: length.
method(#class) new { self messageProhibited: #new }
method(#class) new: size { self messageProhibited: #new: }
method(#class) __with: handle
{
###self addToBeFinalized.
^(self _basicNew initialize) __open(handle)
}
method(#class) family: family type: type
{
###self addToBeFinalized.
## new is prohibited. so use _basicNew with initialize.
##^(self new) open(family, type, 0)
^(self _basicNew initialize) open(family, type, 0)
}
method close method close
{ {
@ -267,7 +284,20 @@ class CoreSocket(Object) from 'sck'
} }
} }
class SyncSocket(CoreSocket) (* TODO: generate these family and type from the C header *)
pooldic Socket.Family
{
INET := 2.
INET6 := 10.
}
pooldic Socket.Type
{
STREAM := 1.
DGRAM := 2.
}
class SyncSocket(Socket)
{ {
var iosem, tmoutsem, sg. var iosem, tmoutsem, sg.
var tmoutsecs, tmoutnsecs. var tmoutsecs, tmoutnsecs.
@ -335,12 +365,33 @@ class SyncSocket(CoreSocket)
if (s == self.tmoutsem) { Exception signal: 'timed out' }. if (s == self.tmoutsem) { Exception signal: 'timed out' }.
} }
method connect: addr
{
| soerr |
## an exception is thrown upon exception failure.
if ((super _connect: addr) <= -1)
{
## connection in progress
while (true)
{
self __wait_for_output.
soerr := self _socketError.
if (soerr == 0) { break }
elsif (soerr > 0)
{
Exception signal: ('unable to connect - error ' & soerr asString).
}.
}.
}.
}
method readBytes: bytes method readBytes: bytes
{ {
| n | | n |
while (true) while (true)
{ {
n := super readBytes: bytes. n := super _readBytes: bytes.
if (n >= 0) { ^n }. if (n >= 0) { ^n }.
self __wait_for_input. self __wait_for_input.
} }
@ -351,7 +402,7 @@ class SyncSocket(CoreSocket)
| n | | n |
while (true) while (true)
{ {
n := super readBytes: bytes offset: offset length: length. n := super _readBytes: bytes offset: offset length: length.
if (n >= 0) { ^n }. if (n >= 0) { ^n }.
self __wait_for_input. self __wait_for_input.
} }
@ -380,44 +431,10 @@ class SyncSocket(CoreSocket)
} }
} }
class Socket(CoreSocket) class AsyncSocket(Socket)
{ {
var pending_bytes, pending_offset, pending_length. var pending_bytes, pending_offset, pending_length.
var outreadysem, outdonesem, inreadysem. var outreadysem, outdonesem, inreadysem.
}
(* TODO: generate these family and type from the C header *)
pooldic Socket.Family
{
INET := 2.
INET6 := 10.
}
pooldic Socket.Type
{
STREAM := 1.
DGRAM := 2.
}
extend Socket
{
method(#class) new { self messageProhibited: #new }
method(#class) new: size { self messageProhibited: #new: }
method(#class) __with: handle
{
###self addToBeFinalized.
^(self _basicNew initialize) __open(handle)
}
method(#class) family: family type: type
{
###self addToBeFinalized.
## new is prohibited. so use _basicNew with initialize.
##^(self new) open(family, type, 0)
^(self _basicNew initialize) open(family, type, 0)
}
(* ------------------- (* -------------------
socket call-back methods socket call-back methods
@ -512,6 +529,16 @@ extend Socket
thisProcess addAsyncSemaphore: self.outdonesem. thisProcess addAsyncSemaphore: self.outdonesem.
} }
method readBytes: bytes
{
^super _readBytes: bytes.
}
method readBytes: bytes offset: offset length: length
{
^super _readBytes: bytes offset: offset length: length.
}
method writeBytes: bytes offset: offset length: length method writeBytes: bytes offset: offset length: length
{ {
| n pos rem | | n pos rem |
@ -569,7 +596,7 @@ extend Socket
} }
} }
class ClientSocket(Socket) class AsyncClientSocket(AsyncSocket)
{ {
var(#get) connectedEventAction. var(#get) connectedEventAction.
var connsem. var connsem.
@ -633,7 +660,7 @@ class ClientSocket(Socket)
} }
} }
class ServerSocket(Socket) class AsyncServerSocket(AsyncSocket)
{ {
method initialize method initialize
{ {

View File

@ -815,7 +815,9 @@ static moo_oop_process_t signal_semaphore (moo_t* moo, moo_oop_semaphore_t sem)
MOO_ASSERT (moo, MOO_OOP_TO_SMOOI(proc->sp) < (moo_ooi_t)(MOO_OBJ_GET_SIZE(proc) - MOO_PROCESS_NAMED_INSTVARS)); MOO_ASSERT (moo, MOO_OOP_TO_SMOOI(proc->sp) < (moo_ooi_t)(MOO_OBJ_GET_SIZE(proc) - MOO_PROCESS_NAMED_INSTVARS));
proc->slot[MOO_OOP_TO_SMOOI(proc->sp)] = (moo_oop_t)sem; proc->slot[MOO_OOP_TO_SMOOI(proc->sp)] = (moo_oop_t)sem;
if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO)) moo->sem_io_wait_count--; /* i should decrement the counter as long as the group being
* signaled contains an IO semaphore */
if (MOO_OOP_TO_SMOOI(sg->sem_io_count) > 0) moo->sem_io_wait_count--;
return proc; return proc;
} }
} }
@ -881,7 +883,7 @@ static MOO_INLINE void await_semaphore (moo_t* moo, moo_oop_semaphore_t sem)
semgrp = sem->group; semgrp = sem->group;
/* the caller of this function ensure that the semaphore doesn't belong to a group */ /* the caller of this function must ensure that the semaphore doesn't belong to a group */
MOO_ASSERT (moo, (moo_oop_t)semgrp == moo->_nil); MOO_ASSERT (moo, (moo_oop_t)semgrp == moo->_nil);
count = MOO_OOP_TO_SMOOI(sem->count); count = MOO_OOP_TO_SMOOI(sem->count);