Compare commits
2 Commits
db48395b13
...
464a550c68
Author | SHA1 | Date | |
---|---|---|---|
464a550c68 | |||
f1c146d94f |
68
client.go
68
client.go
@ -1075,6 +1075,10 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
||||
var ok bool
|
||||
var id uint32
|
||||
|
||||
if len(cfg.ServerAddrs) <= 0 {
|
||||
return nil, fmt.Errorf("no server rpc address specified")
|
||||
}
|
||||
|
||||
cts = NewClientConn(c, cfg)
|
||||
|
||||
c.cts_mtx.Lock()
|
||||
@ -1264,20 +1268,38 @@ func (c *Client) RunCtlTask(wg *sync.WaitGroup) {
|
||||
// by creating the listener explicitly.
|
||||
// err = cs.ListenAndServe()
|
||||
// err = cs.ListenAndServeTLS("", "") // c.tlscfg must provide a certificate and a key
|
||||
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
|
||||
if err == nil {
|
||||
if c.ctltlscfg == nil {
|
||||
err = cs.Serve(l)
|
||||
} else {
|
||||
err = cs.ServeTLS(l, "", "") // c.ctltlscfg must provide a certificate and a key
|
||||
|
||||
//cs.shuttingDown(), as the name indicates, is not expoosed by the net/http
|
||||
//so I have to use my own indicator to check if it's been shutdown..
|
||||
//
|
||||
if c.stop_req.Load() == false {
|
||||
// this guard has a flaw in that the stop request can be made
|
||||
// between the check above and net.Listen() below.
|
||||
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
|
||||
if err == nil {
|
||||
if c.stop_req.Load() == false {
|
||||
// check it again to make the guard slightly more stable
|
||||
// although it's still possible that the stop request is made
|
||||
// after Listen()
|
||||
if c.ctltlscfg == nil {
|
||||
err = cs.Serve(l)
|
||||
} else {
|
||||
err = cs.ServeTLS(l, "", "") // c.ctltlscfg must provide a certificate and a key
|
||||
}
|
||||
} else {
|
||||
err = fmt.Errorf("stop requested")
|
||||
}
|
||||
l.Close()
|
||||
}
|
||||
l.Close()
|
||||
} else {
|
||||
err = fmt.Errorf("stop requested")
|
||||
}
|
||||
if errors.Is(err, http.ErrServerClosed) {
|
||||
c.log.Write("", LOG_INFO, "Control channel[%d] ended", i)
|
||||
} else {
|
||||
c.log.Write("", LOG_ERROR, "Control channel[%d] error - %s", i, err.Error())
|
||||
}
|
||||
|
||||
l_wg.Done()
|
||||
}(idx, ctl)
|
||||
}
|
||||
@ -1295,17 +1317,9 @@ func (c *Client) RunTask(wg *sync.WaitGroup) {
|
||||
// so no call to wg.Done()
|
||||
}
|
||||
|
||||
func (c *Client) start_service(data interface{}) (*ClientConn, error) {
|
||||
func (c *Client) start_service(cfg *ClientConfig) (*ClientConn, error) {
|
||||
var cts *ClientConn
|
||||
var err error
|
||||
var cfg *ClientConfig
|
||||
var ok bool
|
||||
|
||||
cfg, ok = data.(*ClientConfig)
|
||||
if !ok {
|
||||
err = fmt.Errorf("invalid configuration given")
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cts, err = c.AddNewClientConn(cfg)
|
||||
if err != nil {
|
||||
@ -1320,14 +1334,24 @@ func (c *Client) start_service(data interface{}) (*ClientConn, error) {
|
||||
}
|
||||
|
||||
func (c *Client) StartService(data interface{}) {
|
||||
var cts *ClientConn
|
||||
var err error
|
||||
var cfg *ClientConfig
|
||||
var ok bool
|
||||
|
||||
cts, err = c.start_service(data)
|
||||
if err != nil {
|
||||
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
|
||||
cfg, ok = data.(*ClientConfig)
|
||||
if !ok {
|
||||
c.log.Write("", LOG_ERROR, "Failed to start service - invalid configuration - %v", data)
|
||||
} else {
|
||||
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id)
|
||||
var cts *ClientConn
|
||||
var err error
|
||||
|
||||
if len(cfg.ServerAddrs) > 0 {
|
||||
cts, err = c.start_service(cfg)
|
||||
if err != nil {
|
||||
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
|
||||
} else {
|
||||
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -234,14 +234,11 @@ func client_main(ctl_addrs []string, rpc_addrs []string, peer_addrs []string, cf
|
||||
log_mask = log_strings_to_mask(cfg.APP.LogMask)
|
||||
}
|
||||
|
||||
if len(rpc_addrs) <= 0 {
|
||||
return fmt.Errorf("no rpc server address specified")
|
||||
}
|
||||
|
||||
// unlke the server, we allow the client to start with no rpc address.
|
||||
// no check if len(rpc_addrs) <= 0 is mdde here.
|
||||
cc.ServerAddrs = rpc_addrs
|
||||
cc.PeerAddrs = peer_addrs
|
||||
|
||||
// TODO: Change out field depending on cfg.APP.LogFile
|
||||
logger = &AppLogger{id: "client", out: os.Stderr, mask: log_mask}
|
||||
c = hodu.NewClient(
|
||||
context.Background(),
|
||||
|
29
server.go
29
server.go
@ -1020,17 +1020,26 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
|
||||
|
||||
s.log.Write("", LOG_INFO, "Control channel[%d] started on %s", i, s.ctl_addr[i])
|
||||
|
||||
// defeat hard-coded "tcp" in ListenAndServe() and ListenAndServeTLS()
|
||||
// err = cs.ListenAndServe()
|
||||
// err = cs.ListenAndServeTLS("", "")
|
||||
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
|
||||
if err == nil {
|
||||
if s.ctltlscfg == nil {
|
||||
err = cs.Serve(l)
|
||||
} else {
|
||||
err = cs.ServeTLS(l, "", "") // s.ctltlscfg must provide a certificate and a key
|
||||
|
||||
if s.stop_req.Load() == false {
|
||||
// defeat hard-coded "tcp" in ListenAndServe() and ListenAndServeTLS()
|
||||
// err = cs.ListenAndServe()
|
||||
// err = cs.ListenAndServeTLS("", "")
|
||||
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
|
||||
if err == nil {
|
||||
if s.stop_req.Load() == false {
|
||||
if s.ctltlscfg == nil {
|
||||
err = cs.Serve(l)
|
||||
} else {
|
||||
err = cs.ServeTLS(l, "", "") // s.ctltlscfg must provide a certificate and a key
|
||||
}
|
||||
} else {
|
||||
err = fmt.Errorf("stop requested")
|
||||
}
|
||||
l.Close()
|
||||
}
|
||||
l.Close()
|
||||
} else {
|
||||
err = fmt.Errorf("stop requested")
|
||||
}
|
||||
if errors.Is(err, http.ErrServerClosed) {
|
||||
s.log.Write("", LOG_INFO, "Control channel[%d] ended", i)
|
||||
|
Loading…
x
Reference in New Issue
Block a user