diff --git a/server.go b/server.go index 8da2100..772a0e2 100644 --- a/server.go +++ b/server.go @@ -90,7 +90,7 @@ type ServerEventConnAdded struct { } type ServerEventConnDeleted struct { - Conn ConnId `json:"conn-id:"` + Conn ConnId `json:"conn-id"` } type ServerEventRouteAdded struct { @@ -651,7 +651,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { defer wg.Done() - for { + for cts.stop_req.Load() == false { pkt, err = cts.pss.Recv() if errors.Is(err, io.EOF) { cts.S.log.Write(cts.Sid, LOG_INFO, "RPC stream closed for client %s", cts.RemoteAddr) @@ -891,10 +891,15 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { // function be the channel waiter only. // increment on the wait group is for the caller to wait for // these detached goroutines to finish. - //wg.Add(1) - //go cts.receive_from_stream(wg) + wg.Add(1) + go cts.receive_from_stream(wg) + + // Add(1) to cts.route_wg is mostly performed inside a goroutine + // cts.receive_from_stream() invoked just above. + // this can potentially cause a race condition where cts.route_wg + // is still zero when Wait() is called far below. + // Increment the wait count here before the loop begins cts.route_wg.Add(1) - go cts.receive_from_stream(&cts.route_wg) for { // exit if context is done @@ -918,6 +923,7 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { done: cts.ReqStop() // just in case + cts.route_wg.Done() cts.route_wg.Wait() cts.S.bulletin.Enqueue( &ServerEvent{