diff --git a/client.go b/client.go index cadfbd4..aeafb74 100644 --- a/client.go +++ b/client.go @@ -73,8 +73,8 @@ type ClientConnNoticeHandler interface { type Client struct { Named - ctx context.Context - ctx_cancel context.CancelFunc + Ctx context.Context + CtxCancel context.CancelFunc ext_mtx sync.Mutex ext_svcs []Service @@ -438,7 +438,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts tmout = time.Duration(r.cts.C.ptc_tmout) if tmout <= 0 { tmout = 5 * time.Second} // TODO: make this configurable... - waitctx, cancel_wait = context.WithTimeout(r.cts.C.ctx, tmout) + waitctx, cancel_wait = context.WithTimeout(r.cts.C.Ctx, tmout) r.ptc_mtx.Lock() r.ptc_cancel_map[pts_id] = cancel_wait r.ptc_mtx.Unlock() @@ -978,7 +978,7 @@ start_over: // there is nothing to do much about it for now. c_seed.Version = HODU_RPC_VERSION c_seed.Flags = 0 - s_seed, err = cts.hdc.GetSeed(cts.C.ctx, &c_seed) + s_seed, err = cts.hdc.GetSeed(cts.C.Ctx, &c_seed) if err != nil { cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) goto reconnect_to_server @@ -989,7 +989,7 @@ start_over: cts.C.log.Write(cts.Sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version) - psc, err = cts.hdc.PacketStream(cts.C.ctx) + psc, err = cts.hdc.PacketStream(cts.C.Ctx) if err != nil { cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) goto reconnect_to_server @@ -1026,8 +1026,8 @@ start_over: var pkt *Packet select { - case <-cts.C.ctx.Done(): - // need to log cts.C.ctx.Err().Error()? + case <-cts.C.Ctx.Done(): + // need to log cts.C.Ctx.Err().Error()? goto done case <-cts.stop_chan: @@ -1211,10 +1211,10 @@ reconnect_to_server: cts.State = CLIENT_CONN_DISCONNECTED // wait for 2 seconds - slpctx, cancel_sleep = context.WithTimeout(cts.C.ctx, 2 * time.Second) + slpctx, cancel_sleep = context.WithTimeout(cts.C.Ctx, 2 * time.Second) select { - case <-cts.C.ctx.Done(): - // need to log cts.C.ctx.Err().Error()? + case <-cts.C.Ctx.Done(): + // need to log cts.C.Ctx.Err().Error()? cancel_sleep() goto req_stop_and_wait_for_termination case <-cts.stop_chan: @@ -1223,7 +1223,17 @@ reconnect_to_server: cancel_sleep() goto wait_for_termination case <-slpctx.Done(): - // do nothing + select { + case <- cts.C.Ctx.Done(): + // non-blocking check if the parent context of the sleep context is + // terminated too. if so, this is normal termination case. + // this check seem redundant but the go-runtime doesn't seem to guarantee + // the order of event selection whtn cts.C.Ctx.Done() and slpctx.Done() + // are both completed. + goto req_stop_and_wait_for_termination + default: + // do nothing. go on + } } cancel_sleep() goto start_over // and reconnect @@ -1324,7 +1334,7 @@ func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfi var hs_log *log.Logger c.name = name - c.ctx, c.ctx_cancel = context.WithCancel(ctx) + c.Ctx, c.CtxCancel = context.WithCancel(ctx) c.ext_svcs = make([]Service, 0, 1) c.ptc_tmout = cfg.PeerConnTmout c.ptc_limit = cfg.PeerConnMax @@ -1604,9 +1614,9 @@ func (c *Client) ReqStop() { var cts *ClientConn var ctl *http.Server - c.ctx_cancel() + c.CtxCancel() for _, ctl = range c.ctl { - ctl.Shutdown(c.ctx) // to break c.ctl.ListenAndServe() + ctl.Shutdown(c.Ctx) // to break c.ctl.ListenAndServe() } c.cts_mtx.Lock() diff --git a/server-peer.go b/server-peer.go index 9859af3..99e63a3 100644 --- a/server-peer.go +++ b/server-peer.go @@ -69,7 +69,7 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) { // set up a timer to set waiting duration until the connection is // actually established on the client side and it's informed... - waitctx, cancel_wait = context.WithTimeout(spc.route.Cts.S.ctx, 5 * time.Second) // TODO: make this configurable + waitctx, cancel_wait = context.WithTimeout(spc.route.Cts.S.Ctx, 5 * time.Second) // TODO: make this configurable wait_for_started: for { select { diff --git a/server-proxy.go b/server-proxy.go index f10e02a..3c88b59 100644 --- a/server-proxy.go +++ b/server-proxy.go @@ -374,7 +374,7 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re } */ addr = svc_addr_to_dst_addr(pi.SvcAddr) - transport, err = pxy.addr_to_transport(s.ctx, addr) + transport, err = pxy.addr_to_transport(s.Ctx, addr) if err != nil { status_code = WriteEmptyRespHeader(w, http.StatusBadGateway) goto oops @@ -383,7 +383,7 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re s.log.Write(pxy.id, LOG_INFO, "[%s] %s %s -> %+v", req.RemoteAddr, req.Method, req.URL.String(), proxy_url) - proxy_req, err = http.NewRequestWithContext(s.ctx, req.Method, proxy_url.String(), req.Body) + proxy_req, err = http.NewRequestWithContext(s.Ctx, req.Method, proxy_url.String(), req.Body) if err != nil { status_code = WriteEmptyRespHeader(w, http.StatusInternalServerError) goto oops diff --git a/server.go b/server.go index e8f05fd..af52de0 100644 --- a/server.go +++ b/server.go @@ -70,8 +70,8 @@ type Server struct { Named cfg *ServerConfig - ctx context.Context - ctx_cancel context.CancelFunc + Ctx context.Context + CtxCancel context.CancelFunc wg sync.WaitGroup stop_req atomic.Bool @@ -1066,7 +1066,7 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi return nil, fmt.Errorf("no server addresses provided") } - s.ctx, s.ctx_cancel = context.WithCancel(ctx) + s.Ctx, s.CtxCancel = context.WithCancel(ctx) s.name = name s.log = logger /* create the specified number of listeners */ @@ -1471,18 +1471,18 @@ func (s *Server) ReqStop() { // call cancellation function before anything else // to break sub-tasks relying on this server context. // for example, http.Client in server_proxy_http_main - s.ctx_cancel() + s.CtxCancel() for _, hs = range s.ctl { - hs.Shutdown(s.ctx) // to break s.ctl.Serve() + hs.Shutdown(s.Ctx) // to break s.ctl.Serve() } for _, hs = range s.pxy { - hs.Shutdown(s.ctx) // to break s.pxy.Serve() + hs.Shutdown(s.Ctx) // to break s.pxy.Serve() } for _, hs = range s.wpx { - hs.Shutdown(s.ctx) // to break s.wpx.Serve() + hs.Shutdown(s.Ctx) // to break s.wpx.Serve() } //s.rpc_svr.GracefulStop()