From 7479cc0f3aefeb687ad3940ef210645e04a424e3 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Sun, 8 Dec 2024 16:06:18 +0900 Subject: [PATCH] enhancing the client to accept multipl rpc server addresses --- client-ctl.go | 15 +++++++++------ client.go | 46 +++++++++++++++++++++------------------------- cmd/main.go | 6 +----- 3 files changed, 31 insertions(+), 36 deletions(-) diff --git a/client-ctl.go b/client-ctl.go index 10ea733..24ef443 100644 --- a/client-ctl.go +++ b/client-ctl.go @@ -26,7 +26,7 @@ type json_errmsg struct { } type json_in_client_conn struct { - ServerAddr string `json:"server-addr"` + ServerAddrs []string `json:"server-addrs"` } type json_in_client_route struct { @@ -41,7 +41,8 @@ type json_out_client_conn_id struct { type json_out_client_conn struct { Id uint32 `json:"id"` - ReqServerAddr string `json:"req-server-addr"` // server address requested. may be a domain name + ReqServerAddrs []string `json:"req-server-addrs"` // server addresses requested. may include a domain name + CurrentServerIndex int `json:"current-server-index"` ServerAddr string `json:"server-addr"` // actual server address ClientAddr string `json:"client-addr"` Routes []json_out_client_route `json:"routes"` @@ -150,7 +151,8 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R } js = append(js, json_out_client_conn{ Id: cts.id, - ReqServerAddr: cts.cfg.ServerAddr, + ReqServerAddrs: cts.cfg.ServerAddrs, + CurrentServerIndex: cts.cfg.Index, ServerAddr: cts.remote_addr, ClientAddr: cts.local_addr, Routes: jsp, @@ -174,12 +176,12 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R var cts *ClientConn err = json.NewDecoder(req.Body).Decode(&s) - if err != nil || s.ServerAddr == "" { + if err != nil || len(s.ServerAddrs) <= 0 { status_code = http.StatusBadRequest; w.WriteHeader(status_code) goto done } - cc.ServerAddr = s.ServerAddr + cc.ServerAddrs = s.ServerAddrs //cc.PeerAddrs = s.PeerAddrs cts, err = c.start_service(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine? if err != nil { @@ -267,7 +269,8 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt } js = &json_out_client_conn{ Id: cts.id, - ReqServerAddr: cts.cfg.ServerAddr, + ReqServerAddrs: cts.cfg.ServerAddrs, + CurrentServerIndex: cts.cfg.Index, ServerAddr: cts.local_addr, ClientAddr: cts.remote_addr, Routes: jsp, diff --git a/client.go b/client.go index e1a6b84..256a057 100644 --- a/client.go +++ b/client.go @@ -29,12 +29,13 @@ type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc // -------------------------------------------------------------------- type ClientConfig struct { - ServerAddr string + ServerAddrs []string PeerAddrs []string } type ClientConfigActive struct { Id uint32 + Index int ClientConfig } @@ -44,7 +45,6 @@ type Client struct { ctltlscfg *tls.Config rpctlscfg *tls.Config - ext_mtx sync.Mutex ext_svcs []Service @@ -705,6 +705,9 @@ func (cts *ClientConn) disconnect_from_server() { // immediately after the start_over lable in it. // if it's called from ReqStop(), we don't really // need to care about it. + + cts.local_addr = "" + cts.remote_addr = "" } } @@ -728,7 +731,8 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { defer wg.Done() // arrange to call at the end of this function start_over: - cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddr) + cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs) + cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddrs[cts.cfg.Index]) if cts.cli.rpctlscfg == nil { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) // TODO: can we have other authority for non-tls? @@ -740,9 +744,9 @@ start_over: // set the http2 :authority header with tls server name defined. if cts.cli.rpctlscfg.ServerName != "" { opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName)) } } - cts.conn, err = grpc.NewClient(cts.cfg.ServerAddr, opts...) + cts.conn, err = grpc.NewClient(cts.cfg.ServerAddrs[cts.cfg.Index], opts...) if err != nil { - cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddr, err.Error()) + cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) goto reconnect_to_server } cts.hdc = NewHoduClient(cts.conn) @@ -756,17 +760,17 @@ start_over: c_seed.Flags = 0 s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed) if err != nil { - cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get seed from server %s - %s", cts.cfg.ServerAddr, err.Error()) + cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) goto reconnect_to_server } cts.s_seed = *s_seed cts.c_seed = c_seed - cts.cli.log.Write(cts.sid, LOG_INFO, "Got seed from server %s - ver=%#x", cts.cfg.ServerAddr, cts.s_seed.Version) + cts.cli.log.Write(cts.sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version) psc, err = cts.hdc.PacketStream(cts.cli.ctx) if err != nil { - cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.cfg.ServerAddr, err.Error()) + cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) goto reconnect_to_server } @@ -776,7 +780,7 @@ start_over: cts.local_addr = p.LocalAddr.String() } - cts.cli.log.Write(cts.sid, LOG_INFO, "Got packet stream from server %s", cts.cfg.ServerAddr) + cts.cli.log.Write(cts.sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc} @@ -785,7 +789,7 @@ start_over: // let's add routes to the client-side peers if given err = cts.AddClientRoutes(cts.cfg.PeerAddrs) if err != nil { - cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to add routes to server %s for %v - %s", cts.cfg.ServerAddr, cts.cfg.PeerAddrs, err.Error()) + cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to add routes to server[%s] %s for %v - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.cfg.PeerAddrs, err.Error()) goto done } } @@ -811,7 +815,7 @@ start_over: if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) { goto reconnect_to_server } else { - cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.remote_addr, err.Error()) + cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet from %s - %s", cts.remote_addr, err.Error()) goto reconnect_to_server } } @@ -944,7 +948,7 @@ start_over: } done: - cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server %s", cts.cfg.ServerAddr) + cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) //cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination cts.ReqStop() @@ -1056,11 +1060,6 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) { cts = NewClientConn(c, cfg) c.cts_mtx.Lock() - _, ok = c.cts_map_by_addr[cfg.ServerAddr] - if ok { - c.cts_mtx.Unlock() - return nil, fmt.Errorf("existing server - %s", cfg.ServerAddr) - } id = rand.Uint32() for { @@ -1072,12 +1071,11 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) { cts.cfg.Id = id // store it again in the active configuration for easy access via control channel cts.sid = fmt.Sprintf("%d", id) // id in string used for logging - c.cts_map_by_addr[cfg.ServerAddr] = cts c.cts_map[id] = cts c.stats.conns.Add(1) c.cts_mtx.Unlock() - c.log.Write ("", LOG_INFO, "Added client connection(%d) to %s", cts.id, cfg.ServerAddr) + c.log.Write ("", LOG_INFO, "Added client connection(%d) to %v", cts.id, cfg.ServerAddrs) return cts, nil } @@ -1124,12 +1122,11 @@ func (c *Client) RemoveClientConn(cts *ClientConn) error { return fmt.Errorf("conflicting connection id - %d", cts.id) } - delete(c.cts_map_by_addr, cts.cfg.ServerAddr) delete(c.cts_map, cts.id) c.stats.conns.Store(int64(len(c.cts_map))) c.cts_mtx.Unlock() - c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr) + c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %v", cts.id, cts.cfg.ServerAddrs) cts.ReqStop() return nil @@ -1149,12 +1146,11 @@ func (c *Client) RemoveClientConnById(conn_id uint32) error { // NOTE: removal by id doesn't perform identity check - delete(c.cts_map_by_addr, cts.cfg.ServerAddr) delete(c.cts_map, cts.id) c.stats.conns.Store(int64(len(c.cts_map))) c.cts_mtx.Unlock() - c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr) + c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %v", cts.id, cts.cfg.ServerAddrs) cts.ReqStop() return nil } @@ -1295,7 +1291,7 @@ func (c *Client) start_service(data interface{}) (*ClientConn, error) { cts, err = c.AddNewClientConn(cfg) if err != nil { - err = fmt.Errorf("unable to add server connection structure to %s - %s", cfg.ServerAddr, err.Error()) + err = fmt.Errorf("unable to add server connection structure to %v - %s", cfg.ServerAddrs, err.Error()) return nil, err } @@ -1313,7 +1309,7 @@ func (c *Client) StartService(data interface{}) { if err != nil { c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error()) } else { - c.log.Write("", LOG_INFO, "Started service for %s [%d]", cts.cfg.ServerAddr, cts.cfg.Id) + c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id) } } diff --git a/cmd/main.go b/cmd/main.go index 1499870..293cfce 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -214,10 +214,6 @@ func client_main(ctl_addrs []string, rpc_addrs []string, peer_addrs []string, cf if len(rpc_addrs) <= 0 { return fmt.Errorf("no rpc server address specified") - } else if len(rpc_addrs) > 1 { - // TODO: instead of returning an error here, - // support multiple endpoint addresses. round-robin or something to a working server? - return fmt.Errorf("too many rpc server addresses specified") } c = hodu.NewClient( @@ -228,7 +224,7 @@ func client_main(ctl_addrs []string, rpc_addrs []string, peer_addrs []string, cf ctltlscfg, rpctlscfg) - cc.ServerAddr = rpc_addrs[0] + cc.ServerAddrs = rpc_addrs cc.PeerAddrs = peer_addrs c.StartService(&cc)