more async socket code

This commit is contained in:
hyunghwan.chung 2018-05-02 16:36:56 +00:00
parent acb71f521c
commit ff1d47dd15
6 changed files with 109 additions and 56 deletions

View File

@ -355,6 +355,7 @@ pdfdir = @pdfdir@
prefix = @prefix@ prefix = @prefix@
program_transform_name = @program_transform_name@ program_transform_name = @program_transform_name@
psdir = @psdir@ psdir = @psdir@
runstatedir = @runstatedir@
sbindir = @sbindir@ sbindir = @sbindir@
sharedstatedir = @sharedstatedir@ sharedstatedir = @sharedstatedir@
srcdir = @srcdir@ srcdir = @srcdir@
@ -585,7 +586,7 @@ distdir: $(DISTFILES)
! -type d ! -perm -444 -exec $(install_sh) -c -m a+r {} {} \; \ ! -type d ! -perm -444 -exec $(install_sh) -c -m a+r {} {} \; \
|| chmod -R a+r "$(distdir)" || chmod -R a+r "$(distdir)"
dist-gzip: distdir dist-gzip: distdir
tardir=$(distdir) && $(am__tar) | eval GZIP= gzip $(GZIP_ENV) -c >$(distdir).tar.gz tardir=$(distdir) && $(am__tar) | GZIP=$(GZIP_ENV) gzip -c >$(distdir).tar.gz
$(am__post_remove_distdir) $(am__post_remove_distdir)
dist-bzip2: distdir dist-bzip2: distdir
@ -611,7 +612,7 @@ dist-shar: distdir
@echo WARNING: "Support for shar distribution archives is" \ @echo WARNING: "Support for shar distribution archives is" \
"deprecated." >&2 "deprecated." >&2
@echo WARNING: "It will be removed altogether in Automake 2.0" >&2 @echo WARNING: "It will be removed altogether in Automake 2.0" >&2
shar $(distdir) | eval GZIP= gzip $(GZIP_ENV) -c >$(distdir).shar.gz shar $(distdir) | GZIP=$(GZIP_ENV) gzip -c >$(distdir).shar.gz
$(am__post_remove_distdir) $(am__post_remove_distdir)
dist-zip: distdir dist-zip: distdir
@ -629,7 +630,7 @@ dist dist-all:
distcheck: dist distcheck: dist
case '$(DIST_ARCHIVES)' in \ case '$(DIST_ARCHIVES)' in \
*.tar.gz*) \ *.tar.gz*) \
eval GZIP= gzip $(GZIP_ENV) -dc $(distdir).tar.gz | $(am__untar) ;;\ GZIP=$(GZIP_ENV) gzip -dc $(distdir).tar.gz | $(am__untar) ;;\
*.tar.bz2*) \ *.tar.bz2*) \
bzip2 -dc $(distdir).tar.bz2 | $(am__untar) ;;\ bzip2 -dc $(distdir).tar.bz2 | $(am__untar) ;;\
*.tar.lz*) \ *.tar.lz*) \
@ -639,7 +640,7 @@ distcheck: dist
*.tar.Z*) \ *.tar.Z*) \
uncompress -c $(distdir).tar.Z | $(am__untar) ;;\ uncompress -c $(distdir).tar.Z | $(am__untar) ;;\
*.shar.gz*) \ *.shar.gz*) \
eval GZIP= gzip $(GZIP_ENV) -dc $(distdir).shar.gz | unshar ;;\ GZIP=$(GZIP_ENV) gzip -dc $(distdir).shar.gz | unshar ;;\
*.zip*) \ *.zip*) \
unzip $(distdir).zip ;;\ unzip $(distdir).zip ;;\
esac esac

14
moo/configure vendored
View File

@ -789,6 +789,7 @@ infodir
docdir docdir
oldincludedir oldincludedir
includedir includedir
runstatedir
localstatedir localstatedir
sharedstatedir sharedstatedir
sysconfdir sysconfdir
@ -885,6 +886,7 @@ datadir='${datarootdir}'
sysconfdir='${prefix}/etc' sysconfdir='${prefix}/etc'
sharedstatedir='${prefix}/com' sharedstatedir='${prefix}/com'
localstatedir='${prefix}/var' localstatedir='${prefix}/var'
runstatedir='${localstatedir}/run'
includedir='${prefix}/include' includedir='${prefix}/include'
oldincludedir='/usr/include' oldincludedir='/usr/include'
docdir='${datarootdir}/doc/${PACKAGE_TARNAME}' docdir='${datarootdir}/doc/${PACKAGE_TARNAME}'
@ -1137,6 +1139,15 @@ do
| -silent | --silent | --silen | --sile | --sil) | -silent | --silent | --silen | --sile | --sil)
silent=yes ;; silent=yes ;;
-runstatedir | --runstatedir | --runstatedi | --runstated \
| --runstate | --runstat | --runsta | --runst | --runs \
| --run | --ru | --r)
ac_prev=runstatedir ;;
-runstatedir=* | --runstatedir=* | --runstatedi=* | --runstated=* \
| --runstate=* | --runstat=* | --runsta=* | --runst=* | --runs=* \
| --run=* | --ru=* | --r=*)
runstatedir=$ac_optarg ;;
-sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb) -sbindir | --sbindir | --sbindi | --sbind | --sbin | --sbi | --sb)
ac_prev=sbindir ;; ac_prev=sbindir ;;
-sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \ -sbindir=* | --sbindir=* | --sbindi=* | --sbind=* | --sbin=* \
@ -1274,7 +1285,7 @@ fi
for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \ for ac_var in exec_prefix prefix bindir sbindir libexecdir datarootdir \
datadir sysconfdir sharedstatedir localstatedir includedir \ datadir sysconfdir sharedstatedir localstatedir includedir \
oldincludedir docdir infodir htmldir dvidir pdfdir psdir \ oldincludedir docdir infodir htmldir dvidir pdfdir psdir \
libdir localedir mandir libdir localedir mandir runstatedir
do do
eval ac_val=\$$ac_var eval ac_val=\$$ac_var
# Remove trailing slashes. # Remove trailing slashes.
@ -1427,6 +1438,7 @@ Fine tuning of the installation directories:
--sysconfdir=DIR read-only single-machine data [PREFIX/etc] --sysconfdir=DIR read-only single-machine data [PREFIX/etc]
--sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com] --sharedstatedir=DIR modifiable architecture-independent data [PREFIX/com]
--localstatedir=DIR modifiable single-machine data [PREFIX/var] --localstatedir=DIR modifiable single-machine data [PREFIX/var]
--runstatedir=DIR modifiable per-process data [LOCALSTATEDIR/run]
--libdir=DIR object code libraries [EPREFIX/lib] --libdir=DIR object code libraries [EPREFIX/lib]
--includedir=DIR C header files [PREFIX/include] --includedir=DIR C header files [PREFIX/include]
--oldincludedir=DIR C header files for non-gcc [/usr/include] --oldincludedir=DIR C header files for non-gcc [/usr/include]

View File

@ -279,9 +279,9 @@ class Socket(AsyncHandle) from 'sck'
{ {
var eventActions. var eventActions.
var pending_bytes, pending_offset, pending_length. var pending_bytes, pending_offset, pending_length.
var outreadysem, outdonesem, inreadysem. var outreadysem, outdonesem, inreadysem, connsem.
method(#primitive) _open(domain, type, proto). method(#primitive) open(domain, type, proto).
method(#primitive) _close. method(#primitive) _close.
method(#primitive) bind: addr. method(#primitive) bind: addr.
method(#primitive) _listen: backlog. method(#primitive) _listen: backlog.
@ -332,13 +332,14 @@ extend Socket
self.outdonesem := Semaphore new. self.outdonesem := Semaphore new.
self.outreadysem := Semaphore new. self.outreadysem := Semaphore new.
self.inreadysem := Semaphore new. self.inreadysem := Semaphore new.
self.connsem := nil.
self.outdonesem signalAction: [ :xsem | self.outdonesem signalAction: [ :sem |
(self.eventActions at: Socket.EventType.DATA_OUT) value: self. (self.eventActions at: Socket.EventType.DATA_OUT) value: self.
System unsignal: self.outreadysem. System unsignal: self.outreadysem.
]. ].
self.outreadysem signalAction: [ :xsem | self.outreadysem signalAction: [ :sem |
| nwritten | | nwritten |
nwritten := self _writeBytes: self.pending_bytes offset: self.pending_offset length: self.pending_length. nwritten := self _writeBytes: self.pending_bytes offset: self.pending_offset length: self.pending_length.
if (nwritten >= 0) if (nwritten >= 0)
@ -350,44 +351,59 @@ extend Socket
} }
]. ].
self.inreadysem signalAction: [ :ysem | self.inreadysem signalAction: [ :sem |
(self.eventActions at: Socket.EventType.DATA_IN) value: self. (self.eventActions at: Socket.EventType.DATA_IN) value: self.
]. ].
} }
method open(domain, type, proto) method finalize
{ {
| sck | 'SOCKET FINALIZED...............' dump.
sck := self _open(domain, type, proto). self close.
if (self.handle >= 0)
{
System addAsyncSemaphore: self.outdonesem.
System addAsyncSemaphore: self.outreadysem.
}.
^sck
} }
method close method close
{ {
'Socket close' dump. 'Socket close' dump.
System removeAsyncSemaphore: self.outdonesem.
System removeAsyncSemaphore: self.outreadysem. if (self.outdonesem notNil)
{
System unsignal: self.outdonesem.
if (self.outdonesem _group notNil) { System removeAsyncSemaphore: self.outdonesem }.
self.outdonesem := nil.
}.
if (self.outreadysem notNil)
{
System unsignal: self.outreadysem.
if (self.outreadysem _group notNil) { System removeAsyncSemaphore: self.outreadysem }.
self.outreadysem := nil.
}.
if (self.connsem notNil)
{
System unsignal: self.connsem.
if (self.connsem _group notNil) { System removeAsyncSemaphore: self.connsem }.
self.connsem := nil.
}.
if (self.inreadysem notNil)
{
System unsignal: self.inreadysem.
if (self.inreadysem _group notNil) { System removeAsyncSemaphore: self.inreadysem }.
self.inreadysem := nil.
}.
^super close. ^super close.
} }
### method listen: backlog do: acceptBlock
### {
### self.inputAction := acceptBlock.
### self watchInput.
### ^self _listen: backlog.
### }
method onEvent: event_type do: action_block method onEvent: event_type do: action_block
{ {
self.eventActions at: event_type put: action_block. self.eventActions at: event_type put: action_block.
} }
method connect: target method connect: target
{ {
| sem | | sem |
@ -395,49 +411,40 @@ extend Socket
{ {
sem := Semaphore new. sem := Semaphore new.
sem signalAction: [ :xsem | sem signalAction: [ :xsem |
| soerr dra | | soerr |
soerr := self _socketError. soerr := self _socketError.
if (soerr >= 0) if (soerr >= 0)
{ {
## finalize connection if not in progress ## finalize connection if not in progress
'CHECKING FOR CONNECTION.....' dump.
System unsignal: xsem. System unsignal: xsem.
System removeAsyncSemaphore: xsem. System removeAsyncSemaphore: xsem.
'CHECKING FOR CONNECTION.....' dump.
(self.eventActions at: Socket.EventType.CONNECTED) value: self value: (soerr == 0). (self.eventActions at: Socket.EventType.CONNECTED) value: self value: (soerr == 0).
if (soerr == 0) if (soerr == 0)
{ {
if ((self.eventActions at: Socket.EventType.DATA_IN) notNil) System addAsyncSemaphore: self.inreadysem.
{ System signal: self.inreadysem onInput: self.handle.
xsem signalAction: [ :ysem | System addAsyncSemaphore: self.outdonesem.
(self.eventActions at: Socket.EventType.DATA_IN) value: self.
].
System addAsyncSemaphore: xsem.
System signal: xsem onInput: self.handle.
}.
}. }.
}. }.
(* HOW TO HANDLE TIMEOUT? *) (* HOW TO HANDLE TIMEOUT? *)
]. ].
System addAsyncSemaphore: sem. self.connsem := sem.
System signal: sem onOutput: self.handle. System addAsyncSemaphore: self.connsem.
System signal: self.connsem onOutput: self.handle.
} }
else else
{ {
## connected immediately ## connected immediately
'IMMEDIATELY CONNECTED.....' dump. 'IMMEDIATELY CONNECTED.....' dump.
(self.eventActions at: Socket.EventType.CONNECTED) value: self value: true. (self.eventActions at: Socket.EventType.CONNECTED) value: self value: true.
if ((self.eventActions at: Socket.EventType.DATA_IN) notNil)
{ System addAsyncSemaphore: self.inreadysem.
sem := Semaphore new. System signal: self.inreadysem onInput: self.handle.
sem signalAction: [ :xsem | System addAsyncSemaphore: self.outdonesem.
(self.eventActions at: Socket.EventType.DATA_IN) value: self.
].
System addAsyncSemaphore: sem.
System signal: sem onInput: self.handle.
}.
} }
} }
@ -445,6 +452,11 @@ extend Socket
{ {
| n | | n |
if (self.outreadysem _group notNil)
{
Exception signal: 'Not allowed to write again'.
}.
## n >= 0: written ## n >= 0: written
## n <= -1: tolerable error (e.g. EAGAIN) ## n <= -1: tolerable error (e.g. EAGAIN)
## exception: fatal error ## exception: fatal error
@ -463,11 +475,31 @@ extend Socket
self.pending_offset := offset. self.pending_offset := offset.
self.pending_length := length. self.pending_length := length.
System addAsyncSemaphore: self.outreadysem.
System signal: self.outreadysem onOutput: self.handle. System signal: self.outreadysem onOutput: self.handle.
} }
} }
class ServerSocket(Socket)
{
method initialize
{
super initialize.
self.inreadysem := [ :sem |
| xxx |
System accept: xxx.
].
}
method listen: backlog
{
System addAsyncSemaphore: self.inreadysem.
System signal: self.inreadysem onInput: self.handle.
^self _listen: backlog.
}
}
class MyObject(Object) class MyObject(Object)
{ {
method(#class) main method(#class) main
@ -479,12 +511,18 @@ class MyObject(Object)
[ [
buf := ByteArray new: 128. buf := ByteArray new: 128.
s := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM. s := Socket domain: Socket.Domain.INET type: Socket.Type.STREAM.
s2 := ServerSocket domain: Socket.Domain.INET type: Socket.Type.STREAM.
s2 onEvent: Socket.EventType.DATA_IN do: [:sck |
### accept...
].
s onEvent: Socket.EventType.CONNECTED do: [ :sck :state | s onEvent: Socket.EventType.CONNECTED do: [ :sck :state |
if (state) if (state)
{ {
'AAAAAAAA' dump. 'AAAAAAAA' dump.
s writeBytes: #[ $a, $b, $c ] s writeBytes: #[ $a, $b, $c ].
s writeBytes: #[ $d, $e, $f ].
} }
else else
{ {
@ -496,7 +534,8 @@ class MyObject(Object)
nbytes := s readBytes: buf. nbytes := s readBytes: buf.
if (nbytes == 0) if (nbytes == 0)
{ {
sck close sck close.
s := nil.
}. }.
('Got ' & (nbytes asString)) dump. ('Got ' & (nbytes asString)) dump.
buf dump. buf dump.
@ -517,7 +556,6 @@ class MyObject(Object)
ensure: ensure:
[ [
if (s notNil) { s close }. if (s notNil) { s close }.
if (s2 notNil) { s2 close }.
] ]
] on: Exception do: [:ex | ('Exception - ' & ex messageText) dump ]. ] on: Exception do: [:ex | ('Exception - ' & ex messageText) dump ].

View File

@ -400,6 +400,7 @@ pdfdir = @pdfdir@
prefix = @prefix@ prefix = @prefix@
program_transform_name = @program_transform_name@ program_transform_name = @program_transform_name@
psdir = @psdir@ psdir = @psdir@
runstatedir = @runstatedir@
sbindir = @sbindir@ sbindir = @sbindir@
sharedstatedir = @sharedstatedir@ sharedstatedir = @sharedstatedir@
srcdir = @srcdir@ srcdir = @srcdir@

View File

@ -2486,7 +2486,7 @@ static moo_pfrc_t pf_semaphore_group_remove_semaphore (moo_t* moo, moo_ooi_t nar
} }
/* it doesn't belong to a group or belongs to a different group */ /* it doesn't belong to a group or belongs to a different group */
moo_seterrbfmt (moo, MOO_EPERM, "not alloed to remove a semaphore from a non-owning group"); moo_seterrbfmt (moo, MOO_EPERM, "not allowed to remove a semaphore from a non-owning group");
return MOO_PF_FAILURE; return MOO_PF_FAILURE;
} }

View File

@ -424,6 +424,7 @@ pdfdir = @pdfdir@
prefix = @prefix@ prefix = @prefix@
program_transform_name = @program_transform_name@ program_transform_name = @program_transform_name@
psdir = @psdir@ psdir = @psdir@
runstatedir = @runstatedir@
sbindir = @sbindir@ sbindir = @sbindir@
sharedstatedir = @sharedstatedir@ sharedstatedir = @sharedstatedir@
srcdir = @srcdir@ srcdir = @srcdir@