Compare commits

..

2 Commits

3 changed files with 67 additions and 37 deletions

View File

@ -1075,6 +1075,10 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
var ok bool
var id uint32
if len(cfg.ServerAddrs) <= 0 {
return nil, fmt.Errorf("no server rpc address specified")
}
cts = NewClientConn(c, cfg)
c.cts_mtx.Lock()
@ -1264,20 +1268,38 @@ func (c *Client) RunCtlTask(wg *sync.WaitGroup) {
// by creating the listener explicitly.
// err = cs.ListenAndServe()
// err = cs.ListenAndServeTLS("", "") // c.tlscfg must provide a certificate and a key
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
if err == nil {
if c.ctltlscfg == nil {
err = cs.Serve(l)
} else {
err = cs.ServeTLS(l, "", "") // c.ctltlscfg must provide a certificate and a key
//cs.shuttingDown(), as the name indicates, is not expoosed by the net/http
//so I have to use my own indicator to check if it's been shutdown..
//
if c.stop_req.Load() == false {
// this guard has a flaw in that the stop request can be made
// between the check above and net.Listen() below.
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
if err == nil {
if c.stop_req.Load() == false {
// check it again to make the guard slightly more stable
// although it's still possible that the stop request is made
// after Listen()
if c.ctltlscfg == nil {
err = cs.Serve(l)
} else {
err = cs.ServeTLS(l, "", "") // c.ctltlscfg must provide a certificate and a key
}
} else {
err = fmt.Errorf("stop requested")
}
l.Close()
}
l.Close()
} else {
err = fmt.Errorf("stop requested")
}
if errors.Is(err, http.ErrServerClosed) {
c.log.Write("", LOG_INFO, "Control channel[%d] ended", i)
} else {
c.log.Write("", LOG_ERROR, "Control channel[%d] error - %s", i, err.Error())
}
l_wg.Done()
}(idx, ctl)
}
@ -1295,17 +1317,9 @@ func (c *Client) RunTask(wg *sync.WaitGroup) {
// so no call to wg.Done()
}
func (c *Client) start_service(data interface{}) (*ClientConn, error) {
func (c *Client) start_service(cfg *ClientConfig) (*ClientConn, error) {
var cts *ClientConn
var err error
var cfg *ClientConfig
var ok bool
cfg, ok = data.(*ClientConfig)
if !ok {
err = fmt.Errorf("invalid configuration given")
return nil, err
}
cts, err = c.AddNewClientConn(cfg)
if err != nil {
@ -1320,14 +1334,24 @@ func (c *Client) start_service(data interface{}) (*ClientConn, error) {
}
func (c *Client) StartService(data interface{}) {
var cts *ClientConn
var err error
var cfg *ClientConfig
var ok bool
cts, err = c.start_service(data)
if err != nil {
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
cfg, ok = data.(*ClientConfig)
if !ok {
c.log.Write("", LOG_ERROR, "Failed to start service - invalid configuration - %v", data)
} else {
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id)
var cts *ClientConn
var err error
if len(cfg.ServerAddrs) > 0 {
cts, err = c.start_service(cfg)
if err != nil {
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
} else {
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id)
}
}
}
}

View File

@ -234,14 +234,11 @@ func client_main(ctl_addrs []string, rpc_addrs []string, peer_addrs []string, cf
log_mask = log_strings_to_mask(cfg.APP.LogMask)
}
if len(rpc_addrs) <= 0 {
return fmt.Errorf("no rpc server address specified")
}
// unlke the server, we allow the client to start with no rpc address.
// no check if len(rpc_addrs) <= 0 is mdde here.
cc.ServerAddrs = rpc_addrs
cc.PeerAddrs = peer_addrs
// TODO: Change out field depending on cfg.APP.LogFile
logger = &AppLogger{id: "client", out: os.Stderr, mask: log_mask}
c = hodu.NewClient(
context.Background(),

View File

@ -1020,17 +1020,26 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
s.log.Write("", LOG_INFO, "Control channel[%d] started on %s", i, s.ctl_addr[i])
// defeat hard-coded "tcp" in ListenAndServe() and ListenAndServeTLS()
// err = cs.ListenAndServe()
// err = cs.ListenAndServeTLS("", "")
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
if err == nil {
if s.ctltlscfg == nil {
err = cs.Serve(l)
} else {
err = cs.ServeTLS(l, "", "") // s.ctltlscfg must provide a certificate and a key
if s.stop_req.Load() == false {
// defeat hard-coded "tcp" in ListenAndServe() and ListenAndServeTLS()
// err = cs.ListenAndServe()
// err = cs.ListenAndServeTLS("", "")
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
if err == nil {
if s.stop_req.Load() == false {
if s.ctltlscfg == nil {
err = cs.Serve(l)
} else {
err = cs.ServeTLS(l, "", "") // s.ctltlscfg must provide a certificate and a key
}
} else {
err = fmt.Errorf("stop requested")
}
l.Close()
}
l.Close()
} else {
err = fmt.Errorf("stop requested")
}
if errors.Is(err, http.ErrServerClosed) {
s.log.Write("", LOG_INFO, "Control channel[%d] ended", i)