extended configuration to accept rpc service addresses and endpoint addresses
This commit is contained in:
parent
87597ad698
commit
3d19576905
16
client.go
16
client.go
@ -723,20 +723,24 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
|||||||
var p *peer.Peer
|
var p *peer.Peer
|
||||||
var ok bool
|
var ok bool
|
||||||
var err error
|
var err error
|
||||||
|
var opts []grpc.DialOption
|
||||||
|
|
||||||
defer wg.Done() // arrange to call at the end of this function
|
defer wg.Done() // arrange to call at the end of this function
|
||||||
|
|
||||||
start_over:
|
start_over:
|
||||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddr)
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddr)
|
||||||
if cts.cli.rpctlscfg == nil {
|
if cts.cli.rpctlscfg == nil {
|
||||||
cts.conn, err = grpc.NewClient(
|
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
cts.cfg.ServerAddr,
|
// TODO: can we have other authority for non-tls?
|
||||||
grpc.WithTransportCredentials(insecure.NewCredentials()))
|
// if normal configuration has authority, use it (non-tls side)
|
||||||
|
// if notmal configuration doesn't have authori, tls has server name, use tls server name (tls side)
|
||||||
} else {
|
} else {
|
||||||
cts.conn, err = grpc.NewClient(
|
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpctlscfg)))
|
||||||
cts.cfg.ServerAddr,
|
|
||||||
grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpctlscfg)))
|
// set the http2 :authority header with tls server name defined.
|
||||||
|
if cts.cli.rpctlscfg.ServerName != "" { opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName)) }
|
||||||
}
|
}
|
||||||
|
cts.conn, err = grpc.NewClient(cts.cfg.ServerAddr, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddr, err.Error())
|
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddr, err.Error())
|
||||||
goto reconnect_to_server
|
goto reconnect_to_server
|
||||||
|
@ -40,29 +40,40 @@ type ClientTLSConfig struct {
|
|||||||
ServerName string `yaml:"server-name"`
|
ServerName string `yaml:"server-name"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServiceConfig struct {
|
type CTLServiceConfig struct {
|
||||||
Prefix string `yaml:"prefix"`
|
Prefix string `yaml:"prefix"` // url prefix for control channel endpoints
|
||||||
Addrs []string `yaml:"addresses"`
|
Addrs []string `yaml:"addresses"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type RPCServiceConfig struct { // rpc server-side configuration
|
||||||
|
Addrs []string `yaml:"addresses"`
|
||||||
|
MaxConns int `yaml:"max-connections"` // TODO: implement this item
|
||||||
|
}
|
||||||
|
|
||||||
|
type RPCEndpointConfig struct { // rpc client-side configuration
|
||||||
|
Authority string `yaml:"authority"`
|
||||||
|
Addrs []string `yaml:"addresses"`
|
||||||
|
}
|
||||||
|
|
||||||
type ServerConfig struct {
|
type ServerConfig struct {
|
||||||
CTL struct {
|
CTL struct {
|
||||||
Service ServiceConfig `yaml:"service"`
|
Service CTLServiceConfig `yaml:"service"`
|
||||||
TLS ServerTLSConfig `yaml:"tls"`
|
TLS ServerTLSConfig `yaml:"tls"`
|
||||||
} `yaml:"ctl"`
|
} `yaml:"ctl"`
|
||||||
|
|
||||||
RPC struct {
|
RPC struct {
|
||||||
|
Service RPCServiceConfig `yaml:"service"`
|
||||||
TLS ServerTLSConfig `yaml:"tls"`
|
TLS ServerTLSConfig `yaml:"tls"`
|
||||||
ServiceAddrs []string `yaml:"service-addrs"`
|
|
||||||
} `yaml:"rpc"`
|
} `yaml:"rpc"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientConfig struct {
|
type ClientConfig struct {
|
||||||
CTL struct {
|
CTL struct {
|
||||||
Service ServiceConfig `yaml:"endpoint"`
|
Service CTLServiceConfig `yaml:"service"`
|
||||||
TLS ServerTLSConfig `yaml:"tls"`
|
TLS ServerTLSConfig `yaml:"tls"`
|
||||||
} `yaml:"ctl"`
|
} `yaml:"ctl"`
|
||||||
RPC struct {
|
RPC struct {
|
||||||
|
Endpoint RPCEndpointConfig `yaml:"endpoint"`
|
||||||
TLS ClientTLSConfig `yaml:"tls"`
|
TLS ClientTLSConfig `yaml:"tls"`
|
||||||
} `yaml:"rpc"`
|
} `yaml:"rpc"`
|
||||||
}
|
}
|
||||||
|
40
cmd/main.go
40
cmd/main.go
@ -141,7 +141,7 @@ func (sh *signal_handler) WriteLog(id string, level hodu.LogLevel, fmt string, a
|
|||||||
|
|
||||||
// --------------------------------------------------------------------
|
// --------------------------------------------------------------------
|
||||||
|
|
||||||
func server_main(ctl_addrs []string, svcaddrs []string, cfg *ServerConfig) error {
|
func server_main(ctl_addrs []string, rpc_addrs []string, cfg *ServerConfig) error {
|
||||||
var s *hodu.Server
|
var s *hodu.Server
|
||||||
var ctltlscfg *tls.Config
|
var ctltlscfg *tls.Config
|
||||||
var rpctlscfg *tls.Config
|
var rpctlscfg *tls.Config
|
||||||
@ -158,10 +158,21 @@ func server_main(ctl_addrs []string, svcaddrs []string, cfg *ServerConfig) error
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(ctl_addrs) <= 0 {
|
||||||
|
ctl_addrs = cfg.CTL.Service.Addrs
|
||||||
|
}
|
||||||
|
|
||||||
|
if (len(rpc_addrs) <= 0) {
|
||||||
|
rpc_addrs = cfg.RPC.Service.Addrs
|
||||||
|
}
|
||||||
|
if (len(rpc_addrs) <= 0) {
|
||||||
|
return fmt.Errorf("no rpc service addresses specified")
|
||||||
|
}
|
||||||
|
|
||||||
s, err = hodu.NewServer(
|
s, err = hodu.NewServer(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
ctl_addrs,
|
ctl_addrs,
|
||||||
svcaddrs,
|
rpc_addrs,
|
||||||
&AppLogger{id: "server", out: os.Stderr},
|
&AppLogger{id: "server", out: os.Stderr},
|
||||||
cfg.CTL.Service.Prefix,
|
cfg.CTL.Service.Prefix,
|
||||||
ctltlscfg,
|
ctltlscfg,
|
||||||
@ -180,7 +191,7 @@ func server_main(ctl_addrs []string, svcaddrs []string, cfg *ServerConfig) error
|
|||||||
|
|
||||||
// --------------------------------------------------------------------
|
// --------------------------------------------------------------------
|
||||||
|
|
||||||
func client_main(ctl_addrs []string, server_addr string, peer_addrs []string, cfg *ClientConfig) error {
|
func client_main(ctl_addrs []string, rpc_addrs []string, peer_addrs []string, cfg *ClientConfig) error {
|
||||||
var c *hodu.Client
|
var c *hodu.Client
|
||||||
var ctltlscfg *tls.Config
|
var ctltlscfg *tls.Config
|
||||||
var rpctlscfg *tls.Config
|
var rpctlscfg *tls.Config
|
||||||
@ -198,6 +209,17 @@ func client_main(ctl_addrs []string, server_addr string, peer_addrs []string, cf
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if len(ctl_addrs) <= 0 { ctl_addrs = cfg.CTL.Service.Addrs }
|
||||||
|
if len(rpc_addrs) <= 0 { rpc_addrs = cfg.RPC.Endpoint.Addrs }
|
||||||
|
|
||||||
|
if len(rpc_addrs) <= 0 {
|
||||||
|
return fmt.Errorf("no rpc server address specified")
|
||||||
|
} else if len(rpc_addrs) > 1 {
|
||||||
|
// TODO: instead of returning an error here,
|
||||||
|
// support multiple endpoint addresses. round-robin or something to a working server?
|
||||||
|
return fmt.Errorf("too many rpc server addresses specified")
|
||||||
|
}
|
||||||
|
|
||||||
c = hodu.NewClient(
|
c = hodu.NewClient(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
ctl_addrs,
|
ctl_addrs,
|
||||||
@ -206,7 +228,7 @@ func client_main(ctl_addrs []string, server_addr string, peer_addrs []string, cf
|
|||||||
ctltlscfg,
|
ctltlscfg,
|
||||||
rpctlscfg)
|
rpctlscfg)
|
||||||
|
|
||||||
cc.ServerAddr = server_addr
|
cc.ServerAddr = rpc_addrs[0]
|
||||||
cc.PeerAddrs = peer_addrs
|
cc.PeerAddrs = peer_addrs
|
||||||
|
|
||||||
c.StartService(&cc)
|
c.StartService(&cc)
|
||||||
@ -252,7 +274,7 @@ func main() {
|
|||||||
goto wrong_usage
|
goto wrong_usage
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(rpc_addrs) <= 0 || flgs.NArg() > 0 { goto wrong_usage }
|
if flgs.NArg() > 0 { goto wrong_usage }
|
||||||
|
|
||||||
if (cfgfile != "") {
|
if (cfgfile != "") {
|
||||||
cfg, err = load_server_config(cfgfile)
|
cfg, err = load_server_config(cfgfile)
|
||||||
@ -262,8 +284,6 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(ctl_addrs) <= 0 { ctl_addrs = cfg.CTL.Service.Addrs }
|
|
||||||
|
|
||||||
err = server_main(ctl_addrs, rpc_addrs, cfg)
|
err = server_main(ctl_addrs, rpc_addrs, cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "ERROR: server error - %s\n", err.Error())
|
fmt.Fprintf(os.Stderr, "ERROR: server error - %s\n", err.Error())
|
||||||
@ -298,8 +318,6 @@ func main() {
|
|||||||
goto wrong_usage
|
goto wrong_usage
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(rpc_addrs) <= 0 { goto wrong_usage }
|
|
||||||
|
|
||||||
if (cfgfile != "") {
|
if (cfgfile != "") {
|
||||||
cfg, err = load_client_config(cfgfile)
|
cfg, err = load_client_config(cfgfile)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -308,9 +326,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(ctl_addrs) < 1 { ctl_addrs = cfg.CTL.Service.Addrs }
|
err = client_main(ctl_addrs, rpc_addrs, flgs.Args(), cfg)
|
||||||
|
|
||||||
err = client_main(ctl_addrs, rpc_addrs[0], flgs.Args(), cfg)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "ERROR: client error - %s\n", err.Error())
|
fmt.Fprintf(os.Stderr, "ERROR: client error - %s\n", err.Error())
|
||||||
goto oops
|
goto oops
|
||||||
|
21
server.go
21
server.go
@ -828,6 +828,7 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg
|
|||||||
var i int
|
var i int
|
||||||
var cwd string
|
var cwd string
|
||||||
var hs_log *log.Logger
|
var hs_log *log.Logger
|
||||||
|
var opts []grpc.ServerOption
|
||||||
|
|
||||||
if len(rpc_addrs) <= 0 {
|
if len(rpc_addrs) <= 0 {
|
||||||
return nil, fmt.Errorf("no server addresses provided")
|
return nil, fmt.Errorf("no server addresses provided")
|
||||||
@ -866,20 +867,12 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg
|
|||||||
}
|
}
|
||||||
gs = grpc.NewServer(grpc.Creds(creds))
|
gs = grpc.NewServer(grpc.Creds(creds))
|
||||||
*/
|
*/
|
||||||
if s.rpctlscfg == nil {
|
|
||||||
s.rpc_svr = grpc.NewServer(
|
opts = append(opts, grpc.StatsHandler(&ConnCatcher{server: &s}))
|
||||||
//grpc.UnaryInterceptor(unaryInterceptor),
|
if s.rpctlscfg != nil { opts = append(opts, grpc.Creds(credentials.NewTLS(s.rpctlscfg))) }
|
||||||
//grpc.StreamInterceptor(streamInterceptor),
|
//opts = append(opts, grpc.UnaryInterceptor(unaryInterceptor))
|
||||||
grpc.StatsHandler(&ConnCatcher{server: &s}),
|
//opts = append(opts, grpc.StreamInterceptor(streamInterceptor))
|
||||||
)
|
s.rpc_svr = grpc.NewServer(opts...)
|
||||||
} else {
|
|
||||||
s.rpc_svr = grpc.NewServer(
|
|
||||||
grpc.Creds(credentials.NewTLS(s.rpctlscfg)),
|
|
||||||
//grpc.UnaryInterceptor(unaryInterceptor),
|
|
||||||
//grpc.StreamInterceptor(streamInterceptor),
|
|
||||||
grpc.StatsHandler(&ConnCatcher{server: &s}),
|
|
||||||
)
|
|
||||||
}
|
|
||||||
RegisterHoduServer(s.rpc_svr, &s)
|
RegisterHoduServer(s.rpc_svr, &s)
|
||||||
|
|
||||||
s.ctl_prefix = ctl_prefix
|
s.ctl_prefix = ctl_prefix
|
||||||
|
Loading…
x
Reference in New Issue
Block a user