diff --git a/moo/kernel/Http.moo b/moo/kernel/Http.moo index 9c502a2..cf94f90 100644 --- a/moo/kernel/Http.moo +++ b/moo/kernel/Http.moo @@ -134,7 +134,7 @@ class HttpSocket(SyncSocket) } } -class HttpListener(ServerSocket) +class HttpListener(AsyncServerSocket) { var(#get) server := nil. var(#get) rid := -1. @@ -274,7 +274,7 @@ class MyObject(Object) method(#class) start_server_socket { | s2 buf | - s2 := ServerSocket family: Socket.Family.INET type: Socket.Type.STREAM. + s2 := AsyncServerSocket family: Socket.Family.INET type: Socket.Type.STREAM. buf := ByteArray new: 128. s2 onEvent: #accepted do: [ :sck :clisck :cliaddr | @@ -311,7 +311,7 @@ class MyObject(Object) method(#class) start_client_socket { | s buf count | - s := ClientSocket family: Socket.Family.INET type: Socket.Type.STREAM. + s := AsyncClientSocket family: Socket.Family.INET type: Socket.Type.STREAM. buf := ByteArray new: 128. count := 0. @@ -407,7 +407,17 @@ class MyObject(Object) method(#class) main { - | httpd | + | httpd addr | + +(* +[ +addr := SocketAddress fromString: '1.2.3.4:5555'. +##addr := SocketAddress fromString: '127.0.0.1:22'. +httpd := SyncSocket family: (addr family) type: Socket.Type.STREAM. +httpd timeout: 5. +httpd connect: addr. +] on: Exception do: [:ex | ]. +*) [ self another_proc: 5000 ] fork. [ self another_proc: 5100 ] fork. @@ -434,7 +444,6 @@ class MyObject(Object) ] on: Exception do: [:ex | ('Exception - ' & ex messageText) dump]. - '----- END OF MAIN ------' dump. } } diff --git a/moo/kernel/Socket.moo b/moo/kernel/Socket.moo index d47c988..304c685 100644 --- a/moo/kernel/Socket.moo +++ b/moo/kernel/Socket.moo @@ -225,7 +225,7 @@ class(#byte) SocketAddress(Object) from 'sck.addr' } } -class CoreSocket(Object) from 'sck' +class Socket(Object) from 'sck' { ## the handle must be the first field in this object to match ## the internal representation used by various modules. (e.g. sck) @@ -243,11 +243,28 @@ class CoreSocket(Object) from 'sck' method(#primitive) _connect: addr. method(#primitive) _socketError. - method(#primitive) readBytes: bytes. - method(#primitive) readBytes: bytes offset: offset length: length. + method(#primitive) _readBytes: bytes. + method(#primitive) _readBytes: bytes offset: offset length: length. method(#primitive) _writeBytes: bytes. method(#primitive) _writeBytes: bytes offset: offset length: length. + method(#class) new { self messageProhibited: #new } + method(#class) new: size { self messageProhibited: #new: } + + method(#class) __with: handle + { + ###self addToBeFinalized. + ^(self _basicNew initialize) __open(handle) + } + + method(#class) family: family type: type + { + ###self addToBeFinalized. + + ## new is prohibited. so use _basicNew with initialize. + ##^(self new) open(family, type, 0) + ^(self _basicNew initialize) open(family, type, 0) + } method close { @@ -267,7 +284,20 @@ class CoreSocket(Object) from 'sck' } } -class SyncSocket(CoreSocket) +(* TODO: generate these family and type from the C header *) +pooldic Socket.Family +{ + INET := 2. + INET6 := 10. +} + +pooldic Socket.Type +{ + STREAM := 1. + DGRAM := 2. +} + +class SyncSocket(Socket) { var iosem, tmoutsem, sg. var tmoutsecs, tmoutnsecs. @@ -335,12 +365,33 @@ class SyncSocket(CoreSocket) if (s == self.tmoutsem) { Exception signal: 'timed out' }. } + method connect: addr + { + | soerr | + + ## an exception is thrown upon exception failure. + if ((super _connect: addr) <= -1) + { + ## connection in progress + while (true) + { + self __wait_for_output. + soerr := self _socketError. + if (soerr == 0) { break } + elsif (soerr > 0) + { + Exception signal: ('unable to connect - error ' & soerr asString). + }. + }. + }. + } + method readBytes: bytes { | n | while (true) { - n := super readBytes: bytes. + n := super _readBytes: bytes. if (n >= 0) { ^n }. self __wait_for_input. } @@ -351,7 +402,7 @@ class SyncSocket(CoreSocket) | n | while (true) { - n := super readBytes: bytes offset: offset length: length. + n := super _readBytes: bytes offset: offset length: length. if (n >= 0) { ^n }. self __wait_for_input. } @@ -380,44 +431,10 @@ class SyncSocket(CoreSocket) } } -class Socket(CoreSocket) +class AsyncSocket(Socket) { var pending_bytes, pending_offset, pending_length. var outreadysem, outdonesem, inreadysem. -} - -(* TODO: generate these family and type from the C header *) -pooldic Socket.Family -{ - INET := 2. - INET6 := 10. -} - -pooldic Socket.Type -{ - STREAM := 1. - DGRAM := 2. -} - -extend Socket -{ - method(#class) new { self messageProhibited: #new } - method(#class) new: size { self messageProhibited: #new: } - - method(#class) __with: handle - { - ###self addToBeFinalized. - ^(self _basicNew initialize) __open(handle) - } - - method(#class) family: family type: type - { - ###self addToBeFinalized. - - ## new is prohibited. so use _basicNew with initialize. - ##^(self new) open(family, type, 0) - ^(self _basicNew initialize) open(family, type, 0) - } (* ------------------- socket call-back methods @@ -512,6 +529,16 @@ extend Socket thisProcess addAsyncSemaphore: self.outdonesem. } + method readBytes: bytes + { + ^super _readBytes: bytes. + } + + method readBytes: bytes offset: offset length: length + { + ^super _readBytes: bytes offset: offset length: length. + } + method writeBytes: bytes offset: offset length: length { | n pos rem | @@ -569,7 +596,7 @@ extend Socket } } -class ClientSocket(Socket) +class AsyncClientSocket(AsyncSocket) { var(#get) connectedEventAction. var connsem. @@ -633,7 +660,7 @@ class ClientSocket(Socket) } } -class ServerSocket(Socket) +class AsyncServerSocket(AsyncSocket) { method initialize { diff --git a/moo/lib/exec.c b/moo/lib/exec.c index aa83912..2d86264 100644 --- a/moo/lib/exec.c +++ b/moo/lib/exec.c @@ -815,7 +815,9 @@ static moo_oop_process_t signal_semaphore (moo_t* moo, moo_oop_semaphore_t sem) MOO_ASSERT (moo, MOO_OOP_TO_SMOOI(proc->sp) < (moo_ooi_t)(MOO_OBJ_GET_SIZE(proc) - MOO_PROCESS_NAMED_INSTVARS)); proc->slot[MOO_OOP_TO_SMOOI(proc->sp)] = (moo_oop_t)sem; - if (sem->subtype == MOO_SMOOI_TO_OOP(MOO_SEMAPHORE_SUBTYPE_IO)) moo->sem_io_wait_count--; + /* i should decrement the counter as long as the group being + * signaled contains an IO semaphore */ + if (MOO_OOP_TO_SMOOI(sg->sem_io_count) > 0) moo->sem_io_wait_count--; return proc; } } @@ -881,7 +883,7 @@ static MOO_INLINE void await_semaphore (moo_t* moo, moo_oop_semaphore_t sem) semgrp = sem->group; - /* the caller of this function ensure that the semaphore doesn't belong to a group */ + /* the caller of this function must ensure that the semaphore doesn't belong to a group */ MOO_ASSERT (moo, (moo_oop_t)semgrp == moo->_nil); count = MOO_OOP_TO_SMOOI(sem->count);