|
|
|
@ -21,7 +21,6 @@ import "google.golang.org/grpc/status"
|
|
|
|
|
|
|
|
|
|
type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet]
|
|
|
|
|
|
|
|
|
|
type ClientConnMapByAddr = map[string]*ClientConn
|
|
|
|
|
type ClientConnMap = map[uint32]*ClientConn
|
|
|
|
|
type ClientPeerConnMap = map[uint32]*ClientPeerConn
|
|
|
|
|
type ClientRouteMap = map[uint32]*ClientRoute
|
|
|
|
@ -29,12 +28,15 @@ type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc
|
|
|
|
|
|
|
|
|
|
// --------------------------------------------------------------------
|
|
|
|
|
type ClientConfig struct {
|
|
|
|
|
ServerAddr string
|
|
|
|
|
ServerAddrs []string
|
|
|
|
|
PeerAddrs []string
|
|
|
|
|
ServerSeedTimeout int
|
|
|
|
|
ServerAuthority string // http2 :authority header
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type ClientConfigActive struct {
|
|
|
|
|
Id uint32
|
|
|
|
|
Index int
|
|
|
|
|
ClientConfig
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -44,7 +46,6 @@ type Client struct {
|
|
|
|
|
ctltlscfg *tls.Config
|
|
|
|
|
rpctlscfg *tls.Config
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ext_mtx sync.Mutex
|
|
|
|
|
ext_svcs []Service
|
|
|
|
|
|
|
|
|
@ -54,7 +55,6 @@ type Client struct {
|
|
|
|
|
ctl []*http.Server // control server
|
|
|
|
|
|
|
|
|
|
cts_mtx sync.Mutex
|
|
|
|
|
cts_map_by_addr ClientConnMapByAddr
|
|
|
|
|
cts_map ClientConnMap
|
|
|
|
|
|
|
|
|
|
wg sync.WaitGroup
|
|
|
|
@ -705,6 +705,9 @@ func (cts *ClientConn) disconnect_from_server() {
|
|
|
|
|
// immediately after the start_over lable in it.
|
|
|
|
|
// if it's called from ReqStop(), we don't really
|
|
|
|
|
// need to care about it.
|
|
|
|
|
|
|
|
|
|
cts.local_addr = ""
|
|
|
|
|
cts.remote_addr = ""
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -715,6 +718,20 @@ func (cts *ClientConn) ReqStop() {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func timed_interceptor(tmout_sec int) grpc.UnaryClientInterceptor {
|
|
|
|
|
// The client calls GetSeed() as the first call to the server.
|
|
|
|
|
// To simulate a kind of connect timeout to the server and find out an unresponsive server,
|
|
|
|
|
// Place a unary intercepter that places a new context with a timeout on the GetSeed() call.
|
|
|
|
|
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
|
|
|
|
var cancel context.CancelFunc
|
|
|
|
|
if tmout_sec > 0 && method == Hodu_GetSeed_FullMethodName {
|
|
|
|
|
ctx, cancel = context.WithTimeout(ctx, time.Duration(tmout_sec) * time.Second)
|
|
|
|
|
defer cancel()
|
|
|
|
|
}
|
|
|
|
|
return invoker(ctx, method, req, reply, cc, opts...)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
|
|
|
|
var psc PacketStreamClient
|
|
|
|
|
var slpctx context.Context
|
|
|
|
@ -728,21 +745,27 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
|
|
|
|
defer wg.Done() // arrange to call at the end of this function
|
|
|
|
|
|
|
|
|
|
start_over:
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddr)
|
|
|
|
|
cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs)
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
|
|
|
|
if cts.cli.rpctlscfg == nil {
|
|
|
|
|
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
|
|
|
// TODO: can we have other authority for non-tls?
|
|
|
|
|
// if normal configuration has authority, use it (non-tls side)
|
|
|
|
|
// if notmal configuration doesn't have authori, tls has server name, use tls server name (tls side)
|
|
|
|
|
if cts.cfg.ServerAuthority != "" { opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority)) }
|
|
|
|
|
} else {
|
|
|
|
|
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpctlscfg)))
|
|
|
|
|
|
|
|
|
|
// set the http2 :authority header with tls server name defined.
|
|
|
|
|
if cts.cli.rpctlscfg.ServerName != "" { opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName)) }
|
|
|
|
|
if cts.cfg.ServerAuthority != "" {
|
|
|
|
|
opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority))
|
|
|
|
|
} else if cts.cli.rpctlscfg.ServerName != "" {
|
|
|
|
|
opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
cts.conn, err = grpc.NewClient(cts.cfg.ServerAddr, opts...)
|
|
|
|
|
if cts.cfg.ServerSeedTimeout > 0 {
|
|
|
|
|
opts = append(opts, grpc.WithUnaryInterceptor(timed_interceptor(cts.cfg.ServerSeedTimeout)))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cts.conn, err = grpc.NewClient(cts.cfg.ServerAddrs[cts.cfg.Index], opts...)
|
|
|
|
|
if err != nil {
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddr, err.Error())
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
|
|
|
|
goto reconnect_to_server
|
|
|
|
|
}
|
|
|
|
|
cts.hdc = NewHoduClient(cts.conn)
|
|
|
|
@ -756,17 +779,17 @@ start_over:
|
|
|
|
|
c_seed.Flags = 0
|
|
|
|
|
s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed)
|
|
|
|
|
if err != nil {
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get seed from server %s - %s", cts.cfg.ServerAddr, err.Error())
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
|
|
|
|
goto reconnect_to_server
|
|
|
|
|
}
|
|
|
|
|
cts.s_seed = *s_seed
|
|
|
|
|
cts.c_seed = c_seed
|
|
|
|
|
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Got seed from server %s - ver=%#x", cts.cfg.ServerAddr, cts.s_seed.Version)
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version)
|
|
|
|
|
|
|
|
|
|
psc, err = cts.hdc.PacketStream(cts.cli.ctx)
|
|
|
|
|
if err != nil {
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.cfg.ServerAddr, err.Error())
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
|
|
|
|
goto reconnect_to_server
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -776,7 +799,7 @@ start_over:
|
|
|
|
|
cts.local_addr = p.LocalAddr.String()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Got packet stream from server %s", cts.cfg.ServerAddr)
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
|
|
|
|
|
|
|
|
|
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
|
|
|
|
|
|
|
|
|
@ -785,7 +808,7 @@ start_over:
|
|
|
|
|
// let's add routes to the client-side peers if given
|
|
|
|
|
err = cts.AddClientRoutes(cts.cfg.PeerAddrs)
|
|
|
|
|
if err != nil {
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to add routes to server %s for %v - %s", cts.cfg.ServerAddr, cts.cfg.PeerAddrs, err.Error())
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to add routes to server[%s] %s for %v - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.cfg.PeerAddrs, err.Error())
|
|
|
|
|
goto done
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -811,7 +834,7 @@ start_over:
|
|
|
|
|
if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) {
|
|
|
|
|
goto reconnect_to_server
|
|
|
|
|
} else {
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.remote_addr, err.Error())
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet from %s - %s", cts.remote_addr, err.Error())
|
|
|
|
|
goto reconnect_to_server
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
@ -944,7 +967,7 @@ start_over:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
done:
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server %s", cts.cfg.ServerAddr)
|
|
|
|
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
|
|
|
|
//cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination
|
|
|
|
|
cts.ReqStop()
|
|
|
|
|
|
|
|
|
@ -1009,7 +1032,6 @@ func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, ctl_prefi
|
|
|
|
|
c.ctltlscfg = ctltlscfg
|
|
|
|
|
c.rpctlscfg = rpctlscfg
|
|
|
|
|
c.ext_svcs = make([]Service, 0, 1)
|
|
|
|
|
c.cts_map_by_addr = make(ClientConnMapByAddr)
|
|
|
|
|
c.cts_map = make(ClientConnMap)
|
|
|
|
|
c.stop_req.Store(false)
|
|
|
|
|
c.stop_chan = make(chan bool, 8)
|
|
|
|
@ -1056,11 +1078,6 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
|
|
|
|
cts = NewClientConn(c, cfg)
|
|
|
|
|
|
|
|
|
|
c.cts_mtx.Lock()
|
|
|
|
|
_, ok = c.cts_map_by_addr[cfg.ServerAddr]
|
|
|
|
|
if ok {
|
|
|
|
|
c.cts_mtx.Unlock()
|
|
|
|
|
return nil, fmt.Errorf("existing server - %s", cfg.ServerAddr)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
id = rand.Uint32()
|
|
|
|
|
for {
|
|
|
|
@ -1072,12 +1089,11 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
|
|
|
|
cts.cfg.Id = id // store it again in the active configuration for easy access via control channel
|
|
|
|
|
cts.sid = fmt.Sprintf("%d", id) // id in string used for logging
|
|
|
|
|
|
|
|
|
|
c.cts_map_by_addr[cfg.ServerAddr] = cts
|
|
|
|
|
c.cts_map[id] = cts
|
|
|
|
|
c.stats.conns.Add(1)
|
|
|
|
|
c.cts_mtx.Unlock()
|
|
|
|
|
|
|
|
|
|
c.log.Write ("", LOG_INFO, "Added client connection(%d) to %s", cts.id, cfg.ServerAddr)
|
|
|
|
|
c.log.Write ("", LOG_INFO, "Added client connection(%d) to %v", cts.id, cfg.ServerAddrs)
|
|
|
|
|
return cts, nil
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1124,12 +1140,11 @@ func (c *Client) RemoveClientConn(cts *ClientConn) error {
|
|
|
|
|
return fmt.Errorf("conflicting connection id - %d", cts.id)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
|
|
|
|
|
delete(c.cts_map, cts.id)
|
|
|
|
|
c.stats.conns.Store(int64(len(c.cts_map)))
|
|
|
|
|
c.cts_mtx.Unlock()
|
|
|
|
|
|
|
|
|
|
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr)
|
|
|
|
|
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %v", cts.id, cts.cfg.ServerAddrs)
|
|
|
|
|
|
|
|
|
|
cts.ReqStop()
|
|
|
|
|
return nil
|
|
|
|
@ -1149,12 +1164,11 @@ func (c *Client) RemoveClientConnById(conn_id uint32) error {
|
|
|
|
|
|
|
|
|
|
// NOTE: removal by id doesn't perform identity check
|
|
|
|
|
|
|
|
|
|
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
|
|
|
|
|
delete(c.cts_map, cts.id)
|
|
|
|
|
c.stats.conns.Store(int64(len(c.cts_map)))
|
|
|
|
|
c.cts_mtx.Unlock()
|
|
|
|
|
|
|
|
|
|
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr)
|
|
|
|
|
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %v", cts.id, cts.cfg.ServerAddrs)
|
|
|
|
|
cts.ReqStop()
|
|
|
|
|
return nil
|
|
|
|
|
}
|
|
|
|
@ -1295,7 +1309,7 @@ func (c *Client) start_service(data interface{}) (*ClientConn, error) {
|
|
|
|
|
|
|
|
|
|
cts, err = c.AddNewClientConn(cfg)
|
|
|
|
|
if err != nil {
|
|
|
|
|
err = fmt.Errorf("unable to add server connection structure to %s - %s", cfg.ServerAddr, err.Error())
|
|
|
|
|
err = fmt.Errorf("unable to add server connection structure to %v - %s", cfg.ServerAddrs, err.Error())
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -1313,7 +1327,7 @@ func (c *Client) StartService(data interface{}) {
|
|
|
|
|
if err != nil {
|
|
|
|
|
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
|
|
|
|
|
} else {
|
|
|
|
|
c.log.Write("", LOG_INFO, "Started service for %s [%d]", cts.cfg.ServerAddr, cts.cfg.Id)
|
|
|
|
|
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|