extended the tls configuration to rpc server
This commit is contained in:
35
server.go
35
server.go
@ -14,6 +14,7 @@ import "sync"
|
||||
import "sync/atomic"
|
||||
|
||||
import "google.golang.org/grpc"
|
||||
import "google.golang.org/grpc/credentials"
|
||||
//import "google.golang.org/grpc/metadata"
|
||||
import "google.golang.org/grpc/peer"
|
||||
import "google.golang.org/grpc/stats"
|
||||
@ -28,7 +29,8 @@ type ServerRouteMap = map[uint32]*ServerRoute
|
||||
type Server struct {
|
||||
ctx context.Context
|
||||
ctx_cancel context.CancelFunc
|
||||
tlscfg *tls.Config
|
||||
ctltlscfg *tls.Config
|
||||
rpctlscfg *tls.Config
|
||||
|
||||
wg sync.WaitGroup
|
||||
stop_req atomic.Bool
|
||||
@ -792,7 +794,7 @@ func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServ
|
||||
return v, err
|
||||
}
|
||||
|
||||
func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logger Logger, tlscfg *tls.Config) (*Server, error) {
|
||||
func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logger Logger, ctltlscfg *tls.Config, rpctlscfg *tls.Config) (*Server, error) {
|
||||
var s Server
|
||||
var l *net.TCPListener
|
||||
var rpcaddr *net.TCPAddr
|
||||
@ -824,12 +826,14 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg
|
||||
s.rpc = append(s.rpc, l)
|
||||
}
|
||||
|
||||
s.tlscfg = tlscfg
|
||||
s.ctltlscfg = ctltlscfg
|
||||
s.rpctlscfg = rpctlscfg
|
||||
s.ext_svcs = make([]Service, 0, 1)
|
||||
s.cts_map = make(ServerConnMap)
|
||||
s.cts_map_by_addr = make(ServerConnMapByAddr)
|
||||
s.stop_chan = make(chan bool, 8)
|
||||
s.stop_req.Store(false)
|
||||
|
||||
/*
|
||||
creds, err := credentials.NewServerTLSFromFile(data.Path("x509/server_cert.pem"), data.Path("x509/server_key.pem"))
|
||||
if err != nil {
|
||||
@ -837,11 +841,20 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg
|
||||
}
|
||||
gs = grpc.NewServer(grpc.Creds(creds))
|
||||
*/
|
||||
s.rpc_svr = grpc.NewServer(
|
||||
//grpc.UnaryInterceptor(unaryInterceptor),
|
||||
//grpc.StreamInterceptor(streamInterceptor),
|
||||
grpc.StatsHandler(&ConnCatcher{server: &s}),
|
||||
)
|
||||
if s.rpctlscfg == nil {
|
||||
s.rpc_svr = grpc.NewServer(
|
||||
//grpc.UnaryInterceptor(unaryInterceptor),
|
||||
//grpc.StreamInterceptor(streamInterceptor),
|
||||
grpc.StatsHandler(&ConnCatcher{server: &s}),
|
||||
)
|
||||
} else {
|
||||
s.rpc_svr = grpc.NewServer(
|
||||
grpc.Creds(credentials.NewTLS(s.rpctlscfg)),
|
||||
//grpc.UnaryInterceptor(unaryInterceptor),
|
||||
//grpc.StreamInterceptor(streamInterceptor),
|
||||
grpc.StatsHandler(&ConnCatcher{server: &s}),
|
||||
)
|
||||
}
|
||||
RegisterHoduServer(s.rpc_svr, &s)
|
||||
|
||||
s.ctl_prefix = "" // TODO:
|
||||
@ -862,7 +875,7 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg
|
||||
s.ctl[i] = &http.Server{
|
||||
Addr: ctl_addrs[i],
|
||||
Handler: s.ctl_mux,
|
||||
TLSConfig: s.tlscfg,
|
||||
TLSConfig: s.ctltlscfg,
|
||||
// TODO: more settings
|
||||
}
|
||||
}
|
||||
@ -949,10 +962,10 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
|
||||
l_wg.Add(1)
|
||||
go func(i int, cs *http.Server) {
|
||||
s.log.Write ("", LOG_INFO, "Control channel[%d] started on %s", i, s.ctl_addr[i])
|
||||
if s.tlscfg == nil {
|
||||
if s.ctltlscfg == nil {
|
||||
err = cs.ListenAndServe()
|
||||
} else {
|
||||
err = cs.ListenAndServeTLS("", "") // c.tlscfg must provide a certificate and a key
|
||||
err = cs.ListenAndServeTLS("", "") // c.ctltlscfg must provide a certificate and a key
|
||||
}
|
||||
if errors.Is(err, http.ErrServerClosed) {
|
||||
s.log.Write("", LOG_DEBUG, "Control channel[%d] ended", i)
|
||||
|
Reference in New Issue
Block a user