added more async socket code
This commit is contained in:
		@ -258,7 +258,7 @@ class AsyncHandle(Object)
 | 
			
		||||
 | 
			
		||||
class Socket(AsyncHandle) from 'sck'
 | 
			
		||||
{
 | 
			
		||||
	var eventActions.
 | 
			
		||||
	var(#get) eventActions.
 | 
			
		||||
	var pending_bytes, pending_offset, pending_length.
 | 
			
		||||
	var outreadysem, outdonesem, inreadysem.
 | 
			
		||||
 | 
			
		||||
@ -266,7 +266,7 @@ class Socket(AsyncHandle) from 'sck'
 | 
			
		||||
	method(#primitive) _close.
 | 
			
		||||
	method(#primitive) bind: addr.
 | 
			
		||||
	method(#primitive) _listen: backlog.
 | 
			
		||||
	method(#primitive) accept: addr.
 | 
			
		||||
	method(#primitive) _accept: addr.
 | 
			
		||||
	method(#primitive) _connect: addr.
 | 
			
		||||
	method(#primitive) _socketError.
 | 
			
		||||
 | 
			
		||||
@ -308,7 +308,7 @@ extend Socket
 | 
			
		||||
	method initialize
 | 
			
		||||
	{
 | 
			
		||||
		super initialize.
 | 
			
		||||
		self.eventActions := #(nil nil nil).
 | 
			
		||||
		self.eventActions := %(nil, nil, nil).
 | 
			
		||||
 | 
			
		||||
		self.outdonesem := Semaphore new.
 | 
			
		||||
		self.outreadysem := Semaphore new.
 | 
			
		||||
@ -406,6 +406,17 @@ extend Socket
 | 
			
		||||
		System signal: self.outreadysem onOutput: self.handle.
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
	method beWatched
 | 
			
		||||
	{
 | 
			
		||||
		System addAsyncSemaphore: self.inreadysem.
 | 
			
		||||
		System signal: self.inreadysem onInput: self.handle.
 | 
			
		||||
		System addAsyncSemaphore: self.outdonesem.
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	method beUnwatched
 | 
			
		||||
	{
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
class ClientSocket(Socket)
 | 
			
		||||
@ -432,9 +443,7 @@ class ClientSocket(Socket)
 | 
			
		||||
 | 
			
		||||
				if (soerr == 0)
 | 
			
		||||
				{
 | 
			
		||||
					System addAsyncSemaphore: self.inreadysem.
 | 
			
		||||
					System signal: self.inreadysem onInput: self.handle.
 | 
			
		||||
					System addAsyncSemaphore: self.outdonesem.
 | 
			
		||||
					self beWatched
 | 
			
		||||
				}.
 | 
			
		||||
			}.
 | 
			
		||||
			(* HOW TO HANDLE TIMEOUT? *)
 | 
			
		||||
@ -477,13 +486,44 @@ class ServerSocket(Socket)
 | 
			
		||||
{
 | 
			
		||||
	method initialize
 | 
			
		||||
	{
 | 
			
		||||
'Server Socket initialize...........' dump.
 | 
			
		||||
		super initialize.
 | 
			
		||||
		self.inreadysem := [ :sem |
 | 
			
		||||
			| xxx |
 | 
			
		||||
			System accept: xxx.
 | 
			
		||||
 | 
			
		||||
		self.inreadysem signalAction: [ :sem |
 | 
			
		||||
			| cliaddr clisck cliact |
 | 
			
		||||
			cliaddr := SocketAddress new.
 | 
			
		||||
			clisck := self _accept: cliaddr.
 | 
			
		||||
			if (clisck notError)
 | 
			
		||||
			{
 | 
			
		||||
				## the _accept method doesn't invoke the initialize method.
 | 
			
		||||
				## i should invoke it manually here.
 | 
			
		||||
				clisck initialize.
 | 
			
		||||
 | 
			
		||||
				cliact := self.eventActions at: Socket.EventType.CONNECTED.
 | 
			
		||||
				if (cliact notNil) 
 | 
			
		||||
				{ 
 | 
			
		||||
					cliact value: self value: clisck (* value: cliaddr *).
 | 
			
		||||
					clisck beWatched.
 | 
			
		||||
				}
 | 
			
		||||
				else { clisck close }.
 | 
			
		||||
	
 | 
			
		||||
			}.
 | 
			
		||||
		].
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	method close
 | 
			
		||||
	{
 | 
			
		||||
'CLOSING SERVER SOCEKT.... ' dump.
 | 
			
		||||
		if (self.inreadysem notNil)
 | 
			
		||||
		{
 | 
			
		||||
			System unsignal: self.inreadysem.
 | 
			
		||||
			if (self.inreadysem _group notNil) { System removeAsyncSemaphore: self.inreadysem }.
 | 
			
		||||
			self.inreadysem := nil.
 | 
			
		||||
		}.
 | 
			
		||||
 | 
			
		||||
		^super close.
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	method listen: backlog
 | 
			
		||||
	{
 | 
			
		||||
		System addAsyncSemaphore: self.inreadysem.
 | 
			
		||||
@ -505,14 +545,36 @@ class MyObject(Object)
 | 
			
		||||
				s := ClientSocket domain: Socket.Domain.INET type: Socket.Type.STREAM.
 | 
			
		||||
				s2 := ServerSocket domain: Socket.Domain.INET type: Socket.Type.STREAM.
 | 
			
		||||
		
 | 
			
		||||
				s2 onEvent: Socket.EventType.DATA_IN do: [:sck |
 | 
			
		||||
					### accept...
 | 
			
		||||
				s2 onEvent: Socket.EventType.CONNECTED do: [ :sck :clisck |
 | 
			
		||||
'SERVER ACCEPTED new client' dump.
 | 
			
		||||
					clisck onEvent: Socket.EventType.DATA_IN do: [ :csck |
 | 
			
		||||
						| nbytes |
 | 
			
		||||
						nbytes := csck readBytes: buf. 
 | 
			
		||||
						if (nbytes == 0)
 | 
			
		||||
						{
 | 
			
		||||
							csck close.
 | 
			
		||||
						}.
 | 
			
		||||
						('Got ' & (nbytes asString)) dump.
 | 
			
		||||
						
 | 
			
		||||
						if (nbytes > 0) 
 | 
			
		||||
						{ 
 | 
			
		||||
					
 | 
			
		||||
							buf dump.
 | 
			
		||||
							csck writeBytes: buf offset: 0 length: nbytes.
 | 
			
		||||
						}.
 | 
			
		||||
					].
 | 
			
		||||
					clisck onEvent: Socket.EventType.DATA_OUT do: [ :csck |
 | 
			
		||||
						##csck writeBytes: #[ $a, $b, C'\n' ].
 | 
			
		||||
					].
 | 
			
		||||
					###clisck close.
 | 
			
		||||
				].
 | 
			
		||||
 | 
			
		||||
				s2 bind: (SocketAddress fromString: '0.0.0.0:7777').
 | 
			
		||||
				s2 listen: 10.
 | 
			
		||||
 | 
			
		||||
				s onEvent: Socket.EventType.CONNECTED do: [ :sck :state |
 | 
			
		||||
					if (state)
 | 
			
		||||
					{					
 | 
			
		||||
						'AAAAAAAA' dump.
 | 
			
		||||
						s writeBytes: #[ $a, $b, $c ].
 | 
			
		||||
						s writeBytes: #[ $d, $e, $f ].
 | 
			
		||||
					}
 | 
			
		||||
@ -521,9 +583,10 @@ class MyObject(Object)
 | 
			
		||||
						'FAILED TO CONNECT' dump.
 | 
			
		||||
					}.
 | 
			
		||||
				]. 
 | 
			
		||||
 | 
			
		||||
				s onEvent: Socket.EventType.DATA_IN do: [ :sck |
 | 
			
		||||
					| nbytes |
 | 
			
		||||
					nbytes := s readBytes: buf. 
 | 
			
		||||
					nbytes := sck readBytes: buf. 
 | 
			
		||||
					if (nbytes == 0)
 | 
			
		||||
					{
 | 
			
		||||
						sck close.
 | 
			
		||||
@ -533,11 +596,10 @@ class MyObject(Object)
 | 
			
		||||
					buf dump.
 | 
			
		||||
				].
 | 
			
		||||
				s onEvent: Socket.EventType.DATA_OUT do: [ :sck |
 | 
			
		||||
					if (count < 10) { s writeBytes: #[ $a, $b, C'\n' ]. count := count + 1. }.
 | 
			
		||||
					if (count < 10) { sck writeBytes: #[ $a, $b, C'\n' ]. count := count + 1. }.
 | 
			
		||||
				].
 | 
			
		||||
 | 
			
		||||
				s connect: (SocketAddress fromString: '127.0.0.1:9999').
 | 
			
		||||
 | 
			
		||||
				while (true)
 | 
			
		||||
				{
 | 
			
		||||
					ss := System handleAsyncEvent.
 | 
			
		||||
 | 
			
		||||
		Reference in New Issue
	
	Block a user