From feedff3f0493cfeb27cf414639010b78148c98b1 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Sun, 8 Dec 2024 16:48:23 +0900 Subject: [PATCH] implemented timeout for an initial GetSeed call. Added RPC.Endpoint.Authority --- client.go | 38 ++++++++++++++++++++++++++++---------- cmd/config.go | 5 +++-- cmd/main.go | 2 ++ 3 files changed, 33 insertions(+), 12 deletions(-) diff --git a/client.go b/client.go index 256a057..e013ad7 100644 --- a/client.go +++ b/client.go @@ -21,7 +21,6 @@ import "google.golang.org/grpc/status" type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet] -type ClientConnMapByAddr = map[string]*ClientConn type ClientConnMap = map[uint32]*ClientConn type ClientPeerConnMap = map[uint32]*ClientPeerConn type ClientRouteMap = map[uint32]*ClientRoute @@ -31,6 +30,8 @@ type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc type ClientConfig struct { ServerAddrs []string PeerAddrs []string + ServerSeedTimeout int + ServerAuthority string // http2 :authority header } type ClientConfigActive struct { @@ -54,7 +55,6 @@ type Client struct { ctl []*http.Server // control server cts_mtx sync.Mutex - cts_map_by_addr ClientConnMapByAddr cts_map ClientConnMap wg sync.WaitGroup @@ -718,6 +718,20 @@ func (cts *ClientConn) ReqStop() { } } +func timed_interceptor(tmout_sec int) grpc.UnaryClientInterceptor { + // The client calls GetSeed() as the first call to the server. + // To simulate a kind of connect timeout to the server and find out an unresponsive server, + // Place a unary intercepter that places a new context with a timeout on the GetSeed() call. + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + var cancel context.CancelFunc + if tmout_sec > 0 && method == Hodu_GetSeed_FullMethodName { + ctx, cancel = context.WithTimeout(ctx, time.Duration(tmout_sec) * time.Second) + defer cancel() + } + return invoker(ctx, method, req, reply, cc, opts...) + } +} + func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { var psc PacketStreamClient var slpctx context.Context @@ -732,21 +746,26 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { start_over: cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs) - cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddrs[cts.cfg.Index]) + cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) if cts.cli.rpctlscfg == nil { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) - // TODO: can we have other authority for non-tls? - // if normal configuration has authority, use it (non-tls side) - // if notmal configuration doesn't have authori, tls has server name, use tls server name (tls side) + if cts.cfg.ServerAuthority != "" { opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority)) } } else { opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpctlscfg))) - // set the http2 :authority header with tls server name defined. - if cts.cli.rpctlscfg.ServerName != "" { opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName)) } + if cts.cfg.ServerAuthority != "" { + opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority)) + } else if cts.cli.rpctlscfg.ServerName != "" { + opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName)) + } } + if cts.cfg.ServerSeedTimeout > 0 { + opts = append(opts, grpc.WithUnaryInterceptor(timed_interceptor(cts.cfg.ServerSeedTimeout))) + } + cts.conn, err = grpc.NewClient(cts.cfg.ServerAddrs[cts.cfg.Index], opts...) if err != nil { - cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) + cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) goto reconnect_to_server } cts.hdc = NewHoduClient(cts.conn) @@ -1013,7 +1032,6 @@ func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, ctl_prefi c.ctltlscfg = ctltlscfg c.rpctlscfg = rpctlscfg c.ext_svcs = make([]Service, 0, 1) - c.cts_map_by_addr = make(ClientConnMapByAddr) c.cts_map = make(ClientConnMap) c.stop_req.Store(false) c.stop_chan = make(chan bool, 8) diff --git a/cmd/config.go b/cmd/config.go index 860c0f1..0d5a008 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -51,8 +51,9 @@ type RPCServiceConfig struct { // rpc server-side configuration } type RPCEndpointConfig struct { // rpc client-side configuration - Authority string `yaml:"authority"` - Addrs []string `yaml:"addresses"` + Authority string `yaml:"authority"` + Addrs []string `yaml:"addresses"` + SeedTimeout int `yaml:"seed-timeout"` } type ServerConfig struct { diff --git a/cmd/main.go b/cmd/main.go index 293cfce..9737aee 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -226,6 +226,8 @@ func client_main(ctl_addrs []string, rpc_addrs []string, peer_addrs []string, cf cc.ServerAddrs = rpc_addrs cc.PeerAddrs = peer_addrs + cc.ServerSeedTimeout = cfg.RPC.Endpoint.SeedTimeout + cc.ServerAuthority = cfg.RPC.Endpoint.Authority c.StartService(&cc) c.StartCtlService() // control channel