Compare commits

..

No commits in common. "464a550c68c89b523bc629556028623681e0a4df" and "db48395b131cce18ebdde2648a77258c24b2d680" have entirely different histories.

3 changed files with 37 additions and 67 deletions

View File

@ -1075,10 +1075,6 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
var ok bool var ok bool
var id uint32 var id uint32
if len(cfg.ServerAddrs) <= 0 {
return nil, fmt.Errorf("no server rpc address specified")
}
cts = NewClientConn(c, cfg) cts = NewClientConn(c, cfg)
c.cts_mtx.Lock() c.cts_mtx.Lock()
@ -1268,38 +1264,20 @@ func (c *Client) RunCtlTask(wg *sync.WaitGroup) {
// by creating the listener explicitly. // by creating the listener explicitly.
// err = cs.ListenAndServe() // err = cs.ListenAndServe()
// err = cs.ListenAndServeTLS("", "") // c.tlscfg must provide a certificate and a key // err = cs.ListenAndServeTLS("", "") // c.tlscfg must provide a certificate and a key
//cs.shuttingDown(), as the name indicates, is not expoosed by the net/http
//so I have to use my own indicator to check if it's been shutdown..
//
if c.stop_req.Load() == false {
// this guard has a flaw in that the stop request can be made
// between the check above and net.Listen() below.
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr) l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
if err == nil { if err == nil {
if c.stop_req.Load() == false {
// check it again to make the guard slightly more stable
// although it's still possible that the stop request is made
// after Listen()
if c.ctltlscfg == nil { if c.ctltlscfg == nil {
err = cs.Serve(l) err = cs.Serve(l)
} else { } else {
err = cs.ServeTLS(l, "", "") // c.ctltlscfg must provide a certificate and a key err = cs.ServeTLS(l, "", "") // c.ctltlscfg must provide a certificate and a key
} }
} else {
err = fmt.Errorf("stop requested")
}
l.Close() l.Close()
} }
} else {
err = fmt.Errorf("stop requested")
}
if errors.Is(err, http.ErrServerClosed) { if errors.Is(err, http.ErrServerClosed) {
c.log.Write("", LOG_INFO, "Control channel[%d] ended", i) c.log.Write("", LOG_INFO, "Control channel[%d] ended", i)
} else { } else {
c.log.Write("", LOG_ERROR, "Control channel[%d] error - %s", i, err.Error()) c.log.Write("", LOG_ERROR, "Control channel[%d] error - %s", i, err.Error())
} }
l_wg.Done() l_wg.Done()
}(idx, ctl) }(idx, ctl)
} }
@ -1317,9 +1295,17 @@ func (c *Client) RunTask(wg *sync.WaitGroup) {
// so no call to wg.Done() // so no call to wg.Done()
} }
func (c *Client) start_service(cfg *ClientConfig) (*ClientConn, error) { func (c *Client) start_service(data interface{}) (*ClientConn, error) {
var cts *ClientConn var cts *ClientConn
var err error var err error
var cfg *ClientConfig
var ok bool
cfg, ok = data.(*ClientConfig)
if !ok {
err = fmt.Errorf("invalid configuration given")
return nil, err
}
cts, err = c.AddNewClientConn(cfg) cts, err = c.AddNewClientConn(cfg)
if err != nil { if err != nil {
@ -1334,25 +1320,15 @@ func (c *Client) start_service(cfg *ClientConfig) (*ClientConn, error) {
} }
func (c *Client) StartService(data interface{}) { func (c *Client) StartService(data interface{}) {
var cfg *ClientConfig
var ok bool
cfg, ok = data.(*ClientConfig)
if !ok {
c.log.Write("", LOG_ERROR, "Failed to start service - invalid configuration - %v", data)
} else {
var cts *ClientConn var cts *ClientConn
var err error var err error
if len(cfg.ServerAddrs) > 0 { cts, err = c.start_service(data)
cts, err = c.start_service(cfg)
if err != nil { if err != nil {
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error()) c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
} else { } else {
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id) c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id)
} }
}
}
} }
func (c *Client) StartExtService(svc Service, data interface{}) { func (c *Client) StartExtService(svc Service, data interface{}) {

View File

@ -234,11 +234,14 @@ func client_main(ctl_addrs []string, rpc_addrs []string, peer_addrs []string, cf
log_mask = log_strings_to_mask(cfg.APP.LogMask) log_mask = log_strings_to_mask(cfg.APP.LogMask)
} }
// unlke the server, we allow the client to start with no rpc address. if len(rpc_addrs) <= 0 {
// no check if len(rpc_addrs) <= 0 is mdde here. return fmt.Errorf("no rpc server address specified")
}
cc.ServerAddrs = rpc_addrs cc.ServerAddrs = rpc_addrs
cc.PeerAddrs = peer_addrs cc.PeerAddrs = peer_addrs
// TODO: Change out field depending on cfg.APP.LogFile
logger = &AppLogger{id: "client", out: os.Stderr, mask: log_mask} logger = &AppLogger{id: "client", out: os.Stderr, mask: log_mask}
c = hodu.NewClient( c = hodu.NewClient(
context.Background(), context.Background(),

View File

@ -1020,27 +1020,18 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
s.log.Write("", LOG_INFO, "Control channel[%d] started on %s", i, s.ctl_addr[i]) s.log.Write("", LOG_INFO, "Control channel[%d] started on %s", i, s.ctl_addr[i])
if s.stop_req.Load() == false {
// defeat hard-coded "tcp" in ListenAndServe() and ListenAndServeTLS() // defeat hard-coded "tcp" in ListenAndServe() and ListenAndServeTLS()
// err = cs.ListenAndServe() // err = cs.ListenAndServe()
// err = cs.ListenAndServeTLS("", "") // err = cs.ListenAndServeTLS("", "")
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr) l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
if err == nil { if err == nil {
if s.stop_req.Load() == false {
if s.ctltlscfg == nil { if s.ctltlscfg == nil {
err = cs.Serve(l) err = cs.Serve(l)
} else { } else {
err = cs.ServeTLS(l, "", "") // s.ctltlscfg must provide a certificate and a key err = cs.ServeTLS(l, "", "") // s.ctltlscfg must provide a certificate and a key
} }
} else {
err = fmt.Errorf("stop requested")
}
l.Close() l.Close()
} }
} else {
err = fmt.Errorf("stop requested")
}
if errors.Is(err, http.ErrServerClosed) { if errors.Is(err, http.ErrServerClosed) {
s.log.Write("", LOG_INFO, "Control channel[%d] ended", i) s.log.Write("", LOG_INFO, "Control channel[%d] ended", i)
} else { } else {