diff --git a/client.go b/client.go index f1e3300..d5bd391 100644 --- a/client.go +++ b/client.go @@ -65,14 +65,14 @@ type ClientPeerConn struct { // client connection to server type ServerConn struct { - cli *Client - cfg *ClientConfig - saddr *net.TCPAddr // server address that is connected to + cli *Client + cfg *ClientConfig + saddr *net.TCPAddr // server address that is connected to - conn *grpc.ClientConn // grpc connection to the server - hdc HoduClient - psc Hodu_PacketStreamClient // grpc stream - psc_mtx sync.Mutex + conn *grpc.ClientConn // grpc connection to the server + hdc HoduClient + psc *GuardedPacketStreamClient // guarded grpc stream + psc_mtx sync.Mutex route_mtx sync.Mutex route_map ClientRouteMap @@ -103,6 +103,29 @@ type ClientCtlParamServer struct { PeerAddrs []string `json:"peer-addrs"` } +type GuardedPacketStreamClient struct { + mtx sync.Mutex + //psc Hodu_PacketStreamClient + Hodu_PacketStreamClient +} + +// ------------------------------------ + +func (g *GuardedPacketStreamClient) Send(data *Packet) error { + g.mtx.Lock() + defer g.mtx.Unlock() + //return g.psc.Send(data) + return g.Hodu_PacketStreamClient.Send(data) +} + +/*func (g *GuardedPacketStreamClient) Recv() (*Packet, error) { + return g.psc.Recv() +} + +func (g *GuardedPacketStreamClient) Context() context.Context { + return g.psc.Context() +}*/ + // -------------------------------------------------------------------- func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) *ClientRoute { var r ClientRoute @@ -368,7 +391,8 @@ fmt.Printf ("Connecting GRPC to [%s]\n", cts.saddr.String()) cts.conn = conn cts.hdc = hdc - cts.psc = psc + //cts.psc = &GuardedPacketStreamClient{psc: psc} + cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc} // the connection structure to a server is ready. // let's add routes to the client-side peers. diff --git a/hodu.proto b/hodu.proto index effb602..ada4976 100644 --- a/hodu.proto +++ b/hodu.proto @@ -40,7 +40,8 @@ enum PACKET_KIND { ROUTE_STOPPED = 5; PEER_STARTED = 6; PEER_STOPPED = 7; - PEER_DATA = 8; + PEER_EOF = 8; + PEER_DATA = 9; }; message Packet { diff --git a/packet.go b/packet.go index 6fe27c6..2dec5b1 100644 --- a/packet.go +++ b/packet.go @@ -33,15 +33,8 @@ func MakePeerStoppedPacket(route_id uint32, pts_id uint32) *Packet { }} } -func MakePtcStartedPacket(route_id uint32, pts_id uint32) *Packet { - // the connection from the client to a peer has been established - return &Packet{Kind: PACKET_KIND_PEER_STARTED, - U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: pts_id}}, - } -} - -func MakePtcStoppedPacket(route_id uint32, pts_id uint32) *Packet { - return &Packet{Kind: PACKET_KIND_PEER_STOPPED, +func MakePeerEofPacket(route_id uint32, pts_id uint32) *Packet { + return &Packet{Kind: PACKET_KIND_PEER_EOF, U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: pts_id}, }} } diff --git a/s-peer.go b/s-peer.go index 8fa94f6..4144875 100644 --- a/s-peer.go +++ b/s-peer.go @@ -1,8 +1,10 @@ package main +import "errors" import "fmt" import "io" import "net" +import "sync" import "sync/atomic" import "time" @@ -12,6 +14,7 @@ type ServerPeerConn struct { cts *ClientConn conn *net.TCPConn stop_req atomic.Bool + stop_chan chan bool client_peer_status_chan chan bool client_peer_opened_received atomic.Bool client_peer_closed_received atomic.Bool @@ -24,6 +27,7 @@ func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) (*ServerPeerCo spc.conn = c spc.conn_id = id spc.stop_req.Store(false) + spc.stop_chan = make(chan bool, 1) spc.client_peer_status_chan = make(chan bool, 16) spc.client_peer_opened_received.Store(false) spc.client_peer_closed_received.Store(false) @@ -31,16 +35,17 @@ func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) (*ServerPeerCo return &spc } -func (spc *ServerPeerConn) RunTask() error { - var pss Hodu_PacketStreamServer +func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) { + var pss *GuardedPacketStreamServer var n int var buf [4096]byte var tmr *time.Timer var status bool var err error = nil + defer wg.Done() + pss = spc.route.cts.pss -//TODO: this needs to be guarded err = pss.Send(MakePeerStartedPacket(spc.route.id, spc.conn_id)) if err != nil { // TODO: include route id and conn id in the error message @@ -65,24 +70,27 @@ wait_for_started: tmr.Stop() goto done - /*case <- spc->ctx->Done(): + case <- spc.stop_chan: tmr.Stop() - goto done*/ + goto done } } tmr.Stop() for { -fmt.Printf("******************* TRYING TO READ...\n") n, err = spc.conn.Read(buf[:]) if err != nil { - if err != io.EOF { + if !errors.Is(err, io.EOF) { fmt.Printf("read error - %s\n", err.Error()) + goto done } - goto done + if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) != nil { + fmt.Printf("unable to report data - %s\n", err.Error()) + goto done + } + goto wait_for_stopped } -// TODO: this needs to be guarded err = pss.Send(MakePeerDataPacket(spc.route.id, spc.conn_id, buf[:n])) if err != nil { // TODO: include route id and conn id in the error message @@ -91,22 +99,33 @@ fmt.Printf("******************* TRYING TO READ...\n") } } +wait_for_stopped: + //if spc.client_peer_open { + for { + select { + case status = <- spc.client_peer_status_chan: // something not right... may use a different channel for closing... + goto done + case <- spc.stop_chan: + goto done + } + } + //} + done: // TODO: inform the client to close peer connection.. fmt.Printf("SPC really ending..................\n") spc.ReqStop() spc.route.RemoveServerPeerConn(spc) //spc.cts.wg.Done() - return err } func (spc *ServerPeerConn) ReqStop() { if spc.stop_req.CompareAndSwap(false, true) { - var pss Hodu_PacketStreamServer + var pss *GuardedPacketStreamServer var err error pss = spc.route.cts.pss - + spc.stop_chan <- true if spc.client_peer_opened_received.CompareAndSwap(false, true) { spc.client_peer_status_chan <- false } diff --git a/server.go b/server.go index e23d516..226c52d 100644 --- a/server.go +++ b/server.go @@ -1,7 +1,5 @@ package main -//import "bufio" -//import "bytes" import "context" import "crypto/tls" import "errors" @@ -11,7 +9,6 @@ import "math/rand" import "net" import "os" import "os/signal" -//import "strings" import "sync" import "sync/atomic" import "syscall" @@ -31,13 +28,15 @@ type ServerRouteMap = map[uint32]*ServerRoute type Server struct { tlscfg *tls.Config + wg sync.WaitGroup + stop_req atomic.Bool + l []*net.TCPListener // central listener l_wg sync.WaitGroup cts_mtx sync.Mutex cts_map ClientConnMap - wg sync.WaitGroup - stop_req atomic.Bool + cts_wg sync.WaitGroup gs *grpc.Server UnimplementedHoduServer @@ -48,16 +47,16 @@ type Server struct { type ClientConn struct { svr *Server caddr net.Addr // client address that created this structure - pss Hodu_PacketStreamServer + pss *GuardedPacketStreamServer cw_mtx sync.Mutex route_mtx sync.Mutex - routes ServerRouteMap - //route_wg sync.WaitGroup + route_map ServerRouteMap + route_wg sync.WaitGroup wg sync.WaitGroup stop_req atomic.Bool - greeted bool + stop_chan chan bool } type ServerRoute struct { @@ -71,17 +70,70 @@ type ServerRoute struct { pts_limit int pts_last_id uint32 pts_wg sync.WaitGroup + stop_req atomic.Bool +} + +type GuardedPacketStreamServer struct { + mtx sync.Mutex + //pss Hodu_PacketStreamServer + Hodu_PacketStreamServer // let's embed it to avoid reimplement Recv() and Context() } // ------------------------------------ +func (g *GuardedPacketStreamServer) Send(data *Packet) error { + // while Recv() on a stream is called from the same gorountine all the time, + // Send() is called from multiple places. let's guard it as grpc-go + // doesn't provide concurrency safety in this case. + // https://github.com/grpc/grpc-go/blob/master/Documentation/concurrency.md + g.mtx.Lock() + defer g.mtx.Unlock() + return g.Hodu_PacketStreamServer.Send(data) +} + +/* +func (g *GuardedPacketStreamServer) Recv() (*Packet, error) { + return g.pss.Recv() +} + +func (g *GuardedPacketStreamServer) Context() context.Context { + return g.pss.Context() +}*/ + +// ------------------------------------ + +func NewServerRoute(cts *ClientConn, id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { + var r ServerRoute + var l *net.TCPListener + var laddr *net.TCPAddr + var err error + + l, laddr, err = cts.make_route_listener(proto); + if err != nil { + return nil, err + } + + r.cts = cts + r.id = id + r.l = l + r.laddr = laddr + r.pts_limit = PTS_LIMIT + r.pts_map = make(ServerPeerConnMap) + r.pts_last_id = 0 + r.stop_req.Store(false) + + return &r, nil; +} + func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, error) { var pts *ServerPeerConn var ok bool var start_id uint32 +fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX\n") r.pts_mtx.Lock() defer r.pts_mtx.Unlock() +fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB\n") if len(r.pts_map) >= r.pts_limit { return nil, fmt.Errorf("peer-to-server connection table full") @@ -100,6 +152,7 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err } } +fmt.Printf ("Creaing new Server Peer Conn\n") pts = NewServerPeerConn(r, c, r.pts_last_id) r.pts_map[pts.conn_id] = pts r.pts_last_id++ @@ -113,18 +166,17 @@ func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) { r.pts_mtx.Unlock() } -// ------------------------------------ -func (r *ServerRoute) RunTask() { +func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { var err error var conn *net.TCPConn var pts *ServerPeerConn + defer wg.Done() + for { +fmt.Printf ("**** Ready to Acept server side peer connection\n") conn, err = r.l.AcceptTCP() if err != nil { - // TODO: logging - //if strings.Contains(err.Error(), "use of closed network connection") { - //if err == net.ErrClosed { if errors.Is(err, net.ErrClosed) { fmt.Printf("[%s,%d] END OF TASK...[%#v] [%#v]\n", r.cts.caddr.String(), r.id, err, net.ErrClosed) } else { @@ -133,30 +185,39 @@ func (r *ServerRoute) RunTask() { break } +fmt.Printf ("**** Adding server peer connection server side peer connection\n") pts, err = r.AddNewServerPeerConn(conn) if err != nil { // TODO: logging fmt.Printf("YYYYYYYY - %s\n", err.Error()) conn.Close() } else { - fmt.Printf("STARTED NEW SERVER PEER STAK\n") +fmt.Printf("STARTINGNEW SERVER PEER TASK\n") r.pts_wg.Add(1) - go pts.RunTask() + go pts.RunTask(&r.pts_wg) } } - r.l.Close() // don't care about double close. it could have been closed in StopTask - r.pts_wg.Wait() -// cts.l_wg.Done() -// TODO:inform that the job is done? + r.l.Close() // don't care about double close. it could have been closed in ReqStop +fmt.Printf ("*** wariting for all pts to finish..\n") + r.pts_wg.Wait() +fmt.Printf ("*** waited for all pts to finish..\n") } -func (r *ServerRoute) StopTask() { -fmt.Printf ("stoppping taak..\n") - // TODO: all pts stop... - r.l.Close(); -// TODO: wait?? +func (r *ServerRoute) ReqStop() { + fmt.Printf ("requesting to stop route taak..\n") + + if r.stop_req.CompareAndSwap(false, true) { + var pts *ServerPeerConn + + for _, pts = range r.pts_map { + pts.ReqStop() + } + + r.l.Close(); + } + fmt.Printf ("requiested to stopp route taak..\n") } func (r *ServerRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error { @@ -166,6 +227,7 @@ func (r *ServerRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_ r.pts_mtx.Lock() spc, ok = r.pts_map[pts_id] if !ok { + r.pts_mtx.Unlock(); return fmt.Errorf("non-existent peer id - %u", pts_id) } r.pts_mtx.Unlock(); @@ -214,34 +276,12 @@ func (cts *ClientConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, return nil, nil, err } -func NewServerRoute(cts *ClientConn, id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { - var r ServerRoute - var l *net.TCPListener - var laddr *net.TCPAddr - var err error - - l, laddr, err = cts.make_route_listener(proto); - if err != nil { - return nil, err - } - - r.cts = cts - r.id = id - r.l = l - r.laddr = laddr - r.pts_limit = PTS_LIMIT - r.pts_map = make(ServerPeerConnMap) - r.pts_last_id = 0 - - return &r, nil; -} - func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*ServerRoute, error) { var r *ServerRoute var err error cts.route_mtx.Lock() - if cts.routes[route_id] != nil { + if cts.route_map[route_id] != nil { cts.route_mtx.Unlock() return nil, fmt.Errorf ("existent route id - %d", route_id) } @@ -250,10 +290,11 @@ func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*S cts.route_mtx.Unlock() return nil, err } - cts.routes[route_id] = r; + cts.route_map[route_id] = r; cts.route_mtx.Unlock() - go r.RunTask() + cts.route_wg.Add(1) + go r.RunTask(&cts.route_wg) return r, nil } @@ -262,15 +303,15 @@ func (cts *ClientConn) RemoveServerRoute (route_id uint32) error { var ok bool cts.route_mtx.Lock() - r, ok = cts.routes[route_id] + r, ok = cts.route_map[route_id] if (!ok) { cts.route_mtx.Unlock() return fmt.Errorf ("non-existent route id - %d", route_id) } - delete(cts.routes, route_id) + delete(cts.route_map, route_id) cts.route_mtx.Unlock() - r.StopTask() // TODO: make this unblocking or blocking? + r.ReqStop() // TODO: make this unblocking or blocking? return nil; } @@ -279,7 +320,7 @@ func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type P var ok bool cts.route_mtx.Lock() - r, ok = cts.routes[route_id] + r, ok = cts.route_map[route_id] if (!ok) { cts.route_mtx.Unlock() return fmt.Errorf ("non-existent route id - %d", route_id) @@ -289,14 +330,166 @@ func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type P return r.ReportEvent(pts_id, event_type, event_data) } +func (cts *ClientConn) receive_from_stream () { + var pkt *Packet + var err error + + //for { + pkt, err = cts.pss.Recv() + if errors.Is(err, io.EOF) { + // return will close stream from server side +// TODO: clean up route_map and server-side peers releated to the client connection 'cts' +fmt.Printf ("grpd stream ended\n") + goto done + } + if err != nil { + //log.Printf("receive error %v", err) + fmt.Printf ("grpc stream error - %s\n", err.Error()) + goto done + } + + switch pkt.Kind { + case PACKET_KIND_ROUTE_START: + var x *Packet_Route + //var t *ServerRoute + var ok bool + x, ok = pkt.U.(*Packet_Route) + if ok { + var r* ServerRoute + fmt.Printf ("ADDED SERVER ROUTE FOR CLEINT PEER %s\n", x.Route.AddrStr) + r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto) + if err != nil { + // TODO: Send Error Response... + } else { + err = cts.pss.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.laddr.String())) + if err != nil { + // TODO: + } + } + } else { + // TODO: send invalid request... or simply keep quiet? + } + + case PACKET_KIND_ROUTE_STOP: + var x *Packet_Route + var ok bool + x, ok = pkt.U.(*Packet_Route) + if ok { + err = cts.RemoveServerRoute(x.Route.RouteId); // TODO: this must be unblocking. otherwide, other route_map will get blocked... + if err != nil { + // TODO: Send Error Response... + } else { + err = cts.pss.Send(MakeRouteStoppedPacket(x.Route.RouteId, x.Route.Proto)) + if err != nil { + // TODO: + } + } + } else { + // TODO: send invalid request... or simply keep quiet? + } + + case PACKET_KIND_PEER_STARTED: + // the connection from the client to a peer has been established + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil) + if err != nil { + // TODO: + fmt.Printf ("Failed to report PEER_STARTED Event") + } else { + // TODO: + } + } else { + // TODO + } + + case PACKET_KIND_PEER_STOPPED: + // the connection from the client to a peer has been established + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil) + if err != nil { + // TODO: + fmt.Printf ("Failed to report PEER_STOPPED Event") + } else { + // TODO: + } + } else { + // TODO + } + + case PACKET_KIND_PEER_DATA: + // the connection from the client to a peer has been established + var x *Packet_Data + var ok bool + x, ok = pkt.U.(*Packet_Data) + if ok { + err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data) + if err != nil { + // TODO: + fmt.Printf ("Failed to report PEER_DATA Event") + } else { + // TODO: + } + } else { + // TODO + } + } + //} + +done: + fmt.Printf ("************ stream receiver finished....\n") +} + +func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { + var strm *GuardedPacketStreamServer + var ctx context.Context + + defer wg.Done() + + + strm = cts.pss + ctx = strm.Context() + + //go cts.receive_from_stream() + + for { + // exit if context is done + // or continue + select { + case <-ctx.Done(): // the stream context is done +fmt.Printf("grpd server done - %s\n", ctx.Err().Error()) + goto done + + case <- cts.stop_chan: + goto done + + default: + // no other case is ready. + // without the default case, the select construct would block + } + cts.receive_from_stream() + } + +done: +fmt.Printf ("^^^^^^^^^^^^^^^^^ waiting for reoute_wg...\n") + cts.route_wg.Wait() +fmt.Printf ("^^^^^^^^^^^^^^^^^ waited for reoute_wg...\n") +} + func (cts *ClientConn) ReqStop() { if cts.stop_req.CompareAndSwap(false, true) { var r *ServerRoute - for _, r = range cts.routes { - r.StopTask() + for _, r = range cts.route_map { + r.ReqStop() } + cts.stop_chan <- true //cts.c.Close() // close the accepted connection from the client } } @@ -339,7 +532,6 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { var ctx context.Context var p *peer.Peer var ok bool - var pkt *Packet var err error var cts *ClientConn @@ -354,116 +546,11 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error()) } - for { - // exit if context is done - // or continue - select { - case <-ctx.Done(): - return ctx.Err() - default: - // no other case is ready. - // without the default case, the select construct would block - } - - pkt, err = strm.Recv() - if errors.Is(err, io.EOF) { - // return will close stream from server side - return nil - } - if err != nil { - //log.Printf("receive error %v", err) - continue - } - - switch pkt.Kind { - case PACKET_KIND_ROUTE_START: - var x *Packet_Route - //var t *ServerRoute - var ok bool - x, ok = pkt.U.(*Packet_Route) - if ok { - var r* ServerRoute - fmt.Printf ("ADDED SERVER ROUTE FOR CLEINT PEER %s\n", x.Route.AddrStr) - r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto) - if err != nil { - // TODO: Send Error Response... - } else { - err = strm.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.laddr.String())) - if err != nil { - // TODO: - } - } - } else { - // TODO: send invalid request... or simply keep quiet? - } - - case PACKET_KIND_ROUTE_STOP: - var x *Packet_Route - var ok bool - x, ok = pkt.U.(*Packet_Route) - if ok { - err = cts.RemoveServerRoute(x.Route.RouteId); // TODO: this must be unblocking. otherwide, other routes will get blocked... - if err != nil { - // TODO: Send Error Response... - } else { - err = strm.Send(MakeRouteStoppedPacket(x.Route.RouteId, x.Route.Proto)) - if err != nil { - // TODO: - } - } - } else { - // TODO: send invalid request... or simply keep quiet? - } - - case PACKET_KIND_PEER_STARTED: - // the connection from the client to a peer has been established - var x *Packet_Peer - var ok bool - x, ok = pkt.U.(*Packet_Peer) - if ok { - err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil) - if err != nil { - // TODO: - } else { - // TODO: - } - } else { - // TODO - } - - case PACKET_KIND_PEER_STOPPED: - // the connection from the client to a peer has been established - var x *Packet_Peer - var ok bool - x, ok = pkt.U.(*Packet_Peer) - if ok { - err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil) - if err != nil { - // TODO: - } else { - // TODO: - } - } else { - // TODO - } - - case PACKET_KIND_PEER_DATA: - // the connection from the client to a peer has been established - var x *Packet_Data - var ok bool - x, ok = pkt.U.(*Packet_Data) - if ok { - err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data) - if err != nil { - // TODO: - } else { - // TODO: - } - } else { - // TODO - } - } - } + // Don't detached the cts task as a go-routine as this function + // is invoked as a go-routine by the grpc server. + s.cts_wg.Add(1) + cts.RunTask(&s.cts_wg) + return nil } // ------------------------------------ @@ -628,10 +715,12 @@ oops: return nil, err } -func (s *Server) run_grpc_server(idx int) error { +func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error { var l *net.TCPListener var err error + defer wg.Done(); + l = s.l[idx] fmt.Printf ("serving grpc on %d listener\n", idx) // it seems to be safe to call a single grpc server on differnt listening sockets multiple times @@ -641,21 +730,21 @@ func (s *Server) run_grpc_server(idx int) error { fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXXxx %s\n", err.Error()); } - s.l_wg.Done(); return nil } -func (s *Server) MainLoop() { +func (s *Server) RunTask(wg *sync.WaitGroup) { var idx int - defer s.wg.Done() + defer wg.Done() for idx, _ = range s.l { s.l_wg.Add(1) - go s.run_grpc_server(idx) + go s.run_grpc_server(idx, &s.l_wg) } - s.l_wg.Wait(); + s.l_wg.Wait() + s.cts_wg.Wait() s.ReqStop() syscall.Kill(syscall.Getpid(), syscall.SIGTERM) } @@ -684,12 +773,13 @@ func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (* var ok bool cts.svr = s - cts.routes = make(ServerRouteMap) + cts.route_map = make(ServerRouteMap) cts.caddr = addr - cts.pss = pss + //cts.pss = &GuardedPacketStreamServer{pss: pss} + cts.pss = &GuardedPacketStreamServer{Hodu_PacketStreamServer: pss} cts.stop_req.Store(false) - cts.greeted = false + cts.stop_chan = make(chan bool, 1) s.cts_mtx.Lock() defer s.cts_mtx.Unlock() @@ -785,8 +875,9 @@ func server_main(laddrs []string) error { s.wg.Add(1) go s.handle_os_signals() + s.wg.Add(1) - go s.MainLoop() // this is blocking. ReqStop() will be called from a signal handler + go s.RunTask(&s.wg) // this is blocking. ReqStop() will be called from a signal handler s.wg.Wait() return nil