changed to store the target server address string as given by the caller and not resolve the string
This commit is contained in:
120
client.go
120
client.go
@ -18,7 +18,7 @@ import "google.golang.org/grpc/status"
|
||||
|
||||
type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet]
|
||||
|
||||
type ClientConnMap = map[net.Addr]*ClientConn
|
||||
type ClientConnMap = map[string]*ClientConn
|
||||
type ClientConnMapById = map[uint32]*ClientConn
|
||||
type ClientPeerConnMap = map[uint32]*ClientPeerConn
|
||||
type ClientRouteMap = map[uint32]*ClientRoute
|
||||
@ -73,7 +73,7 @@ type ClientPeerConn struct {
|
||||
type ClientConn struct {
|
||||
cli *Client
|
||||
cfg ClientConfigActive
|
||||
saddr *net.TCPAddr // server address that is connected to
|
||||
//saddr *net.TCPAddr // server address that is connected to
|
||||
id uint32
|
||||
lid string
|
||||
|
||||
@ -95,7 +95,7 @@ type ClientConn struct {
|
||||
type ClientRoute struct {
|
||||
cts *ClientConn
|
||||
id uint32
|
||||
peer_addr *net.TCPAddr
|
||||
peer_addr string
|
||||
server_peer_listen_addr *net.TCPAddr
|
||||
proto ROUTE_PROTO
|
||||
|
||||
@ -108,11 +108,6 @@ type ClientRoute struct {
|
||||
stop_chan chan bool
|
||||
}
|
||||
|
||||
type ClientCtlParamServer struct {
|
||||
ServerAddr string `json:"server-addr"`
|
||||
PeerAddrs []string `json:"peer-addrs"`
|
||||
}
|
||||
|
||||
type GuardedPacketStreamClient struct {
|
||||
mtx sync.Mutex
|
||||
//psc Hodu_PacketStreamClient
|
||||
@ -137,7 +132,7 @@ func (g *GuardedPacketStreamClient) Context() context.Context {
|
||||
}*/
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
func NewClientRoute(cts *ClientConn, id uint32, addr *net.TCPAddr, proto ROUTE_PROTO) *ClientRoute {
|
||||
func NewClientRoute(cts *ClientConn, id uint32, addr string, proto ROUTE_PROTO) *ClientRoute {
|
||||
var r ClientRoute
|
||||
|
||||
r.cts = cts
|
||||
@ -159,10 +154,10 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) {
|
||||
// most useful works are triggered by ReportEvent() and done by ConnectToPeer()
|
||||
defer wg.Done()
|
||||
|
||||
r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-start for id=%d peer=%s to %s", r.id, r.peer_addr.String(), r.cts.saddr.String())
|
||||
err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.proto, r.peer_addr.String()))
|
||||
r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-start for id=%d peer=%s to %s", r.id, r.peer_addr, r.cts.cfg.ServerAddr)
|
||||
err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.proto, r.peer_addr))
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write("", LOG_DEBUG, "Failed to Send route-start for id=%d peer=%s to %s", r.id, r.peer_addr.String(), r.cts.saddr.String())
|
||||
r.cts.cli.log.Write("", LOG_DEBUG, "Failed to Send route-start for id=%d peer=%s to %s", r.id, r.peer_addr, r.cts.cfg.ServerAddr)
|
||||
goto done
|
||||
}
|
||||
|
||||
@ -178,8 +173,8 @@ done:
|
||||
r.ReqStop()
|
||||
r.ptc_wg.Wait() // wait for all peer tasks are finished
|
||||
|
||||
r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-stop for id=%d peer=%s to %s", r.id, r.peer_addr.String(), r.cts.saddr.String())
|
||||
r.cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr.String()))
|
||||
r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-stop for id=%d peer=%s to %s", r.id, r.peer_addr, r.cts.cfg.ServerAddr)
|
||||
r.cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr))
|
||||
r.cts.RemoveClientRoute(r)
|
||||
fmt.Printf("*** End fo Client Roue Task\n")
|
||||
}
|
||||
@ -207,7 +202,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) {
|
||||
|
||||
defer wg.Done()
|
||||
|
||||
// TODO: make timeuot value configurable
|
||||
// TODO: make timeout value configurable
|
||||
// TODO: fire the cancellation function upon stop request???
|
||||
ctx, cancel = context.WithTimeout(r.cts.cli.ctx, 10 * time.Second)
|
||||
r.ptc_mtx.Lock()
|
||||
@ -215,7 +210,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) {
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
d.LocalAddr = nil // TOOD: use this if local address is specified
|
||||
conn, err = d.DialContext(ctx, "tcp", r.peer_addr.String())
|
||||
conn, err = d.DialContext(ctx, "tcp", r.peer_addr)
|
||||
|
||||
r.ptc_mtx.Lock()
|
||||
delete(r.ptc_cancel_map, pts_id)
|
||||
@ -223,7 +218,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) {
|
||||
|
||||
if err != nil {
|
||||
// TODO: make send peer started failure mesage?
|
||||
fmt.Printf("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error())
|
||||
fmt.Printf("failed to connect to %s - %s\n", r.peer_addr, err.Error())
|
||||
goto peer_aborted
|
||||
}
|
||||
|
||||
@ -370,12 +365,11 @@ fmt.Printf("GOT PEER EOF. REMEMBER EOF\n")
|
||||
}
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
func NewClientConn(c *Client, addr *net.TCPAddr, cfg *ClientConfig) *ClientConn {
|
||||
func NewClientConn(c *Client, cfg *ClientConfig) *ClientConn {
|
||||
var cts ClientConn
|
||||
|
||||
cts.cli = c
|
||||
cts.route_map = make(ClientRouteMap)
|
||||
cts.saddr = addr
|
||||
cts.cfg.ClientConfig = *cfg
|
||||
cts.stop_req.Store(false)
|
||||
cts.stop_chan = make(chan bool, 8)
|
||||
@ -386,7 +380,7 @@ func NewClientConn(c *Client, addr *net.TCPAddr, cfg *ClientConfig) *ClientConn
|
||||
return &cts
|
||||
}
|
||||
|
||||
func (cts *ClientConn) AddNewClientRoute(addr *net.TCPAddr, proto ROUTE_PROTO) (*ClientRoute, error) {
|
||||
func (cts *ClientConn) AddNewClientRoute(addr string, proto ROUTE_PROTO) (*ClientRoute, error) {
|
||||
var r *ClientRoute
|
||||
var id uint32
|
||||
var ok bool
|
||||
@ -414,6 +408,17 @@ fmt.Printf("added client route.... %d -> %d\n", id, len(cts.route_map))
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (cts *ClientConn) RemoveClientRoutes() {
|
||||
var r *ClientRoute
|
||||
|
||||
cts.route_mtx.Lock()
|
||||
for _, r = range cts.route_map {
|
||||
delete(cts.route_map, r.id)
|
||||
r.ReqStop()
|
||||
}
|
||||
cts.route_mtx.Unlock()
|
||||
}
|
||||
|
||||
func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error {
|
||||
var r *ClientRoute
|
||||
var ok bool
|
||||
@ -469,25 +474,12 @@ func (cts *ClientConn) FindClientRouteById(route_id uint32) *ClientRoute {
|
||||
|
||||
func (cts *ClientConn) AddClientRoutes(peer_addrs []string) error {
|
||||
var v string
|
||||
var addr *net.TCPAddr
|
||||
var proto ROUTE_PROTO
|
||||
var err error
|
||||
|
||||
for _, v = range peer_addrs {
|
||||
addr, err = net.ResolveTCPAddr(NET_TYPE_TCP, v) // Make this interruptable
|
||||
_, err = cts.AddNewClientRoute(v, ROUTE_PROTO_TCP)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to resovle %s - %s", v, err.Error())
|
||||
}
|
||||
|
||||
if addr.IP.To4() != nil {
|
||||
proto = ROUTE_PROTO_TCP4
|
||||
} else {
|
||||
proto = ROUTE_PROTO_TCP6
|
||||
}
|
||||
|
||||
_, err = cts.AddNewClientRoute(addr, proto)
|
||||
if err != nil {
|
||||
return fmt.Errorf("unable to add client route for %s - %s", addr, err.Error())
|
||||
return fmt.Errorf("unable to add client route for %s - %s", v, err.Error())
|
||||
}
|
||||
}
|
||||
|
||||
@ -531,10 +523,18 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
||||
defer wg.Done() // arrange to call at the end of this function
|
||||
|
||||
start_over:
|
||||
cts.cli.log.Write(cts.lid, LOG_INFO, "Connecting to server %s", cts.saddr.String())
|
||||
cts.conn, err = grpc.NewClient(cts.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
/*
|
||||
cts.saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cts.cfg.ServerAddr) // TODO: make this interruptable...
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to make client to server %s - %s", cts.saddr.String(), err.Error())
|
||||
err = fmt.Errorf("unresolavable address %s - %s", cts.saddr, err.Error())
|
||||
goto reconnect_to_server
|
||||
}
|
||||
*/
|
||||
|
||||
cts.cli.log.Write(cts.lid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddr)
|
||||
cts.conn, err = grpc.NewClient(cts.cfg.ServerAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddr, err.Error())
|
||||
goto reconnect_to_server
|
||||
}
|
||||
cts.hdc = NewHoduClient(cts.conn)
|
||||
@ -548,21 +548,21 @@ start_over:
|
||||
c_seed.Flags = 0
|
||||
s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get seed from server %s - %s", cts.saddr.String(), err.Error())
|
||||
cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get seed from server %s - %s", cts.cfg.ServerAddr, err.Error())
|
||||
goto reconnect_to_server
|
||||
}
|
||||
cts.s_seed = *s_seed
|
||||
cts.c_seed = c_seed
|
||||
|
||||
cts.cli.log.Write(cts.lid, LOG_INFO, "Got seed from server %s - ver=%#x", cts.saddr.String(), cts.s_seed.Version)
|
||||
cts.cli.log.Write(cts.lid, LOG_INFO, "Got seed from server %s - ver=%#x", cts.cfg.ServerAddr, cts.s_seed.Version)
|
||||
|
||||
psc, err = cts.hdc.PacketStream(cts.cli.ctx)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.saddr.String(), err.Error())
|
||||
cts.cli.log.Write(cts.lid, LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.cfg.ServerAddr, err.Error())
|
||||
goto reconnect_to_server
|
||||
}
|
||||
|
||||
cts.cli.log.Write(cts.lid, LOG_INFO, "Got packet stream from server %s", cts.saddr.String())
|
||||
cts.cli.log.Write(cts.lid, LOG_INFO, "Got packet stream from server %s", cts.cfg.ServerAddr)
|
||||
|
||||
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
|
||||
|
||||
@ -570,7 +570,7 @@ start_over:
|
||||
// let's add routes to the client-side peers.
|
||||
err = cts.AddClientRoutes(cts.cfg.PeerAddrs)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to add routes to server %s for %v - %s", cts.saddr.String(), cts.cfg.PeerAddrs, err.Error())
|
||||
cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to add routes to server %s for %v - %s", cts.cfg.ServerAddr, cts.cfg.PeerAddrs, err.Error())
|
||||
goto done
|
||||
}
|
||||
|
||||
@ -597,7 +597,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error())
|
||||
if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) {
|
||||
goto reconnect_to_server
|
||||
} else {
|
||||
cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.saddr.String(), err.Error())
|
||||
cts.cli.log.Write(cts.lid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.cfg.ServerAddr, err.Error())
|
||||
goto reconnect_to_server
|
||||
}
|
||||
}
|
||||
@ -703,7 +703,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error())
|
||||
}
|
||||
|
||||
done:
|
||||
cts.cli.log.Write("", LOG_INFO, "Disconnected from server %s", cts.saddr.String())
|
||||
cts.cli.log.Write("", LOG_INFO, "Disconnected from server %s", cts.cfg.ServerAddr)
|
||||
//cts.RemoveClientRoutes()
|
||||
cts.ReqStop()
|
||||
wait_for_termination:
|
||||
@ -790,19 +790,19 @@ func NewClient(ctx context.Context, listen_on string, logger Logger, tlscfg *tls
|
||||
return &c
|
||||
}
|
||||
|
||||
func (c *Client) AddNewClientConn(addr *net.TCPAddr, cfg *ClientConfig) (*ClientConn, error) {
|
||||
func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
||||
var cts *ClientConn
|
||||
var ok bool
|
||||
var id uint32
|
||||
|
||||
cts = NewClientConn(c, addr, cfg)
|
||||
cts = NewClientConn(c, cfg)
|
||||
|
||||
c.cts_mtx.Lock()
|
||||
defer c.cts_mtx.Unlock()
|
||||
|
||||
_, ok = c.cts_map[addr]
|
||||
_, ok = c.cts_map[cfg.ServerAddr]
|
||||
if ok {
|
||||
return nil, fmt.Errorf("existing server - %s", addr.String())
|
||||
return nil, fmt.Errorf("existing server - %s", cfg.ServerAddr)
|
||||
}
|
||||
|
||||
id = rand.Uint32()
|
||||
@ -813,9 +813,9 @@ func (c *Client) AddNewClientConn(addr *net.TCPAddr, cfg *ClientConfig) (*Client
|
||||
}
|
||||
cts.id = id
|
||||
cts.cfg.Id = id // store it again in the active configuration for easy access via control channel
|
||||
cts.lid = fmt.Sprintf("%d", id)
|
||||
cts.lid = fmt.Sprintf("%d", id) // id in string used for logging
|
||||
|
||||
c.cts_map[addr] = cts
|
||||
c.cts_map[cfg.ServerAddr] = cts
|
||||
c.cts_map_by_id[id] = cts
|
||||
fmt.Printf("ADD total servers %d\n", len(c.cts_map))
|
||||
return cts, nil
|
||||
@ -823,9 +823,9 @@ fmt.Printf("ADD total servers %d\n", len(c.cts_map))
|
||||
|
||||
func (c *Client) RemoveClientConn(cts *ClientConn) {
|
||||
c.cts_mtx.Lock()
|
||||
delete(c.cts_map, cts.saddr)
|
||||
delete(c.cts_map, cts.cfg.ServerAddr)
|
||||
delete(c.cts_map_by_id, cts.id)
|
||||
fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.saddr, len(c.cts_map))
|
||||
fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.cfg.ServerAddr, len(c.cts_map))
|
||||
c.cts_mtx.Unlock()
|
||||
}
|
||||
|
||||
@ -886,7 +886,6 @@ func (c *Client) RunTask(wg *sync.WaitGroup) {
|
||||
}
|
||||
|
||||
func (c *Client) start_service(data interface{}) (*ClientConn, error) {
|
||||
var saddr *net.TCPAddr
|
||||
var cts *ClientConn
|
||||
var err error
|
||||
var cfg *ClientConfig
|
||||
@ -898,18 +897,7 @@ func (c *Client) start_service(data interface{}) (*ClientConn, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if len(cfg.PeerAddrs) < 0 || len(cfg.PeerAddrs) > int(^uint16(0)) { // TODO: change this check... not really right...
|
||||
err = fmt.Errorf("invalid number of peer addresses given to server connection to %s", cfg.ServerAddr)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.ServerAddr) // TODO: make this interruptable...
|
||||
if err != nil {
|
||||
err = fmt.Errorf("unresolavable address %s - %s", cfg.ServerAddr, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
|
||||
cts, err = c.AddNewClientConn(saddr, cfg)
|
||||
cts, err = c.AddNewClientConn(cfg)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("unable to add server connection structure to %s - %s", cfg.ServerAddr, err.Error())
|
||||
return nil, err
|
||||
|
Reference in New Issue
Block a user