added the keepalive support from the client side
This commit is contained in:
52
client.go
52
client.go
@@ -29,6 +29,7 @@ import "google.golang.org/grpc"
|
||||
import "google.golang.org/grpc/codes"
|
||||
import "google.golang.org/grpc/credentials"
|
||||
import "google.golang.org/grpc/credentials/insecure"
|
||||
import "google.golang.org/grpc/keepalive"
|
||||
import "google.golang.org/grpc/peer"
|
||||
import "google.golang.org/grpc/status"
|
||||
|
||||
@@ -58,11 +59,13 @@ type ClientRouteConfig struct {
|
||||
}
|
||||
|
||||
type ClientConnConfig struct {
|
||||
ServerAddrs []string
|
||||
Routes []ClientRouteConfig
|
||||
ServerAddrs []string
|
||||
Routes []ClientRouteConfig
|
||||
ServerPingIntvl time.Duration
|
||||
ServerPingTmout time.Duration
|
||||
ServerSeedTmout time.Duration
|
||||
ServerAuthority string // http2 :authority header
|
||||
ClientToken string
|
||||
ClientToken string
|
||||
CloseOnConnErrorEvent bool
|
||||
}
|
||||
|
||||
@@ -89,6 +92,11 @@ type ClientConfig struct {
|
||||
|
||||
Token string // to send to the server for identification
|
||||
|
||||
// default values
|
||||
RpcPingIntvl time.Duration
|
||||
RpcPingTmout time.Duration
|
||||
RpcSeedTmout time.Duration
|
||||
|
||||
// default target for rpx
|
||||
RpxTargetAddr string
|
||||
RpxTargetTls *tls.Config
|
||||
@@ -174,6 +182,9 @@ type Client struct {
|
||||
|
||||
pty_user string
|
||||
pty_shell string
|
||||
rpc_ping_intvl time.Duration
|
||||
rpc_ping_tmout time.Duration
|
||||
rpc_seed_tmout time.Duration
|
||||
xterm_html string
|
||||
}
|
||||
|
||||
@@ -1310,6 +1321,9 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
||||
var p *peer.Peer
|
||||
var ok bool
|
||||
var err error
|
||||
var ping_intvl time.Duration
|
||||
var ping_tmout time.Duration
|
||||
var seed_tmout time.Duration
|
||||
var opts []grpc.DialOption
|
||||
|
||||
defer wg.Done() // arrange to call at the end of this function
|
||||
@@ -1319,9 +1333,29 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
||||
start_over:
|
||||
cts.State.Store(CLIENT_CONN_CONNECTING)
|
||||
cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs)
|
||||
cts.C.log.Write(cts.Sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
|
||||
opts = []grpc.DialOption{ }
|
||||
ping_intvl = cts.cfg.ServerPingIntvl
|
||||
if ping_intvl <= 0 {
|
||||
ping_intvl = cts.C.rpc_ping_intvl
|
||||
// the server's minimum policy defaults to 5 minutes.
|
||||
// the client's default interval is 10s. i want it to be 10m instead.
|
||||
if ping_intvl <= 0 { ping_intvl = 10 * time.Minute }
|
||||
}
|
||||
ping_tmout = cts.cfg.ServerPingTmout
|
||||
if ping_tmout <= 0 { ping_tmout = cts.C.rpc_ping_tmout }
|
||||
seed_tmout = cts.cfg.ServerSeedTmout
|
||||
if seed_tmout <= 0 { seed_tmout = cts.C.rpc_seed_tmout }
|
||||
|
||||
cts.C.log.Write(cts.Sid, LOG_INFO, "Connecting to server[%d] %s ping_intvl[%v] ping_tmout[%v] seed_tmout[%v]", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], ping_intvl, ping_tmout, seed_tmout)
|
||||
|
||||
opts = []grpc.DialOption{}
|
||||
|
||||
opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: ping_intvl,
|
||||
Timeout: ping_tmout,
|
||||
PermitWithoutStream: true,
|
||||
}))
|
||||
|
||||
if cts.C.rpc_tls == nil {
|
||||
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if cts.cfg.ServerAuthority != "" { opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority)) }
|
||||
@@ -1334,8 +1368,8 @@ start_over:
|
||||
opts = append(opts, grpc.WithAuthority(cts.C.rpc_tls.ServerName))
|
||||
}
|
||||
}
|
||||
if cts.cfg.ServerSeedTmout > 0 {
|
||||
opts = append(opts, grpc.WithUnaryInterceptor(timed_interceptor(cts.cfg.ServerSeedTmout)))
|
||||
if seed_tmout > 0 {
|
||||
opts = append(opts, grpc.WithUnaryInterceptor(timed_interceptor(seed_tmout)))
|
||||
}
|
||||
|
||||
cts.conn, err = grpc.NewClient(cts.cfg.ServerAddrs[cts.cfg.Index], opts...)
|
||||
@@ -2270,6 +2304,10 @@ func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfi
|
||||
c.stop_chan = make(chan bool, 8)
|
||||
c.bulletin = NewBulletin[*ClientEvent](&c, 1024)
|
||||
|
||||
c.rpc_ping_intvl = cfg.RpcPingIntvl
|
||||
c.rpc_ping_tmout = cfg.RpcPingTmout
|
||||
c.rpc_seed_tmout = cfg.RpcSeedTmout
|
||||
|
||||
c.rpc_tls = cfg.RpcTls
|
||||
c.rpx_target_addr = cfg.RpxTargetAddr
|
||||
c.rpx_target_addr_org = cfg.RpxTargetAddr
|
||||
|
||||
@@ -92,9 +92,11 @@ type RPCServiceConfig struct { // rpc server-side configuration
|
||||
}
|
||||
|
||||
type RPCEndpointConfig struct { // rpc client-side configuration
|
||||
Authority string `yaml:"authority"`
|
||||
Addrs []string `yaml:"addresses"`
|
||||
SeedTmout time.Duration `yaml:"seed-timeout"`
|
||||
Authority string `yaml:"authority"`
|
||||
Addrs []string `yaml:"addresses"`
|
||||
PingIntvl time.Duration `yaml:"ping-interval"`
|
||||
PingTmout time.Duration `yaml:"ping-timeout"`
|
||||
SeedTmout time.Duration `yaml:"seed-timeout"`
|
||||
}
|
||||
|
||||
type ServerAppConfig struct {
|
||||
@@ -122,6 +124,9 @@ type ClientAppConfig struct {
|
||||
TokenFile string `yaml:"token-file"`
|
||||
PtyUser string `yaml:"pty-user"`
|
||||
PtyShell string `yaml:"pty-shell"`
|
||||
RpcPingIntvl time.Duration `yaml:"rpc-ping-interval"`
|
||||
RpcPingTmout time.Duration `yaml:"rpc-ping-timeout"`
|
||||
RpcSeedTmout time.Duration `yaml:"rpc-seed-timeout"`
|
||||
XtermHtmlFile string `yaml:"xterm-html-file"`
|
||||
}
|
||||
|
||||
|
||||
@@ -297,6 +297,8 @@ func client_main(ctl_addrs []string, rpc_addrs []string, route_configs []string,
|
||||
config.CtlAuth, err = make_http_auth_config(&cfg.CTL.Service.Auth)
|
||||
if err != nil { return err }
|
||||
|
||||
cc.ServerPingIntvl = cfg.RPC.Endpoint.PingIntvl
|
||||
cc.ServerPingTmout = cfg.RPC.Endpoint.PingTmout
|
||||
cc.ServerSeedTmout = cfg.RPC.Endpoint.SeedTmout
|
||||
cc.ServerAuthority = cfg.RPC.Endpoint.Authority
|
||||
pty_user = cfg.APP.PtyUser
|
||||
@@ -306,6 +308,10 @@ func client_main(ctl_addrs []string, rpc_addrs []string, route_configs []string,
|
||||
config.PeerConnMax = cfg.APP.MaxPeers
|
||||
config.PeerConnTmout = cfg.APP.PeerConnTmout
|
||||
|
||||
config.RpcPingIntvl = cfg.APP.RpcPingIntvl // app-level default
|
||||
config.RpcPingTmout = cfg.APP.RpcPingTmout // app-level default
|
||||
config.RpcSeedTmout = cfg.APP.RpcSeedTmout // app-level default
|
||||
|
||||
if cfg.APP.TokenText != "" {
|
||||
config.Token = cfg.APP.TokenText
|
||||
} else if cfg.APP.TokenFile != "" {
|
||||
|
||||
Reference in New Issue
Block a user