From d1673088796a42c47d6721415273b55e13de20cb Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Tue, 3 Dec 2024 11:52:46 +0900 Subject: [PATCH] update to support multiple control server addresses --- client-ctl.go | 12 +++- client.go | 73 +++++++++++++++-------- cmd/main.go | 37 ++++++------ server-ctl.go | 18 ++++-- server-peer.go | 4 +- server.go | 158 ++++++++++++++++++++++++++++--------------------- 6 files changed, 181 insertions(+), 121 deletions(-) diff --git a/client-ctl.go b/client-ctl.go index 1aa0278..72b654a 100644 --- a/client-ctl.go +++ b/client-ctl.go @@ -37,7 +37,8 @@ type json_out_client_conn_id struct { type json_out_client_conn struct { Id uint32 `json:"id"` - ServerAddr string `json:"server-addr"` + ReqServerAddr string `json:"req-server-addr"` // server address requested. may be a domain name + ServerAddr string `json:"server-addr"` // actual server address ClientAddr string `json:"client-addr"` Routes []json_out_client_route `json:"routes"` } @@ -126,6 +127,7 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R } js = append(js, json_out_client_conn{ Id: cts.id, + ReqServerAddr: cts.cfg.ServerAddr, ServerAddr: cts.remote_addr, ClientAddr: cts.local_addr, Routes: jsp, @@ -233,7 +235,13 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt ServerPeerListenAddr: r.server_peer_listen_addr.String(), }) } - js = &json_out_client_conn{Id: cts.id, ServerAddr: cts.local_addr, ClientAddr: cts.remote_addr, Routes: jsp} + js = &json_out_client_conn{ + Id: cts.id, + ReqServerAddr: cts.cfg.ServerAddr, + ServerAddr: cts.local_addr, + ClientAddr: cts.remote_addr, + Routes: jsp, + } cts.route_mtx.Unlock() status_code = http.StatusOK; w.WriteHeader(status_code) diff --git a/client.go b/client.go index f1b2bde..fa2ada2 100644 --- a/client.go +++ b/client.go @@ -40,12 +40,15 @@ type Client struct { ctx context.Context ctx_cancel context.CancelFunc tlscfg *tls.Config - ctl_prefix string + ext_mtx sync.Mutex ext_svcs []Service + + ctl_addr []string + ctl_prefix string ctl_mux *http.ServeMux - ctl *http.Server // control server + ctl []*http.Server // control server cts_mtx sync.Mutex cts_map_by_addr ClientConnMapByAddr @@ -63,7 +66,7 @@ type ClientConn struct { cli *Client cfg ClientConfigActive id uint32 - lid string + sid string // id rendered in string local_addr string remote_addr string @@ -621,10 +624,10 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { defer wg.Done() // arrange to call at the end of this function start_over: - cts.cli.log.Write(cts.lid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddr) + cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddr) cts.conn, err = grpc.NewClient(cts.cfg.ServerAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddr, err.Error()) + cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddr, err.Error()) goto reconnect_to_server } cts.hdc = NewHoduClient(cts.conn) @@ -638,17 +641,17 @@ start_over: c_seed.Flags = 0 s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed) if err != nil { - cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get seed from server %s - %s", cts.cfg.ServerAddr, err.Error()) + cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get seed from server %s - %s", cts.cfg.ServerAddr, err.Error()) goto reconnect_to_server } cts.s_seed = *s_seed cts.c_seed = c_seed - cts.cli.log.Write(cts.lid, LOG_INFO, "Got seed from server %s - ver=%#x", cts.cfg.ServerAddr, cts.s_seed.Version) + cts.cli.log.Write(cts.sid, LOG_INFO, "Got seed from server %s - ver=%#x", cts.cfg.ServerAddr, cts.s_seed.Version) psc, err = cts.hdc.PacketStream(cts.cli.ctx) if err != nil { - cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.cfg.ServerAddr, err.Error()) + cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.cfg.ServerAddr, err.Error()) goto reconnect_to_server } @@ -658,7 +661,7 @@ start_over: cts.local_addr = p.LocalAddr.String() } - cts.cli.log.Write(cts.lid, LOG_INFO, "Got packet stream from server %s", cts.cfg.ServerAddr) + cts.cli.log.Write(cts.sid, LOG_INFO, "Got packet stream from server %s", cts.cfg.ServerAddr) cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc} @@ -666,7 +669,7 @@ start_over: // let's add routes to the client-side peers. err = cts.AddClientRoutes(cts.cfg.PeerAddrs) if err != nil { - cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to add routes to server %s for %v - %s", cts.cfg.ServerAddr, cts.cfg.PeerAddrs, err.Error()) + cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to add routes to server %s for %v - %s", cts.cfg.ServerAddr, cts.cfg.PeerAddrs, err.Error()) goto done } @@ -693,7 +696,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) { goto reconnect_to_server } else { - cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.cfg.ServerAddr, err.Error()) + cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.cfg.ServerAddr, err.Error()) goto reconnect_to_server } } @@ -843,8 +846,9 @@ func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type P // -------------------------------------------------------------------- -func NewClient(ctx context.Context, ctl_addr string, logger Logger, tlscfg *tls.Config) *Client { +func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, tlscfg *tls.Config) *Client { var c Client + var i int c.ctx, c.ctx_cancel = context.WithCancel(ctx) c.tlscfg = tlscfg @@ -866,10 +870,15 @@ func NewClient(ctx context.Context, ctl_addr string, logger Logger, tlscfg *tls. c.ctl_mux.Handle(c.ctl_prefix + "/server-conns", &client_ctl_clients{c: &c}) c.ctl_mux.Handle(c.ctl_prefix + "/server-conns/{id}", &client_ctl_clients_id{c: &c}) - c.ctl = &http.Server{ - Addr: ctl_addr, - Handler: c.ctl_mux, - // TODO: more settings + c.ctl_addr = make([]string, len(ctl_addrs)) + c.ctl = make([]*http.Server, len(ctl_addrs)) + copy(c.ctl_addr, ctl_addrs) + for i = 0; i < len(ctl_addrs); i++ { + c.ctl[i] = &http.Server{ + Addr: ctl_addrs[i], + Handler: c.ctl_mux, + // TODO: more settings + } } return &c @@ -898,7 +907,7 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) { } cts.id = id cts.cfg.Id = id // store it again in the active configuration for easy access via control channel - cts.lid = fmt.Sprintf("%d", id) // id in string used for logging + cts.sid = fmt.Sprintf("%d", id) // id in string used for logging c.cts_map_by_addr[cfg.ServerAddr] = cts c.cts_map[id] = cts @@ -1035,9 +1044,10 @@ func (c *Client) FindClientPeerConnById(conn_id uint32, route_id uint32, peer_id func (c *Client) ReqStop() { if c.stop_req.CompareAndSwap(false, true) { var cts *ClientConn + var ctl *http.Server - if c.ctl != nil { - c.ctl.Shutdown(c.ctx) // to break c.ctl.ListenAndServe() + for _, ctl = range c.ctl { + ctl.Shutdown(c.ctx) // to break c.ctl.ListenAndServe() } for _, cts = range c.cts_map { @@ -1051,15 +1061,26 @@ func (c *Client) ReqStop() { func (c *Client) RunCtlTask(wg *sync.WaitGroup) { var err error + var ctl *http.Server + var idx int + var l_wg sync.WaitGroup defer wg.Done() - err = c.ctl.ListenAndServe() - if errors.Is(err, http.ErrServerClosed) { - c.log.Write("", LOG_DEBUG, "Control channel closed") - } else { - c.log.Write("", LOG_ERROR, "Control channel error - %s", err.Error()) + for idx, ctl = range c.ctl { + l_wg.Add(1) + go func(i int, cs *http.Server) { + c.log.Write ("", LOG_INFO, "Control channel[%d] started on %s", i, c.ctl_addr[i]) + err = cs.ListenAndServe() + if errors.Is(err, http.ErrServerClosed) { + c.log.Write("", LOG_DEBUG, "Control channel[%d] ended", i) + } else { + c.log.Write("", LOG_ERROR, "Control channel[%d] error - %s", i, err.Error()) + } + l_wg.Done() + }(idx, ctl) } + l_wg.Wait() } func (c *Client) StartCtlService() { @@ -1067,10 +1088,10 @@ func (c *Client) StartCtlService() { go c.RunCtlTask(&c.wg) } - func (c *Client) RunTask(wg *sync.WaitGroup) { // just a place holder to pacify the Service interface - // StartService() calls cts.RunTask() instead. + // StartService() calls cts.RunTask() instead. it is not called. + // so no call to wg.Done() } func (c *Client) start_service(data interface{}) (*ClientConn, error) { diff --git a/cmd/main.go b/cmd/main.go index 126630f..00d77a6 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -146,7 +146,7 @@ func (sh *signal_handler) WriteLog(id string, level hodu.LogLevel, fmt string, a sh.svc.WriteLog(id, level, fmt, args...) } -func server_main(ctl_addr string, laddrs []string) error { +func server_main(ctl_addrs []string, svcaddrs []string) error { var s *hodu.Server var err error var cert tls.Certificate @@ -156,7 +156,7 @@ func server_main(ctl_addr string, laddrs []string) error { return fmt.Errorf("ERROR: failed to load key pair - %s\n", err) } - s, err = hodu.NewServer(context.Background(), ctl_addr, laddrs, &AppLogger{id: "server", out: os.Stderr}, &tls.Config{Certificates: []tls.Certificate{cert}}) + s, err = hodu.NewServer(context.Background(), ctl_addrs, svcaddrs, &AppLogger{id: "server", out: os.Stderr}, &tls.Config{Certificates: []tls.Certificate{cert}}) if err != nil { return fmt.Errorf("ERROR: failed to create new server - %s", err.Error()) } @@ -171,7 +171,7 @@ func server_main(ctl_addr string, laddrs []string) error { // -------------------------------------------------------------------- -func client_main(ctl_addr string, server_addr string, peer_addrs []string) error { +func client_main(ctl_addr []string, server_addr string, peer_addrs []string) error { var c *hodu.Client var cert_pool *x509.CertPool var tlscfg *tls.Config @@ -210,18 +210,19 @@ func main() { goto wrong_usage } if strings.EqualFold(os.Args[1], "server") { - var la []string - var ctl_addr string + var rpc_addrs[] string + var ctl_addrs[] string - la = make([]string, 0) + ctl_addrs = make([]string, 0) + rpc_addrs = make([]string, 0) flgs = flag.NewFlagSet("", flag.ContinueOnError) flgs.Func("ctl-on", "specify a listening address for control channel", func(v string) error { - ctl_addr = v // TODO: support multiple addrs + ctl_addrs = append(ctl_addrs, v) return nil }) flgs.Func("rpc-on", "specify a rpc listening address", func(v string) error { - la = append(la, v) + rpc_addrs = append(rpc_addrs, v) return nil }) flgs.SetOutput(io.Discard) // prevent usage output @@ -231,29 +232,29 @@ func main() { goto wrong_usage } - if ctl_addr == "" || len(la) < 0 || flgs.NArg() > 0 { + if len(rpc_addrs) < 1 || flgs.NArg() > 0 { goto wrong_usage } - err = server_main(ctl_addr, la) + err = server_main(ctl_addrs, rpc_addrs) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: server error - %s\n", err.Error()) goto oops } } else if strings.EqualFold(os.Args[1], "client") { - var rpc_addr []string - var ctl_addr[] string + var rpc_addrs []string + var ctl_addrs []string - ctl_addr = make([]string, 0) - rpc_addr = make([]string, 0) + ctl_addrs = make([]string, 0) + rpc_addrs = make([]string, 0) flgs = flag.NewFlagSet("", flag.ContinueOnError) flgs.Func("ctl-on", "specify a listening address for control channel", func(v string) error { - ctl_addr = append(ctl_addr, v) + ctl_addrs = append(ctl_addrs, v) return nil }) flgs.Func("rpc-server", "specify a rpc server address", func(v string) error { - rpc_addr = append(rpc_addr, v) + rpc_addrs = append(rpc_addrs, v) return nil }) flgs.SetOutput(io.Discard) @@ -263,10 +264,10 @@ func main() { goto wrong_usage } - if len(ctl_addr) != 1 || len(rpc_addr) != 1 || flgs.NArg() < 1 { + if len(rpc_addrs) < 1 || flgs.NArg() < 1 { goto wrong_usage } - err = client_main(ctl_addr[0], rpc_addr[0], flgs.Args()) + err = client_main(ctl_addrs, rpc_addrs[0], flgs.Args()) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: client error - %s\n", err.Error()) goto oops diff --git a/server-ctl.go b/server-ctl.go index ad0c6fb..a513e06 100644 --- a/server-ctl.go +++ b/server-ctl.go @@ -56,10 +56,15 @@ func (ctl *server_ctl_server_conns) ServeHTTP(w http.ResponseWriter, req *http.R jsp = append(jsp, json_out_server_route{ Id: r.id, ClientPeerAddr: r.ptc_addr, - ServerPeerListenAddr: r.laddr.String(), + ServerPeerListenAddr: r.svcaddr.String(), }) } - js = append(js, json_out_server_conn{Id: cts.id, ClientAddr: cts.caddr.String(), ServerAddr: cts.local_addr.String(), Routes: jsp}) + js = append(js, json_out_server_conn{ + Id: cts.id, + ClientAddr: cts.raddr.String(), + ServerAddr: cts.laddr.String(), + Routes: jsp, + }) cts.route_mtx.Unlock() } s.cts_mtx.Unlock() @@ -126,10 +131,15 @@ func (ctl *server_ctl_server_conns_id) ServeHTTP(w http.ResponseWriter, req *htt jsp = append(jsp, json_out_server_route{ Id: r.id, ClientPeerAddr: r.ptc_addr, - ServerPeerListenAddr: r.laddr.String(), + ServerPeerListenAddr: r.svcaddr.String(), }) } - js = &json_out_server_conn{Id: cts.id, ClientAddr: cts.caddr.String(), ServerAddr: cts.local_addr.String(), Routes: jsp} + js = &json_out_server_conn{ + Id: cts.id, + ClientAddr: cts.raddr.String(), + ServerAddr: cts.laddr.String(), + Routes: jsp, + } cts.route_mtx.Unlock() status_code = http.StatusOK; w.WriteHeader(status_code) diff --git a/server-peer.go b/server-peer.go index c319ba5..f4153e9 100644 --- a/server-peer.go +++ b/server-peer.go @@ -180,11 +180,11 @@ fmt.Printf("******************* CCCCCCCCCCCCCCCCCCCCCCCccc\n") _, err = spc.conn.Write(event_data) if err != nil { // TODO: logging - fmt.Printf ("WARNING - failed to write data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String()) + fmt.Printf ("WARNING - failed to write data from %s to %s\n", spc.route.cts.raddr, spc.conn.RemoteAddr().String()) } } else { // protocol error. the client must not relay more data from the client-side peer after EOF. - fmt.Printf("WARNING - broken client - redundant data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String()) + fmt.Printf("WARNING - broken client - redundant data from %s to %s\n", spc.route.cts.raddr, spc.conn.RemoteAddr().String()) } default: diff --git a/server.go b/server.go index 35c3dfe..949d443 100644 --- a/server.go +++ b/server.go @@ -30,24 +30,26 @@ type Server struct { tlscfg *tls.Config wg sync.WaitGroup - ext_mtx sync.Mutex - ext_svcs []Service stop_req atomic.Bool stop_chan chan bool + ext_mtx sync.Mutex + ext_svcs []Service + + ctl_addr []string ctl_prefix string ctl_mux *http.ServeMux - ctl *http.Server // control server + ctl []*http.Server // control server - l []*net.TCPListener // main listener for grpc - l_wg sync.WaitGroup + rpc []*net.TCPListener // main listener for grpc + rpc_wg sync.WaitGroup + gs *grpc.Server cts_mtx sync.Mutex cts_map ServerConnMap cts_map_by_addr ServerConnMapByAddr cts_wg sync.WaitGroup - gs *grpc.Server log Logger UnimplementedHoduServer @@ -58,9 +60,10 @@ type Server struct { type ServerConn struct { svr *Server id uint32 - lid string // for logging - caddr net.Addr // client address that created this structure - local_addr net.Addr + sid string // for logging + + raddr net.Addr // client address that created this structure + laddr net.Addr // local address that the client is connected to pss *GuardedPacketStreamServer route_mtx sync.Mutex @@ -75,7 +78,7 @@ type ServerConn struct { type ServerRoute struct { cts *ServerConn l *net.TCPListener - laddr *net.TCPAddr + svcaddr *net.TCPAddr ptc_addr string id uint32 @@ -119,10 +122,10 @@ func (g *GuardedPacketStreamServer) Context() context.Context { func NewServerRoute(cts *ServerConn, id uint32, proto ROUTE_PROTO, ptc_addr string) (*ServerRoute, error) { var r ServerRoute var l *net.TCPListener - var laddr *net.TCPAddr + var svcaddr *net.TCPAddr var err error - l, laddr, err = cts.make_route_listener(proto) + l, svcaddr, err = cts.make_route_listener(proto) if err != nil { return nil, err } @@ -130,7 +133,7 @@ func NewServerRoute(cts *ServerConn, id uint32, proto ROUTE_PROTO, ptc_addr stri r.cts = cts r.id = id r.l = l - r.laddr = laddr + r.svcaddr = svcaddr r.ptc_addr = ptc_addr r.pts_limit = PTS_LIMIT r.pts_map = make(ServerPeerConnMap) @@ -186,7 +189,7 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { var log_id string defer wg.Done() - log_id = fmt.Sprintf("%s,%d", r.cts.caddr.String(), r.id) + log_id = fmt.Sprintf("%s,%d", r.cts.raddr.String(), r.id) for { conn, err = r.l.AcceptTCP() @@ -252,7 +255,7 @@ func (r *ServerRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_d func (cts *ServerConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) { var l *net.TCPListener var err error - var laddr *net.TCPAddr + var svcaddr *net.TCPAddr var port int var tries int = 0 var nw string @@ -269,12 +272,12 @@ func (cts *ServerConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, for { port = rand.Intn(65535-32000+1) + 32000 - laddr, err = net.ResolveTCPAddr(nw, fmt.Sprintf(":%d", port)) + svcaddr, err = net.ResolveTCPAddr(nw, fmt.Sprintf(":%d", port)) if err == nil { - l, err = net.ListenTCP(nw, laddr) // make the binding address configurable. support multiple binding addresses??? + l, err = net.ListenTCP(nw, svcaddr) // make the binding address configurable. support multiple binding addresses??? if err == nil { fmt.Printf("listening .... on ... %d\n", port) - return l, laddr, nil + return l, svcaddr, nil } } @@ -373,11 +376,11 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { for { pkt, err = cts.pss.Recv() if errors.Is(err, io.EOF) { - cts.svr.log.Write("", LOG_INFO, "GRPC stream closed for client %s", cts.caddr) + cts.svr.log.Write("", LOG_INFO, "GRPC stream closed for client %s", cts.raddr) goto done } if err != nil { - cts.svr.log.Write("", LOG_ERROR, "GRPC stream error for client %s - %s", cts.caddr, err.Error()) + cts.svr.log.Write("", LOG_ERROR, "GRPC stream error for client %s - %s", cts.raddr, err.Error()) goto done } @@ -391,18 +394,18 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto, x.Route.AddrStr) if err != nil { - cts.svr.log.Write("", LOG_ERROR, "Failed to add server route for client %s peer %s", cts.caddr, x.Route.AddrStr) + cts.svr.log.Write("", LOG_ERROR, "Failed to add server route for client %s peer %s", cts.raddr, x.Route.AddrStr) } else { - cts.svr.log.Write("", LOG_INFO, "Added server route(id=%d) for client %s peer %s to cts(id=%d)", r.id, cts.caddr, x.Route.AddrStr, cts.id) - err = cts.pss.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.laddr.String())) + cts.svr.log.Write("", LOG_INFO, "Added server route(id=%d) for client %s peer %s to cts(id=%d)", r.id, cts.raddr, x.Route.AddrStr, cts.id) + err = cts.pss.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.svcaddr.String())) if err != nil { r.ReqStop() - cts.svr.log.Write("", LOG_ERROR, "Failed to inform client %s of server route started for peer %s", cts.caddr, x.Route.AddrStr) + cts.svr.log.Write("", LOG_ERROR, "Failed to inform client %s of server route started for peer %s", cts.raddr, x.Route.AddrStr) goto done } } } else { - cts.svr.log.Write("", LOG_INFO, "Received invalid packet from %s", cts.caddr) + cts.svr.log.Write("", LOG_INFO, "Received invalid packet from %s", cts.raddr) // TODO: need to abort this client? } @@ -415,18 +418,18 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { r, err = cts.RemoveServerRouteById(x.Route.RouteId) if err != nil { - cts.svr.log.Write("", LOG_ERROR, "Failed to delete server route(id=%d) for client %s peer %s", x.Route.RouteId, cts.caddr, x.Route.AddrStr) + cts.svr.log.Write("", LOG_ERROR, "Failed to delete server route(id=%d) for client %s peer %s", x.Route.RouteId, cts.raddr, x.Route.AddrStr) } else { - cts.svr.log.Write("", LOG_ERROR, "Deleted server route(id=%d) for client %s peer %s", x.Route.RouteId, cts.caddr, x.Route.AddrStr) + cts.svr.log.Write("", LOG_ERROR, "Deleted server route(id=%d) for client %s peer %s", x.Route.RouteId, cts.raddr, x.Route.AddrStr) err = cts.pss.Send(MakeRouteStoppedPacket(x.Route.RouteId, x.Route.Proto)) if err != nil { r.ReqStop() - cts.svr.log.Write("", LOG_ERROR, "Failed to inform client %s of server route(id=%d) stopped for peer %s", cts.caddr, x.Route.RouteId, x.Route.AddrStr) + cts.svr.log.Write("", LOG_ERROR, "Failed to inform client %s of server route(id=%d) stopped for peer %s", cts.raddr, x.Route.RouteId, x.Route.AddrStr) goto done } } } else { - cts.svr.log.Write("", LOG_INFO, "Received invalid packet from %s", cts.caddr) + cts.svr.log.Write("", LOG_INFO, "Received invalid packet from %s", cts.raddr) // TODO: need to abort this client? } @@ -698,35 +701,36 @@ func unaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, han return m, err } -func NewServer(ctx context.Context, ctl_addr string, laddrs []string, logger Logger, tlscfg *tls.Config) (*Server, error) { +func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logger Logger, tlscfg *tls.Config) (*Server, error) { var s Server var l *net.TCPListener - var laddr *net.TCPAddr + var rpcaddr *net.TCPAddr var err error var addr string var gl *net.TCPListener + var i int var cwd string - if len(laddrs) <= 0 { + if len(rpc_addrs) <= 0 { return nil, fmt.Errorf("no server addresses provided") } s.ctx, s.ctx_cancel = context.WithCancel(ctx) s.log = logger /* create the specified number of listeners */ - s.l = make([]*net.TCPListener, 0) - for _, addr = range laddrs { - laddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, addr) // Make this interruptable??? + s.rpc = make([]*net.TCPListener, 0) + for _, addr = range rpc_addrs { + rpcaddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, addr) // Make this interruptable??? if err != nil { goto oops } - l, err = net.ListenTCP(NET_TYPE_TCP, laddr) + l, err = net.ListenTCP(NET_TYPE_TCP, rpcaddr) if err != nil { goto oops } - s.l = append(s.l, l) + s.rpc = append(s.rpc, l) } s.tlscfg = tlscfg @@ -758,10 +762,15 @@ func NewServer(ctx context.Context, ctl_addr string, laddrs []string, logger Log s.ctl_mux.Handle(s.ctl_prefix + "/server-conns", &server_ctl_server_conns{s: &s}) s.ctl_mux.Handle(s.ctl_prefix + "/server-conns/{conn_id}", &server_ctl_server_conns_id{s: &s}) - s.ctl = &http.Server{ - Addr: ctl_addr, - Handler: s.ctl_mux, - // TODO: more settings + s.ctl_addr = make([]string, len(ctl_addrs)) + s.ctl = make([]*http.Server, len(ctl_addrs)) + copy(s.ctl_addr, ctl_addrs) + for i = 0; i < len(ctl_addrs); i++ { + s.ctl[i] = &http.Server{ + Addr: ctl_addrs[i], + Handler: s.ctl_mux, + // TODO: more settings + } } return &s, nil @@ -772,10 +781,10 @@ oops: gl.Close() } - for _, l = range s.l { + for _, l = range s.rpc { l.Close() } - s.l = make([]*net.TCPListener, 0) + s.rpc = make([]*net.TCPListener, 0) return nil, err } @@ -785,15 +794,15 @@ func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error { defer wg.Done() - l = s.l[idx] + l = s.rpc[idx] // it seems to be safe to call a single grpc server on differnt listening sockets multiple times - s.log.Write("", LOG_ERROR, "Starting GRPC server listening on %s", l.Addr().String()) + s.log.Write("", LOG_ERROR, "Starting GRPC server on %s", l.Addr().String()) err = s.gs.Serve(l) if err != nil { if errors.Is(err, net.ErrClosed) { - s.log.Write("", LOG_ERROR, "GRPC server listening on %s closed", l.Addr().String()) + s.log.Write("", LOG_ERROR, "GRPC server on %s closed", l.Addr().String()) } else { - s.log.Write("", LOG_ERROR, "Error from GRPC server listening on %s - %s", l.Addr().String(), err.Error()) + s.log.Write("", LOG_ERROR, "Error from GRPC server on %s - %s", l.Addr().String(), err.Error()) } return err } @@ -806,9 +815,9 @@ func (s *Server) RunTask(wg *sync.WaitGroup) { defer wg.Done() - for idx, _ = range s.l { - s.l_wg.Add(1) - go s.run_grpc_server(idx, &s.l_wg) + for idx, _ = range s.rpc { + s.rpc_wg.Add(1) + go s.run_grpc_server(idx, &s.rpc_wg) } // most the work is done by in separate goroutines (s.run_grp_server) @@ -824,7 +833,7 @@ task_loop: s.ReqStop() - s.l_wg.Wait() + s.rpc_wg.Wait() s.log.Write("", LOG_DEBUG, "All GRPC listeners completed") s.cts_wg.Wait() @@ -836,30 +845,41 @@ task_loop: func (s *Server) RunCtlTask(wg *sync.WaitGroup) { var err error + var ctl *http.Server + var idx int + var l_wg sync.WaitGroup defer wg.Done() - err = s.ctl.ListenAndServe() - if errors.Is(err, http.ErrServerClosed) { - fmt.Printf("------------http server error - %s\n", err.Error()) - } else { - fmt.Printf("********* http server ended\n") + for idx, ctl = range s.ctl { + l_wg.Add(1) + go func(i int, cs *http.Server) { + s.log.Write ("", LOG_INFO, "Control channel[%d] started on %s", i, s.ctl_addr[i]) + err = cs.ListenAndServe() + if errors.Is(err, http.ErrServerClosed) { + s.log.Write("", LOG_DEBUG, "Control channel[%d] ended", i) + } else { + s.log.Write("", LOG_ERROR, "Control channel[%d] error - %s", i, err.Error()) + } + l_wg.Done() + }(idx, ctl) } + l_wg.Wait() } func (s *Server) ReqStop() { if s.stop_req.CompareAndSwap(false, true) { var l *net.TCPListener var cts *ServerConn + var ctl *http.Server - if s.ctl != nil { - // shutdown the control server if ever started. - s.ctl.Shutdown(s.ctx) + for _, ctl = range s.ctl { + ctl.Shutdown(s.ctx) // to break c.ctl.ListenAndServe() } //s.gs.GracefulStop() //s.gs.Stop() - for _, l = range s.l { + for _, l = range s.rpc { l.Close() } @@ -881,8 +901,8 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p cts.svr = s cts.route_map = make(ServerRouteMap) - cts.caddr = *remote_addr - cts.local_addr = *local_addr + cts.raddr = *remote_addr + cts.laddr = *local_addr cts.pss = &GuardedPacketStreamServer{Hodu_PacketStreamServer: pss} cts.stop_req.Store(false) @@ -898,15 +918,15 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p id++ } cts.id = id - cts.lid = fmt.Sprintf("%d", id) // id in string used for logging + cts.sid = fmt.Sprintf("%d", id) // id in string used for logging - _, ok = s.cts_map_by_addr[cts.caddr] + _, ok = s.cts_map_by_addr[cts.raddr] if ok { - return nil, fmt.Errorf("existing client - %s", cts.caddr.String()) + return nil, fmt.Errorf("existing client - %s", cts.raddr.String()) } - s.cts_map_by_addr[cts.caddr] = &cts + s.cts_map_by_addr[cts.raddr] = &cts s.cts_map[id] = &cts; - s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.caddr.String()) + s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.raddr.String()) return &cts, nil } @@ -938,7 +958,7 @@ func (s *Server) RemoveServerConn(cts *ServerConn) error { } delete(s.cts_map, cts.id) - delete(s.cts_map_by_addr, cts.caddr) + delete(s.cts_map_by_addr, cts.raddr) s.cts_mtx.Unlock() cts.ReqStop() @@ -957,7 +977,7 @@ func (s *Server) RemoveServerConnByAddr(addr net.Addr) error { return fmt.Errorf("non-existent connection address - %s", addr.String()) } delete(s.cts_map, cts.id) - delete(s.cts_map_by_addr, cts.caddr) + delete(s.cts_map_by_addr, cts.raddr) s.cts_mtx.Unlock() cts.ReqStop()