Compare commits
No commits in common. "464a550c68c89b523bc629556028623681e0a4df" and "db48395b131cce18ebdde2648a77258c24b2d680" have entirely different histories.
464a550c68
...
db48395b13
68
client.go
68
client.go
@ -1075,10 +1075,6 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
|||||||
var ok bool
|
var ok bool
|
||||||
var id uint32
|
var id uint32
|
||||||
|
|
||||||
if len(cfg.ServerAddrs) <= 0 {
|
|
||||||
return nil, fmt.Errorf("no server rpc address specified")
|
|
||||||
}
|
|
||||||
|
|
||||||
cts = NewClientConn(c, cfg)
|
cts = NewClientConn(c, cfg)
|
||||||
|
|
||||||
c.cts_mtx.Lock()
|
c.cts_mtx.Lock()
|
||||||
@ -1268,38 +1264,20 @@ func (c *Client) RunCtlTask(wg *sync.WaitGroup) {
|
|||||||
// by creating the listener explicitly.
|
// by creating the listener explicitly.
|
||||||
// err = cs.ListenAndServe()
|
// err = cs.ListenAndServe()
|
||||||
// err = cs.ListenAndServeTLS("", "") // c.tlscfg must provide a certificate and a key
|
// err = cs.ListenAndServeTLS("", "") // c.tlscfg must provide a certificate and a key
|
||||||
|
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
|
||||||
//cs.shuttingDown(), as the name indicates, is not expoosed by the net/http
|
if err == nil {
|
||||||
//so I have to use my own indicator to check if it's been shutdown..
|
if c.ctltlscfg == nil {
|
||||||
//
|
err = cs.Serve(l)
|
||||||
if c.stop_req.Load() == false {
|
} else {
|
||||||
// this guard has a flaw in that the stop request can be made
|
err = cs.ServeTLS(l, "", "") // c.ctltlscfg must provide a certificate and a key
|
||||||
// between the check above and net.Listen() below.
|
|
||||||
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
|
|
||||||
if err == nil {
|
|
||||||
if c.stop_req.Load() == false {
|
|
||||||
// check it again to make the guard slightly more stable
|
|
||||||
// although it's still possible that the stop request is made
|
|
||||||
// after Listen()
|
|
||||||
if c.ctltlscfg == nil {
|
|
||||||
err = cs.Serve(l)
|
|
||||||
} else {
|
|
||||||
err = cs.ServeTLS(l, "", "") // c.ctltlscfg must provide a certificate and a key
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
err = fmt.Errorf("stop requested")
|
|
||||||
}
|
|
||||||
l.Close()
|
|
||||||
}
|
}
|
||||||
} else {
|
l.Close()
|
||||||
err = fmt.Errorf("stop requested")
|
|
||||||
}
|
}
|
||||||
if errors.Is(err, http.ErrServerClosed) {
|
if errors.Is(err, http.ErrServerClosed) {
|
||||||
c.log.Write("", LOG_INFO, "Control channel[%d] ended", i)
|
c.log.Write("", LOG_INFO, "Control channel[%d] ended", i)
|
||||||
} else {
|
} else {
|
||||||
c.log.Write("", LOG_ERROR, "Control channel[%d] error - %s", i, err.Error())
|
c.log.Write("", LOG_ERROR, "Control channel[%d] error - %s", i, err.Error())
|
||||||
}
|
}
|
||||||
|
|
||||||
l_wg.Done()
|
l_wg.Done()
|
||||||
}(idx, ctl)
|
}(idx, ctl)
|
||||||
}
|
}
|
||||||
@ -1317,9 +1295,17 @@ func (c *Client) RunTask(wg *sync.WaitGroup) {
|
|||||||
// so no call to wg.Done()
|
// so no call to wg.Done()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) start_service(cfg *ClientConfig) (*ClientConn, error) {
|
func (c *Client) start_service(data interface{}) (*ClientConn, error) {
|
||||||
var cts *ClientConn
|
var cts *ClientConn
|
||||||
var err error
|
var err error
|
||||||
|
var cfg *ClientConfig
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
cfg, ok = data.(*ClientConfig)
|
||||||
|
if !ok {
|
||||||
|
err = fmt.Errorf("invalid configuration given")
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
cts, err = c.AddNewClientConn(cfg)
|
cts, err = c.AddNewClientConn(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1334,24 +1320,14 @@ func (c *Client) start_service(cfg *ClientConfig) (*ClientConn, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (c *Client) StartService(data interface{}) {
|
func (c *Client) StartService(data interface{}) {
|
||||||
var cfg *ClientConfig
|
var cts *ClientConn
|
||||||
var ok bool
|
var err error
|
||||||
|
|
||||||
cfg, ok = data.(*ClientConfig)
|
cts, err = c.start_service(data)
|
||||||
if !ok {
|
if err != nil {
|
||||||
c.log.Write("", LOG_ERROR, "Failed to start service - invalid configuration - %v", data)
|
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
|
||||||
} else {
|
} else {
|
||||||
var cts *ClientConn
|
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id)
|
||||||
var err error
|
|
||||||
|
|
||||||
if len(cfg.ServerAddrs) > 0 {
|
|
||||||
cts, err = c.start_service(cfg)
|
|
||||||
if err != nil {
|
|
||||||
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
|
|
||||||
} else {
|
|
||||||
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -234,11 +234,14 @@ func client_main(ctl_addrs []string, rpc_addrs []string, peer_addrs []string, cf
|
|||||||
log_mask = log_strings_to_mask(cfg.APP.LogMask)
|
log_mask = log_strings_to_mask(cfg.APP.LogMask)
|
||||||
}
|
}
|
||||||
|
|
||||||
// unlke the server, we allow the client to start with no rpc address.
|
if len(rpc_addrs) <= 0 {
|
||||||
// no check if len(rpc_addrs) <= 0 is mdde here.
|
return fmt.Errorf("no rpc server address specified")
|
||||||
|
}
|
||||||
|
|
||||||
cc.ServerAddrs = rpc_addrs
|
cc.ServerAddrs = rpc_addrs
|
||||||
cc.PeerAddrs = peer_addrs
|
cc.PeerAddrs = peer_addrs
|
||||||
|
|
||||||
|
// TODO: Change out field depending on cfg.APP.LogFile
|
||||||
logger = &AppLogger{id: "client", out: os.Stderr, mask: log_mask}
|
logger = &AppLogger{id: "client", out: os.Stderr, mask: log_mask}
|
||||||
c = hodu.NewClient(
|
c = hodu.NewClient(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
|
29
server.go
29
server.go
@ -1020,26 +1020,17 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
|
|||||||
|
|
||||||
s.log.Write("", LOG_INFO, "Control channel[%d] started on %s", i, s.ctl_addr[i])
|
s.log.Write("", LOG_INFO, "Control channel[%d] started on %s", i, s.ctl_addr[i])
|
||||||
|
|
||||||
|
// defeat hard-coded "tcp" in ListenAndServe() and ListenAndServeTLS()
|
||||||
if s.stop_req.Load() == false {
|
// err = cs.ListenAndServe()
|
||||||
// defeat hard-coded "tcp" in ListenAndServe() and ListenAndServeTLS()
|
// err = cs.ListenAndServeTLS("", "")
|
||||||
// err = cs.ListenAndServe()
|
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
|
||||||
// err = cs.ListenAndServeTLS("", "")
|
if err == nil {
|
||||||
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
|
if s.ctltlscfg == nil {
|
||||||
if err == nil {
|
err = cs.Serve(l)
|
||||||
if s.stop_req.Load() == false {
|
} else {
|
||||||
if s.ctltlscfg == nil {
|
err = cs.ServeTLS(l, "", "") // s.ctltlscfg must provide a certificate and a key
|
||||||
err = cs.Serve(l)
|
|
||||||
} else {
|
|
||||||
err = cs.ServeTLS(l, "", "") // s.ctltlscfg must provide a certificate and a key
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
err = fmt.Errorf("stop requested")
|
|
||||||
}
|
|
||||||
l.Close()
|
|
||||||
}
|
}
|
||||||
} else {
|
l.Close()
|
||||||
err = fmt.Errorf("stop requested")
|
|
||||||
}
|
}
|
||||||
if errors.Is(err, http.ErrServerClosed) {
|
if errors.Is(err, http.ErrServerClosed) {
|
||||||
s.log.Write("", LOG_INFO, "Control channel[%d] ended", i)
|
s.log.Write("", LOG_INFO, "Control channel[%d] ended", i)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user