enhanced the reconnect logic in client.go

This commit is contained in:
hyung-hwan 2025-02-19 10:38:23 +09:00
parent 2b3a841299
commit b5c1ae2a73
4 changed files with 34 additions and 24 deletions

View File

@ -73,8 +73,8 @@ type ClientConnNoticeHandler interface {
type Client struct { type Client struct {
Named Named
ctx context.Context Ctx context.Context
ctx_cancel context.CancelFunc CtxCancel context.CancelFunc
ext_mtx sync.Mutex ext_mtx sync.Mutex
ext_svcs []Service ext_svcs []Service
@ -438,7 +438,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
tmout = time.Duration(r.cts.C.ptc_tmout) tmout = time.Duration(r.cts.C.ptc_tmout)
if tmout <= 0 { tmout = 5 * time.Second} // TODO: make this configurable... if tmout <= 0 { tmout = 5 * time.Second} // TODO: make this configurable...
waitctx, cancel_wait = context.WithTimeout(r.cts.C.ctx, tmout) waitctx, cancel_wait = context.WithTimeout(r.cts.C.Ctx, tmout)
r.ptc_mtx.Lock() r.ptc_mtx.Lock()
r.ptc_cancel_map[pts_id] = cancel_wait r.ptc_cancel_map[pts_id] = cancel_wait
r.ptc_mtx.Unlock() r.ptc_mtx.Unlock()
@ -978,7 +978,7 @@ start_over:
// there is nothing to do much about it for now. // there is nothing to do much about it for now.
c_seed.Version = HODU_RPC_VERSION c_seed.Version = HODU_RPC_VERSION
c_seed.Flags = 0 c_seed.Flags = 0
s_seed, err = cts.hdc.GetSeed(cts.C.ctx, &c_seed) s_seed, err = cts.hdc.GetSeed(cts.C.Ctx, &c_seed)
if err != nil { if err != nil {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
goto reconnect_to_server goto reconnect_to_server
@ -989,7 +989,7 @@ start_over:
cts.C.log.Write(cts.Sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version) cts.C.log.Write(cts.Sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version)
psc, err = cts.hdc.PacketStream(cts.C.ctx) psc, err = cts.hdc.PacketStream(cts.C.Ctx)
if err != nil { if err != nil {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
goto reconnect_to_server goto reconnect_to_server
@ -1026,8 +1026,8 @@ start_over:
var pkt *Packet var pkt *Packet
select { select {
case <-cts.C.ctx.Done(): case <-cts.C.Ctx.Done():
// need to log cts.C.ctx.Err().Error()? // need to log cts.C.Ctx.Err().Error()?
goto done goto done
case <-cts.stop_chan: case <-cts.stop_chan:
@ -1211,10 +1211,10 @@ reconnect_to_server:
cts.State = CLIENT_CONN_DISCONNECTED cts.State = CLIENT_CONN_DISCONNECTED
// wait for 2 seconds // wait for 2 seconds
slpctx, cancel_sleep = context.WithTimeout(cts.C.ctx, 2 * time.Second) slpctx, cancel_sleep = context.WithTimeout(cts.C.Ctx, 2 * time.Second)
select { select {
case <-cts.C.ctx.Done(): case <-cts.C.Ctx.Done():
// need to log cts.C.ctx.Err().Error()? // need to log cts.C.Ctx.Err().Error()?
cancel_sleep() cancel_sleep()
goto req_stop_and_wait_for_termination goto req_stop_and_wait_for_termination
case <-cts.stop_chan: case <-cts.stop_chan:
@ -1223,7 +1223,17 @@ reconnect_to_server:
cancel_sleep() cancel_sleep()
goto wait_for_termination goto wait_for_termination
case <-slpctx.Done(): case <-slpctx.Done():
// do nothing select {
case <- cts.C.Ctx.Done():
// non-blocking check if the parent context of the sleep context is
// terminated too. if so, this is normal termination case.
// this check seem redundant but the go-runtime doesn't seem to guarantee
// the order of event selection whtn cts.C.Ctx.Done() and slpctx.Done()
// are both completed.
goto req_stop_and_wait_for_termination
default:
// do nothing. go on
}
} }
cancel_sleep() cancel_sleep()
goto start_over // and reconnect goto start_over // and reconnect
@ -1324,7 +1334,7 @@ func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfi
var hs_log *log.Logger var hs_log *log.Logger
c.name = name c.name = name
c.ctx, c.ctx_cancel = context.WithCancel(ctx) c.Ctx, c.CtxCancel = context.WithCancel(ctx)
c.ext_svcs = make([]Service, 0, 1) c.ext_svcs = make([]Service, 0, 1)
c.ptc_tmout = cfg.PeerConnTmout c.ptc_tmout = cfg.PeerConnTmout
c.ptc_limit = cfg.PeerConnMax c.ptc_limit = cfg.PeerConnMax
@ -1604,9 +1614,9 @@ func (c *Client) ReqStop() {
var cts *ClientConn var cts *ClientConn
var ctl *http.Server var ctl *http.Server
c.ctx_cancel() c.CtxCancel()
for _, ctl = range c.ctl { for _, ctl = range c.ctl {
ctl.Shutdown(c.ctx) // to break c.ctl.ListenAndServe() ctl.Shutdown(c.Ctx) // to break c.ctl.ListenAndServe()
} }
c.cts_mtx.Lock() c.cts_mtx.Lock()

View File

@ -69,7 +69,7 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) {
// set up a timer to set waiting duration until the connection is // set up a timer to set waiting duration until the connection is
// actually established on the client side and it's informed... // actually established on the client side and it's informed...
waitctx, cancel_wait = context.WithTimeout(spc.route.Cts.S.ctx, 5 * time.Second) // TODO: make this configurable waitctx, cancel_wait = context.WithTimeout(spc.route.Cts.S.Ctx, 5 * time.Second) // TODO: make this configurable
wait_for_started: wait_for_started:
for { for {
select { select {

View File

@ -374,7 +374,7 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re
} }
*/ */
addr = svc_addr_to_dst_addr(pi.SvcAddr) addr = svc_addr_to_dst_addr(pi.SvcAddr)
transport, err = pxy.addr_to_transport(s.ctx, addr) transport, err = pxy.addr_to_transport(s.Ctx, addr)
if err != nil { if err != nil {
status_code = WriteEmptyRespHeader(w, http.StatusBadGateway) status_code = WriteEmptyRespHeader(w, http.StatusBadGateway)
goto oops goto oops
@ -383,7 +383,7 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re
s.log.Write(pxy.id, LOG_INFO, "[%s] %s %s -> %+v", req.RemoteAddr, req.Method, req.URL.String(), proxy_url) s.log.Write(pxy.id, LOG_INFO, "[%s] %s %s -> %+v", req.RemoteAddr, req.Method, req.URL.String(), proxy_url)
proxy_req, err = http.NewRequestWithContext(s.ctx, req.Method, proxy_url.String(), req.Body) proxy_req, err = http.NewRequestWithContext(s.Ctx, req.Method, proxy_url.String(), req.Body)
if err != nil { if err != nil {
status_code = WriteEmptyRespHeader(w, http.StatusInternalServerError) status_code = WriteEmptyRespHeader(w, http.StatusInternalServerError)
goto oops goto oops

View File

@ -70,8 +70,8 @@ type Server struct {
Named Named
cfg *ServerConfig cfg *ServerConfig
ctx context.Context Ctx context.Context
ctx_cancel context.CancelFunc CtxCancel context.CancelFunc
wg sync.WaitGroup wg sync.WaitGroup
stop_req atomic.Bool stop_req atomic.Bool
@ -1066,7 +1066,7 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi
return nil, fmt.Errorf("no server addresses provided") return nil, fmt.Errorf("no server addresses provided")
} }
s.ctx, s.ctx_cancel = context.WithCancel(ctx) s.Ctx, s.CtxCancel = context.WithCancel(ctx)
s.name = name s.name = name
s.log = logger s.log = logger
/* create the specified number of listeners */ /* create the specified number of listeners */
@ -1471,18 +1471,18 @@ func (s *Server) ReqStop() {
// call cancellation function before anything else // call cancellation function before anything else
// to break sub-tasks relying on this server context. // to break sub-tasks relying on this server context.
// for example, http.Client in server_proxy_http_main // for example, http.Client in server_proxy_http_main
s.ctx_cancel() s.CtxCancel()
for _, hs = range s.ctl { for _, hs = range s.ctl {
hs.Shutdown(s.ctx) // to break s.ctl.Serve() hs.Shutdown(s.Ctx) // to break s.ctl.Serve()
} }
for _, hs = range s.pxy { for _, hs = range s.pxy {
hs.Shutdown(s.ctx) // to break s.pxy.Serve() hs.Shutdown(s.Ctx) // to break s.pxy.Serve()
} }
for _, hs = range s.wpx { for _, hs = range s.wpx {
hs.Shutdown(s.ctx) // to break s.wpx.Serve() hs.Shutdown(s.Ctx) // to break s.wpx.Serve()
} }
//s.rpc_svr.GracefulStop() //s.rpc_svr.GracefulStop()