Compare commits
2 Commits
db48395b13
...
464a550c68
Author | SHA1 | Date | |
---|---|---|---|
464a550c68 | |||
f1c146d94f |
44
client.go
44
client.go
@ -1075,6 +1075,10 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
|||||||
var ok bool
|
var ok bool
|
||||||
var id uint32
|
var id uint32
|
||||||
|
|
||||||
|
if len(cfg.ServerAddrs) <= 0 {
|
||||||
|
return nil, fmt.Errorf("no server rpc address specified")
|
||||||
|
}
|
||||||
|
|
||||||
cts = NewClientConn(c, cfg)
|
cts = NewClientConn(c, cfg)
|
||||||
|
|
||||||
c.cts_mtx.Lock()
|
c.cts_mtx.Lock()
|
||||||
@ -1264,20 +1268,38 @@ func (c *Client) RunCtlTask(wg *sync.WaitGroup) {
|
|||||||
// by creating the listener explicitly.
|
// by creating the listener explicitly.
|
||||||
// err = cs.ListenAndServe()
|
// err = cs.ListenAndServe()
|
||||||
// err = cs.ListenAndServeTLS("", "") // c.tlscfg must provide a certificate and a key
|
// err = cs.ListenAndServeTLS("", "") // c.tlscfg must provide a certificate and a key
|
||||||
|
|
||||||
|
//cs.shuttingDown(), as the name indicates, is not expoosed by the net/http
|
||||||
|
//so I have to use my own indicator to check if it's been shutdown..
|
||||||
|
//
|
||||||
|
if c.stop_req.Load() == false {
|
||||||
|
// this guard has a flaw in that the stop request can be made
|
||||||
|
// between the check above and net.Listen() below.
|
||||||
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
|
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
if c.stop_req.Load() == false {
|
||||||
|
// check it again to make the guard slightly more stable
|
||||||
|
// although it's still possible that the stop request is made
|
||||||
|
// after Listen()
|
||||||
if c.ctltlscfg == nil {
|
if c.ctltlscfg == nil {
|
||||||
err = cs.Serve(l)
|
err = cs.Serve(l)
|
||||||
} else {
|
} else {
|
||||||
err = cs.ServeTLS(l, "", "") // c.ctltlscfg must provide a certificate and a key
|
err = cs.ServeTLS(l, "", "") // c.ctltlscfg must provide a certificate and a key
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("stop requested")
|
||||||
|
}
|
||||||
l.Close()
|
l.Close()
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("stop requested")
|
||||||
|
}
|
||||||
if errors.Is(err, http.ErrServerClosed) {
|
if errors.Is(err, http.ErrServerClosed) {
|
||||||
c.log.Write("", LOG_INFO, "Control channel[%d] ended", i)
|
c.log.Write("", LOG_INFO, "Control channel[%d] ended", i)
|
||||||
} else {
|
} else {
|
||||||
c.log.Write("", LOG_ERROR, "Control channel[%d] error - %s", i, err.Error())
|
c.log.Write("", LOG_ERROR, "Control channel[%d] error - %s", i, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
l_wg.Done()
|
l_wg.Done()
|
||||||
}(idx, ctl)
|
}(idx, ctl)
|
||||||
}
|
}
|
||||||
@ -1295,17 +1317,9 @@ func (c *Client) RunTask(wg *sync.WaitGroup) {
|
|||||||
// so no call to wg.Done()
|
// so no call to wg.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) start_service(data interface{}) (*ClientConn, error) {
|
func (c *Client) start_service(cfg *ClientConfig) (*ClientConn, error) {
|
||||||
var cts *ClientConn
|
var cts *ClientConn
|
||||||
var err error
|
var err error
|
||||||
var cfg *ClientConfig
|
|
||||||
var ok bool
|
|
||||||
|
|
||||||
cfg, ok = data.(*ClientConfig)
|
|
||||||
if !ok {
|
|
||||||
err = fmt.Errorf("invalid configuration given")
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
cts, err = c.AddNewClientConn(cfg)
|
cts, err = c.AddNewClientConn(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1320,15 +1334,25 @@ func (c *Client) start_service(data interface{}) (*ClientConn, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) StartService(data interface{}) {
|
func (c *Client) StartService(data interface{}) {
|
||||||
|
var cfg *ClientConfig
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
cfg, ok = data.(*ClientConfig)
|
||||||
|
if !ok {
|
||||||
|
c.log.Write("", LOG_ERROR, "Failed to start service - invalid configuration - %v", data)
|
||||||
|
} else {
|
||||||
var cts *ClientConn
|
var cts *ClientConn
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
cts, err = c.start_service(data)
|
if len(cfg.ServerAddrs) > 0 {
|
||||||
|
cts, err = c.start_service(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
|
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
|
||||||
} else {
|
} else {
|
||||||
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id)
|
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) StartExtService(svc Service, data interface{}) {
|
func (c *Client) StartExtService(svc Service, data interface{}) {
|
||||||
|
@ -234,14 +234,11 @@ func client_main(ctl_addrs []string, rpc_addrs []string, peer_addrs []string, cf
|
|||||||
log_mask = log_strings_to_mask(cfg.APP.LogMask)
|
log_mask = log_strings_to_mask(cfg.APP.LogMask)
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(rpc_addrs) <= 0 {
|
// unlke the server, we allow the client to start with no rpc address.
|
||||||
return fmt.Errorf("no rpc server address specified")
|
// no check if len(rpc_addrs) <= 0 is mdde here.
|
||||||
}
|
|
||||||
|
|
||||||
cc.ServerAddrs = rpc_addrs
|
cc.ServerAddrs = rpc_addrs
|
||||||
cc.PeerAddrs = peer_addrs
|
cc.PeerAddrs = peer_addrs
|
||||||
|
|
||||||
// TODO: Change out field depending on cfg.APP.LogFile
|
|
||||||
logger = &AppLogger{id: "client", out: os.Stderr, mask: log_mask}
|
logger = &AppLogger{id: "client", out: os.Stderr, mask: log_mask}
|
||||||
c = hodu.NewClient(
|
c = hodu.NewClient(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
|
@ -1020,18 +1020,27 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
|
|||||||
|
|
||||||
s.log.Write("", LOG_INFO, "Control channel[%d] started on %s", i, s.ctl_addr[i])
|
s.log.Write("", LOG_INFO, "Control channel[%d] started on %s", i, s.ctl_addr[i])
|
||||||
|
|
||||||
|
|
||||||
|
if s.stop_req.Load() == false {
|
||||||
// defeat hard-coded "tcp" in ListenAndServe() and ListenAndServeTLS()
|
// defeat hard-coded "tcp" in ListenAndServe() and ListenAndServeTLS()
|
||||||
// err = cs.ListenAndServe()
|
// err = cs.ListenAndServe()
|
||||||
// err = cs.ListenAndServeTLS("", "")
|
// err = cs.ListenAndServeTLS("", "")
|
||||||
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
|
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
if s.stop_req.Load() == false {
|
||||||
if s.ctltlscfg == nil {
|
if s.ctltlscfg == nil {
|
||||||
err = cs.Serve(l)
|
err = cs.Serve(l)
|
||||||
} else {
|
} else {
|
||||||
err = cs.ServeTLS(l, "", "") // s.ctltlscfg must provide a certificate and a key
|
err = cs.ServeTLS(l, "", "") // s.ctltlscfg must provide a certificate and a key
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("stop requested")
|
||||||
|
}
|
||||||
l.Close()
|
l.Close()
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
err = fmt.Errorf("stop requested")
|
||||||
|
}
|
||||||
if errors.Is(err, http.ErrServerClosed) {
|
if errors.Is(err, http.ErrServerClosed) {
|
||||||
s.log.Write("", LOG_INFO, "Control channel[%d] ended", i)
|
s.log.Write("", LOG_INFO, "Control channel[%d] ended", i)
|
||||||
} else {
|
} else {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user