added http auth config to the client-side control channel
This commit is contained in:
54
client.go
54
client.go
@ -55,16 +55,17 @@ type ClientConfigActive struct {
|
||||
type Client struct {
|
||||
Named
|
||||
|
||||
ctx context.Context
|
||||
ctx_cancel context.CancelFunc
|
||||
ctltlscfg *tls.Config
|
||||
rpctlscfg *tls.Config
|
||||
ctx context.Context
|
||||
ctx_cancel context.CancelFunc
|
||||
|
||||
ext_mtx sync.Mutex
|
||||
ext_mtx sync.Mutex
|
||||
ext_svcs []Service
|
||||
|
||||
rpc_tls *tls.Config
|
||||
ctl_tls *tls.Config
|
||||
ctl_addr []string
|
||||
ctl_prefix string
|
||||
ctl_prefix string
|
||||
ctl_auth *HttpAuthConfig
|
||||
ctl_mux *http.ServeMux
|
||||
ctl []*http.Server // control server
|
||||
|
||||
@ -930,16 +931,16 @@ start_over:
|
||||
cts.State = CLIENT_CONN_CONNECTING
|
||||
cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs)
|
||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
if cts.cli.rpctlscfg == nil {
|
||||
if cts.cli.rpc_tls == nil {
|
||||
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if cts.cfg.ServerAuthority != "" { opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority)) }
|
||||
} else {
|
||||
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpctlscfg)))
|
||||
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpc_tls)))
|
||||
// set the http2 :authority header with tls server name defined.
|
||||
if cts.cfg.ServerAuthority != "" {
|
||||
opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority))
|
||||
} else if cts.cli.rpctlscfg.ServerName != "" {
|
||||
opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName))
|
||||
} else if cts.cli.rpc_tls.ServerName != "" {
|
||||
opts = append(opts, grpc.WithAuthority(cts.cli.rpc_tls.ServerName))
|
||||
}
|
||||
}
|
||||
if cts.cfg.ServerSeedTmout > 0 {
|
||||
@ -1225,6 +1226,7 @@ func (hlw *client_ctl_log_writer) Write(p []byte) (n int, err error) {
|
||||
|
||||
type ClientHttpHandler interface {
|
||||
Id() string
|
||||
Authenticate(req *http.Request) (int, string)
|
||||
ServeHTTP (w http.ResponseWriter, req *http.Request) (int, error)
|
||||
}
|
||||
|
||||
@ -1234,6 +1236,7 @@ func (c *Client) wrap_http_handler(handler ClientHttpHandler) http.Handler {
|
||||
var err error
|
||||
var start_time time.Time
|
||||
var time_taken time.Duration
|
||||
var realm string
|
||||
|
||||
// this deferred function is to overcome the recovering implemenation
|
||||
// from panic done in go's http server. in that implemenation, panic
|
||||
@ -1247,11 +1250,15 @@ func (c *Client) wrap_http_handler(handler ClientHttpHandler) http.Handler {
|
||||
|
||||
start_time = time.Now()
|
||||
|
||||
// TODO: some kind of authorization, especially for ctl
|
||||
//req.BasicAuth()
|
||||
//req.Header.Get("Authorization")
|
||||
|
||||
status_code, err = handler.ServeHTTP(w, req)
|
||||
status_code, realm = handler.Authenticate(req)
|
||||
if status_code == http.StatusUnauthorized && realm != "" {
|
||||
w.Header().Set("WWW-Authenticate", fmt.Sprintf("Basic Realm=\"%s\"", realm))
|
||||
WriteEmptyRespHeader(w, status_code)
|
||||
} else if status_code == http.StatusOK {
|
||||
status_code, err = handler.ServeHTTP(w, req)
|
||||
} else {
|
||||
WriteEmptyRespHeader(w, status_code)
|
||||
}
|
||||
|
||||
// TODO: statistics by status_code and end point types.
|
||||
time_taken = time.Now().Sub(start_time)
|
||||
@ -1266,15 +1273,13 @@ func (c *Client) wrap_http_handler(handler ClientHttpHandler) http.Handler {
|
||||
})
|
||||
}
|
||||
|
||||
func NewClient(ctx context.Context, name string, logger Logger, ctl_addrs []string, ctl_prefix string, ctltlscfg *tls.Config, rpctlscfg *tls.Config, rpc_max int, peer_max int, peer_conn_tmout time.Duration) *Client {
|
||||
func NewClient(ctx context.Context, name string, logger Logger, ctl_addrs []string, ctl_prefix string, ctl_tls *tls.Config, ctl_auth *HttpAuthConfig, rpc_tls *tls.Config, rpc_max int, peer_max int, peer_conn_tmout time.Duration) *Client {
|
||||
var c Client
|
||||
var i int
|
||||
var hs_log *log.Logger
|
||||
|
||||
c.name = name
|
||||
c.ctx, c.ctx_cancel = context.WithCancel(ctx)
|
||||
c.ctltlscfg = ctltlscfg
|
||||
c.rpctlscfg = rpctlscfg
|
||||
c.ext_svcs = make([]Service, 0, 1)
|
||||
c.ptc_tmout = peer_conn_tmout
|
||||
c.ptc_limit = peer_max
|
||||
@ -1284,8 +1289,11 @@ func NewClient(ctx context.Context, name string, logger Logger, ctl_addrs []stri
|
||||
c.stop_req.Store(false)
|
||||
c.stop_chan = make(chan bool, 8)
|
||||
c.log = logger
|
||||
c.ctl_prefix = ctl_prefix
|
||||
|
||||
c.rpc_tls = rpc_tls
|
||||
c.ctl_auth = ctl_auth
|
||||
c.ctl_tls = ctl_tls
|
||||
c.ctl_prefix = ctl_prefix
|
||||
c.ctl_mux = http.NewServeMux()
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/client-conns",
|
||||
c.wrap_http_handler(&client_ctl_client_conns{client_ctl{c: &c, id: HS_ID_CTL}}))
|
||||
@ -1303,6 +1311,8 @@ func NewClient(ctx context.Context, name string, logger Logger, ctl_addrs []stri
|
||||
c.wrap_http_handler(&client_ctl_client_conns_id_routes_id_peers_id{client_ctl{c: &c, id: HS_ID_CTL}}))
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/stats",
|
||||
c.wrap_http_handler(&client_ctl_stats{client_ctl{c: &c, id: HS_ID_CTL}}))
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/token",
|
||||
c.wrap_http_handler(&client_ctl_token{client_ctl{c: &c, id: HS_ID_CTL}}))
|
||||
|
||||
// TODO: make this optional. add this endpoint only if it's enabled...
|
||||
c.promreg = prometheus.NewRegistry()
|
||||
@ -1322,7 +1332,7 @@ func NewClient(ctx context.Context, name string, logger Logger, ctl_addrs []stri
|
||||
c.ctl[i] = &http.Server{
|
||||
Addr: ctl_addrs[i],
|
||||
Handler: c.ctl_mux,
|
||||
TLSConfig: c.ctltlscfg,
|
||||
TLSConfig: c.ctl_tls,
|
||||
ErrorLog: hs_log,
|
||||
// TODO: more settings
|
||||
}
|
||||
@ -1575,10 +1585,10 @@ func (c *Client) RunCtlTask(wg *sync.WaitGroup) {
|
||||
// check it again to make the guard slightly more stable
|
||||
// although it's still possible that the stop request is made
|
||||
// after Listen()
|
||||
if c.ctltlscfg == nil {
|
||||
if c.ctl_tls == nil {
|
||||
err = cs.Serve(l)
|
||||
} else {
|
||||
err = cs.ServeTLS(l, "", "") // c.ctltlscfg must provide a certificate and a key
|
||||
err = cs.ServeTLS(l, "", "") // c.ctl_tls must provide a certificate and a key
|
||||
}
|
||||
} else {
|
||||
err = fmt.Errorf("stop requested")
|
||||
|
Reference in New Issue
Block a user