Compare commits
No commits in common. "feedff3f0493cfeb27cf414639010b78148c98b1" and "3d19576905cd520fda74a59742d950db021092e9" have entirely different histories.
feedff3f04
...
3d19576905
@ -26,7 +26,7 @@ type json_errmsg struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type json_in_client_conn struct {
|
type json_in_client_conn struct {
|
||||||
ServerAddrs []string `json:"server-addrs"`
|
ServerAddr string `json:"server-addr"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type json_in_client_route struct {
|
type json_in_client_route struct {
|
||||||
@ -41,8 +41,7 @@ type json_out_client_conn_id struct {
|
|||||||
|
|
||||||
type json_out_client_conn struct {
|
type json_out_client_conn struct {
|
||||||
Id uint32 `json:"id"`
|
Id uint32 `json:"id"`
|
||||||
ReqServerAddrs []string `json:"req-server-addrs"` // server addresses requested. may include a domain name
|
ReqServerAddr string `json:"req-server-addr"` // server address requested. may be a domain name
|
||||||
CurrentServerIndex int `json:"current-server-index"`
|
|
||||||
ServerAddr string `json:"server-addr"` // actual server address
|
ServerAddr string `json:"server-addr"` // actual server address
|
||||||
ClientAddr string `json:"client-addr"`
|
ClientAddr string `json:"client-addr"`
|
||||||
Routes []json_out_client_route `json:"routes"`
|
Routes []json_out_client_route `json:"routes"`
|
||||||
@ -151,8 +150,7 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R
|
|||||||
}
|
}
|
||||||
js = append(js, json_out_client_conn{
|
js = append(js, json_out_client_conn{
|
||||||
Id: cts.id,
|
Id: cts.id,
|
||||||
ReqServerAddrs: cts.cfg.ServerAddrs,
|
ReqServerAddr: cts.cfg.ServerAddr,
|
||||||
CurrentServerIndex: cts.cfg.Index,
|
|
||||||
ServerAddr: cts.remote_addr,
|
ServerAddr: cts.remote_addr,
|
||||||
ClientAddr: cts.local_addr,
|
ClientAddr: cts.local_addr,
|
||||||
Routes: jsp,
|
Routes: jsp,
|
||||||
@ -176,12 +174,12 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R
|
|||||||
var cts *ClientConn
|
var cts *ClientConn
|
||||||
|
|
||||||
err = json.NewDecoder(req.Body).Decode(&s)
|
err = json.NewDecoder(req.Body).Decode(&s)
|
||||||
if err != nil || len(s.ServerAddrs) <= 0 {
|
if err != nil || s.ServerAddr == "" {
|
||||||
status_code = http.StatusBadRequest; w.WriteHeader(status_code)
|
status_code = http.StatusBadRequest; w.WriteHeader(status_code)
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
|
|
||||||
cc.ServerAddrs = s.ServerAddrs
|
cc.ServerAddr = s.ServerAddr
|
||||||
//cc.PeerAddrs = s.PeerAddrs
|
//cc.PeerAddrs = s.PeerAddrs
|
||||||
cts, err = c.start_service(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine?
|
cts, err = c.start_service(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine?
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -269,8 +267,7 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt
|
|||||||
}
|
}
|
||||||
js = &json_out_client_conn{
|
js = &json_out_client_conn{
|
||||||
Id: cts.id,
|
Id: cts.id,
|
||||||
ReqServerAddrs: cts.cfg.ServerAddrs,
|
ReqServerAddr: cts.cfg.ServerAddr,
|
||||||
CurrentServerIndex: cts.cfg.Index,
|
|
||||||
ServerAddr: cts.local_addr,
|
ServerAddr: cts.local_addr,
|
||||||
ClientAddr: cts.remote_addr,
|
ClientAddr: cts.remote_addr,
|
||||||
Routes: jsp,
|
Routes: jsp,
|
||||||
|
82
client.go
82
client.go
@ -21,6 +21,7 @@ import "google.golang.org/grpc/status"
|
|||||||
|
|
||||||
type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet]
|
type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet]
|
||||||
|
|
||||||
|
type ClientConnMapByAddr = map[string]*ClientConn
|
||||||
type ClientConnMap = map[uint32]*ClientConn
|
type ClientConnMap = map[uint32]*ClientConn
|
||||||
type ClientPeerConnMap = map[uint32]*ClientPeerConn
|
type ClientPeerConnMap = map[uint32]*ClientPeerConn
|
||||||
type ClientRouteMap = map[uint32]*ClientRoute
|
type ClientRouteMap = map[uint32]*ClientRoute
|
||||||
@ -28,15 +29,12 @@ type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc
|
|||||||
|
|
||||||
// --------------------------------------------------------------------
|
// --------------------------------------------------------------------
|
||||||
type ClientConfig struct {
|
type ClientConfig struct {
|
||||||
ServerAddrs []string
|
ServerAddr string
|
||||||
PeerAddrs []string
|
PeerAddrs []string
|
||||||
ServerSeedTimeout int
|
|
||||||
ServerAuthority string // http2 :authority header
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientConfigActive struct {
|
type ClientConfigActive struct {
|
||||||
Id uint32
|
Id uint32
|
||||||
Index int
|
|
||||||
ClientConfig
|
ClientConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -46,6 +44,7 @@ type Client struct {
|
|||||||
ctltlscfg *tls.Config
|
ctltlscfg *tls.Config
|
||||||
rpctlscfg *tls.Config
|
rpctlscfg *tls.Config
|
||||||
|
|
||||||
|
|
||||||
ext_mtx sync.Mutex
|
ext_mtx sync.Mutex
|
||||||
ext_svcs []Service
|
ext_svcs []Service
|
||||||
|
|
||||||
@ -55,6 +54,7 @@ type Client struct {
|
|||||||
ctl []*http.Server // control server
|
ctl []*http.Server // control server
|
||||||
|
|
||||||
cts_mtx sync.Mutex
|
cts_mtx sync.Mutex
|
||||||
|
cts_map_by_addr ClientConnMapByAddr
|
||||||
cts_map ClientConnMap
|
cts_map ClientConnMap
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
@ -705,9 +705,6 @@ func (cts *ClientConn) disconnect_from_server() {
|
|||||||
// immediately after the start_over lable in it.
|
// immediately after the start_over lable in it.
|
||||||
// if it's called from ReqStop(), we don't really
|
// if it's called from ReqStop(), we don't really
|
||||||
// need to care about it.
|
// need to care about it.
|
||||||
|
|
||||||
cts.local_addr = ""
|
|
||||||
cts.remote_addr = ""
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -718,20 +715,6 @@ func (cts *ClientConn) ReqStop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func timed_interceptor(tmout_sec int) grpc.UnaryClientInterceptor {
|
|
||||||
// The client calls GetSeed() as the first call to the server.
|
|
||||||
// To simulate a kind of connect timeout to the server and find out an unresponsive server,
|
|
||||||
// Place a unary intercepter that places a new context with a timeout on the GetSeed() call.
|
|
||||||
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
|
|
||||||
var cancel context.CancelFunc
|
|
||||||
if tmout_sec > 0 && method == Hodu_GetSeed_FullMethodName {
|
|
||||||
ctx, cancel = context.WithTimeout(ctx, time.Duration(tmout_sec) * time.Second)
|
|
||||||
defer cancel()
|
|
||||||
}
|
|
||||||
return invoker(ctx, method, req, reply, cc, opts...)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
||||||
var psc PacketStreamClient
|
var psc PacketStreamClient
|
||||||
var slpctx context.Context
|
var slpctx context.Context
|
||||||
@ -745,27 +728,21 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
|||||||
defer wg.Done() // arrange to call at the end of this function
|
defer wg.Done() // arrange to call at the end of this function
|
||||||
|
|
||||||
start_over:
|
start_over:
|
||||||
cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs)
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server %s", cts.cfg.ServerAddr)
|
||||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
|
||||||
if cts.cli.rpctlscfg == nil {
|
if cts.cli.rpctlscfg == nil {
|
||||||
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||||
if cts.cfg.ServerAuthority != "" { opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority)) }
|
// TODO: can we have other authority for non-tls?
|
||||||
|
// if normal configuration has authority, use it (non-tls side)
|
||||||
|
// if notmal configuration doesn't have authori, tls has server name, use tls server name (tls side)
|
||||||
} else {
|
} else {
|
||||||
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpctlscfg)))
|
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpctlscfg)))
|
||||||
// set the http2 :authority header with tls server name defined.
|
|
||||||
if cts.cfg.ServerAuthority != "" {
|
|
||||||
opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority))
|
|
||||||
} else if cts.cli.rpctlscfg.ServerName != "" {
|
|
||||||
opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if cts.cfg.ServerSeedTimeout > 0 {
|
|
||||||
opts = append(opts, grpc.WithUnaryInterceptor(timed_interceptor(cts.cfg.ServerSeedTimeout)))
|
|
||||||
}
|
|
||||||
|
|
||||||
cts.conn, err = grpc.NewClient(cts.cfg.ServerAddrs[cts.cfg.Index], opts...)
|
// set the http2 :authority header with tls server name defined.
|
||||||
|
if cts.cli.rpctlscfg.ServerName != "" { opts = append(opts, grpc.WithAuthority(cts.cli.rpctlscfg.ServerName)) }
|
||||||
|
}
|
||||||
|
cts.conn, err = grpc.NewClient(cts.cfg.ServerAddr, opts...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server %s - %s", cts.cfg.ServerAddr, err.Error())
|
||||||
goto reconnect_to_server
|
goto reconnect_to_server
|
||||||
}
|
}
|
||||||
cts.hdc = NewHoduClient(cts.conn)
|
cts.hdc = NewHoduClient(cts.conn)
|
||||||
@ -779,17 +756,17 @@ start_over:
|
|||||||
c_seed.Flags = 0
|
c_seed.Flags = 0
|
||||||
s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed)
|
s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get seed from server %s - %s", cts.cfg.ServerAddr, err.Error())
|
||||||
goto reconnect_to_server
|
goto reconnect_to_server
|
||||||
}
|
}
|
||||||
cts.s_seed = *s_seed
|
cts.s_seed = *s_seed
|
||||||
cts.c_seed = c_seed
|
cts.c_seed = c_seed
|
||||||
|
|
||||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version)
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Got seed from server %s - ver=%#x", cts.cfg.ServerAddr, cts.s_seed.Version)
|
||||||
|
|
||||||
psc, err = cts.hdc.PacketStream(cts.cli.ctx)
|
psc, err = cts.hdc.PacketStream(cts.cli.ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.cfg.ServerAddr, err.Error())
|
||||||
goto reconnect_to_server
|
goto reconnect_to_server
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -799,7 +776,7 @@ start_over:
|
|||||||
cts.local_addr = p.LocalAddr.String()
|
cts.local_addr = p.LocalAddr.String()
|
||||||
}
|
}
|
||||||
|
|
||||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Got packet stream from server %s", cts.cfg.ServerAddr)
|
||||||
|
|
||||||
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
|
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
|
||||||
|
|
||||||
@ -808,7 +785,7 @@ start_over:
|
|||||||
// let's add routes to the client-side peers if given
|
// let's add routes to the client-side peers if given
|
||||||
err = cts.AddClientRoutes(cts.cfg.PeerAddrs)
|
err = cts.AddClientRoutes(cts.cfg.PeerAddrs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to add routes to server[%s] %s for %v - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.cfg.PeerAddrs, err.Error())
|
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to add routes to server %s for %v - %s", cts.cfg.ServerAddr, cts.cfg.PeerAddrs, err.Error())
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -834,7 +811,7 @@ start_over:
|
|||||||
if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) {
|
if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) {
|
||||||
goto reconnect_to_server
|
goto reconnect_to_server
|
||||||
} else {
|
} else {
|
||||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet from %s - %s", cts.remote_addr, err.Error())
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.remote_addr, err.Error())
|
||||||
goto reconnect_to_server
|
goto reconnect_to_server
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -967,7 +944,7 @@ start_over:
|
|||||||
}
|
}
|
||||||
|
|
||||||
done:
|
done:
|
||||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server %s", cts.cfg.ServerAddr)
|
||||||
//cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination
|
//cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination
|
||||||
cts.ReqStop()
|
cts.ReqStop()
|
||||||
|
|
||||||
@ -1032,6 +1009,7 @@ func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, ctl_prefi
|
|||||||
c.ctltlscfg = ctltlscfg
|
c.ctltlscfg = ctltlscfg
|
||||||
c.rpctlscfg = rpctlscfg
|
c.rpctlscfg = rpctlscfg
|
||||||
c.ext_svcs = make([]Service, 0, 1)
|
c.ext_svcs = make([]Service, 0, 1)
|
||||||
|
c.cts_map_by_addr = make(ClientConnMapByAddr)
|
||||||
c.cts_map = make(ClientConnMap)
|
c.cts_map = make(ClientConnMap)
|
||||||
c.stop_req.Store(false)
|
c.stop_req.Store(false)
|
||||||
c.stop_chan = make(chan bool, 8)
|
c.stop_chan = make(chan bool, 8)
|
||||||
@ -1078,6 +1056,11 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
|||||||
cts = NewClientConn(c, cfg)
|
cts = NewClientConn(c, cfg)
|
||||||
|
|
||||||
c.cts_mtx.Lock()
|
c.cts_mtx.Lock()
|
||||||
|
_, ok = c.cts_map_by_addr[cfg.ServerAddr]
|
||||||
|
if ok {
|
||||||
|
c.cts_mtx.Unlock()
|
||||||
|
return nil, fmt.Errorf("existing server - %s", cfg.ServerAddr)
|
||||||
|
}
|
||||||
|
|
||||||
id = rand.Uint32()
|
id = rand.Uint32()
|
||||||
for {
|
for {
|
||||||
@ -1089,11 +1072,12 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
|||||||
cts.cfg.Id = id // store it again in the active configuration for easy access via control channel
|
cts.cfg.Id = id // store it again in the active configuration for easy access via control channel
|
||||||
cts.sid = fmt.Sprintf("%d", id) // id in string used for logging
|
cts.sid = fmt.Sprintf("%d", id) // id in string used for logging
|
||||||
|
|
||||||
|
c.cts_map_by_addr[cfg.ServerAddr] = cts
|
||||||
c.cts_map[id] = cts
|
c.cts_map[id] = cts
|
||||||
c.stats.conns.Add(1)
|
c.stats.conns.Add(1)
|
||||||
c.cts_mtx.Unlock()
|
c.cts_mtx.Unlock()
|
||||||
|
|
||||||
c.log.Write ("", LOG_INFO, "Added client connection(%d) to %v", cts.id, cfg.ServerAddrs)
|
c.log.Write ("", LOG_INFO, "Added client connection(%d) to %s", cts.id, cfg.ServerAddr)
|
||||||
return cts, nil
|
return cts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1140,11 +1124,12 @@ func (c *Client) RemoveClientConn(cts *ClientConn) error {
|
|||||||
return fmt.Errorf("conflicting connection id - %d", cts.id)
|
return fmt.Errorf("conflicting connection id - %d", cts.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
|
||||||
delete(c.cts_map, cts.id)
|
delete(c.cts_map, cts.id)
|
||||||
c.stats.conns.Store(int64(len(c.cts_map)))
|
c.stats.conns.Store(int64(len(c.cts_map)))
|
||||||
c.cts_mtx.Unlock()
|
c.cts_mtx.Unlock()
|
||||||
|
|
||||||
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %v", cts.id, cts.cfg.ServerAddrs)
|
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr)
|
||||||
|
|
||||||
cts.ReqStop()
|
cts.ReqStop()
|
||||||
return nil
|
return nil
|
||||||
@ -1164,11 +1149,12 @@ func (c *Client) RemoveClientConnById(conn_id uint32) error {
|
|||||||
|
|
||||||
// NOTE: removal by id doesn't perform identity check
|
// NOTE: removal by id doesn't perform identity check
|
||||||
|
|
||||||
|
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
|
||||||
delete(c.cts_map, cts.id)
|
delete(c.cts_map, cts.id)
|
||||||
c.stats.conns.Store(int64(len(c.cts_map)))
|
c.stats.conns.Store(int64(len(c.cts_map)))
|
||||||
c.cts_mtx.Unlock()
|
c.cts_mtx.Unlock()
|
||||||
|
|
||||||
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %v", cts.id, cts.cfg.ServerAddrs)
|
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr)
|
||||||
cts.ReqStop()
|
cts.ReqStop()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
@ -1309,7 +1295,7 @@ func (c *Client) start_service(data interface{}) (*ClientConn, error) {
|
|||||||
|
|
||||||
cts, err = c.AddNewClientConn(cfg)
|
cts, err = c.AddNewClientConn(cfg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
err = fmt.Errorf("unable to add server connection structure to %v - %s", cfg.ServerAddrs, err.Error())
|
err = fmt.Errorf("unable to add server connection structure to %s - %s", cfg.ServerAddr, err.Error())
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1327,7 +1313,7 @@ func (c *Client) StartService(data interface{}) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
|
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
|
||||||
} else {
|
} else {
|
||||||
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id)
|
c.log.Write("", LOG_INFO, "Started service for %s [%d]", cts.cfg.ServerAddr, cts.cfg.Id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,9 +51,8 @@ type RPCServiceConfig struct { // rpc server-side configuration
|
|||||||
}
|
}
|
||||||
|
|
||||||
type RPCEndpointConfig struct { // rpc client-side configuration
|
type RPCEndpointConfig struct { // rpc client-side configuration
|
||||||
Authority string `yaml:"authority"`
|
Authority string `yaml:"authority"`
|
||||||
Addrs []string `yaml:"addresses"`
|
Addrs []string `yaml:"addresses"`
|
||||||
SeedTimeout int `yaml:"seed-timeout"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type ServerConfig struct {
|
type ServerConfig struct {
|
||||||
|
@ -214,6 +214,10 @@ func client_main(ctl_addrs []string, rpc_addrs []string, peer_addrs []string, cf
|
|||||||
|
|
||||||
if len(rpc_addrs) <= 0 {
|
if len(rpc_addrs) <= 0 {
|
||||||
return fmt.Errorf("no rpc server address specified")
|
return fmt.Errorf("no rpc server address specified")
|
||||||
|
} else if len(rpc_addrs) > 1 {
|
||||||
|
// TODO: instead of returning an error here,
|
||||||
|
// support multiple endpoint addresses. round-robin or something to a working server?
|
||||||
|
return fmt.Errorf("too many rpc server addresses specified")
|
||||||
}
|
}
|
||||||
|
|
||||||
c = hodu.NewClient(
|
c = hodu.NewClient(
|
||||||
@ -224,10 +228,8 @@ func client_main(ctl_addrs []string, rpc_addrs []string, peer_addrs []string, cf
|
|||||||
ctltlscfg,
|
ctltlscfg,
|
||||||
rpctlscfg)
|
rpctlscfg)
|
||||||
|
|
||||||
cc.ServerAddrs = rpc_addrs
|
cc.ServerAddr = rpc_addrs[0]
|
||||||
cc.PeerAddrs = peer_addrs
|
cc.PeerAddrs = peer_addrs
|
||||||
cc.ServerSeedTimeout = cfg.RPC.Endpoint.SeedTimeout
|
|
||||||
cc.ServerAuthority = cfg.RPC.Endpoint.Authority
|
|
||||||
|
|
||||||
c.StartService(&cc)
|
c.StartService(&cc)
|
||||||
c.StartCtlService() // control channel
|
c.StartCtlService() // control channel
|
||||||
|
Loading…
x
Reference in New Issue
Block a user