From b42586a1c666c78198cf5ef9b6149bea3ade9cde Mon Sep 17 00:00:00 2001 From: "hyunghwan.chung" Date: Mon, 21 May 2018 17:14:16 +0000 Subject: [PATCH] added some experimental code --- moo/kernel/Http.moo | 185 +++++++++++++++++++++++++++++++++++++++++- moo/kernel/Socket.moo | 96 +++++++++++++++++++++- 2 files changed, 279 insertions(+), 2 deletions(-) diff --git a/moo/kernel/Http.moo b/moo/kernel/Http.moo index cf94f90..10233c2 100644 --- a/moo/kernel/Http.moo +++ b/moo/kernel/Http.moo @@ -269,6 +269,169 @@ class HttpServer(Object) } } + +class HttpListener2(Socket) +{ + var(#get) server := nil. + var(#get) rid := -1. + var sem. + + method initialize + { + super initialize. + self.sem := Semaphore new. + self.sem signalAction: [:sem | + | cliaddr clisck cliact fd | + cliaddr := SocketAddress new. + + fd := self _accept: cliaddr. + ##if (fd >= 0) + if (fd notNil) + { + clisck := (self acceptedSocketClass) __with: fd. + clisck beWatched. + self onSocketAccepted: clisck from: cliaddr. + }. + ]. + } + +(* + method onSocketAccepted: clisck from: cliaddr + { + | rid | + + 'CLIENT accepted ..............' dump. +clisck dump. + cliaddr dump. + + if (self.server notNil) + { + server addConnection: clisck. + if (clisck isKindOf: SyncSocket) + { +'SERVICE READLLY STARTING' dump. + [clisck runService] fork. + } + }. + + } + + method acceptedSocketClass + { + ##^if (self currentAddress port == 80) { HttpSocket } else { HttpSocket }. + ^HttpSocket. + } + + method server: server rid: rid + { + self.server := server. + self.rid := rid. + } +*) + +} + + +class HttpServer2(Object) +{ + var listeners. + var connreg. + var sg. + + method initialize + { + super initialize. + self.sg := SemaphoreGroup new. + self.listeners := HttpConnReg new. + self.connreg := HttpConnReg new. + } + + method run: laddr + { + | listener sem ss | + + if (laddr class == Array) + { + laddr do: [:addr | + listener := HttpListener2 family: (addr family) type: Socket.Type.STREAM. + if ((self addListener: listener) notError) + { + listener bind: addr. + listener _listen: 128. + } + ]. + } + else + { + listener := HttpListener2 family: (laddr family) type: Socket.Type.STREAM. + if ((self addListener: listener) notError) + { + listener bind: laddr. + listener _listen: 128. + } + }. + + while (true) + { + ss := self.sg wait. + if (ss isError) { break }. + }. + } + + method close + { + self.listeners do: [:listener | + listener close. + ]. + + self.connreg do: [:conn | + conn close. + ]. + } + + method addConnection: conn + { + | rid | + rid := self.connreg add: conn. + if (rid isError) + { + 'ERROR - CANNOT REGISTER NEW CONNECTION >>>>>>>>>> ' dump. + conn close. + ^rid. + }. + +('ADD NEW CONNECTION ' & rid asString) dump. + conn server: self rid: rid. + } + + method removeConnection: conn + { + self.connreg remove: (conn rid). + conn server: nil rid: -1. + } + + method addListener: listener + { + | rid | + rid := self.listeners add: listener. + if (rid isError) + { + 'ERROR - CANNOT REGISTER NEW LISTENER >>>>>>>>>> ' dump. + listener close. + ^rid. + }. + +('ADD NEW LISTENER ' & rid asString) dump. + listener server: self rid: rid. + } + + method removeListener: listener + { + self.listeners remove: (listener rid). + listener server: nil rid: -1. + } +} + class MyObject(Object) { method(#class) start_server_socket @@ -405,7 +568,7 @@ class MyObject(Object) '----- END OF ANOTHER PROC ------' dump. } - method(#class) main + method(#class) main222 { | httpd addr | @@ -446,4 +609,24 @@ httpd connect: addr. '----- END OF MAIN ------' dump. } + + method(#class) main + { + | httpd addr | + + [ + httpd := HttpServer2 new. + [ + httpd run: %( + SocketAddress fromString: '[::]:7777', + SocketAddress fromString: '0.0.0.0:7776' + ). + ] ensure: [ + if (httpd notNil) { httpd close } + ] + + ] on: Exception do: [:ex | ('Exception - ' & ex messageText) dump]. + + '----- END OF MAIN ------' dump. + } } diff --git a/moo/kernel/Socket.moo b/moo/kernel/Socket.moo index 304c685..26b7720 100644 --- a/moo/kernel/Socket.moo +++ b/moo/kernel/Socket.moo @@ -722,6 +722,100 @@ class AsyncServerSocket(AsyncSocket) method acceptedSocketClass { - ^Socket + ^AsyncSocket } } + + +class Mux(SemaphoreGroup) +{ + method addSocket: sck + { + self addSemaphore: (sck inreadysem). + } + + method removeSocket: sck + { + } +} + +class ListenerSocket(Socket) +{ + var inreadysem. + + method initialize + { +'Server Socket initialize...........' dump. + super initialize. + + self.inreadysem signalAction: [ :sem | + | cliaddr clisck cliact fd | + cliaddr := SocketAddress new. + + fd := self _accept: cliaddr. + ##if (fd >= 0) + if (fd notNil) + { + clisck := (self acceptedSocketClass) __with: fd. + + sg addSemaphore: self.inreadysem. + self.inreadysem signalOnInput: self.handle. + + + self onSocketAccepted: clisck from: cliaddr. + }. + ]. + } + + method close + { +'CLOSING SERVER SOCEKT.... ' dump. + | sg | + if (self.inreadysem notNil) + { + self.inreadysem unsignal. + sg := self.inreadysem _group. + if (sg notNil) { sg removeSemaphore: self.inreadysem }. + self.inreadysem := nil. + }. + + ^super close. + } + + method listen: backlog + { + | n | + + ## If listen is called before the socket handle is + ## added to the multiplexer, a spurious hangup event might + ## be generated. At least, such behavior was observed + ## in linux with epoll in the level trigger mode. + ## self.inreadysem signalOnInput: self.handle. + ## thisProcess addAsyncSemaphore: self.inreadysem. + ## self _listen: backlog. + + n := self _listen: backlog. + + self.inreadysem signalOnInput: self.handle. + sg addemaphore: self.inreadysem. + + ^n. + } + + method accept + { + } + + method onSocketAccepted: clisck from: cliaddr + { + ## close the accepted client socket immediately. + ## a subclass must override this to avoid it. + clisck close. + } + + method acceptedSocketClass + { + ^Socket + } + +}