added lifetime to client route
This commit is contained in:
45
client.go
45
client.go
@ -28,23 +28,22 @@ type ClientPeerCancelFuncMap = map[PeerId]context.CancelFunc
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
type ClientRouteConfig struct {
|
||||
PeerAddr string
|
||||
PeerName string
|
||||
PeerAddr string
|
||||
PeerName string
|
||||
Option RouteOption
|
||||
ServiceAddr string // server-peer-service-addr
|
||||
ServiceNet string // server-peer-service-net
|
||||
Lifetime time.Duration
|
||||
}
|
||||
|
||||
type ClientConfig struct {
|
||||
ServerAddrs []string
|
||||
//PeerAddrs []string
|
||||
Routes []ClientRouteConfig
|
||||
ServerSeedTmout time.Duration
|
||||
ServerAuthority string // http2 :authority header
|
||||
}
|
||||
|
||||
type ClientConfigActive struct {
|
||||
Id ConnId
|
||||
Index int
|
||||
ClientConfig
|
||||
}
|
||||
@ -123,7 +122,10 @@ type ClientRoute struct {
|
||||
ptc_mtx sync.Mutex
|
||||
ptc_map ClientPeerConnMap
|
||||
ptc_cancel_map ClientPeerCancelFuncMap
|
||||
ptc_wg sync.WaitGroup
|
||||
ptc_wg sync.WaitGroup
|
||||
|
||||
lifetime time.Duration
|
||||
lifetime_timer *time.Timer
|
||||
|
||||
stop_req atomic.Bool
|
||||
stop_chan chan bool
|
||||
@ -166,7 +168,7 @@ func (g *GuardedPacketStreamClient) Context() context.Context {
|
||||
}*/
|
||||
|
||||
// --------------------------------------------------------------------
|
||||
func NewClientRoute(cts *ClientConn, id RouteId, client_peer_addr string, client_peer_name string, server_peer_svc_addr string, server_peer_svc_net string, server_peer_option RouteOption) *ClientRoute {
|
||||
func NewClientRoute(cts *ClientConn, id RouteId, client_peer_addr string, client_peer_name string, server_peer_svc_addr string, server_peer_svc_net string, server_peer_option RouteOption, lifetime time.Duration) *ClientRoute {
|
||||
var r ClientRoute
|
||||
|
||||
r.cts = cts
|
||||
@ -181,6 +183,7 @@ func NewClientRoute(cts *ClientConn, id RouteId, client_peer_addr string, client
|
||||
r.server_peer_addr = server_peer_svc_addr
|
||||
r.server_peer_net = server_peer_svc_net // permitted network for server-side peer
|
||||
r.server_peer_option = server_peer_option
|
||||
r.lifetime = lifetime
|
||||
r.stop_req.Store(false)
|
||||
r.stop_chan = make(chan bool, 8)
|
||||
|
||||
@ -262,8 +265,9 @@ func (r *ClientRoute) FindClientPeerConnById(conn_id PeerId) *ClientPeerConn {
|
||||
func (r *ClientRoute) RunTask(wg *sync.WaitGroup) {
|
||||
var err error
|
||||
|
||||
// this task on the route object isn't actually necessary.
|
||||
// this task on the route object do actual data manipulation
|
||||
// most useful works are triggered by ReportEvent() and done by ConnectToPeer()
|
||||
// it merely implements some timeout if set.
|
||||
defer wg.Done()
|
||||
|
||||
err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.server_peer_option, r.peer_addr, r.peer_name, r.server_peer_addr, r.server_peer_net))
|
||||
@ -278,14 +282,30 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) {
|
||||
r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr)
|
||||
}
|
||||
|
||||
if r.lifetime > 0 { r.lifetime_timer = time.NewTimer(r.lifetime) }
|
||||
|
||||
main_loop:
|
||||
for {
|
||||
select {
|
||||
case <-r.stop_chan:
|
||||
break main_loop
|
||||
if r.lifetime_timer != nil {
|
||||
select {
|
||||
case <-r.stop_chan:
|
||||
break main_loop
|
||||
|
||||
case <-r.lifetime_timer.C:
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "route(%d,%s,%v,%v) reached end of lifetime(%v)",
|
||||
r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.lifetime)
|
||||
break main_loop
|
||||
}
|
||||
} else {
|
||||
select {
|
||||
case <-r.stop_chan:
|
||||
break main_loop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if r.lifetime_timer != nil { r.lifetime_timer.Stop() }
|
||||
|
||||
done:
|
||||
r.ReqStop()
|
||||
r.ptc_wg.Wait() // wait for all peer tasks are finished
|
||||
@ -618,7 +638,7 @@ func (cts *ClientConn) AddNewClientRoute(rc *ClientRouteConfig) (*ClientRoute, e
|
||||
}
|
||||
}
|
||||
|
||||
r = NewClientRoute(cts, cts.route_next_id, rc.PeerAddr, rc.PeerName, rc.ServiceAddr, rc.ServiceNet, rc.Option)
|
||||
r = NewClientRoute(cts, cts.route_next_id, rc.PeerAddr, rc.PeerName, rc.ServiceAddr, rc.ServiceNet, rc.Option, rc.Lifetime)
|
||||
cts.route_map[r.id] = r
|
||||
cts.route_next_id++
|
||||
cts.cli.stats.routes.Add(1)
|
||||
@ -1193,7 +1213,6 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
||||
}
|
||||
}
|
||||
cts.id = c.cts_next_id
|
||||
cts.cfg.Id = cts.id // store it again in the active configuration for easy access via control channel
|
||||
cts.sid = fmt.Sprintf("%d", cts.id) // id in string used for logging
|
||||
|
||||
c.cts_map[cts.id] = cts
|
||||
@ -1449,7 +1468,7 @@ func (c *Client) StartService(data interface{}) {
|
||||
if err != nil {
|
||||
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
|
||||
} else {
|
||||
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.cfg.Id)
|
||||
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user