moo/kernel/Socket.moo

810 lines
16 KiB
Smalltalk
Raw Permalink Normal View History

#include 'Moo.moo'.
//interface IPAddressInterface
//{
//}
// class XXX(Object,IPAddressInterface) {}
class(#byte) IPAddress(Object)
{
}
//# TODO: extend the compiler
//# #byte(4) basic size if 4 bytes. basicNew: xxx creates an instance of the size 4 + xxx.
//# -> extend to support fixed 4 bytes by throwing an error in basicNew:.
//# -> #byte(4,fixed)?
//# -> #byte -> byte variable/flexible
//# -> #byte(4) -> byte variable with the mimimum size of 4
//# -> (TODO)-> #byte(4,10) -> byte variable with the mimum size of 4 and maximum size of 10 => basicNew: should be allowed with upto 6.
//# -> #byte(4,4) -> it can emulated fixed byte size. -> do i have space in spec to store the upper bound?
2018-01-24 10:31:34 +00:00
class(#byte(4)) IP4Address(IPAddress)
{
/*method(#class) new
{
^self basicNew: 4.
}*/
method(#class) fromString: str
{
^self new fromString: str.
}
2019-10-23 16:40:02 +00:00
method __fromString: str startingAt: string_offset into: address_offset
{
| dots digits pos size c acc |
pos := string_offset.
size := str size.
acc := 0.
digits := 0.
dots := 0.
do
{
if (pos >= size)
{
if (dots < 3 or: [digits == 0]) { ^Error.Code.EINVAL }.
self basicAt: (dots + address_offset) put: acc.
break.
}.
c := str at: pos.
pos := pos + 1.
if ((c >= $0) and (c <= $9))
{
acc := acc * 10 + (c asInteger - $0 asInteger).
if (acc > 255) { Exception signal: ('invalid IPv4 address B ' & str). }.
digits := digits + 1.
}
elif (c == $.)
{
if (dots >= 3 or: [digits == 0]) { ^Error.Code.EINVAL }.
self basicAt: (dots + address_offset) put: acc.
dots := dots + 1.
acc := 0.
digits := 0.
}
else
{
^Error.Code.EINVAL
//# goto @label@.
}.
}
while (true).
^self.
/*
(@label@)
Exception signal: ('invalid IPv4 address ' & str).
*/
}
method fromString: str
{
2019-10-23 16:40:02 +00:00
if ((self __fromString: str startingAt: 0 into: 0) isError)
{
Exception signal: ('invalid IPv4 address ' & str).
}
}
}
class(#byte(16)) IP6Address(IP4Address)
{
/*method(#class) new
{
^self basicNew: 16.
}*/
//method(#class) fromString: str
//{
// ^self new fromString: str.
//}
method __fromString: str
{
2018-01-03 15:33:09 +00:00
| pos size mysize ch tgpos v1 val curseg saw_xdigit colonpos |
pos := 0.
size := str size.
mysize := self basicSize.
// handle leading :: specially
if ((size > 0) and ((str at: pos) == $:))
{
pos := pos + 1.
if (pos >= size or: [ (str at: pos) ~~ $:]) { ^Error.Code.EINVAL }.
}.
tgpos := 0.
2018-01-03 15:33:09 +00:00
curseg := pos.
val := 0.
saw_xdigit := false.
colonpos := -1.
while (pos < size)
{
ch := str at: pos.
pos := pos + 1.
v1 := ch digitValue.
if ((v1 >= 0) and (v1 <= 15))
{
val := (val bitShift: 4) bitOr: v1.
if (val > 16rFFFF) { ^Error.Code.EINVAL }.
saw_xdigit := true.
continue.
}.
if (ch == $:)
{
2018-01-03 15:33:09 +00:00
curseg := pos.
if (saw_xdigit not)
{
// no multiple double colons are allowed
if (colonpos >= 0) { ^Error.Code.EINVAL }.
// capture the target position when the double colons
// are encountered.
colonpos := tgpos.
continue.
}
elif (pos >= size)
{
// a colon can't be the last character
^Error.Code.EINVAL
}.
self basicAt: tgpos put: ((val bitShift: -8) bitAnd: 16rFF).
tgpos := tgpos + 1.
self basicAt: tgpos put: (val bitAnd: 16rFF).
tgpos := tgpos + 1.
saw_xdigit := false.
val := 0.
continue.
}.
if ((ch == $.) and (tgpos + 4 <= mysize))
{
2019-10-23 16:40:02 +00:00
//if ((super __fromString: (str copyFrom: curseg) startingAt: 0 into: tgpos) isError) { ^Error.Code.EINVAL }.
if ((super __fromString: str startingAt: curseg into: tgpos) isError) { ^Error.Code.EINVAL }.
tgpos := tgpos + 4.
saw_xdigit := false.
break.
}.
// invalid character in the address
^Error.Code.EINVAL.
}.
if (saw_xdigit)
{
self basicAt: tgpos put: ((val bitShift: -8) bitAnd: 16rFF).
tgpos := tgpos + 1.
self basicAt: tgpos put: (val bitAnd: 16rFF).
tgpos := tgpos + 1.
}.
if (colonpos >= 0)
{
// double colon position
self basicShiftFrom: colonpos to: (colonpos + (mysize - tgpos)) count: (tgpos - colonpos).
//tgpos := tgpos + (mysize - tgpos).
}
elif (tgpos ~~ mysize)
{
^Error.Code.EINVAL
}.
}
method fromString: str
{
if ((self __fromString: str) isError)
{
Exception signal: ('invalid IPv6 address ' & str).
}
}
2018-01-07 15:14:15 +00:00
//method toString
//{
//}
}
class(#byte) SocketAddress(Object) from 'sck.addr'
{
method(#primitive) family.
method(#primitive) fromString: str.
method(#class) fromString: str
{
^self new fromString: str
}
}
class Socket(Object) from 'sck'
{
// the handle must be the first field in this object to match
// the internal representation used by various modules. (e.g. sck)
2018-01-14 15:11:53 +00:00
var(#get) handle := -1.
// TODO: generate these family and type from the C header
pooldic Family
{
INET := 2,
INET6 := 10
}
pooldic Type
{
STREAM := 1,
DGRAM := 2
}
method(#primitive) open(family, type, proto).
// map the open primitive again with a different name for strict internal use only.
// this method is supposed to be used to handle an accepted socket in server sockets.
method(#primitive) __open(handle).
method(#primitive) _close.
method(#primitive) bind: addr.
method(#primitive) _listen: backlog.
2018-05-07 16:53:37 +00:00
method(#primitive) _accept: addr.
method(#primitive) _connect: addr.
method(#primitive) _socketError.
method(#primitive) _readBytesInto: bytes.
2019-10-23 16:40:02 +00:00
method(#primitive) _readBytesInto: bytes startingAt: offset for: length.
method(#primitive) _writeBytesFrom: bytes.
2019-10-23 16:40:02 +00:00
method(#primitive) _writeBytesFrom: bytes startingAt: offset for: length.
2018-05-16 10:25:20 +00:00
method(#class) new { self messageProhibited: #new }
method(#class) new: size { self messageProhibited: #new: }
method(#class) __with_handle: handle
{
//#self addToBeFinalized.
^(super new) __open(handle)
}
method(#class) family: family type: type
{
//#self addToBeFinalized.
// new is prohibited. so use _basicNew with initialize.
//^(self new) open(family, type, 0)
^(super new) open(family, type, 0)
}
2018-05-16 10:25:20 +00:00
method close
{
('CLOSING Socket HANDLE ' & (self.handle asString)) dump.
if (self.handle >= 0)
{
('REALLY CLOSING Socket HANDLE ' & (self.handle asString)) dump.
self _close.
self.handle := -1.
self onSocketClosed.
}.
}
method onSocketClosed
{
// do nothing.
2018-05-16 10:25:20 +00:00
}
}
class SyncSocket(Socket)
2018-05-16 10:25:20 +00:00
{
var iosem, tmoutsem, sg.
var tmoutsecs, tmoutnsecs.
method(#class) new { self messageProhibited: #new }
method(#class) new: size { self messageProhibited: #new: }
/*
method(#class) __with_handle: handle
2018-05-16 10:25:20 +00:00
{
//#self addToBeFinalized.
^(super new) __open(handle)
2018-05-16 10:25:20 +00:00
}
method(#class) family: family type: type
{
//#self addToBeFinalized.
2018-05-16 10:25:20 +00:00
// new is prohibited. so use _basicNew with initialize.
//^(self new) open(family, type, 0)
^(super new) open(family, type, 0)
2018-05-16 10:25:20 +00:00
}
*/
2018-05-16 10:25:20 +00:00
method initialize
{
super initialize.
self.iosem := Semaphore new.
self.tmoutsem := Semaphore new.
self.sg := SemaphoreGroup new.
self.sg addSemaphore: self.iosem.
self.sg addSemaphore: self.tmoutsem.
}
method beWatched
{
// do nothing. i don't want to be watched.
2018-05-16 10:25:20 +00:00
}
method timeout: secs
{
self.tmoutsecs := secs.
self.tmoutnsecs := 0.
}
method __wait_for_input
{
| s |
if (self.tmoutsecs notNil) { self.tmoutsem signalAfterSecs: self.tmoutsecs nanosecs: self.tmoutnsecs }.
self.iosem signalOnInput: self.handle.
s := self.sg wait.
self.iosem unsignal.
if (self.tmoutsecs notNil) { self.tmoutsem unsignal }.
if (s == self.tmoutsem) { Exception signal: 'timed out' }.
}
method __wait_for_output
{
| s |
if (self.tmoutsecs notNil) { self.tmoutsem signalAfterSecs: self.tmoutsecs nanosecs: self.tmoutnsecs }.
self.iosem signalOnOutput: self.handle.
s := self.sg wait.
self.iosem unsignal.
if (self.tmoutsecs notNil) { self.tmoutsem unsignal }.
if (s == self.tmoutsem) { Exception signal: 'timed out' }.
}
method connect: addr
{
| soerr |
// an exception is thrown upon exception failure.
if ((super _connect: addr) <= -1)
{
// connection in progress
while (true)
{
self __wait_for_output.
soerr := self _socketError.
if (soerr == 0) { break }
elif (soerr > 0)
{
Exception signal: ('unable to connect - error ' & soerr asString).
}.
}.
}.
}
method readBytesInto: bytes
2018-05-16 10:25:20 +00:00
{
| n |
2018-05-16 10:25:20 +00:00
while (true)
{
n := super _readBytesInto: bytes.
2018-05-16 10:25:20 +00:00
if (n >= 0) { ^n }.
self __wait_for_input.
2018-05-16 10:25:20 +00:00
}
}
2019-10-23 16:40:02 +00:00
method readBytesInto: bytes startingAt: offset for: length
2018-05-16 10:25:20 +00:00
{
| n |
while (true)
{
2019-10-23 16:40:02 +00:00
n := super _readBytesInto: bytes startingAt: offset for: length.
2018-05-16 10:25:20 +00:00
if (n >= 0) { ^n }.
self __wait_for_input.
2018-05-16 10:25:20 +00:00
}
}
method writeBytesFrom: bytes
2018-05-16 10:25:20 +00:00
{
| n |
while (true)
{
n := super _writeBytesFrom: bytes.
2018-05-16 10:25:20 +00:00
if (n >= 0) { ^n }.
self __wait_for_output.
2018-05-16 10:25:20 +00:00
}
}
2019-10-23 16:40:02 +00:00
method writeBytesFrom: bytes startingAt: offset for: length
2018-05-16 10:25:20 +00:00
{
| n |
while (true)
{
2019-10-23 16:40:02 +00:00
n := super _writeBytesFrom: bytes startingAt: offset for: length.
2018-05-16 10:25:20 +00:00
if (n >= 0) { ^n }.
self __wait_for_output.
2018-05-16 10:25:20 +00:00
}
}
}
class AsyncSocket(Socket)
{
var pending_bytes, pending_offset, pending_length.
var outreadysem, outdonesem, inreadysem.
/* -------------------
socket call-back methods
socketClosing
socketClosed
socketDataIn
socketDataOut
socketAccepted:from:
socketConnected:
-------------------- */
method initialize
{
super initialize.
self.outdonesem := Semaphore new.
self.outreadysem := Semaphore new.
self.inreadysem := Semaphore new.
2018-05-02 16:36:56 +00:00
self.outdonesem signalAction: [ :sem |
self onSocketDataOut.
self.outreadysem unsignal.
].
2018-05-02 16:36:56 +00:00
self.outreadysem signalAction: [ :sem |
2018-05-08 10:18:50 +00:00
| nbytes pos rem |
pos := self.pending_offset.
rem := self.pending_length.
while (rem > 0)
{
2019-10-23 16:40:02 +00:00
nbytes := self _writeBytesFrom: self.pending_bytes startingAt: pos for: rem.
2018-05-08 10:18:50 +00:00
if (nbytes <= -1) { break }.
pos := pos + nbytes.
rem := rem - nbytes.
}.
if (rem <= 0)
{
self.pending_bytes := nil.
self.pending_offset := 0.
self.pending_length := 0.
self.outdonesem signal.
2018-05-08 10:18:50 +00:00
}.
].
2018-05-02 16:36:56 +00:00
self.inreadysem signalAction: [ :sem |
self onSocketDataIn.
].
}
/*
2018-05-02 16:36:56 +00:00
method finalize
{
2018-05-02 16:36:56 +00:00
'SOCKET FINALIZED...............' dump.
self close.
}
*/
method close
{
2018-05-02 16:36:56 +00:00
if (self.outdonesem notNil)
{
self.outdonesem unsignal.
2018-05-15 16:38:37 +00:00
if (self.outdonesem _group notNil) { thisProcess removeAsyncSemaphore: self.outdonesem }.
2018-05-02 16:36:56 +00:00
self.outdonesem := nil.
}.
if (self.outreadysem notNil)
{
self.outreadysem unsignal.
2018-05-15 16:38:37 +00:00
if (self.outreadysem _group notNil) { thisProcess removeAsyncSemaphore: self.outreadysem }.
2018-05-02 16:36:56 +00:00
self.outreadysem := nil.
}.
2018-05-02 16:36:56 +00:00
if (self.inreadysem notNil)
{
self.inreadysem unsignal.
2018-05-15 16:38:37 +00:00
if (self.inreadysem _group notNil) { thisProcess removeAsyncSemaphore: self.inreadysem }.
2018-05-02 16:36:56 +00:00
self.inreadysem := nil.
}.
2018-05-16 10:25:20 +00:00
^super close.
}
method beWatched
{
2018-05-15 16:38:37 +00:00
thisProcess addAsyncSemaphore: self.inreadysem.
self.inreadysem signalOnInput: self.handle.
2018-05-15 16:38:37 +00:00
thisProcess addAsyncSemaphore: self.outdonesem.
}
method readBytesInto: bytes
{
^super _readBytesInto: bytes.
}
2019-10-23 16:40:02 +00:00
method readBytesInto: bytes startingAt: offset for: length
{
2019-10-23 16:40:02 +00:00
^super _readBytesInto: bytes startingAt: offset for: length.
}
2019-10-23 16:40:02 +00:00
method writeBytesFrom: bytes startingAt: offset for: length
2017-10-30 01:11:18 +00:00
{
2018-05-08 10:18:50 +00:00
| n pos rem |
2018-05-02 16:36:56 +00:00
if (self.outreadysem _group notNil)
{
Exception signal: 'Not allowed to write again'.
}.
// n >= 0: written
// n <= -1: tolerable error (e.g. EAGAIN)
// exception: fatal error
2018-01-28 15:47:56 +00:00
2018-05-08 10:18:50 +00:00
pos := offset.
rem := length.
2018-05-22 16:22:32 +00:00
while (rem > 0)
2018-05-08 10:18:50 +00:00
{
2019-10-23 16:40:02 +00:00
n := self _writeBytesFrom: bytes startingAt: pos for: rem.
2018-05-08 10:18:50 +00:00
if (n <= -1) { break }.
rem := rem - n.
pos := pos + n.
}.
if (rem <= 0)
{
self.outdonesem signal.
^length
}.
self.pending_bytes := bytes.
2018-05-08 10:18:50 +00:00
self.pending_offset := pos.
2018-05-15 16:38:37 +00:00
self.pending_length := rem.
2018-05-15 16:38:37 +00:00
thisProcess addAsyncSemaphore: self.outreadysem.
self.outreadysem signalOnOutput: self.handle.
}
method writeBytesFrom: bytes
{
2019-10-23 16:40:02 +00:00
^self writeBytesFrom: bytes startingAt: 0 for: (bytes size)
}
//method onSocketClosed
//{
//}
method onSocketDataIn
{
}
method onSocketDataOut
2018-05-07 16:53:37 +00:00
{
}
}
class AsyncClientSocket(AsyncSocket)
{
2018-05-08 10:18:50 +00:00
var(#get) connectedEventAction.
var connsem.
method initialize
{
super initialize.
self.connsem := Semaphore new.
self.connsem signalAction: [ :sem |
| soerr |
soerr := self _socketError.
if (soerr >= 0)
{
// finalize connection if not in progress
sem unsignal.
2018-05-15 16:38:37 +00:00
thisProcess removeAsyncSemaphore: sem.
self onSocketConnected: (soerr == 0).
2018-05-08 10:18:50 +00:00
if (soerr == 0) { self beWatched }.
}.
/* HOW TO HANDLE TIMEOUT? */
].
}
method close
{
if (self.connsem notNil)
{
self.connsem unsignal.
2018-05-15 16:38:37 +00:00
if (self.connsem _group notNil) { thisProcess removeAsyncSemaphore: self.connsem }.
self.connsem := nil.
}.
^super close
}
method connect: target
{
| sem |
if ((self _connect: target) <= -1)
{
2018-05-15 16:38:37 +00:00
thisProcess addAsyncSemaphore: self.connsem.
self.connsem signalOnOutput: self.handle.
}
else
{
// connected immediately
'IMMEDIATELY CONNECTED.....' dump.
self onSocketConnected: true.
2018-05-15 16:38:37 +00:00
thisProcess addAsyncSemaphore: self.inreadysem.
self.inreadysem signalOnInput: self.handle.
2018-05-15 16:38:37 +00:00
thisProcess addAsyncSemaphore: self.outdonesem.
}
}
method onSocketConnected
{
// do nothing special. the subclass may override this method.
}
}
class AsyncServerSocket(AsyncSocket)
2018-05-02 16:36:56 +00:00
{
method initialize
{
2018-05-07 16:53:37 +00:00
'Server Socket initialize...........' dump.
2018-05-02 16:36:56 +00:00
super initialize.
2018-05-07 16:53:37 +00:00
self.inreadysem signalAction: [ :sem |
| cliaddr clisck cliact fd |
2018-05-07 16:53:37 +00:00
cliaddr := SocketAddress new.
fd := self _accept: cliaddr.
//if (fd >= 0)
2018-05-15 16:38:37 +00:00
if (fd notNil)
{
clisck := (self acceptedSocketClass) __with_handle: fd.
clisck beWatched.
self onSocketAccepted: clisck from: cliaddr.
2018-05-07 16:53:37 +00:00
}.
2018-05-02 16:36:56 +00:00
].
}
2018-05-07 16:53:37 +00:00
method close
{
'CLOSING SERVER SOCEKT.... ' dump.
if (self.inreadysem notNil)
{
self.inreadysem unsignal.
2018-05-15 16:38:37 +00:00
if (self.inreadysem _group notNil) { thisProcess removeAsyncSemaphore: self.inreadysem }.
2018-05-07 16:53:37 +00:00
self.inreadysem := nil.
}.
^super close.
}
2018-05-02 16:36:56 +00:00
method listen: backlog
{
| n |
// If listen is called before the socket handle is
// added to the multiplexer, a spurious hangup event might
// be generated. At least, such behavior was observed
// in linux with epoll in the level trigger mode.
// self.inreadysem signalOnInput: self.handle.
// thisProcess addAsyncSemaphore: self.inreadysem.
// self _listen: backlog.
n := self _listen: backlog.
self.inreadysem signalOnInput: self.handle.
thisProcess addAsyncSemaphore: self.inreadysem.
^n.
2018-05-02 16:36:56 +00:00
}
method onSocketAccepted: clisck from: cliaddr
{
// close the accepted client socket immediately.
// a subclass must override this to avoid it.
clisck close.
}
2018-05-21 17:14:16 +00:00
method acceptedSocketClass
{
^AsyncSocket
}
}
/*
2018-05-21 17:14:16 +00:00
class ListenerSocket(Socket)
{
var inreadysem.
method initialize
{
'Server Socket initialize...........' dump.
super initialize.
self.inreadysem signalAction: [ :sem |
| cliaddr clisck cliact fd |
cliaddr := SocketAddress new.
fd := self _accept: cliaddr.
//if (fd >= 0)
2018-05-21 17:14:16 +00:00
if (fd notNil)
{
clisck := (self acceptedSocketClass) __with_handle: fd.
2018-05-21 17:14:16 +00:00
sg addSemaphore: self.inreadysem.
self.inreadysem signalOnInput: self.handle.
self onSocketAccepted: clisck from: cliaddr.
}.
].
}
method close
{
'CLOSING SERVER SOCEKT.... ' dump.
| sg |
if (self.inreadysem notNil)
{
self.inreadysem unsignal.
sg := self.inreadysem _group.
if (sg notNil) { sg removeSemaphore: self.inreadysem }.
self.inreadysem := nil.
}.
^super close.
}
method listen: backlog
{
| n |
// If listen is called before the socket handle is
// added to the multiplexer, a spurious hangup event might
// be generated. At least, such behavior was observed
// in linux with epoll in the level trigger mode.
// self.inreadysem signalOnInput: self.handle.
// thisProcess addAsyncSemaphore: self.inreadysem.
// self _listen: backlog.
2018-05-21 17:14:16 +00:00
n := self _listen: backlog.
self.inreadysem signalOnInput: self.handle.
sg addemaphore: self.inreadysem.
^n.
}
method accept
{
}
method onSocketAccepted: clisck from: cliaddr
{
// close the accepted client socket immediately.
// a subclass must override this to avoid it.
2018-05-21 17:14:16 +00:00
clisck close.
}
method acceptedSocketClass
{
^Socket
}
2018-05-21 17:14:16 +00:00
2018-05-02 16:36:56 +00:00
}
*/