added the State field to the ClientConn structure
This commit is contained in:
parent
90305e3eed
commit
810356efe5
37
client.go
37
client.go
@ -85,12 +85,22 @@ type Client struct {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type ClientConnState int
|
||||||
|
|
||||||
|
const (
|
||||||
|
CLIENT_CONN_CONNECTING ClientConnState = iota
|
||||||
|
CLIENT_CONN_CONNECTED
|
||||||
|
CLIENT_CONN_DISCONNECTING
|
||||||
|
CLIENT_CONN_DISCONNECTED
|
||||||
|
)
|
||||||
|
|
||||||
// client connection to server
|
// client connection to server
|
||||||
type ClientConn struct {
|
type ClientConn struct {
|
||||||
cli *Client
|
cli *Client
|
||||||
cfg ClientConfigActive
|
cfg ClientConfigActive
|
||||||
Id ConnId
|
Id ConnId
|
||||||
Sid string // id rendered in string
|
Sid string // id rendered in string
|
||||||
|
State ClientConnState
|
||||||
|
|
||||||
local_addr string
|
local_addr string
|
||||||
remote_addr string
|
remote_addr string
|
||||||
@ -515,12 +525,8 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
|
|||||||
// but the server must be able to handle this case as invalid route.
|
// but the server must be able to handle this case as invalid route.
|
||||||
var ok bool
|
var ok bool
|
||||||
_, ok = event_data.(*RouteDesc)
|
_, ok = event_data.(*RouteDesc)
|
||||||
if !ok {
|
if !ok { r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id) }
|
||||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id)
|
r.ReqStop()
|
||||||
r.ReqStop()
|
|
||||||
} else {
|
|
||||||
r.ReqStop()
|
|
||||||
}
|
|
||||||
|
|
||||||
case PACKET_KIND_PEER_STARTED:
|
case PACKET_KIND_PEER_STARTED:
|
||||||
var ok bool
|
var ok bool
|
||||||
@ -528,9 +534,9 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
|
|||||||
|
|
||||||
pd, ok = event_data.(*PeerDesc)
|
pd, ok = event_data.(*PeerDesc)
|
||||||
if !ok {
|
if !ok {
|
||||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
r.cts.cli.log.Write(r.cts.Sid, LOG_WARN,
|
||||||
"Protocol error - invalid data in peer_started event(%d,%d)", r.Id, pts_id)
|
"Protocol error - invalid data in peer_started event(%d,%d)", r.Id, pts_id)
|
||||||
r.ReqStop()
|
// ignore it. don't want to delete the whole route
|
||||||
} else {
|
} else {
|
||||||
if r.cts.cli.ptc_limit > 0 && int(r.cts.cli.stats.peers.Load()) >= r.cts.cli.ptc_limit {
|
if r.cts.cli.ptc_limit > 0 && int(r.cts.cli.stats.peers.Load()) >= r.cts.cli.ptc_limit {
|
||||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||||
@ -561,7 +567,7 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
|
|||||||
if !ok {
|
if !ok {
|
||||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||||
"Protocol error - invalid data in peer_aborted event(%d,%d)", r.Id, pts_id)
|
"Protocol error - invalid data in peer_aborted event(%d,%d)", r.Id, pts_id)
|
||||||
r.ReqStop()
|
ptc.ReqStop()
|
||||||
} else {
|
} else {
|
||||||
err = r.DisconnectFromPeer(ptc)
|
err = r.DisconnectFromPeer(ptc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -758,7 +764,9 @@ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error {
|
|||||||
}
|
}
|
||||||
delete(cts.route_map, route.Id)
|
delete(cts.route_map, route.Id)
|
||||||
cts.cli.stats.routes.Add(-1)
|
cts.cli.stats.routes.Add(-1)
|
||||||
if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) }
|
if cts.cli.route_persister != nil {
|
||||||
|
cts.cli.route_persister.Delete(cts, r)
|
||||||
|
}
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
|
|
||||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", route.Id, route.PeerAddr)
|
cts.cli.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", route.Id, route.PeerAddr)
|
||||||
@ -914,6 +922,7 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
|||||||
defer wg.Done() // arrange to call at the end of this function
|
defer wg.Done() // arrange to call at the end of this function
|
||||||
|
|
||||||
start_over:
|
start_over:
|
||||||
|
cts.State = CLIENT_CONN_CONNECTING
|
||||||
cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs)
|
cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs)
|
||||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
cts.cli.log.Write(cts.Sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||||
if cts.cli.rpctlscfg == nil {
|
if cts.cli.rpctlscfg == nil {
|
||||||
@ -968,6 +977,8 @@ start_over:
|
|||||||
|
|
||||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
cts.cli.log.Write(cts.Sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||||
|
|
||||||
|
cts.State = CLIENT_CONN_CONNECTED
|
||||||
|
|
||||||
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
|
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
|
||||||
|
|
||||||
if len(cts.cfg.Routes) > 0 {
|
if len(cts.cfg.Routes) > 0 {
|
||||||
@ -1140,6 +1151,7 @@ start_over:
|
|||||||
|
|
||||||
done:
|
done:
|
||||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
cts.cli.log.Write(cts.Sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||||
|
cts.State = CLIENT_CONN_DISCONNECTED
|
||||||
|
|
||||||
req_stop_and_wait_for_termination:
|
req_stop_and_wait_for_termination:
|
||||||
//cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination
|
//cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination
|
||||||
@ -1151,10 +1163,13 @@ wait_for_termination:
|
|||||||
return
|
return
|
||||||
|
|
||||||
reconnect_to_server:
|
reconnect_to_server:
|
||||||
|
cts.State = CLIENT_CONN_DISCONNECTING
|
||||||
|
|
||||||
if cts.conn != nil {
|
if cts.conn != nil {
|
||||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Disconnecting from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
cts.cli.log.Write(cts.Sid, LOG_INFO, "Disconnecting from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||||
}
|
}
|
||||||
cts.disconnect_from_server()
|
cts.disconnect_from_server()
|
||||||
|
cts.State = CLIENT_CONN_DISCONNECTED
|
||||||
|
|
||||||
// wait for 2 seconds
|
// wait for 2 seconds
|
||||||
slpctx, cancel_sleep = context.WithTimeout(cts.cli.ctx, 2 * time.Second)
|
slpctx, cancel_sleep = context.WithTimeout(cts.cli.ctx, 2 * time.Second)
|
||||||
@ -1165,7 +1180,7 @@ reconnect_to_server:
|
|||||||
goto req_stop_and_wait_for_termination
|
goto req_stop_and_wait_for_termination
|
||||||
case <-cts.stop_chan:
|
case <-cts.stop_chan:
|
||||||
// this signal indicates that ReqStop() has been called
|
// this signal indicates that ReqStop() has been called
|
||||||
// so jumt to the waiting label
|
// so jump to the waiting label
|
||||||
cancel_sleep()
|
cancel_sleep()
|
||||||
goto wait_for_termination
|
goto wait_for_termination
|
||||||
case <-slpctx.Done():
|
case <-slpctx.Done():
|
||||||
|
Loading…
x
Reference in New Issue
Block a user