diff --git a/client-ctl.go b/client-ctl.go index 09091fd..b8a44de 100644 --- a/client-ctl.go +++ b/client-ctl.go @@ -177,14 +177,14 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R cts.route_mtx.Lock() for _, r = range cts.route_map { jsp = append(jsp, json_out_client_route{ - Id: r.id, - ClientPeerAddr: r.peer_addr, - ClientPeerName: r.peer_name, + Id: r.Id, + ClientPeerAddr: r.PeerAddr, + ClientPeerName: r.PeerName, ServerPeerListenAddr: r.server_peer_listen_addr.String(), - ServerPeerNet: r.server_peer_net, - ServerPeerOption: r.server_peer_option.String(), - Lifetime: fmt.Sprintf("%.09f", r.lifetime.Seconds()), - LifetimeStart: r.lifetime_start.Unix(), + ServerPeerNet: r.ServerPeerNet, + ServerPeerOption: r.ServerPeerOption.String(), + Lifetime: fmt.Sprintf("%.09f", r.Lifetime.Seconds()), + LifetimeStart: r.LifetimeStart.Unix(), }) } js = append(js, json_out_client_conn{ @@ -290,14 +290,14 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt cts.route_mtx.Lock() for _, r = range cts.route_map { jsp = append(jsp, json_out_client_route{ - Id: r.id, - ClientPeerAddr: r.peer_addr, - ClientPeerName: r.peer_name, + Id: r.Id, + ClientPeerAddr: r.PeerAddr, + ClientPeerName: r.PeerName, ServerPeerListenAddr: r.server_peer_listen_addr.String(), - ServerPeerNet: r.server_peer_net, - ServerPeerOption: r.server_peer_option.String(), - Lifetime: fmt.Sprintf("%.09f", r.lifetime.Seconds()), - LifetimeStart: r.lifetime_start.Unix(), + ServerPeerNet: r.ServerPeerNet, + ServerPeerOption: r.ServerPeerOption.String(), + Lifetime: fmt.Sprintf("%.09f", r.Lifetime.Seconds()), + LifetimeStart: r.LifetimeStart.Unix(), }) } js = &json_out_client_conn{ @@ -368,14 +368,14 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r cts.route_mtx.Lock() for _, r = range cts.route_map { jsp = append(jsp, json_out_client_route{ - Id: r.id, - ClientPeerAddr: r.peer_addr, - ClientPeerName: r.peer_name, + Id: r.Id, + ClientPeerAddr: r.PeerAddr, + ClientPeerName: r.PeerName, ServerPeerListenAddr: r.server_peer_listen_addr.String(), - ServerPeerNet: r.server_peer_net, - ServerPeerOption: r.server_peer_option.String(), - Lifetime: fmt.Sprintf("%.09f", r.lifetime.Seconds()), - LifetimeStart: r.lifetime_start.Unix(), + ServerPeerNet: r.ServerPeerNet, + ServerPeerOption: r.ServerPeerOption.String(), + Lifetime: fmt.Sprintf("%.09f", r.Lifetime.Seconds()), + LifetimeStart: r.LifetimeStart.Unix(), }) } cts.route_mtx.Unlock() @@ -433,7 +433,7 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r if err = je.Encode(JsonErrmsg{Text: err.Error()}); err != nil { goto oops } } else { status_code = WriteJsonRespHeader(w, http.StatusCreated) - if err = je.Encode(json_out_client_route_id{Id: r.id, CtsId: r.cts.id}); err != nil { goto oops } + if err = je.Encode(json_out_client_route_id{Id: r.Id, CtsId: r.cts.id}); err != nil { goto oops } } case http.MethodDelete: @@ -503,13 +503,13 @@ func (ctl *client_ctl_client_conns_id_routes_id) ServeHTTP(w http.ResponseWriter case http.MethodGet: status_code = WriteJsonRespHeader(w, http.StatusOK) err = je.Encode(json_out_client_route{ - Id: r.id, - ClientPeerAddr: r.peer_addr, - ClientPeerName: r.peer_name, + Id: r.Id, + ClientPeerAddr: r.PeerAddr, + ClientPeerName: r.PeerName, ServerPeerListenAddr: r.server_peer_listen_addr.String(), - ServerPeerNet: r.server_peer_net, - ServerPeerOption: r.server_peer_option.String(), - Lifetime: r.lifetime.String(), + ServerPeerNet: r.ServerPeerNet, + ServerPeerOption: r.ServerPeerOption.String(), + Lifetime: r.Lifetime.String(), }) if err != nil { goto oops } @@ -608,13 +608,13 @@ func (ctl *client_ctl_client_conns_id_routes_spsp) ServeHTTP(w http.ResponseWrit case http.MethodGet: status_code = WriteJsonRespHeader(w, http.StatusOK) err = je.Encode(json_out_client_route{ - Id: r.id, - ClientPeerAddr: r.peer_addr, - ClientPeerName: r.peer_name, + Id: r.Id, + ClientPeerAddr: r.PeerAddr, + ClientPeerName: r.PeerName, ServerPeerListenAddr: r.server_peer_listen_addr.String(), - ServerPeerNet: r.server_peer_net, - ServerPeerOption: r.server_peer_option.String(), - Lifetime: r.lifetime.String(), + ServerPeerNet: r.ServerPeerNet, + ServerPeerOption: r.ServerPeerOption.String(), + Lifetime: r.Lifetime.String(), }) if err != nil { goto oops } diff --git a/client-peer.go b/client-peer.go index e716d61..dcb30fe 100644 --- a/client-peer.go +++ b/client-peer.go @@ -33,25 +33,25 @@ func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error { if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "use of closed network connection") { // i hate checking this condition with strings.Contains() cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_INFO, "Client-side peer(%d,%d,%s,%s) closed", - cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String()) + cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String()) } else { cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR, "Failed to read from client-side peer(%d,%d,%s,%s) - %s", - cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error()) + cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error()) } break } - err = cpc.route.cts.psc.Send(MakePeerDataPacket(cpc.route.id, cpc.conn_id, buf[0:n])) + err = cpc.route.cts.psc.Send(MakePeerDataPacket(cpc.route.Id, cpc.conn_id, buf[0:n])) if err != nil { cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR, "Failed to write peer(%d,%d,%s,%s) data to server - %s", - cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error()) + cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error()) break } } - cpc.route.cts.psc.Send(MakePeerStoppedPacket(cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String())) // nothing much to do upon failure. no error check here + cpc.route.cts.psc.Send(MakePeerStoppedPacket(cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String())) // nothing much to do upon failure. no error check here cpc.ReqStop() cpc.route.RemoveClientPeerConn(cpc) return nil diff --git a/client.go b/client.go index 83cdfa3..ccdf409 100644 --- a/client.go +++ b/client.go @@ -75,6 +75,7 @@ type Client struct { stop_chan chan bool log Logger + route_persister ClientRoutePersister stats struct { conns atomic.Int64 @@ -110,23 +111,23 @@ type ClientConn struct { type ClientRoute struct { cts *ClientConn - id RouteId - peer_addr string - peer_name string - peer_option RouteOption + Id RouteId + PeerAddr string + PeerName string + PeerOption RouteOption server_peer_listen_addr *net.TCPAddr // actual service-side service address - server_peer_addr string // desired server-side service address - server_peer_net string - server_peer_option RouteOption + ServerPeerAddr string // desired server-side service address + ServerPeerNet string + ServerPeerOption RouteOption ptc_mtx sync.Mutex ptc_map ClientPeerConnMap ptc_cancel_map ClientPeerCancelFuncMap ptc_wg sync.WaitGroup - lifetime time.Duration - lifetime_start time.Time + Lifetime time.Duration + LifetimeStart time.Time lifetime_timer *time.Timer lifetime_mtx sync.Mutex @@ -153,6 +154,12 @@ type GuardedPacketStreamClient struct { Hodu_PacketStreamClient } +type ClientRoutePersister interface { + LoadAll(cts *ClientConn) + Save(cts *ClientConn, r *ClientRoute) + Delete(cts *ClientConn, r *ClientRoute) +} + // ------------------------------------ func (g *GuardedPacketStreamClient) Send(data *Packet) error { @@ -175,19 +182,19 @@ func NewClientRoute(cts *ClientConn, id RouteId, client_peer_addr string, client var r ClientRoute r.cts = cts - r.id = id + r.Id = id r.ptc_map = make(ClientPeerConnMap) r.ptc_cancel_map = make(ClientPeerCancelFuncMap) - r.peer_addr = client_peer_addr // client-side peer - r.peer_name = client_peer_name + r.PeerAddr = client_peer_addr // client-side peer + r.PeerName = client_peer_name // if the client_peer_addr is a domain name, it can't tell between tcp4 and tcp6 - r.peer_option = string_to_route_option(TcpAddrStrClass(client_peer_addr)) + r.PeerOption = string_to_route_option(TcpAddrStrClass(client_peer_addr)) - r.server_peer_addr = server_peer_svc_addr - r.server_peer_net = server_peer_svc_net // permitted network for server-side peer - r.server_peer_option = server_peer_option - r.lifetime_start = time.Now() - r.lifetime = lifetime + r.ServerPeerAddr = server_peer_svc_addr + r.ServerPeerNet = server_peer_svc_net // permitted network for server-side peer + r.ServerPeerOption = server_peer_option + r.LifetimeStart = time.Now() + r.Lifetime = lifetime r.stop_req.Store(false) r.stop_chan = make(chan bool, 8) @@ -203,7 +210,7 @@ func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id PeerId, pts_ra r.cts.cli.stats.peers.Add(1) r.ptc_mtx.Unlock() - r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) + r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) return ptc, nil } @@ -225,24 +232,11 @@ func (r *ClientRoute) RemoveClientPeerConn(ptc *ClientPeerConn) error { r.cts.cli.stats.peers.Add(-1) r.ptc_mtx.Unlock() - r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) + r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) ptc.ReqStop() return nil } -/*func (r *ClientRoute) RemoveAllClientPeerConns() { - var c *ClientPeerConn - - r.ptc_mtx.Lock() - defer r.ptc_mtx.Unlock() - - for _, c = range r.ptc_map { - delete(r.ptc_map, c.conn_id) - r.cts.cli.stats.peers.Add(-1) - c.ReqStop() - } -}*/ - func (r *ClientRoute) ReqStopAllClientPeerConns() { var c *ClientPeerConn @@ -259,9 +253,7 @@ func (r *ClientRoute) FindClientPeerConnById(conn_id PeerId) *ClientPeerConn { defer r.ptc_mtx.Unlock() c, ok = r.ptc_map[conn_id] - if !ok { - return nil - } + if !ok { return nil } return c } @@ -276,9 +268,10 @@ func (r *ClientRoute) ExtendLifetime(lifetime time.Duration) error { } else { var expiry time.Time r.lifetime_timer.Stop() - r.lifetime = r.lifetime + lifetime - expiry = r.lifetime_start.Add(r.lifetime) + r.Lifetime = r.Lifetime + lifetime + expiry = r.LifetimeStart.Add(r.Lifetime) r.lifetime_timer.Reset(expiry.Sub(time.Now())) + if r.cts.cli.route_persister != nil { r.cts.cli.route_persister.Save(r.cts, r) } return nil } } @@ -292,9 +285,10 @@ func (r *ClientRoute) ResetLifetime(lifetime time.Duration) error { return fmt.Errorf("prohibited operation") } else { r.lifetime_timer.Stop() - r.lifetime = lifetime - r.lifetime_start = time.Now() + r.Lifetime = lifetime + r.LifetimeStart = time.Now() r.lifetime_timer.Reset(lifetime) + if r.cts.cli.route_persister != nil { r.cts.cli.route_persister.Save(r.cts, r) } return nil } } @@ -307,22 +301,22 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) { // it merely implements some timeout if set. defer wg.Done() - err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.server_peer_option, r.peer_addr, r.peer_name, r.server_peer_addr, r.server_peer_net)) + err = r.cts.psc.Send(MakeRouteStartPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ServerPeerAddr, r.ServerPeerNet)) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Failed to send route_start for route(%d,%s,%v,%v) to %s", - r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr) + r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr) goto done } else { r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Sent route_start for route(%d,%s,%v,%v) to %s", - r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr) + r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr) } r.lifetime_mtx.Lock() - if r.lifetime > 0 { - r.lifetime_start = time.Now() - r.lifetime_timer = time.NewTimer(r.lifetime) + if r.Lifetime > 0 { + r.LifetimeStart = time.Now() + r.lifetime_timer = time.NewTimer(r.Lifetime) } r.lifetime_mtx.Unlock() @@ -335,7 +329,7 @@ main_loop: case <-r.lifetime_timer.C: r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "route(%d,%s,%v,%v) reached end of lifetime(%v)", - r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.lifetime) + r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.Lifetime) break main_loop } } else { @@ -357,15 +351,15 @@ done: r.ReqStop() r.ptc_wg.Wait() // wait for all peer tasks are finished - err = r.cts.psc.Send(MakeRouteStopPacket(r.id, r.server_peer_option, r.peer_addr, r.peer_name, r.server_peer_addr, r.server_peer_net)) + err = r.cts.psc.Send(MakeRouteStopPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ServerPeerAddr, r.ServerPeerNet)) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Failed to route_stop for route(%d,%s,%v,%v) to %s - %s", - r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr, err.Error()) + r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr, err.Error()) } else { r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Sent route_stop for route(%d,%s,%v,%v) to %s", - r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr) + r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr) } r.cts.RemoveClientRoute(r) @@ -410,7 +404,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts r.ptc_mtx.Unlock() d.LocalAddr = nil // TOOD: use this if local address is specified - conn, err = d.DialContext(waitctx, TcpAddrStrClass(r.peer_addr), r.peer_addr) + conn, err = d.DialContext(waitctx, TcpAddrStrClass(r.PeerAddr), r.PeerAddr) r.ptc_mtx.Lock() cancel_wait() @@ -420,7 +414,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to connect to %s for route(%d,%d,%s,%s) - %s", - r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) + r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted } @@ -428,7 +422,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to get connection information to %s for route(%d,%d,%s,%s) - %s", - r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) + r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted } @@ -438,18 +432,18 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to add client peer %s for route(%d,%d,%s,%s) - %s", - r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) + r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted } // ptc.conn is equal to pts_id as assigned in r.AddNewClientPeerConn() - err = r.cts.psc.Send(MakePeerStartedPacket(r.id, ptc.conn_id, real_conn_raddr, real_conn_laddr)) + err = r.cts.psc.Send(MakePeerStartedPacket(r.Id, ptc.conn_id, real_conn_raddr, real_conn_laddr)) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to send peer_start(%d,%d,%s,%s) for route(%d,%d,%s,%s) - %s", - r.id, ptc.conn_id, real_conn_raddr, real_conn_laddr, - r.id, pts_id, pts_raddr, pts_laddr, err.Error()) + r.Id, ptc.conn_id, real_conn_raddr, real_conn_laddr, + r.Id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted } @@ -459,11 +453,11 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts peer_aborted: // real_conn_radd and real_conn_laddr may be empty depending on when the jump to here is made. - err = r.cts.psc.Send(MakePeerAbortedPacket(r.id, pts_id, real_conn_raddr, real_conn_laddr)) + err = r.cts.psc.Send(MakePeerAbortedPacket(r.Id, pts_id, real_conn_raddr, real_conn_laddr)) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s", - r.id, pts_id, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) + r.Id, pts_id, r.Id, pts_id, pts_raddr, pts_laddr, err.Error()) } if conn != nil { conn.Close() @@ -496,17 +490,17 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d var rd *RouteDesc rd, ok = event_data.(*RouteDesc) if !ok { - r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.id) + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id) r.ReqStop() } else { var addr *net.TCPAddr addr, err = net.ResolveTCPAddr(TcpAddrStrClass(rd.TargetAddrStr), rd.TargetAddrStr) if err != nil { - r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid service address(%s) for server peer in route_started event(%d)", rd.TargetAddrStr, r.id) + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid service address(%s) for server peer in route_started event(%d)", rd.TargetAddrStr, r.Id) r.ReqStop() } else { r.server_peer_listen_addr = addr - r.server_peer_net = rd.ServiceNetStr + r.ServerPeerNet = rd.ServiceNetStr } } @@ -518,7 +512,7 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d var ok bool _, ok = event_data.(*RouteDesc) if !ok { - r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.id) + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id) r.ReqStop() } else { r.ReqStop() @@ -531,23 +525,23 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d pd, ok = event_data.(*PeerDesc) if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, - "Protocol error - invalid data in peer_started event(%d,%d)", r.id, pts_id) + "Protocol error - invalid data in peer_started event(%d,%d)", r.Id, pts_id) r.ReqStop() } else { if r.cts.cli.ptc_limit > 0 && int(r.cts.cli.stats.peers.Load()) >= r.cts.cli.ptc_limit { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Rejecting to connect to peer(%s)for route(%d,%d) - allowed max %d", - r.peer_addr, r.id, pts_id, r.cts.cli.ptc_limit) + r.PeerAddr, r.Id, pts_id, r.cts.cli.ptc_limit) - err = r.cts.psc.Send(MakePeerAbortedPacket(r.id, pts_id, "", "")) + err = r.cts.psc.Send(MakePeerAbortedPacket(r.Id, pts_id, "", "")) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s", - r.id, pts_id, r.id, pts_id, "", "", err.Error()) + r.Id, pts_id, r.Id, pts_id, "", "", err.Error()) } } else { r.ptc_wg.Add(1) - go r.ConnectToPeer(pts_id, r.peer_option, pd.RemoteAddrStr, pd.LocalAddrStr, &r.ptc_wg) + go r.ConnectToPeer(pts_id, r.PeerOption, pd.RemoteAddrStr, pd.LocalAddrStr, &r.ptc_wg) } } @@ -562,14 +556,14 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d pd, ok = event_data.(*PeerDesc) if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, - "Protocol error - invalid data in peer_aborted event(%d,%d)", r.id, pts_id) + "Protocol error - invalid data in peer_aborted event(%d,%d)", r.Id, pts_id) r.ReqStop() } else { err = r.DisconnectFromPeer(ptc) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to disconnect from peer(%d,%d,%s,%s) - %s", - r.id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error()) + r.Id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error()) ptc.ReqStop() } } @@ -587,14 +581,14 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in peer_stopped event(%d,%d)", - r.id, pts_id) + r.Id, pts_id) ptc.ReqStop() } else { err = r.DisconnectFromPeer(ptc) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_WARN, "Failed to disconnect from peer(%d,%d,%s,%s) - %s", - r.id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error()) + r.Id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error()) ptc.ReqStop() } } @@ -611,7 +605,7 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in peer_eof event(%d,%d)", - r.id, pts_id) + r.Id, pts_id) ptc.ReqStop() } else { ptc.CloseWrite() @@ -630,14 +624,14 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d if !ok { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in peer_data event(%d,%d)", - r.id, pts_id) + r.Id, pts_id) ptc.ReqStop() } else { _, err = ptc.conn.Write(data) if err != nil { r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to write to peer(%d,%d,%s,%s) - %s", - r.id, pts_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String(), err.Error()) + r.Id, pts_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String(), err.Error()) ptc.ReqStop() } } @@ -705,11 +699,12 @@ func (cts *ClientConn) AddNewClientRoute(rc *ClientRouteConfig) (*ClientRoute, e } r = NewClientRoute(cts, assigned_id, rc.PeerAddr, rc.PeerName, rc.ServiceAddr, rc.ServiceNet, rc.Option, rc.Lifetime) - cts.route_map[r.id] = r + cts.route_map[r.Id] = r cts.cli.stats.routes.Add(1) + if cts.cli.route_persister != nil { cts.cli.route_persister.Save(cts, r) } cts.route_mtx.Unlock() - cts.cli.log.Write(cts.sid, LOG_INFO, "Added route(%d,%d) %s", cts.id, r.id, r.peer_addr) + cts.cli.log.Write(cts.sid, LOG_INFO, "Added route(%d,%d) %s", cts.id, r.Id, r.PeerAddr) cts.route_wg.Add(1) go r.RunTask(&cts.route_wg) @@ -731,7 +726,7 @@ func (cts *ClientConn) RemoveAllClientRoutes() { defer cts.route_mtx.Unlock() for _, r = range cts.route_map { - delete(cts.route_map, r.id) + delete(cts.route_map, r.Id) cts.cli.stats.routes.Add(-1) r.ReqStop() } @@ -742,20 +737,21 @@ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error { var ok bool cts.route_mtx.Lock() - r, ok = cts.route_map[route.id] + r, ok = cts.route_map[route.Id] if !ok { cts.route_mtx.Unlock() - return fmt.Errorf("non-existent route id - %d", route.id) + return fmt.Errorf("non-existent route id - %d", route.Id) } if r != route { cts.route_mtx.Unlock() - return fmt.Errorf("conflicting route id - %d", route.id) + return fmt.Errorf("conflicting route id - %d", route.Id) } - delete(cts.route_map, route.id) + delete(cts.route_map, route.Id) cts.cli.stats.routes.Add(-1) + if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) } cts.route_mtx.Unlock() - cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", route.id, route.peer_addr) + cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", route.Id, route.PeerAddr) r.ReqStop() return nil @@ -773,9 +769,10 @@ func (cts *ClientConn) RemoveClientRouteById(route_id RouteId) error { } delete(cts.route_map, route_id) cts.cli.stats.routes.Add(-1) + if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) } cts.route_mtx.Unlock() - cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.id, r.peer_addr) + cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr) r.ReqStop() return nil @@ -790,10 +787,11 @@ func (cts *ClientConn) RemoveClientRouteByServerPeerSvcPortId(port_id PortId) er cts.route_mtx.Lock() for _, r = range cts.route_map { if r.server_peer_listen_addr.Port == int(port_id) { - delete(cts.route_map, r.id) + delete(cts.route_map, r.Id) cts.cli.stats.routes.Add(-1) + if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) } cts.route_mtx.Unlock() - cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.id, r.peer_addr) + cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr) r.ReqStop() return nil } @@ -978,6 +976,8 @@ start_over: } } + if cts.cli.route_persister != nil { cts.cli.route_persister.LoadAll(cts) } + for { var pkt *Packet @@ -1640,3 +1640,7 @@ func (c *Client) WaitForTermination() { func (c *Client) WriteLog(id string, level LogLevel, fmtstr string, args ...interface{}) { c.log.Write(id, level, fmtstr, args...) } + +func (c *Client) SetRoutePersister(persister ClientRoutePersister) { + c.route_persister = persister +}