implemented timeout for an initial GetSeed call.
Added RPC.Endpoint.Authority
This commit is contained in:
parent
7479cc0f3a
commit
feedff3f04
38
client.go
38
client.go
@ -21,7 +21,6 @@ import "google.golang.org/grpc/status"
|
||||
|
||||
type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet]
|
||||
|
||||
type ClientConnMapByAddr = map[string]*ClientConn
|
||||
type ClientConnMap = map[uint32]*ClientConn
|
||||
type ClientPeerConnMap = map[uint32]*ClientPeerConn
|
||||
type ClientRouteMap = map[uint32]*ClientRoute
|
||||
@ -31,6 +30,8 @@ type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc
|
||||
type ClientConfig struct {
|
||||
ServerAddrs []string
|
||||
PeerAddrs []string
|
||||
ServerSeedTimeout int
|
||||
ServerAuthority string // http2 :authority header
|
||||
}
|
||||
|
||||
type ClientConfigActive struct {
|
||||
@ -54,7 +55,6 @@ type Client struct {
|
||||
ctl []*http.Server // control server
|
||||
|
||||
cts_mtx sync.Mutex
|
||||
cts_map_by_addr ClientConnMapByAddr
|
||||
cts_map ClientConnMap
|
||||
|
||||
wg sync.WaitGroup
|
||||
@ -718,6 +718,20 @@ func (cts *ClientConn) ReqStop() {
|
||||
}
|
||||
}
|
||||
|
||||
func timed_interceptor(tmout_sec int) grpc.UnaryClientInterceptor {
|
||||
// The client calls GetSeed() as the first call to the server.
|
||||
// To simulate a kind of connect timeout to the server and find out an unresponsive server,
|
||||
// Place a unary intercepter that places a new context with a timeout on the GetSeed() call.
|
||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
||||
var cancel context.CancelFunc
|
||||
if tmout_sec > 0 && method == Hodu_GetSeed_FullMethodName {
|
||||
ctx, cancel = context.WithTimeout(ctx, time.Duration(tmout_sec) * time.Second)
|
||||
defer cancel()
|
||||
}
|
||||
return invoker(ctx, method, req, reply, cc, opts...)
|
||||
}
|
||||
}
|
||||
|
||||
func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
||||
var psc PacketStreamClient
|
||||
var slpctx context.Context
|
||||
@ -732,21 +746,26 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
||||
|
||||
start_over:
|
||||
cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs)
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
if cts.cli.rpctlscfg == nil {
|
||||
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
// TODO: can we have other authority for non-tls?
|
||||
// if normal configuration has authority, use it (non-tls side)
|
||||
// if notmal configuration doesn't have authori, tls has server name, use tls server name (tls side)
|
||||
if cts.cfg.ServerAuthority != "" { opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority)) }
|
||||
} else {
|
||||
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpctlscfg)))
|
||||
|
||||
// set the http2 :authority header with tls server name defined.
|
||||
if cts.cli.rpctlscfg.ServerName != "" { opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName)) }
|
||||
if cts.cfg.ServerAuthority != "" {
|
||||
opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority))
|
||||
} else if cts.cli.rpctlscfg.ServerName != "" {
|
||||
opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName))
|
||||
}
|
||||
}
|
||||
if cts.cfg.ServerSeedTimeout > 0 {
|
||||
opts = append(opts, grpc.WithUnaryInterceptor(timed_interceptor(cts.cfg.ServerSeedTimeout)))
|
||||
}
|
||||
|
||||
cts.conn, err = grpc.NewClient(cts.cfg.ServerAddrs[cts.cfg.Index], opts...)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
||||
goto reconnect_to_server
|
||||
}
|
||||
cts.hdc = NewHoduClient(cts.conn)
|
||||
@ -1013,7 +1032,6 @@ func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, ctl_prefi
|
||||
c.ctltlscfg = ctltlscfg
|
||||
c.rpctlscfg = rpctlscfg
|
||||
c.ext_svcs = make([]Service, 0, 1)
|
||||
c.cts_map_by_addr = make(ClientConnMapByAddr)
|
||||
c.cts_map = make(ClientConnMap)
|
||||
c.stop_req.Store(false)
|
||||
c.stop_chan = make(chan bool, 8)
|
||||
|
@ -51,8 +51,9 @@ type RPCServiceConfig struct { // rpc server-side configuration
|
||||
}
|
||||
|
||||
type RPCEndpointConfig struct { // rpc client-side configuration
|
||||
Authority string `yaml:"authority"`
|
||||
Addrs []string `yaml:"addresses"`
|
||||
Authority string `yaml:"authority"`
|
||||
Addrs []string `yaml:"addresses"`
|
||||
SeedTimeout int `yaml:"seed-timeout"`
|
||||
}
|
||||
|
||||
type ServerConfig struct {
|
||||
|
@ -226,6 +226,8 @@ func client_main(ctl_addrs []string, rpc_addrs []string, peer_addrs []string, cf
|
||||
|
||||
cc.ServerAddrs = rpc_addrs
|
||||
cc.PeerAddrs = peer_addrs
|
||||
cc.ServerSeedTimeout = cfg.RPC.Endpoint.SeedTimeout
|
||||
cc.ServerAuthority = cfg.RPC.Endpoint.Authority
|
||||
|
||||
c.StartService(&cc)
|
||||
c.StartCtlService() // control channel
|
||||
|
Loading…
x
Reference in New Issue
Block a user