From 3d19576905cd520fda74a59742d950db021092e9 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Sun, 8 Dec 2024 13:34:47 +0900 Subject: [PATCH] extended configuration to accept rpc service addresses and endpoint addresses --- client.go | 16 ++++++++++------ cmd/config.go | 21 ++++++++++++++++----- cmd/main.go | 40 ++++++++++++++++++++++++++++------------ server.go | 21 +++++++-------------- 4 files changed, 61 insertions(+), 37 deletions(-) diff --git a/client.go b/client.go index 80d7649..e1a6b84 100644 --- a/client.go +++ b/client.go @@ -723,20 +723,24 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { var p *peer.Peer var ok bool var err error + var opts []grpc.DialOption defer wg.Done() // arrange to call at the end of this function start_over: cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddr) if cts.cli.rpctlscfg == nil { - cts.conn, err = grpc.NewClient( - cts.cfg.ServerAddr, - grpc.WithTransportCredentials(insecure.NewCredentials())) + opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) + // TODO: can we have other authority for non-tls? + // if normal configuration has authority, use it (non-tls side) + // if notmal configuration doesn't have authori, tls has server name, use tls server name (tls side) } else { - cts.conn, err = grpc.NewClient( - cts.cfg.ServerAddr, - grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpctlscfg))) + opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpctlscfg))) + + // set the http2 :authority header with tls server name defined. + if cts.cli.rpctlscfg.ServerName != "" { opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName)) } } + cts.conn, err = grpc.NewClient(cts.cfg.ServerAddr, opts...) if err != nil { cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddr, err.Error()) goto reconnect_to_server diff --git a/cmd/config.go b/cmd/config.go index 6ceb61e..860c0f1 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -40,29 +40,40 @@ type ClientTLSConfig struct { ServerName string `yaml:"server-name"` } -type ServiceConfig struct { - Prefix string `yaml:"prefix"` +type CTLServiceConfig struct { + Prefix string `yaml:"prefix"` // url prefix for control channel endpoints Addrs []string `yaml:"addresses"` } +type RPCServiceConfig struct { // rpc server-side configuration + Addrs []string `yaml:"addresses"` + MaxConns int `yaml:"max-connections"` // TODO: implement this item +} + +type RPCEndpointConfig struct { // rpc client-side configuration + Authority string `yaml:"authority"` + Addrs []string `yaml:"addresses"` +} + type ServerConfig struct { CTL struct { - Service ServiceConfig `yaml:"service"` + Service CTLServiceConfig `yaml:"service"` TLS ServerTLSConfig `yaml:"tls"` } `yaml:"ctl"` RPC struct { + Service RPCServiceConfig `yaml:"service"` TLS ServerTLSConfig `yaml:"tls"` - ServiceAddrs []string `yaml:"service-addrs"` } `yaml:"rpc"` } type ClientConfig struct { CTL struct { - Service ServiceConfig `yaml:"endpoint"` + Service CTLServiceConfig `yaml:"service"` TLS ServerTLSConfig `yaml:"tls"` } `yaml:"ctl"` RPC struct { + Endpoint RPCEndpointConfig `yaml:"endpoint"` TLS ClientTLSConfig `yaml:"tls"` } `yaml:"rpc"` } diff --git a/cmd/main.go b/cmd/main.go index f2f10fa..1499870 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -141,7 +141,7 @@ func (sh *signal_handler) WriteLog(id string, level hodu.LogLevel, fmt string, a // -------------------------------------------------------------------- -func server_main(ctl_addrs []string, svcaddrs []string, cfg *ServerConfig) error { +func server_main(ctl_addrs []string, rpc_addrs []string, cfg *ServerConfig) error { var s *hodu.Server var ctltlscfg *tls.Config var rpctlscfg *tls.Config @@ -158,10 +158,21 @@ func server_main(ctl_addrs []string, svcaddrs []string, cfg *ServerConfig) error } } + if len(ctl_addrs) <= 0 { + ctl_addrs = cfg.CTL.Service.Addrs + } + + if (len(rpc_addrs) <= 0) { + rpc_addrs = cfg.RPC.Service.Addrs + } + if (len(rpc_addrs) <= 0) { + return fmt.Errorf("no rpc service addresses specified") + } + s, err = hodu.NewServer( context.Background(), ctl_addrs, - svcaddrs, + rpc_addrs, &AppLogger{id: "server", out: os.Stderr}, cfg.CTL.Service.Prefix, ctltlscfg, @@ -180,7 +191,7 @@ func server_main(ctl_addrs []string, svcaddrs []string, cfg *ServerConfig) error // -------------------------------------------------------------------- -func client_main(ctl_addrs []string, server_addr string, peer_addrs []string, cfg *ClientConfig) error { +func client_main(ctl_addrs []string, rpc_addrs []string, peer_addrs []string, cfg *ClientConfig) error { var c *hodu.Client var ctltlscfg *tls.Config var rpctlscfg *tls.Config @@ -198,6 +209,17 @@ func client_main(ctl_addrs []string, server_addr string, peer_addrs []string, cf } } + if len(ctl_addrs) <= 0 { ctl_addrs = cfg.CTL.Service.Addrs } + if len(rpc_addrs) <= 0 { rpc_addrs = cfg.RPC.Endpoint.Addrs } + + if len(rpc_addrs) <= 0 { + return fmt.Errorf("no rpc server address specified") + } else if len(rpc_addrs) > 1 { + // TODO: instead of returning an error here, + // support multiple endpoint addresses. round-robin or something to a working server? + return fmt.Errorf("too many rpc server addresses specified") + } + c = hodu.NewClient( context.Background(), ctl_addrs, @@ -206,7 +228,7 @@ func client_main(ctl_addrs []string, server_addr string, peer_addrs []string, cf ctltlscfg, rpctlscfg) - cc.ServerAddr = server_addr + cc.ServerAddr = rpc_addrs[0] cc.PeerAddrs = peer_addrs c.StartService(&cc) @@ -252,7 +274,7 @@ func main() { goto wrong_usage } - if len(rpc_addrs) <= 0 || flgs.NArg() > 0 { goto wrong_usage } + if flgs.NArg() > 0 { goto wrong_usage } if (cfgfile != "") { cfg, err = load_server_config(cfgfile) @@ -262,8 +284,6 @@ func main() { } } - if len(ctl_addrs) <= 0 { ctl_addrs = cfg.CTL.Service.Addrs } - err = server_main(ctl_addrs, rpc_addrs, cfg) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: server error - %s\n", err.Error()) @@ -298,8 +318,6 @@ func main() { goto wrong_usage } - if len(rpc_addrs) <= 0 { goto wrong_usage } - if (cfgfile != "") { cfg, err = load_client_config(cfgfile) if err != nil { @@ -308,9 +326,7 @@ func main() { } } - if len(ctl_addrs) < 1 { ctl_addrs = cfg.CTL.Service.Addrs } - - err = client_main(ctl_addrs, rpc_addrs[0], flgs.Args(), cfg) + err = client_main(ctl_addrs, rpc_addrs, flgs.Args(), cfg) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: client error - %s\n", err.Error()) goto oops diff --git a/server.go b/server.go index f117bd3..35060a6 100644 --- a/server.go +++ b/server.go @@ -828,6 +828,7 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg var i int var cwd string var hs_log *log.Logger + var opts []grpc.ServerOption if len(rpc_addrs) <= 0 { return nil, fmt.Errorf("no server addresses provided") @@ -866,20 +867,12 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg } gs = grpc.NewServer(grpc.Creds(creds)) */ - if s.rpctlscfg == nil { - s.rpc_svr = grpc.NewServer( - //grpc.UnaryInterceptor(unaryInterceptor), - //grpc.StreamInterceptor(streamInterceptor), - grpc.StatsHandler(&ConnCatcher{server: &s}), - ) - } else { - s.rpc_svr = grpc.NewServer( - grpc.Creds(credentials.NewTLS(s.rpctlscfg)), - //grpc.UnaryInterceptor(unaryInterceptor), - //grpc.StreamInterceptor(streamInterceptor), - grpc.StatsHandler(&ConnCatcher{server: &s}), - ) - } + + opts = append(opts, grpc.StatsHandler(&ConnCatcher{server: &s})) + if s.rpctlscfg != nil { opts = append(opts, grpc.Creds(credentials.NewTLS(s.rpctlscfg))) } + //opts = append(opts, grpc.UnaryInterceptor(unaryInterceptor)) + //opts = append(opts, grpc.StreamInterceptor(streamInterceptor)) + s.rpc_svr = grpc.NewServer(opts...) RegisterHoduServer(s.rpc_svr, &s) s.ctl_prefix = ctl_prefix