From f6de948a5a2c69cfcacd3c2d77676c28bf53d680 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Sat, 30 Nov 2024 02:53:47 +0900 Subject: [PATCH] changed to store the target server address string as given by the caller and not resolve the string --- client-ctl.go | 65 +++++++++++++++------------ client.go | 120 +++++++++++++++++++++++--------------------------- 2 files changed, 91 insertions(+), 94 deletions(-) diff --git a/client-ctl.go b/client-ctl.go index 15e8804..7e39dc6 100644 --- a/client-ctl.go +++ b/client-ctl.go @@ -4,7 +4,6 @@ import "encoding/json" import "net/http" import "strconv" - /* * POST GET PUT DELETE * /servers - create new server list all servers bulk update delete all servers @@ -24,8 +23,16 @@ type json_errmsg struct { Text string `json:"error-text"` } -type json_in_peer_addrs struct { - PeerAddrs []string `json:"peer-addrs"` +type json_in_client_conn struct { + ServerAddr string `json:"server-addr"` +} + +type json_in_client_route struct { + ClientPeerAddr string `json:"client-peer-addr"` +} + +type json_out_client_conn_id struct { + Id uint32 `json:"id"` } type json_out_client_conn struct { @@ -34,6 +41,10 @@ type json_out_client_conn struct { Routes []json_out_client_route `json:"routes"` } +type json_out_client_route_id struct { + Id uint32 `json:"id"` +} + type json_out_client_route struct { Id uint32 `json:"id"` ClientPeerAddr string `json:"client-peer-addr"` @@ -94,11 +105,11 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R for _, r = range cts.route_map { jsp = append(jsp, json_out_client_route{ Id: r.id, - ClientPeerAddr: r.peer_addr.String(), + ClientPeerAddr: r.peer_addr, ServerPeerListenAddr: r.server_peer_listen_addr.String(), }) } - js = append(js, json_out_client_conn{Id: cts.id, ServerAddr: cts.saddr.String(), Routes: jsp}) + js = append(js, json_out_client_conn{Id: cts.id, ServerAddr: cts.cfg.ServerAddr, Routes: jsp}) cts.route_mtx.Unlock() } c.cts_mtx.Unlock() @@ -108,7 +119,7 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R case http.MethodPost: // add a new server connection - var s ClientCtlParamServer + var s json_in_client_conn var cc ClientConfig var cts *ClientConn @@ -118,14 +129,14 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R goto done } cc.ServerAddr = s.ServerAddr - cc.PeerAddrs = s.PeerAddrs + //cc.PeerAddrs = s.PeerAddrs cts, err = c.start_service(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine? if err != nil { status_code = http.StatusInternalServerError; w.WriteHeader(status_code) if err = je.Encode(json_errmsg{Text: err.Error()}); err != nil { goto oops } } else { status_code = http.StatusCreated; w.WriteHeader(status_code) - if err = je.Encode(cts.cfg); err != nil { goto oops } + if err = je.Encode(json_out_client_conn_id{Id: cts.id}); err != nil { goto oops } } case http.MethodDelete: @@ -194,18 +205,18 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt for _, r = range cts.route_map { jsp = append(jsp, json_out_client_route{ Id: r.id, - ClientPeerAddr: r.peer_addr.String(), + ClientPeerAddr: r.peer_addr, ServerPeerListenAddr: r.server_peer_listen_addr.String(), }) } - js = &json_out_client_conn{Id: cts.id, ServerAddr: cts.saddr.String(), Routes: jsp} + js = &json_out_client_conn{Id: cts.id, ServerAddr: cts.cfg.ServerAddr, Routes: jsp} cts.route_mtx.Unlock() status_code = http.StatusOK; w.WriteHeader(status_code) if err = je.Encode(js); err != nil { goto oops } case http.MethodDelete: - /* + /* TODO err = c.RemoveClientConnById(uint32(conn_nid)) if err != nil { status_code = http.StatusNotFound; w.WriteHeader(status_code) @@ -264,13 +275,12 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r var r *ClientRoute var jsp []json_out_client_route - jsp = make([]json_out_client_route, 0) cts.route_mtx.Lock() for _, r = range cts.route_map { jsp = append(jsp, json_out_client_route{ Id: r.id, - ClientPeerAddr: r.peer_addr.String(), + ClientPeerAddr: r.peer_addr, ServerPeerListenAddr: r.server_peer_listen_addr.String(), }) } @@ -280,29 +290,28 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r if err = je.Encode(jsp); err != nil { goto oops } case http.MethodPost: - var pa json_in_peer_addrs - var cts *ClientConn + var jcr json_in_client_route + var r *ClientRoute - err = json.NewDecoder(req.Body).Decode(&pa) + err = json.NewDecoder(req.Body).Decode(&jcr) if err != nil { status_code = http.StatusBadRequest; w.WriteHeader(status_code) goto done } - cts = c.FindClientConnById(uint32(conn_nid)) - if cts == nil { - status_code = http.StatusNotFound; w.WriteHeader(status_code) - if err = je.Encode(json_errmsg{Text: "wrong connection id - " + conn_id}); err != nil { goto oops } + r, err = cts.AddNewClientRoute(jcr.ClientPeerAddr, ROUTE_PROTO_TCP) // TODO: configurable protocol + if err != nil { + status_code = http.StatusInternalServerError; w.WriteHeader(status_code) + if err = je.Encode(json_errmsg{Text: err.Error()}); err != nil { goto oops } } else { - err = cts.AddClientRoutes(pa.PeerAddrs) - if err != nil { - status_code = http.StatusInternalServerError; w.WriteHeader(status_code) - if err = je.Encode(json_errmsg{Text: err.Error()}); err != nil { goto oops } - } else { - status_code = http.StatusCreated; w.WriteHeader(status_code) - } + status_code = http.StatusCreated; w.WriteHeader(status_code) + if err = je.Encode(json_out_client_route_id{Id: r.id}); err != nil { goto oops } } + case http.MethodDelete: + cts.RemoveClientRoutes() + status_code = http.StatusNoContent; w.WriteHeader(status_code) + default: status_code = http.StatusBadRequest; w.WriteHeader(status_code) } @@ -369,7 +378,7 @@ func (ctl *client_ctl_client_conns_id_routes_id) ServeHTTP(w http.ResponseWriter } err = je.Encode(json_out_client_route{ Id: r.id, - ClientPeerAddr: r.peer_addr.String(), + ClientPeerAddr: r.peer_addr, ServerPeerListenAddr: r.server_peer_listen_addr.String(), }) if err != nil { goto oops } diff --git a/client.go b/client.go index be6240f..b82ffe1 100644 --- a/client.go +++ b/client.go @@ -18,7 +18,7 @@ import "google.golang.org/grpc/status" type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet] -type ClientConnMap = map[net.Addr]*ClientConn +type ClientConnMap = map[string]*ClientConn type ClientConnMapById = map[uint32]*ClientConn type ClientPeerConnMap = map[uint32]*ClientPeerConn type ClientRouteMap = map[uint32]*ClientRoute @@ -73,7 +73,7 @@ type ClientPeerConn struct { type ClientConn struct { cli *Client cfg ClientConfigActive - saddr *net.TCPAddr // server address that is connected to + //saddr *net.TCPAddr // server address that is connected to id uint32 lid string @@ -95,7 +95,7 @@ type ClientConn struct { type ClientRoute struct { cts *ClientConn id uint32 - peer_addr *net.TCPAddr + peer_addr string server_peer_listen_addr *net.TCPAddr proto ROUTE_PROTO @@ -108,11 +108,6 @@ type ClientRoute struct { stop_chan chan bool } -type ClientCtlParamServer struct { - ServerAddr string `json:"server-addr"` - PeerAddrs []string `json:"peer-addrs"` -} - type GuardedPacketStreamClient struct { mtx sync.Mutex //psc Hodu_PacketStreamClient @@ -137,7 +132,7 @@ func (g *GuardedPacketStreamClient) Context() context.Context { }*/ // -------------------------------------------------------------------- -func NewClientRoute(cts *ClientConn, id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) *ClientRoute { +func NewClientRoute(cts *ClientConn, id uint32, addr string, proto ROUTE_PROTO) *ClientRoute { var r ClientRoute r.cts = cts @@ -159,10 +154,10 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) { // most useful works are triggered by ReportEvent() and done by ConnectToPeer() defer wg.Done() - r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-start for id=%d peer=%s to %s", r.id, r.peer_addr.String(), r.cts.saddr.String()) - err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.proto, r.peer_addr.String())) + r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-start for id=%d peer=%s to %s", r.id, r.peer_addr, r.cts.cfg.ServerAddr) + err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.proto, r.peer_addr)) if err != nil { - r.cts.cli.log.Write("", LOG_DEBUG, "Failed to Send route-start for id=%d peer=%s to %s", r.id, r.peer_addr.String(), r.cts.saddr.String()) + r.cts.cli.log.Write("", LOG_DEBUG, "Failed to Send route-start for id=%d peer=%s to %s", r.id, r.peer_addr, r.cts.cfg.ServerAddr) goto done } @@ -178,8 +173,8 @@ done: r.ReqStop() r.ptc_wg.Wait() // wait for all peer tasks are finished - r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-stop for id=%d peer=%s to %s", r.id, r.peer_addr.String(), r.cts.saddr.String()) - r.cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr.String())) + r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-stop for id=%d peer=%s to %s", r.id, r.peer_addr, r.cts.cfg.ServerAddr) + r.cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr)) r.cts.RemoveClientRoute(r) fmt.Printf("*** End fo Client Roue Task\n") } @@ -207,7 +202,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) { defer wg.Done() -// TODO: make timeuot value configurable +// TODO: make timeout value configurable // TODO: fire the cancellation function upon stop request??? ctx, cancel = context.WithTimeout(r.cts.cli.ctx, 10 * time.Second) r.ptc_mtx.Lock() @@ -215,7 +210,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) { r.ptc_mtx.Unlock() d.LocalAddr = nil // TOOD: use this if local address is specified - conn, err = d.DialContext(ctx, "tcp", r.peer_addr.String()) + conn, err = d.DialContext(ctx, "tcp", r.peer_addr) r.ptc_mtx.Lock() delete(r.ptc_cancel_map, pts_id) @@ -223,7 +218,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) { if err != nil { // TODO: make send peer started failure mesage? - fmt.Printf("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error()) + fmt.Printf("failed to connect to %s - %s\n", r.peer_addr, err.Error()) goto peer_aborted } @@ -370,12 +365,11 @@ fmt.Printf("GOT PEER EOF. REMEMBER EOF\n") } // -------------------------------------------------------------------- -func NewClientConn(c *Client, addr *net.TCPAddr, cfg *ClientConfig) *ClientConn { +func NewClientConn(c *Client, cfg *ClientConfig) *ClientConn { var cts ClientConn cts.cli = c cts.route_map = make(ClientRouteMap) - cts.saddr = addr cts.cfg.ClientConfig = *cfg cts.stop_req.Store(false) cts.stop_chan = make(chan bool, 8) @@ -386,7 +380,7 @@ func NewClientConn(c *Client, addr *net.TCPAddr, cfg *ClientConfig) *ClientConn return &cts } -func (cts *ClientConn) AddNewClientRoute(addr *net.TCPAddr, proto ROUTE_PROTO) (*ClientRoute, error) { +func (cts *ClientConn) AddNewClientRoute(addr string, proto ROUTE_PROTO) (*ClientRoute, error) { var r *ClientRoute var id uint32 var ok bool @@ -414,6 +408,17 @@ fmt.Printf("added client route.... %d -> %d\n", id, len(cts.route_map)) return r, nil } +func (cts *ClientConn) RemoveClientRoutes() { + var r *ClientRoute + + cts.route_mtx.Lock() + for _, r = range cts.route_map { + delete(cts.route_map, r.id) + r.ReqStop() + } + cts.route_mtx.Unlock() +} + func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error { var r *ClientRoute var ok bool @@ -469,25 +474,12 @@ func (cts *ClientConn) FindClientRouteById(route_id uint32) *ClientRoute { func (cts *ClientConn) AddClientRoutes(peer_addrs []string) error { var v string - var addr *net.TCPAddr - var proto ROUTE_PROTO var err error for _, v = range peer_addrs { - addr, err = net.ResolveTCPAddr(NET_TYPE_TCP, v) // Make this interruptable + _, err = cts.AddNewClientRoute(v, ROUTE_PROTO_TCP) if err != nil { - return fmt.Errorf("unable to resovle %s - %s", v, err.Error()) - } - - if addr.IP.To4() != nil { - proto = ROUTE_PROTO_TCP4 - } else { - proto = ROUTE_PROTO_TCP6 - } - - _, err = cts.AddNewClientRoute(addr, proto) - if err != nil { - return fmt.Errorf("unable to add client route for %s - %s", addr, err.Error()) + return fmt.Errorf("unable to add client route for %s - %s", v, err.Error()) } } @@ -531,10 +523,18 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { defer wg.Done() // arrange to call at the end of this function start_over: - cts.cli.log.Write(cts.lid, LOG_INFO, "Connecting to server %s", cts.saddr.String()) - cts.conn, err = grpc.NewClient(cts.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) +/* + cts.saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cts.cfg.ServerAddr) // TODO: make this interruptable... if err != nil { - cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to make client to server %s - %s", cts.saddr.String(), err.Error()) + err = fmt.Errorf("unresolavable address %s - %s", cts.saddr, err.Error()) + goto reconnect_to_server + } +*/ + + cts.cli.log.Write(cts.lid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddr) + cts.conn, err = grpc.NewClient(cts.cfg.ServerAddr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddr, err.Error()) goto reconnect_to_server } cts.hdc = NewHoduClient(cts.conn) @@ -548,21 +548,21 @@ start_over: c_seed.Flags = 0 s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed) if err != nil { - cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get seed from server %s - %s", cts.saddr.String(), err.Error()) + cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get seed from server %s - %s", cts.cfg.ServerAddr, err.Error()) goto reconnect_to_server } cts.s_seed = *s_seed cts.c_seed = c_seed - cts.cli.log.Write(cts.lid, LOG_INFO, "Got seed from server %s - ver=%#x", cts.saddr.String(), cts.s_seed.Version) + cts.cli.log.Write(cts.lid, LOG_INFO, "Got seed from server %s - ver=%#x", cts.cfg.ServerAddr, cts.s_seed.Version) psc, err = cts.hdc.PacketStream(cts.cli.ctx) if err != nil { - cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.saddr.String(), err.Error()) + cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.cfg.ServerAddr, err.Error()) goto reconnect_to_server } - cts.cli.log.Write(cts.lid, LOG_INFO, "Got packet stream from server %s", cts.saddr.String()) + cts.cli.log.Write(cts.lid, LOG_INFO, "Got packet stream from server %s", cts.cfg.ServerAddr) cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc} @@ -570,7 +570,7 @@ start_over: // let's add routes to the client-side peers. err = cts.AddClientRoutes(cts.cfg.PeerAddrs) if err != nil { - cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to add routes to server %s for %v - %s", cts.saddr.String(), cts.cfg.PeerAddrs, err.Error()) + cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to add routes to server %s for %v - %s", cts.cfg.ServerAddr, cts.cfg.PeerAddrs, err.Error()) goto done } @@ -597,7 +597,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) { goto reconnect_to_server } else { - cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.saddr.String(), err.Error()) + cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.cfg.ServerAddr, err.Error()) goto reconnect_to_server } } @@ -703,7 +703,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) } done: - cts.cli.log.Write("", LOG_INFO, "Disconnected from server %s", cts.saddr.String()) + cts.cli.log.Write("", LOG_INFO, "Disconnected from server %s", cts.cfg.ServerAddr) //cts.RemoveClientRoutes() cts.ReqStop() wait_for_termination: @@ -790,19 +790,19 @@ func NewClient(ctx context.Context, listen_on string, logger Logger, tlscfg *tls return &c } -func (c *Client) AddNewClientConn(addr *net.TCPAddr, cfg *ClientConfig) (*ClientConn, error) { +func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) { var cts *ClientConn var ok bool var id uint32 - cts = NewClientConn(c, addr, cfg) + cts = NewClientConn(c, cfg) c.cts_mtx.Lock() defer c.cts_mtx.Unlock() - _, ok = c.cts_map[addr] + _, ok = c.cts_map[cfg.ServerAddr] if ok { - return nil, fmt.Errorf("existing server - %s", addr.String()) + return nil, fmt.Errorf("existing server - %s", cfg.ServerAddr) } id = rand.Uint32() @@ -813,9 +813,9 @@ func (c *Client) AddNewClientConn(addr *net.TCPAddr, cfg *ClientConfig) (*Client } cts.id = id cts.cfg.Id = id // store it again in the active configuration for easy access via control channel - cts.lid = fmt.Sprintf("%d", id) + cts.lid = fmt.Sprintf("%d", id) // id in string used for logging - c.cts_map[addr] = cts + c.cts_map[cfg.ServerAddr] = cts c.cts_map_by_id[id] = cts fmt.Printf("ADD total servers %d\n", len(c.cts_map)) return cts, nil @@ -823,9 +823,9 @@ fmt.Printf("ADD total servers %d\n", len(c.cts_map)) func (c *Client) RemoveClientConn(cts *ClientConn) { c.cts_mtx.Lock() - delete(c.cts_map, cts.saddr) + delete(c.cts_map, cts.cfg.ServerAddr) delete(c.cts_map_by_id, cts.id) -fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.saddr, len(c.cts_map)) +fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.cfg.ServerAddr, len(c.cts_map)) c.cts_mtx.Unlock() } @@ -886,7 +886,6 @@ func (c *Client) RunTask(wg *sync.WaitGroup) { } func (c *Client) start_service(data interface{}) (*ClientConn, error) { - var saddr *net.TCPAddr var cts *ClientConn var err error var cfg *ClientConfig @@ -898,18 +897,7 @@ func (c *Client) start_service(data interface{}) (*ClientConn, error) { return nil, err } - if len(cfg.PeerAddrs) < 0 || len(cfg.PeerAddrs) > int(^uint16(0)) { // TODO: change this check... not really right... - err = fmt.Errorf("invalid number of peer addresses given to server connection to %s", cfg.ServerAddr) - return nil, err - } - - saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.ServerAddr) // TODO: make this interruptable... - if err != nil { - err = fmt.Errorf("unresolavable address %s - %s", cfg.ServerAddr, err.Error()) - return nil, err - } - - cts, err = c.AddNewClientConn(saddr, cfg) + cts, err = c.AddNewClientConn(cfg) if err != nil { err = fmt.Errorf("unable to add server connection structure to %s - %s", cfg.ServerAddr, err.Error()) return nil, err