filling impleemantion with more code

This commit is contained in:
hyung-hwan 2024-11-18 22:25:59 +09:00
parent 847f71d914
commit fa336bfb07
5 changed files with 335 additions and 207 deletions

View File

@ -71,7 +71,7 @@ type ServerConn struct {
conn *grpc.ClientConn // grpc connection to the server conn *grpc.ClientConn // grpc connection to the server
hdc HoduClient hdc HoduClient
psc Hodu_PacketStreamClient // grpc stream psc *GuardedPacketStreamClient // guarded grpc stream
psc_mtx sync.Mutex psc_mtx sync.Mutex
route_mtx sync.Mutex route_mtx sync.Mutex
@ -103,6 +103,29 @@ type ClientCtlParamServer struct {
PeerAddrs []string `json:"peer-addrs"` PeerAddrs []string `json:"peer-addrs"`
} }
type GuardedPacketStreamClient struct {
mtx sync.Mutex
//psc Hodu_PacketStreamClient
Hodu_PacketStreamClient
}
// ------------------------------------
func (g *GuardedPacketStreamClient) Send(data *Packet) error {
g.mtx.Lock()
defer g.mtx.Unlock()
//return g.psc.Send(data)
return g.Hodu_PacketStreamClient.Send(data)
}
/*func (g *GuardedPacketStreamClient) Recv() (*Packet, error) {
return g.psc.Recv()
}
func (g *GuardedPacketStreamClient) Context() context.Context {
return g.psc.Context()
}*/
// -------------------------------------------------------------------- // --------------------------------------------------------------------
func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) *ClientRoute { func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) *ClientRoute {
var r ClientRoute var r ClientRoute
@ -368,7 +391,8 @@ fmt.Printf ("Connecting GRPC to [%s]\n", cts.saddr.String())
cts.conn = conn cts.conn = conn
cts.hdc = hdc cts.hdc = hdc
cts.psc = psc //cts.psc = &GuardedPacketStreamClient{psc: psc}
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
// the connection structure to a server is ready. // the connection structure to a server is ready.
// let's add routes to the client-side peers. // let's add routes to the client-side peers.

View File

@ -40,7 +40,8 @@ enum PACKET_KIND {
ROUTE_STOPPED = 5; ROUTE_STOPPED = 5;
PEER_STARTED = 6; PEER_STARTED = 6;
PEER_STOPPED = 7; PEER_STOPPED = 7;
PEER_DATA = 8; PEER_EOF = 8;
PEER_DATA = 9;
}; };
message Packet { message Packet {

View File

@ -33,15 +33,8 @@ func MakePeerStoppedPacket(route_id uint32, pts_id uint32) *Packet {
}} }}
} }
func MakePtcStartedPacket(route_id uint32, pts_id uint32) *Packet { func MakePeerEofPacket(route_id uint32, pts_id uint32) *Packet {
// the connection from the client to a peer has been established return &Packet{Kind: PACKET_KIND_PEER_EOF,
return &Packet{Kind: PACKET_KIND_PEER_STARTED,
U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: pts_id}},
}
}
func MakePtcStoppedPacket(route_id uint32, pts_id uint32) *Packet {
return &Packet{Kind: PACKET_KIND_PEER_STOPPED,
U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: pts_id}, U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: pts_id},
}} }}
} }

View File

@ -1,8 +1,10 @@
package main package main
import "errors"
import "fmt" import "fmt"
import "io" import "io"
import "net" import "net"
import "sync"
import "sync/atomic" import "sync/atomic"
import "time" import "time"
@ -12,6 +14,7 @@ type ServerPeerConn struct {
cts *ClientConn cts *ClientConn
conn *net.TCPConn conn *net.TCPConn
stop_req atomic.Bool stop_req atomic.Bool
stop_chan chan bool
client_peer_status_chan chan bool client_peer_status_chan chan bool
client_peer_opened_received atomic.Bool client_peer_opened_received atomic.Bool
client_peer_closed_received atomic.Bool client_peer_closed_received atomic.Bool
@ -24,6 +27,7 @@ func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) (*ServerPeerCo
spc.conn = c spc.conn = c
spc.conn_id = id spc.conn_id = id
spc.stop_req.Store(false) spc.stop_req.Store(false)
spc.stop_chan = make(chan bool, 1)
spc.client_peer_status_chan = make(chan bool, 16) spc.client_peer_status_chan = make(chan bool, 16)
spc.client_peer_opened_received.Store(false) spc.client_peer_opened_received.Store(false)
spc.client_peer_closed_received.Store(false) spc.client_peer_closed_received.Store(false)
@ -31,16 +35,17 @@ func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) (*ServerPeerCo
return &spc return &spc
} }
func (spc *ServerPeerConn) RunTask() error { func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) {
var pss Hodu_PacketStreamServer var pss *GuardedPacketStreamServer
var n int var n int
var buf [4096]byte var buf [4096]byte
var tmr *time.Timer var tmr *time.Timer
var status bool var status bool
var err error = nil var err error = nil
defer wg.Done()
pss = spc.route.cts.pss pss = spc.route.cts.pss
//TODO: this needs to be guarded
err = pss.Send(MakePeerStartedPacket(spc.route.id, spc.conn_id)) err = pss.Send(MakePeerStartedPacket(spc.route.id, spc.conn_id))
if err != nil { if err != nil {
// TODO: include route id and conn id in the error message // TODO: include route id and conn id in the error message
@ -65,24 +70,27 @@ wait_for_started:
tmr.Stop() tmr.Stop()
goto done goto done
/*case <- spc->ctx->Done(): case <- spc.stop_chan:
tmr.Stop() tmr.Stop()
goto done*/ goto done
} }
} }
tmr.Stop() tmr.Stop()
for { for {
fmt.Printf("******************* TRYING TO READ...\n")
n, err = spc.conn.Read(buf[:]) n, err = spc.conn.Read(buf[:])
if err != nil { if err != nil {
if err != io.EOF { if !errors.Is(err, io.EOF) {
fmt.Printf("read error - %s\n", err.Error()) fmt.Printf("read error - %s\n", err.Error())
}
goto done goto done
} }
if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) != nil {
fmt.Printf("unable to report data - %s\n", err.Error())
goto done
}
goto wait_for_stopped
}
// TODO: this needs to be guarded
err = pss.Send(MakePeerDataPacket(spc.route.id, spc.conn_id, buf[:n])) err = pss.Send(MakePeerDataPacket(spc.route.id, spc.conn_id, buf[:n]))
if err != nil { if err != nil {
// TODO: include route id and conn id in the error message // TODO: include route id and conn id in the error message
@ -91,22 +99,33 @@ fmt.Printf("******************* TRYING TO READ...\n")
} }
} }
wait_for_stopped:
//if spc.client_peer_open {
for {
select {
case status = <- spc.client_peer_status_chan: // something not right... may use a different channel for closing...
goto done
case <- spc.stop_chan:
goto done
}
}
//}
done: done:
// TODO: inform the client to close peer connection.. // TODO: inform the client to close peer connection..
fmt.Printf("SPC really ending..................\n") fmt.Printf("SPC really ending..................\n")
spc.ReqStop() spc.ReqStop()
spc.route.RemoveServerPeerConn(spc) spc.route.RemoveServerPeerConn(spc)
//spc.cts.wg.Done() //spc.cts.wg.Done()
return err
} }
func (spc *ServerPeerConn) ReqStop() { func (spc *ServerPeerConn) ReqStop() {
if spc.stop_req.CompareAndSwap(false, true) { if spc.stop_req.CompareAndSwap(false, true) {
var pss Hodu_PacketStreamServer var pss *GuardedPacketStreamServer
var err error var err error
pss = spc.route.cts.pss pss = spc.route.cts.pss
spc.stop_chan <- true
if spc.client_peer_opened_received.CompareAndSwap(false, true) { if spc.client_peer_opened_received.CompareAndSwap(false, true) {
spc.client_peer_status_chan <- false spc.client_peer_status_chan <- false
} }

441
server.go
View File

@ -1,7 +1,5 @@
package main package main
//import "bufio"
//import "bytes"
import "context" import "context"
import "crypto/tls" import "crypto/tls"
import "errors" import "errors"
@ -11,7 +9,6 @@ import "math/rand"
import "net" import "net"
import "os" import "os"
import "os/signal" import "os/signal"
//import "strings"
import "sync" import "sync"
import "sync/atomic" import "sync/atomic"
import "syscall" import "syscall"
@ -31,13 +28,15 @@ type ServerRouteMap = map[uint32]*ServerRoute
type Server struct { type Server struct {
tlscfg *tls.Config tlscfg *tls.Config
wg sync.WaitGroup
stop_req atomic.Bool
l []*net.TCPListener // central listener l []*net.TCPListener // central listener
l_wg sync.WaitGroup l_wg sync.WaitGroup
cts_mtx sync.Mutex cts_mtx sync.Mutex
cts_map ClientConnMap cts_map ClientConnMap
wg sync.WaitGroup cts_wg sync.WaitGroup
stop_req atomic.Bool
gs *grpc.Server gs *grpc.Server
UnimplementedHoduServer UnimplementedHoduServer
@ -48,16 +47,16 @@ type Server struct {
type ClientConn struct { type ClientConn struct {
svr *Server svr *Server
caddr net.Addr // client address that created this structure caddr net.Addr // client address that created this structure
pss Hodu_PacketStreamServer pss *GuardedPacketStreamServer
cw_mtx sync.Mutex cw_mtx sync.Mutex
route_mtx sync.Mutex route_mtx sync.Mutex
routes ServerRouteMap route_map ServerRouteMap
//route_wg sync.WaitGroup route_wg sync.WaitGroup
wg sync.WaitGroup wg sync.WaitGroup
stop_req atomic.Bool stop_req atomic.Bool
greeted bool stop_chan chan bool
} }
type ServerRoute struct { type ServerRoute struct {
@ -71,17 +70,70 @@ type ServerRoute struct {
pts_limit int pts_limit int
pts_last_id uint32 pts_last_id uint32
pts_wg sync.WaitGroup pts_wg sync.WaitGroup
stop_req atomic.Bool
}
type GuardedPacketStreamServer struct {
mtx sync.Mutex
//pss Hodu_PacketStreamServer
Hodu_PacketStreamServer // let's embed it to avoid reimplement Recv() and Context()
} }
// ------------------------------------ // ------------------------------------
func (g *GuardedPacketStreamServer) Send(data *Packet) error {
// while Recv() on a stream is called from the same gorountine all the time,
// Send() is called from multiple places. let's guard it as grpc-go
// doesn't provide concurrency safety in this case.
// https://github.com/grpc/grpc-go/blob/master/Documentation/concurrency.md
g.mtx.Lock()
defer g.mtx.Unlock()
return g.Hodu_PacketStreamServer.Send(data)
}
/*
func (g *GuardedPacketStreamServer) Recv() (*Packet, error) {
return g.pss.Recv()
}
func (g *GuardedPacketStreamServer) Context() context.Context {
return g.pss.Context()
}*/
// ------------------------------------
func NewServerRoute(cts *ClientConn, id uint32, proto ROUTE_PROTO) (*ServerRoute, error) {
var r ServerRoute
var l *net.TCPListener
var laddr *net.TCPAddr
var err error
l, laddr, err = cts.make_route_listener(proto);
if err != nil {
return nil, err
}
r.cts = cts
r.id = id
r.l = l
r.laddr = laddr
r.pts_limit = PTS_LIMIT
r.pts_map = make(ServerPeerConnMap)
r.pts_last_id = 0
r.stop_req.Store(false)
return &r, nil;
}
func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, error) { func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, error) {
var pts *ServerPeerConn var pts *ServerPeerConn
var ok bool var ok bool
var start_id uint32 var start_id uint32
fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n")
r.pts_mtx.Lock() r.pts_mtx.Lock()
defer r.pts_mtx.Unlock() defer r.pts_mtx.Unlock()
fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\n")
if len(r.pts_map) >= r.pts_limit { if len(r.pts_map) >= r.pts_limit {
return nil, fmt.Errorf("peer-to-server connection table full") return nil, fmt.Errorf("peer-to-server connection table full")
@ -100,6 +152,7 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err
} }
} }
fmt.Printf ("Creaing new Server Peer Conn\n")
pts = NewServerPeerConn(r, c, r.pts_last_id) pts = NewServerPeerConn(r, c, r.pts_last_id)
r.pts_map[pts.conn_id] = pts r.pts_map[pts.conn_id] = pts
r.pts_last_id++ r.pts_last_id++
@ -113,18 +166,17 @@ func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) {
r.pts_mtx.Unlock() r.pts_mtx.Unlock()
} }
// ------------------------------------ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
func (r *ServerRoute) RunTask() {
var err error var err error
var conn *net.TCPConn var conn *net.TCPConn
var pts *ServerPeerConn var pts *ServerPeerConn
defer wg.Done()
for { for {
fmt.Printf ("**** Ready to Acept server side peer connection\n")
conn, err = r.l.AcceptTCP() conn, err = r.l.AcceptTCP()
if err != nil { if err != nil {
// TODO: logging
//if strings.Contains(err.Error(), "use of closed network connection") {
//if err == net.ErrClosed {
if errors.Is(err, net.ErrClosed) { if errors.Is(err, net.ErrClosed) {
fmt.Printf("[%s,%d] END OF TASK...[%#v] [%#v]\n", r.cts.caddr.String(), r.id, err, net.ErrClosed) fmt.Printf("[%s,%d] END OF TASK...[%#v] [%#v]\n", r.cts.caddr.String(), r.id, err, net.ErrClosed)
} else { } else {
@ -133,30 +185,39 @@ func (r *ServerRoute) RunTask() {
break break
} }
fmt.Printf ("**** Adding server peer connection server side peer connection\n")
pts, err = r.AddNewServerPeerConn(conn) pts, err = r.AddNewServerPeerConn(conn)
if err != nil { if err != nil {
// TODO: logging // TODO: logging
fmt.Printf("YYYYYYYY - %s\n", err.Error()) fmt.Printf("YYYYYYYY - %s\n", err.Error())
conn.Close() conn.Close()
} else { } else {
fmt.Printf("STARTED NEW SERVER PEER STAK\n") fmt.Printf("STARTINGNEW SERVER PEER TASK\n")
r.pts_wg.Add(1) r.pts_wg.Add(1)
go pts.RunTask() go pts.RunTask(&r.pts_wg)
} }
} }
r.l.Close() // don't care about double close. it could have been closed in StopTask
r.l.Close() // don't care about double close. it could have been closed in ReqStop
fmt.Printf ("*** wariting for all pts to finish..\n")
r.pts_wg.Wait() r.pts_wg.Wait()
fmt.Printf ("*** waited for all pts to finish..\n")
// cts.l_wg.Done()
// TODO:inform that the job is done?
} }
func (r *ServerRoute) StopTask() { func (r *ServerRoute) ReqStop() {
fmt.Printf ("stoppping taak..\n") fmt.Printf ("requesting to stop route taak..\n")
// TODO: all pts stop...
if r.stop_req.CompareAndSwap(false, true) {
var pts *ServerPeerConn
for _, pts = range r.pts_map {
pts.ReqStop()
}
r.l.Close(); r.l.Close();
// TODO: wait?? }
fmt.Printf ("requiested to stopp route taak..\n")
} }
func (r *ServerRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error { func (r *ServerRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
@ -166,6 +227,7 @@ func (r *ServerRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_
r.pts_mtx.Lock() r.pts_mtx.Lock()
spc, ok = r.pts_map[pts_id] spc, ok = r.pts_map[pts_id]
if !ok { if !ok {
r.pts_mtx.Unlock();
return fmt.Errorf("non-existent peer id - %u", pts_id) return fmt.Errorf("non-existent peer id - %u", pts_id)
} }
r.pts_mtx.Unlock(); r.pts_mtx.Unlock();
@ -214,34 +276,12 @@ func (cts *ClientConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener,
return nil, nil, err return nil, nil, err
} }
func NewServerRoute(cts *ClientConn, id uint32, proto ROUTE_PROTO) (*ServerRoute, error) {
var r ServerRoute
var l *net.TCPListener
var laddr *net.TCPAddr
var err error
l, laddr, err = cts.make_route_listener(proto);
if err != nil {
return nil, err
}
r.cts = cts
r.id = id
r.l = l
r.laddr = laddr
r.pts_limit = PTS_LIMIT
r.pts_map = make(ServerPeerConnMap)
r.pts_last_id = 0
return &r, nil;
}
func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*ServerRoute, error) {
var r *ServerRoute var r *ServerRoute
var err error var err error
cts.route_mtx.Lock() cts.route_mtx.Lock()
if cts.routes[route_id] != nil { if cts.route_map[route_id] != nil {
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
return nil, fmt.Errorf ("existent route id - %d", route_id) return nil, fmt.Errorf ("existent route id - %d", route_id)
} }
@ -250,10 +290,11 @@ func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*S
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
return nil, err return nil, err
} }
cts.routes[route_id] = r; cts.route_map[route_id] = r;
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
go r.RunTask() cts.route_wg.Add(1)
go r.RunTask(&cts.route_wg)
return r, nil return r, nil
} }
@ -262,15 +303,15 @@ func (cts *ClientConn) RemoveServerRoute (route_id uint32) error {
var ok bool var ok bool
cts.route_mtx.Lock() cts.route_mtx.Lock()
r, ok = cts.routes[route_id] r, ok = cts.route_map[route_id]
if (!ok) { if (!ok) {
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
return fmt.Errorf ("non-existent route id - %d", route_id) return fmt.Errorf ("non-existent route id - %d", route_id)
} }
delete(cts.routes, route_id) delete(cts.route_map, route_id)
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
r.StopTask() // TODO: make this unblocking or blocking? r.ReqStop() // TODO: make this unblocking or blocking?
return nil; return nil;
} }
@ -279,7 +320,7 @@ func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type P
var ok bool var ok bool
cts.route_mtx.Lock() cts.route_mtx.Lock()
r, ok = cts.routes[route_id] r, ok = cts.route_map[route_id]
if (!ok) { if (!ok) {
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
return fmt.Errorf ("non-existent route id - %d", route_id) return fmt.Errorf ("non-existent route id - %d", route_id)
@ -289,14 +330,166 @@ func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type P
return r.ReportEvent(pts_id, event_type, event_data) return r.ReportEvent(pts_id, event_type, event_data)
} }
func (cts *ClientConn) receive_from_stream () {
var pkt *Packet
var err error
//for {
pkt, err = cts.pss.Recv()
if errors.Is(err, io.EOF) {
// return will close stream from server side
// TODO: clean up route_map and server-side peers releated to the client connection 'cts'
fmt.Printf ("grpd stream ended\n")
goto done
}
if err != nil {
//log.Printf("receive error %v", err)
fmt.Printf ("grpc stream error - %s\n", err.Error())
goto done
}
switch pkt.Kind {
case PACKET_KIND_ROUTE_START:
var x *Packet_Route
//var t *ServerRoute
var ok bool
x, ok = pkt.U.(*Packet_Route)
if ok {
var r* ServerRoute
fmt.Printf ("ADDED SERVER ROUTE FOR CLEINT PEER %s\n", x.Route.AddrStr)
r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto)
if err != nil {
// TODO: Send Error Response...
} else {
err = cts.pss.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.laddr.String()))
if err != nil {
// TODO:
}
}
} else {
// TODO: send invalid request... or simply keep quiet?
}
case PACKET_KIND_ROUTE_STOP:
var x *Packet_Route
var ok bool
x, ok = pkt.U.(*Packet_Route)
if ok {
err = cts.RemoveServerRoute(x.Route.RouteId); // TODO: this must be unblocking. otherwide, other route_map will get blocked...
if err != nil {
// TODO: Send Error Response...
} else {
err = cts.pss.Send(MakeRouteStoppedPacket(x.Route.RouteId, x.Route.Proto))
if err != nil {
// TODO:
}
}
} else {
// TODO: send invalid request... or simply keep quiet?
}
case PACKET_KIND_PEER_STARTED:
// the connection from the client to a peer has been established
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil)
if err != nil {
// TODO:
fmt.Printf ("Failed to report PEER_STARTED Event")
} else {
// TODO:
}
} else {
// TODO
}
case PACKET_KIND_PEER_STOPPED:
// the connection from the client to a peer has been established
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil)
if err != nil {
// TODO:
fmt.Printf ("Failed to report PEER_STOPPED Event")
} else {
// TODO:
}
} else {
// TODO
}
case PACKET_KIND_PEER_DATA:
// the connection from the client to a peer has been established
var x *Packet_Data
var ok bool
x, ok = pkt.U.(*Packet_Data)
if ok {
err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data)
if err != nil {
// TODO:
fmt.Printf ("Failed to report PEER_DATA Event")
} else {
// TODO:
}
} else {
// TODO
}
}
//}
done:
fmt.Printf ("************ stream receiver finished....\n")
}
func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
var strm *GuardedPacketStreamServer
var ctx context.Context
defer wg.Done()
strm = cts.pss
ctx = strm.Context()
//go cts.receive_from_stream()
for {
// exit if context is done
// or continue
select {
case <-ctx.Done(): // the stream context is done
fmt.Printf("grpd server done - %s\n", ctx.Err().Error())
goto done
case <- cts.stop_chan:
goto done
default:
// no other case is ready.
// without the default case, the select construct would block
}
cts.receive_from_stream()
}
done:
fmt.Printf ("^^^^^^^^^^^^^^^^^ waiting for reoute_wg...\n")
cts.route_wg.Wait()
fmt.Printf ("^^^^^^^^^^^^^^^^^ waited for reoute_wg...\n")
}
func (cts *ClientConn) ReqStop() { func (cts *ClientConn) ReqStop() {
if cts.stop_req.CompareAndSwap(false, true) { if cts.stop_req.CompareAndSwap(false, true) {
var r *ServerRoute var r *ServerRoute
for _, r = range cts.routes { for _, r = range cts.route_map {
r.StopTask() r.ReqStop()
} }
cts.stop_chan <- true
//cts.c.Close() // close the accepted connection from the client //cts.c.Close() // close the accepted connection from the client
} }
} }
@ -339,7 +532,6 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error {
var ctx context.Context var ctx context.Context
var p *peer.Peer var p *peer.Peer
var ok bool var ok bool
var pkt *Packet
var err error var err error
var cts *ClientConn var cts *ClientConn
@ -354,116 +546,11 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error {
return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error()) return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error())
} }
for { // Don't detached the cts task as a go-routine as this function
// exit if context is done // is invoked as a go-routine by the grpc server.
// or continue s.cts_wg.Add(1)
select { cts.RunTask(&s.cts_wg)
case <-ctx.Done():
return ctx.Err()
default:
// no other case is ready.
// without the default case, the select construct would block
}
pkt, err = strm.Recv()
if errors.Is(err, io.EOF) {
// return will close stream from server side
return nil return nil
}
if err != nil {
//log.Printf("receive error %v", err)
continue
}
switch pkt.Kind {
case PACKET_KIND_ROUTE_START:
var x *Packet_Route
//var t *ServerRoute
var ok bool
x, ok = pkt.U.(*Packet_Route)
if ok {
var r* ServerRoute
fmt.Printf ("ADDED SERVER ROUTE FOR CLEINT PEER %s\n", x.Route.AddrStr)
r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto)
if err != nil {
// TODO: Send Error Response...
} else {
err = strm.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.laddr.String()))
if err != nil {
// TODO:
}
}
} else {
// TODO: send invalid request... or simply keep quiet?
}
case PACKET_KIND_ROUTE_STOP:
var x *Packet_Route
var ok bool
x, ok = pkt.U.(*Packet_Route)
if ok {
err = cts.RemoveServerRoute(x.Route.RouteId); // TODO: this must be unblocking. otherwide, other routes will get blocked...
if err != nil {
// TODO: Send Error Response...
} else {
err = strm.Send(MakeRouteStoppedPacket(x.Route.RouteId, x.Route.Proto))
if err != nil {
// TODO:
}
}
} else {
// TODO: send invalid request... or simply keep quiet?
}
case PACKET_KIND_PEER_STARTED:
// the connection from the client to a peer has been established
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil)
if err != nil {
// TODO:
} else {
// TODO:
}
} else {
// TODO
}
case PACKET_KIND_PEER_STOPPED:
// the connection from the client to a peer has been established
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil)
if err != nil {
// TODO:
} else {
// TODO:
}
} else {
// TODO
}
case PACKET_KIND_PEER_DATA:
// the connection from the client to a peer has been established
var x *Packet_Data
var ok bool
x, ok = pkt.U.(*Packet_Data)
if ok {
err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data)
if err != nil {
// TODO:
} else {
// TODO:
}
} else {
// TODO
}
}
}
} }
// ------------------------------------ // ------------------------------------
@ -628,10 +715,12 @@ oops:
return nil, err return nil, err
} }
func (s *Server) run_grpc_server(idx int) error { func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error {
var l *net.TCPListener var l *net.TCPListener
var err error var err error
defer wg.Done();
l = s.l[idx] l = s.l[idx]
fmt.Printf ("serving grpc on %d listener\n", idx) fmt.Printf ("serving grpc on %d listener\n", idx)
// it seems to be safe to call a single grpc server on differnt listening sockets multiple times // it seems to be safe to call a single grpc server on differnt listening sockets multiple times
@ -641,21 +730,21 @@ func (s *Server) run_grpc_server(idx int) error {
fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXXxx %s\n", err.Error()); fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXXxx %s\n", err.Error());
} }
s.l_wg.Done();
return nil return nil
} }
func (s *Server) MainLoop() { func (s *Server) RunTask(wg *sync.WaitGroup) {
var idx int var idx int
defer s.wg.Done() defer wg.Done()
for idx, _ = range s.l { for idx, _ = range s.l {
s.l_wg.Add(1) s.l_wg.Add(1)
go s.run_grpc_server(idx) go s.run_grpc_server(idx, &s.l_wg)
} }
s.l_wg.Wait(); s.l_wg.Wait()
s.cts_wg.Wait()
s.ReqStop() s.ReqStop()
syscall.Kill(syscall.Getpid(), syscall.SIGTERM) syscall.Kill(syscall.Getpid(), syscall.SIGTERM)
} }
@ -684,12 +773,13 @@ func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (*
var ok bool var ok bool
cts.svr = s cts.svr = s
cts.routes = make(ServerRouteMap) cts.route_map = make(ServerRouteMap)
cts.caddr = addr cts.caddr = addr
cts.pss = pss //cts.pss = &GuardedPacketStreamServer{pss: pss}
cts.pss = &GuardedPacketStreamServer{Hodu_PacketStreamServer: pss}
cts.stop_req.Store(false) cts.stop_req.Store(false)
cts.greeted = false cts.stop_chan = make(chan bool, 1)
s.cts_mtx.Lock() s.cts_mtx.Lock()
defer s.cts_mtx.Unlock() defer s.cts_mtx.Unlock()
@ -785,8 +875,9 @@ func server_main(laddrs []string) error {
s.wg.Add(1) s.wg.Add(1)
go s.handle_os_signals() go s.handle_os_signals()
s.wg.Add(1) s.wg.Add(1)
go s.MainLoop() // this is blocking. ReqStop() will be called from a signal handler go s.RunTask(&s.wg) // this is blocking. ReqStop() will be called from a signal handler
s.wg.Wait() s.wg.Wait()
return nil return nil