moved some waiting loops to goroutines to avoid race conditions
This commit is contained in:
parent
dee3711dd4
commit
deb6f7b05a
64
client.go
64
client.go
@ -431,42 +431,48 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) {
|
|||||||
r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet, r.cts.remote_addr_p)
|
r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet, r.cts.remote_addr_p)
|
||||||
}
|
}
|
||||||
|
|
||||||
r.lifetime_mtx.Lock()
|
r.ptc_wg.Add(1) // increment counter here
|
||||||
if r.Lifetime > 0 {
|
|
||||||
r.LifetimeStart = time.Now()
|
|
||||||
r.lifetime_timer = time.NewTimer(r.Lifetime)
|
|
||||||
}
|
|
||||||
r.lifetime_mtx.Unlock()
|
|
||||||
|
|
||||||
main_loop:
|
go func() { // and run the waiting loop in a goroutine to give a positive counter to r.ptc_wg.Wait()
|
||||||
for {
|
r.lifetime_mtx.Lock()
|
||||||
if r.lifetime_timer != nil {
|
if r.Lifetime > 0 {
|
||||||
select {
|
r.LifetimeStart = time.Now()
|
||||||
case <-r.stop_chan:
|
r.lifetime_timer = time.NewTimer(r.Lifetime)
|
||||||
break main_loop
|
}
|
||||||
|
r.lifetime_mtx.Unlock()
|
||||||
|
|
||||||
case <-r.lifetime_timer.C:
|
waiting_loop:
|
||||||
r.cts.C.log.Write(r.cts.Sid, LOG_INFO, "route(%d,%s) reached end of lifetime(%v)",
|
for {
|
||||||
r.Id, r.PeerAddr, r.Lifetime)
|
if r.lifetime_timer != nil {
|
||||||
break main_loop
|
select {
|
||||||
}
|
case <-r.stop_chan:
|
||||||
} else {
|
break waiting_loop
|
||||||
select {
|
|
||||||
case <-r.stop_chan:
|
case <-r.lifetime_timer.C:
|
||||||
break main_loop
|
r.cts.C.log.Write(r.cts.Sid, LOG_INFO, "route(%d,%s) reached end of lifetime(%v)",
|
||||||
|
r.Id, r.PeerAddr, r.Lifetime)
|
||||||
|
break waiting_loop
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
select {
|
||||||
|
case <-r.stop_chan:
|
||||||
|
break waiting_loop
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
r.lifetime_mtx.Lock()
|
r.lifetime_mtx.Lock()
|
||||||
if r.lifetime_timer != nil {
|
if r.lifetime_timer != nil {
|
||||||
r.lifetime_timer.Stop()
|
r.lifetime_timer.Stop()
|
||||||
r.lifetime_timer = nil
|
r.lifetime_timer = nil
|
||||||
}
|
}
|
||||||
r.lifetime_mtx.Unlock()
|
r.lifetime_mtx.Unlock()
|
||||||
|
|
||||||
|
r.ReqStop() // just in case
|
||||||
|
r.ptc_wg.Done()
|
||||||
|
}()
|
||||||
|
|
||||||
done:
|
done:
|
||||||
r.ReqStop()
|
|
||||||
r.ptc_wg.Wait() // wait for all peer tasks are finished
|
r.ptc_wg.Wait() // wait for all peer tasks are finished
|
||||||
|
|
||||||
err = r.cts.psc.Send(MakeRouteStopPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet))
|
err = r.cts.psc.Send(MakeRouteStopPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet))
|
||||||
|
42
server.go
42
server.go
@ -907,29 +907,33 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
|
|||||||
// Increment the wait count here before the loop begins
|
// Increment the wait count here before the loop begins
|
||||||
cts.route_wg.Add(1)
|
cts.route_wg.Add(1)
|
||||||
|
|
||||||
for {
|
// start the loop inside a goroutine so that route_wg counter
|
||||||
// exit if context is done
|
// is likely to be greater than 1 what Wait() is called.
|
||||||
// or continue
|
go func() {
|
||||||
select {
|
waiting_loop:
|
||||||
case <-ctx.Done(): // the stream context is done
|
for {
|
||||||
cts.S.log.Write(cts.Sid, LOG_INFO, "RPC stream done - %s", ctx.Err().Error())
|
// exit if context is done or continue
|
||||||
goto done
|
select {
|
||||||
|
case <-ctx.Done(): // the stream context is done
|
||||||
|
cts.S.log.Write(cts.Sid, LOG_INFO, "RPC stream done - %s", ctx.Err().Error())
|
||||||
|
break waiting_loop
|
||||||
|
|
||||||
case <- cts.stop_chan:
|
case <- cts.stop_chan:
|
||||||
// get out of the loop to eventually to exit from
|
// get out of the loop to eventually to exit from
|
||||||
// this handler to let the main grpc server to
|
// this handler to let the main grpc server to
|
||||||
// close this specific client connection.
|
// close this specific client connection.
|
||||||
goto done
|
break waiting_loop
|
||||||
|
|
||||||
//default:
|
//default:
|
||||||
// no other case is ready.
|
// no other case is ready.
|
||||||
// without the default case, the select construct would block
|
// without the default case, the select construct would block
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
done:
|
cts.ReqStop() // just in case
|
||||||
cts.ReqStop() // just in case
|
cts.route_wg.Done()
|
||||||
cts.route_wg.Done()
|
}()
|
||||||
|
|
||||||
cts.route_wg.Wait()
|
cts.route_wg.Wait()
|
||||||
cts.S.FireConnEvent(SERVER_EVENT_CONN_STOPPED, cts)
|
cts.S.FireConnEvent(SERVER_EVENT_CONN_STOPPED, cts)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user