enhancing the client to accept multipl rpc server addresses

This commit is contained in:
2024-12-08 16:06:18 +09:00
parent 3d19576905
commit 7479cc0f3a
3 changed files with 31 additions and 36 deletions

View File

@ -29,12 +29,13 @@ type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc
// --------------------------------------------------------------------
type ClientConfig struct {
ServerAddr string
ServerAddrs []string
PeerAddrs []string
}
type ClientConfigActive struct {
Id uint32
Index int
ClientConfig
}
@ -44,7 +45,6 @@ type Client struct {
ctltlscfg *tls.Config
rpctlscfg *tls.Config
ext_mtx sync.Mutex
ext_svcs []Service
@ -705,6 +705,9 @@ func (cts *ClientConn) disconnect_from_server() {
// immediately after the start_over lable in it.
// if it's called from ReqStop(), we don't really
// need to care about it.
cts.local_addr = ""
cts.remote_addr = ""
}
}
@ -728,7 +731,8 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
defer wg.Done() // arrange to call at the end of this function
start_over:
cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddr)
cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs)
cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddrs[cts.cfg.Index])
if cts.cli.rpctlscfg == nil {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
// TODO: can we have other authority for non-tls?
@ -740,9 +744,9 @@ start_over:
// set the http2 :authority header with tls server name defined.
if cts.cli.rpctlscfg.ServerName != "" { opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName)) }
}
cts.conn, err = grpc.NewClient(cts.cfg.ServerAddr, opts...)
cts.conn, err = grpc.NewClient(cts.cfg.ServerAddrs[cts.cfg.Index], opts...)
if err != nil {
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddr, err.Error())
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
goto reconnect_to_server
}
cts.hdc = NewHoduClient(cts.conn)
@ -756,17 +760,17 @@ start_over:
c_seed.Flags = 0
s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed)
if err != nil {
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get seed from server %s - %s", cts.cfg.ServerAddr, err.Error())
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
goto reconnect_to_server
}
cts.s_seed = *s_seed
cts.c_seed = c_seed
cts.cli.log.Write(cts.sid, LOG_INFO, "Got seed from server %s - ver=%#x", cts.cfg.ServerAddr, cts.s_seed.Version)
cts.cli.log.Write(cts.sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version)
psc, err = cts.hdc.PacketStream(cts.cli.ctx)
if err != nil {
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.cfg.ServerAddr, err.Error())
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
goto reconnect_to_server
}
@ -776,7 +780,7 @@ start_over:
cts.local_addr = p.LocalAddr.String()
}
cts.cli.log.Write(cts.sid, LOG_INFO, "Got packet stream from server %s", cts.cfg.ServerAddr)
cts.cli.log.Write(cts.sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
@ -785,7 +789,7 @@ start_over:
// let's add routes to the client-side peers if given
err = cts.AddClientRoutes(cts.cfg.PeerAddrs)
if err != nil {
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to add routes to server %s for %v - %s", cts.cfg.ServerAddr, cts.cfg.PeerAddrs, err.Error())
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to add routes to server[%s] %s for %v - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.cfg.PeerAddrs, err.Error())
goto done
}
}
@ -811,7 +815,7 @@ start_over:
if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) {
goto reconnect_to_server
} else {
cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.remote_addr, err.Error())
cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet from %s - %s", cts.remote_addr, err.Error())
goto reconnect_to_server
}
}
@ -944,7 +948,7 @@ start_over:
}
done:
cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server %s", cts.cfg.ServerAddr)
cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
//cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination
cts.ReqStop()
@ -1056,11 +1060,6 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
cts = NewClientConn(c, cfg)
c.cts_mtx.Lock()
_, ok = c.cts_map_by_addr[cfg.ServerAddr]
if ok {
c.cts_mtx.Unlock()
return nil, fmt.Errorf("existing server - %s", cfg.ServerAddr)
}
id = rand.Uint32()
for {
@ -1072,12 +1071,11 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
cts.cfg.Id = id // store it again in the active configuration for easy access via control channel
cts.sid = fmt.Sprintf("%d", id) // id in string used for logging
c.cts_map_by_addr[cfg.ServerAddr] = cts
c.cts_map[id] = cts
c.stats.conns.Add(1)
c.cts_mtx.Unlock()
c.log.Write ("", LOG_INFO, "Added client connection(%d) to %s", cts.id, cfg.ServerAddr)
c.log.Write ("", LOG_INFO, "Added client connection(%d) to %v", cts.id, cfg.ServerAddrs)
return cts, nil
}
@ -1124,12 +1122,11 @@ func (c *Client) RemoveClientConn(cts *ClientConn) error {
return fmt.Errorf("conflicting connection id - %d", cts.id)
}
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
delete(c.cts_map, cts.id)
c.stats.conns.Store(int64(len(c.cts_map)))
c.cts_mtx.Unlock()
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr)
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %v", cts.id, cts.cfg.ServerAddrs)
cts.ReqStop()
return nil
@ -1149,12 +1146,11 @@ func (c *Client) RemoveClientConnById(conn_id uint32) error {
// NOTE: removal by id doesn't perform identity check
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
delete(c.cts_map, cts.id)
c.stats.conns.Store(int64(len(c.cts_map)))
c.cts_mtx.Unlock()
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr)
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %v", cts.id, cts.cfg.ServerAddrs)
cts.ReqStop()
return nil
}
@ -1295,7 +1291,7 @@ func (c *Client) start_service(data interface{}) (*ClientConn, error) {
cts, err = c.AddNewClientConn(cfg)
if err != nil {
err = fmt.Errorf("unable to add server connection structure to %s - %s", cfg.ServerAddr, err.Error())
err = fmt.Errorf("unable to add server connection structure to %v - %s", cfg.ServerAddrs, err.Error())
return nil, err
}
@ -1313,7 +1309,7 @@ func (c *Client) StartService(data interface{}) {
if err != nil {
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
} else {
c.log.Write("", LOG_INFO, "Started service for %s [%d]", cts.cfg.ServerAddr, cts.cfg.Id)
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id)
}
}