diff --git a/client.go b/client.go index c5d0ca8..ba35a09 100644 --- a/client.go +++ b/client.go @@ -85,12 +85,22 @@ type Client struct { } } +type ClientConnState int + +const ( + CLIENT_CONN_CONNECTING ClientConnState = iota + CLIENT_CONN_CONNECTED + CLIENT_CONN_DISCONNECTING + CLIENT_CONN_DISCONNECTED +) + // client connection to server type ClientConn struct { cli *Client cfg ClientConfigActive Id ConnId Sid string // id rendered in string + State ClientConnState local_addr string remote_addr string @@ -515,12 +525,8 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d // but the server must be able to handle this case as invalid route. var ok bool _, ok = event_data.(*RouteDesc) - if !ok { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id) - r.ReqStop() - } else { - r.ReqStop() - } + if !ok { r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id) } + r.ReqStop() case PACKET_KIND_PEER_STARTED: var ok bool @@ -528,9 +534,9 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d pd, ok = event_data.(*PeerDesc) if !ok { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + r.cts.cli.log.Write(r.cts.Sid, LOG_WARN, "Protocol error - invalid data in peer_started event(%d,%d)", r.Id, pts_id) - r.ReqStop() + // ignore it. don't want to delete the whole route } else { if r.cts.cli.ptc_limit > 0 && int(r.cts.cli.stats.peers.Load()) >= r.cts.cli.ptc_limit { r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, @@ -561,7 +567,7 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d if !ok { r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in peer_aborted event(%d,%d)", r.Id, pts_id) - r.ReqStop() + ptc.ReqStop() } else { err = r.DisconnectFromPeer(ptc) if err != nil { @@ -758,7 +764,9 @@ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error { } delete(cts.route_map, route.Id) cts.cli.stats.routes.Add(-1) - if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) } + if cts.cli.route_persister != nil { + cts.cli.route_persister.Delete(cts, r) + } cts.route_mtx.Unlock() cts.cli.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", route.Id, route.PeerAddr) @@ -914,6 +922,7 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { defer wg.Done() // arrange to call at the end of this function start_over: + cts.State = CLIENT_CONN_CONNECTING cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs) cts.cli.log.Write(cts.Sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) if cts.cli.rpctlscfg == nil { @@ -968,6 +977,8 @@ start_over: cts.cli.log.Write(cts.Sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) + cts.State = CLIENT_CONN_CONNECTED + cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc} if len(cts.cfg.Routes) > 0 { @@ -1140,6 +1151,7 @@ start_over: done: cts.cli.log.Write(cts.Sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) + cts.State = CLIENT_CONN_DISCONNECTED req_stop_and_wait_for_termination: //cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination @@ -1151,10 +1163,13 @@ wait_for_termination: return reconnect_to_server: + cts.State = CLIENT_CONN_DISCONNECTING + if cts.conn != nil { cts.cli.log.Write(cts.Sid, LOG_INFO, "Disconnecting from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) } cts.disconnect_from_server() + cts.State = CLIENT_CONN_DISCONNECTED // wait for 2 seconds slpctx, cancel_sleep = context.WithTimeout(cts.cli.ctx, 2 * time.Second) @@ -1165,7 +1180,7 @@ reconnect_to_server: goto req_stop_and_wait_for_termination case <-cts.stop_chan: // this signal indicates that ReqStop() has been called - // so jumt to the waiting label + // so jump to the waiting label cancel_sleep() goto wait_for_termination case <-slpctx.Done():