exposed some fields of the ClientRoute struct

This commit is contained in:
hyung-hwan 2025-01-18 12:58:17 +09:00
parent edda6d169b
commit e2f1d58c5e
3 changed files with 129 additions and 125 deletions

View File

@ -177,14 +177,14 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R
cts.route_mtx.Lock() cts.route_mtx.Lock()
for _, r = range cts.route_map { for _, r = range cts.route_map {
jsp = append(jsp, json_out_client_route{ jsp = append(jsp, json_out_client_route{
Id: r.id, Id: r.Id,
ClientPeerAddr: r.peer_addr, ClientPeerAddr: r.PeerAddr,
ClientPeerName: r.peer_name, ClientPeerName: r.PeerName,
ServerPeerListenAddr: r.server_peer_listen_addr.String(), ServerPeerListenAddr: r.server_peer_listen_addr.String(),
ServerPeerNet: r.server_peer_net, ServerPeerNet: r.ServerPeerNet,
ServerPeerOption: r.server_peer_option.String(), ServerPeerOption: r.ServerPeerOption.String(),
Lifetime: fmt.Sprintf("%.09f", r.lifetime.Seconds()), Lifetime: fmt.Sprintf("%.09f", r.Lifetime.Seconds()),
LifetimeStart: r.lifetime_start.Unix(), LifetimeStart: r.LifetimeStart.Unix(),
}) })
} }
js = append(js, json_out_client_conn{ js = append(js, json_out_client_conn{
@ -290,14 +290,14 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt
cts.route_mtx.Lock() cts.route_mtx.Lock()
for _, r = range cts.route_map { for _, r = range cts.route_map {
jsp = append(jsp, json_out_client_route{ jsp = append(jsp, json_out_client_route{
Id: r.id, Id: r.Id,
ClientPeerAddr: r.peer_addr, ClientPeerAddr: r.PeerAddr,
ClientPeerName: r.peer_name, ClientPeerName: r.PeerName,
ServerPeerListenAddr: r.server_peer_listen_addr.String(), ServerPeerListenAddr: r.server_peer_listen_addr.String(),
ServerPeerNet: r.server_peer_net, ServerPeerNet: r.ServerPeerNet,
ServerPeerOption: r.server_peer_option.String(), ServerPeerOption: r.ServerPeerOption.String(),
Lifetime: fmt.Sprintf("%.09f", r.lifetime.Seconds()), Lifetime: fmt.Sprintf("%.09f", r.Lifetime.Seconds()),
LifetimeStart: r.lifetime_start.Unix(), LifetimeStart: r.LifetimeStart.Unix(),
}) })
} }
js = &json_out_client_conn{ js = &json_out_client_conn{
@ -368,14 +368,14 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r
cts.route_mtx.Lock() cts.route_mtx.Lock()
for _, r = range cts.route_map { for _, r = range cts.route_map {
jsp = append(jsp, json_out_client_route{ jsp = append(jsp, json_out_client_route{
Id: r.id, Id: r.Id,
ClientPeerAddr: r.peer_addr, ClientPeerAddr: r.PeerAddr,
ClientPeerName: r.peer_name, ClientPeerName: r.PeerName,
ServerPeerListenAddr: r.server_peer_listen_addr.String(), ServerPeerListenAddr: r.server_peer_listen_addr.String(),
ServerPeerNet: r.server_peer_net, ServerPeerNet: r.ServerPeerNet,
ServerPeerOption: r.server_peer_option.String(), ServerPeerOption: r.ServerPeerOption.String(),
Lifetime: fmt.Sprintf("%.09f", r.lifetime.Seconds()), Lifetime: fmt.Sprintf("%.09f", r.Lifetime.Seconds()),
LifetimeStart: r.lifetime_start.Unix(), LifetimeStart: r.LifetimeStart.Unix(),
}) })
} }
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
@ -433,7 +433,7 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r
if err = je.Encode(JsonErrmsg{Text: err.Error()}); err != nil { goto oops } if err = je.Encode(JsonErrmsg{Text: err.Error()}); err != nil { goto oops }
} else { } else {
status_code = WriteJsonRespHeader(w, http.StatusCreated) status_code = WriteJsonRespHeader(w, http.StatusCreated)
if err = je.Encode(json_out_client_route_id{Id: r.id, CtsId: r.cts.id}); err != nil { goto oops } if err = je.Encode(json_out_client_route_id{Id: r.Id, CtsId: r.cts.id}); err != nil { goto oops }
} }
case http.MethodDelete: case http.MethodDelete:
@ -503,13 +503,13 @@ func (ctl *client_ctl_client_conns_id_routes_id) ServeHTTP(w http.ResponseWriter
case http.MethodGet: case http.MethodGet:
status_code = WriteJsonRespHeader(w, http.StatusOK) status_code = WriteJsonRespHeader(w, http.StatusOK)
err = je.Encode(json_out_client_route{ err = je.Encode(json_out_client_route{
Id: r.id, Id: r.Id,
ClientPeerAddr: r.peer_addr, ClientPeerAddr: r.PeerAddr,
ClientPeerName: r.peer_name, ClientPeerName: r.PeerName,
ServerPeerListenAddr: r.server_peer_listen_addr.String(), ServerPeerListenAddr: r.server_peer_listen_addr.String(),
ServerPeerNet: r.server_peer_net, ServerPeerNet: r.ServerPeerNet,
ServerPeerOption: r.server_peer_option.String(), ServerPeerOption: r.ServerPeerOption.String(),
Lifetime: r.lifetime.String(), Lifetime: r.Lifetime.String(),
}) })
if err != nil { goto oops } if err != nil { goto oops }
@ -608,13 +608,13 @@ func (ctl *client_ctl_client_conns_id_routes_spsp) ServeHTTP(w http.ResponseWrit
case http.MethodGet: case http.MethodGet:
status_code = WriteJsonRespHeader(w, http.StatusOK) status_code = WriteJsonRespHeader(w, http.StatusOK)
err = je.Encode(json_out_client_route{ err = je.Encode(json_out_client_route{
Id: r.id, Id: r.Id,
ClientPeerAddr: r.peer_addr, ClientPeerAddr: r.PeerAddr,
ClientPeerName: r.peer_name, ClientPeerName: r.PeerName,
ServerPeerListenAddr: r.server_peer_listen_addr.String(), ServerPeerListenAddr: r.server_peer_listen_addr.String(),
ServerPeerNet: r.server_peer_net, ServerPeerNet: r.ServerPeerNet,
ServerPeerOption: r.server_peer_option.String(), ServerPeerOption: r.ServerPeerOption.String(),
Lifetime: r.lifetime.String(), Lifetime: r.Lifetime.String(),
}) })
if err != nil { goto oops } if err != nil { goto oops }

View File

@ -33,25 +33,25 @@ func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error {
if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "use of closed network connection") { // i hate checking this condition with strings.Contains() if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "use of closed network connection") { // i hate checking this condition with strings.Contains()
cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_INFO, cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_INFO,
"Client-side peer(%d,%d,%s,%s) closed", "Client-side peer(%d,%d,%s,%s) closed",
cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String()) cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String())
} else { } else {
cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR, cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR,
"Failed to read from client-side peer(%d,%d,%s,%s) - %s", "Failed to read from client-side peer(%d,%d,%s,%s) - %s",
cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error()) cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error())
} }
break break
} }
err = cpc.route.cts.psc.Send(MakePeerDataPacket(cpc.route.id, cpc.conn_id, buf[0:n])) err = cpc.route.cts.psc.Send(MakePeerDataPacket(cpc.route.Id, cpc.conn_id, buf[0:n]))
if err != nil { if err != nil {
cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR, cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR,
"Failed to write peer(%d,%d,%s,%s) data to server - %s", "Failed to write peer(%d,%d,%s,%s) data to server - %s",
cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error()) cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error())
break break
} }
} }
cpc.route.cts.psc.Send(MakePeerStoppedPacket(cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String())) // nothing much to do upon failure. no error check here cpc.route.cts.psc.Send(MakePeerStoppedPacket(cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String())) // nothing much to do upon failure. no error check here
cpc.ReqStop() cpc.ReqStop()
cpc.route.RemoveClientPeerConn(cpc) cpc.route.RemoveClientPeerConn(cpc)
return nil return nil

176
client.go
View File

@ -75,6 +75,7 @@ type Client struct {
stop_chan chan bool stop_chan chan bool
log Logger log Logger
route_persister ClientRoutePersister
stats struct { stats struct {
conns atomic.Int64 conns atomic.Int64
@ -110,23 +111,23 @@ type ClientConn struct {
type ClientRoute struct { type ClientRoute struct {
cts *ClientConn cts *ClientConn
id RouteId Id RouteId
peer_addr string PeerAddr string
peer_name string PeerName string
peer_option RouteOption PeerOption RouteOption
server_peer_listen_addr *net.TCPAddr // actual service-side service address server_peer_listen_addr *net.TCPAddr // actual service-side service address
server_peer_addr string // desired server-side service address ServerPeerAddr string // desired server-side service address
server_peer_net string ServerPeerNet string
server_peer_option RouteOption ServerPeerOption RouteOption
ptc_mtx sync.Mutex ptc_mtx sync.Mutex
ptc_map ClientPeerConnMap ptc_map ClientPeerConnMap
ptc_cancel_map ClientPeerCancelFuncMap ptc_cancel_map ClientPeerCancelFuncMap
ptc_wg sync.WaitGroup ptc_wg sync.WaitGroup
lifetime time.Duration Lifetime time.Duration
lifetime_start time.Time LifetimeStart time.Time
lifetime_timer *time.Timer lifetime_timer *time.Timer
lifetime_mtx sync.Mutex lifetime_mtx sync.Mutex
@ -153,6 +154,12 @@ type GuardedPacketStreamClient struct {
Hodu_PacketStreamClient Hodu_PacketStreamClient
} }
type ClientRoutePersister interface {
LoadAll(cts *ClientConn)
Save(cts *ClientConn, r *ClientRoute)
Delete(cts *ClientConn, r *ClientRoute)
}
// ------------------------------------ // ------------------------------------
func (g *GuardedPacketStreamClient) Send(data *Packet) error { func (g *GuardedPacketStreamClient) Send(data *Packet) error {
@ -175,19 +182,19 @@ func NewClientRoute(cts *ClientConn, id RouteId, client_peer_addr string, client
var r ClientRoute var r ClientRoute
r.cts = cts r.cts = cts
r.id = id r.Id = id
r.ptc_map = make(ClientPeerConnMap) r.ptc_map = make(ClientPeerConnMap)
r.ptc_cancel_map = make(ClientPeerCancelFuncMap) r.ptc_cancel_map = make(ClientPeerCancelFuncMap)
r.peer_addr = client_peer_addr // client-side peer r.PeerAddr = client_peer_addr // client-side peer
r.peer_name = client_peer_name r.PeerName = client_peer_name
// if the client_peer_addr is a domain name, it can't tell between tcp4 and tcp6 // if the client_peer_addr is a domain name, it can't tell between tcp4 and tcp6
r.peer_option = string_to_route_option(TcpAddrStrClass(client_peer_addr)) r.PeerOption = string_to_route_option(TcpAddrStrClass(client_peer_addr))
r.server_peer_addr = server_peer_svc_addr r.ServerPeerAddr = server_peer_svc_addr
r.server_peer_net = server_peer_svc_net // permitted network for server-side peer r.ServerPeerNet = server_peer_svc_net // permitted network for server-side peer
r.server_peer_option = server_peer_option r.ServerPeerOption = server_peer_option
r.lifetime_start = time.Now() r.LifetimeStart = time.Now()
r.lifetime = lifetime r.Lifetime = lifetime
r.stop_req.Store(false) r.stop_req.Store(false)
r.stop_chan = make(chan bool, 8) r.stop_chan = make(chan bool, 8)
@ -203,7 +210,7 @@ func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id PeerId, pts_ra
r.cts.cli.stats.peers.Add(1) r.cts.cli.stats.peers.Add(1)
r.ptc_mtx.Unlock() r.ptc_mtx.Unlock()
r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
return ptc, nil return ptc, nil
} }
@ -225,24 +232,11 @@ func (r *ClientRoute) RemoveClientPeerConn(ptc *ClientPeerConn) error {
r.cts.cli.stats.peers.Add(-1) r.cts.cli.stats.peers.Add(-1)
r.ptc_mtx.Unlock() r.ptc_mtx.Unlock()
r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
ptc.ReqStop() ptc.ReqStop()
return nil return nil
} }
/*func (r *ClientRoute) RemoveAllClientPeerConns() {
var c *ClientPeerConn
r.ptc_mtx.Lock()
defer r.ptc_mtx.Unlock()
for _, c = range r.ptc_map {
delete(r.ptc_map, c.conn_id)
r.cts.cli.stats.peers.Add(-1)
c.ReqStop()
}
}*/
func (r *ClientRoute) ReqStopAllClientPeerConns() { func (r *ClientRoute) ReqStopAllClientPeerConns() {
var c *ClientPeerConn var c *ClientPeerConn
@ -259,9 +253,7 @@ func (r *ClientRoute) FindClientPeerConnById(conn_id PeerId) *ClientPeerConn {
defer r.ptc_mtx.Unlock() defer r.ptc_mtx.Unlock()
c, ok = r.ptc_map[conn_id] c, ok = r.ptc_map[conn_id]
if !ok { if !ok { return nil }
return nil
}
return c return c
} }
@ -276,9 +268,10 @@ func (r *ClientRoute) ExtendLifetime(lifetime time.Duration) error {
} else { } else {
var expiry time.Time var expiry time.Time
r.lifetime_timer.Stop() r.lifetime_timer.Stop()
r.lifetime = r.lifetime + lifetime r.Lifetime = r.Lifetime + lifetime
expiry = r.lifetime_start.Add(r.lifetime) expiry = r.LifetimeStart.Add(r.Lifetime)
r.lifetime_timer.Reset(expiry.Sub(time.Now())) r.lifetime_timer.Reset(expiry.Sub(time.Now()))
if r.cts.cli.route_persister != nil { r.cts.cli.route_persister.Save(r.cts, r) }
return nil return nil
} }
} }
@ -292,9 +285,10 @@ func (r *ClientRoute) ResetLifetime(lifetime time.Duration) error {
return fmt.Errorf("prohibited operation") return fmt.Errorf("prohibited operation")
} else { } else {
r.lifetime_timer.Stop() r.lifetime_timer.Stop()
r.lifetime = lifetime r.Lifetime = lifetime
r.lifetime_start = time.Now() r.LifetimeStart = time.Now()
r.lifetime_timer.Reset(lifetime) r.lifetime_timer.Reset(lifetime)
if r.cts.cli.route_persister != nil { r.cts.cli.route_persister.Save(r.cts, r) }
return nil return nil
} }
} }
@ -307,22 +301,22 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) {
// it merely implements some timeout if set. // it merely implements some timeout if set.
defer wg.Done() defer wg.Done()
err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.server_peer_option, r.peer_addr, r.peer_name, r.server_peer_addr, r.server_peer_net)) err = r.cts.psc.Send(MakeRouteStartPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ServerPeerAddr, r.ServerPeerNet))
if err != nil { if err != nil {
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
"Failed to send route_start for route(%d,%s,%v,%v) to %s", "Failed to send route_start for route(%d,%s,%v,%v) to %s",
r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr) r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr)
goto done goto done
} else { } else {
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
"Sent route_start for route(%d,%s,%v,%v) to %s", "Sent route_start for route(%d,%s,%v,%v) to %s",
r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr) r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr)
} }
r.lifetime_mtx.Lock() r.lifetime_mtx.Lock()
if r.lifetime > 0 { if r.Lifetime > 0 {
r.lifetime_start = time.Now() r.LifetimeStart = time.Now()
r.lifetime_timer = time.NewTimer(r.lifetime) r.lifetime_timer = time.NewTimer(r.Lifetime)
} }
r.lifetime_mtx.Unlock() r.lifetime_mtx.Unlock()
@ -335,7 +329,7 @@ main_loop:
case <-r.lifetime_timer.C: case <-r.lifetime_timer.C:
r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "route(%d,%s,%v,%v) reached end of lifetime(%v)", r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "route(%d,%s,%v,%v) reached end of lifetime(%v)",
r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.lifetime) r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.Lifetime)
break main_loop break main_loop
} }
} else { } else {
@ -357,15 +351,15 @@ done:
r.ReqStop() r.ReqStop()
r.ptc_wg.Wait() // wait for all peer tasks are finished r.ptc_wg.Wait() // wait for all peer tasks are finished
err = r.cts.psc.Send(MakeRouteStopPacket(r.id, r.server_peer_option, r.peer_addr, r.peer_name, r.server_peer_addr, r.server_peer_net)) err = r.cts.psc.Send(MakeRouteStopPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ServerPeerAddr, r.ServerPeerNet))
if err != nil { if err != nil {
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
"Failed to route_stop for route(%d,%s,%v,%v) to %s - %s", "Failed to route_stop for route(%d,%s,%v,%v) to %s - %s",
r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr, err.Error()) r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr, err.Error())
} else { } else {
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
"Sent route_stop for route(%d,%s,%v,%v) to %s", "Sent route_stop for route(%d,%s,%v,%v) to %s",
r.id, r.peer_addr, r.server_peer_option, r.server_peer_net, r.cts.remote_addr) r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr)
} }
r.cts.RemoveClientRoute(r) r.cts.RemoveClientRoute(r)
@ -410,7 +404,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
r.ptc_mtx.Unlock() r.ptc_mtx.Unlock()
d.LocalAddr = nil // TOOD: use this if local address is specified d.LocalAddr = nil // TOOD: use this if local address is specified
conn, err = d.DialContext(waitctx, TcpAddrStrClass(r.peer_addr), r.peer_addr) conn, err = d.DialContext(waitctx, TcpAddrStrClass(r.PeerAddr), r.PeerAddr)
r.ptc_mtx.Lock() r.ptc_mtx.Lock()
cancel_wait() cancel_wait()
@ -420,7 +414,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
if err != nil { if err != nil {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Failed to connect to %s for route(%d,%d,%s,%s) - %s", "Failed to connect to %s for route(%d,%d,%s,%s) - %s",
r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
goto peer_aborted goto peer_aborted
} }
@ -428,7 +422,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
if !ok { if !ok {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Failed to get connection information to %s for route(%d,%d,%s,%s) - %s", "Failed to get connection information to %s for route(%d,%d,%s,%s) - %s",
r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
goto peer_aborted goto peer_aborted
} }
@ -438,18 +432,18 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
if err != nil { if err != nil {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Failed to add client peer %s for route(%d,%d,%s,%s) - %s", "Failed to add client peer %s for route(%d,%d,%s,%s) - %s",
r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
goto peer_aborted goto peer_aborted
} }
// ptc.conn is equal to pts_id as assigned in r.AddNewClientPeerConn() // ptc.conn is equal to pts_id as assigned in r.AddNewClientPeerConn()
err = r.cts.psc.Send(MakePeerStartedPacket(r.id, ptc.conn_id, real_conn_raddr, real_conn_laddr)) err = r.cts.psc.Send(MakePeerStartedPacket(r.Id, ptc.conn_id, real_conn_raddr, real_conn_laddr))
if err != nil { if err != nil {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Failed to send peer_start(%d,%d,%s,%s) for route(%d,%d,%s,%s) - %s", "Failed to send peer_start(%d,%d,%s,%s) for route(%d,%d,%s,%s) - %s",
r.id, ptc.conn_id, real_conn_raddr, real_conn_laddr, r.Id, ptc.conn_id, real_conn_raddr, real_conn_laddr,
r.id, pts_id, pts_raddr, pts_laddr, err.Error()) r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
goto peer_aborted goto peer_aborted
} }
@ -459,11 +453,11 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
peer_aborted: peer_aborted:
// real_conn_radd and real_conn_laddr may be empty depending on when the jump to here is made. // real_conn_radd and real_conn_laddr may be empty depending on when the jump to here is made.
err = r.cts.psc.Send(MakePeerAbortedPacket(r.id, pts_id, real_conn_raddr, real_conn_laddr)) err = r.cts.psc.Send(MakePeerAbortedPacket(r.Id, pts_id, real_conn_raddr, real_conn_laddr))
if err != nil { if err != nil {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s", "Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s",
r.id, pts_id, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) r.Id, pts_id, r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
} }
if conn != nil { if conn != nil {
conn.Close() conn.Close()
@ -496,17 +490,17 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
var rd *RouteDesc var rd *RouteDesc
rd, ok = event_data.(*RouteDesc) rd, ok = event_data.(*RouteDesc)
if !ok { if !ok {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.id) r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id)
r.ReqStop() r.ReqStop()
} else { } else {
var addr *net.TCPAddr var addr *net.TCPAddr
addr, err = net.ResolveTCPAddr(TcpAddrStrClass(rd.TargetAddrStr), rd.TargetAddrStr) addr, err = net.ResolveTCPAddr(TcpAddrStrClass(rd.TargetAddrStr), rd.TargetAddrStr)
if err != nil { if err != nil {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid service address(%s) for server peer in route_started event(%d)", rd.TargetAddrStr, r.id) r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid service address(%s) for server peer in route_started event(%d)", rd.TargetAddrStr, r.Id)
r.ReqStop() r.ReqStop()
} else { } else {
r.server_peer_listen_addr = addr r.server_peer_listen_addr = addr
r.server_peer_net = rd.ServiceNetStr r.ServerPeerNet = rd.ServiceNetStr
} }
} }
@ -518,7 +512,7 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
var ok bool var ok bool
_, ok = event_data.(*RouteDesc) _, ok = event_data.(*RouteDesc)
if !ok { if !ok {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.id) r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id)
r.ReqStop() r.ReqStop()
} else { } else {
r.ReqStop() r.ReqStop()
@ -531,23 +525,23 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
pd, ok = event_data.(*PeerDesc) pd, ok = event_data.(*PeerDesc)
if !ok { if !ok {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Protocol error - invalid data in peer_started event(%d,%d)", r.id, pts_id) "Protocol error - invalid data in peer_started event(%d,%d)", r.Id, pts_id)
r.ReqStop() r.ReqStop()
} else { } else {
if r.cts.cli.ptc_limit > 0 && int(r.cts.cli.stats.peers.Load()) >= r.cts.cli.ptc_limit { if r.cts.cli.ptc_limit > 0 && int(r.cts.cli.stats.peers.Load()) >= r.cts.cli.ptc_limit {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Rejecting to connect to peer(%s)for route(%d,%d) - allowed max %d", "Rejecting to connect to peer(%s)for route(%d,%d) - allowed max %d",
r.peer_addr, r.id, pts_id, r.cts.cli.ptc_limit) r.PeerAddr, r.Id, pts_id, r.cts.cli.ptc_limit)
err = r.cts.psc.Send(MakePeerAbortedPacket(r.id, pts_id, "", "")) err = r.cts.psc.Send(MakePeerAbortedPacket(r.Id, pts_id, "", ""))
if err != nil { if err != nil {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s", "Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s",
r.id, pts_id, r.id, pts_id, "", "", err.Error()) r.Id, pts_id, r.Id, pts_id, "", "", err.Error())
} }
} else { } else {
r.ptc_wg.Add(1) r.ptc_wg.Add(1)
go r.ConnectToPeer(pts_id, r.peer_option, pd.RemoteAddrStr, pd.LocalAddrStr, &r.ptc_wg) go r.ConnectToPeer(pts_id, r.PeerOption, pd.RemoteAddrStr, pd.LocalAddrStr, &r.ptc_wg)
} }
} }
@ -562,14 +556,14 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
pd, ok = event_data.(*PeerDesc) pd, ok = event_data.(*PeerDesc)
if !ok { if !ok {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Protocol error - invalid data in peer_aborted event(%d,%d)", r.id, pts_id) "Protocol error - invalid data in peer_aborted event(%d,%d)", r.Id, pts_id)
r.ReqStop() r.ReqStop()
} else { } else {
err = r.DisconnectFromPeer(ptc) err = r.DisconnectFromPeer(ptc)
if err != nil { if err != nil {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Failed to disconnect from peer(%d,%d,%s,%s) - %s", "Failed to disconnect from peer(%d,%d,%s,%s) - %s",
r.id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error()) r.Id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error())
ptc.ReqStop() ptc.ReqStop()
} }
} }
@ -587,14 +581,14 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
if !ok { if !ok {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Protocol error - invalid data in peer_stopped event(%d,%d)", "Protocol error - invalid data in peer_stopped event(%d,%d)",
r.id, pts_id) r.Id, pts_id)
ptc.ReqStop() ptc.ReqStop()
} else { } else {
err = r.DisconnectFromPeer(ptc) err = r.DisconnectFromPeer(ptc)
if err != nil { if err != nil {
r.cts.cli.log.Write(r.cts.sid, LOG_WARN, r.cts.cli.log.Write(r.cts.sid, LOG_WARN,
"Failed to disconnect from peer(%d,%d,%s,%s) - %s", "Failed to disconnect from peer(%d,%d,%s,%s) - %s",
r.id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error()) r.Id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error())
ptc.ReqStop() ptc.ReqStop()
} }
} }
@ -611,7 +605,7 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
if !ok { if !ok {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Protocol error - invalid data in peer_eof event(%d,%d)", "Protocol error - invalid data in peer_eof event(%d,%d)",
r.id, pts_id) r.Id, pts_id)
ptc.ReqStop() ptc.ReqStop()
} else { } else {
ptc.CloseWrite() ptc.CloseWrite()
@ -630,14 +624,14 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
if !ok { if !ok {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Protocol error - invalid data in peer_data event(%d,%d)", "Protocol error - invalid data in peer_data event(%d,%d)",
r.id, pts_id) r.Id, pts_id)
ptc.ReqStop() ptc.ReqStop()
} else { } else {
_, err = ptc.conn.Write(data) _, err = ptc.conn.Write(data)
if err != nil { if err != nil {
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
"Failed to write to peer(%d,%d,%s,%s) - %s", "Failed to write to peer(%d,%d,%s,%s) - %s",
r.id, pts_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String(), err.Error()) r.Id, pts_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String(), err.Error())
ptc.ReqStop() ptc.ReqStop()
} }
} }
@ -705,11 +699,12 @@ func (cts *ClientConn) AddNewClientRoute(rc *ClientRouteConfig) (*ClientRoute, e
} }
r = NewClientRoute(cts, assigned_id, rc.PeerAddr, rc.PeerName, rc.ServiceAddr, rc.ServiceNet, rc.Option, rc.Lifetime) r = NewClientRoute(cts, assigned_id, rc.PeerAddr, rc.PeerName, rc.ServiceAddr, rc.ServiceNet, rc.Option, rc.Lifetime)
cts.route_map[r.id] = r cts.route_map[r.Id] = r
cts.cli.stats.routes.Add(1) cts.cli.stats.routes.Add(1)
if cts.cli.route_persister != nil { cts.cli.route_persister.Save(cts, r) }
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
cts.cli.log.Write(cts.sid, LOG_INFO, "Added route(%d,%d) %s", cts.id, r.id, r.peer_addr) cts.cli.log.Write(cts.sid, LOG_INFO, "Added route(%d,%d) %s", cts.id, r.Id, r.PeerAddr)
cts.route_wg.Add(1) cts.route_wg.Add(1)
go r.RunTask(&cts.route_wg) go r.RunTask(&cts.route_wg)
@ -731,7 +726,7 @@ func (cts *ClientConn) RemoveAllClientRoutes() {
defer cts.route_mtx.Unlock() defer cts.route_mtx.Unlock()
for _, r = range cts.route_map { for _, r = range cts.route_map {
delete(cts.route_map, r.id) delete(cts.route_map, r.Id)
cts.cli.stats.routes.Add(-1) cts.cli.stats.routes.Add(-1)
r.ReqStop() r.ReqStop()
} }
@ -742,20 +737,21 @@ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error {
var ok bool var ok bool
cts.route_mtx.Lock() cts.route_mtx.Lock()
r, ok = cts.route_map[route.id] r, ok = cts.route_map[route.Id]
if !ok { if !ok {
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
return fmt.Errorf("non-existent route id - %d", route.id) return fmt.Errorf("non-existent route id - %d", route.Id)
} }
if r != route { if r != route {
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
return fmt.Errorf("conflicting route id - %d", route.id) return fmt.Errorf("conflicting route id - %d", route.Id)
} }
delete(cts.route_map, route.id) delete(cts.route_map, route.Id)
cts.cli.stats.routes.Add(-1) cts.cli.stats.routes.Add(-1)
if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) }
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", route.id, route.peer_addr) cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", route.Id, route.PeerAddr)
r.ReqStop() r.ReqStop()
return nil return nil
@ -773,9 +769,10 @@ func (cts *ClientConn) RemoveClientRouteById(route_id RouteId) error {
} }
delete(cts.route_map, route_id) delete(cts.route_map, route_id)
cts.cli.stats.routes.Add(-1) cts.cli.stats.routes.Add(-1)
if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) }
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.id, r.peer_addr) cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr)
r.ReqStop() r.ReqStop()
return nil return nil
@ -790,10 +787,11 @@ func (cts *ClientConn) RemoveClientRouteByServerPeerSvcPortId(port_id PortId) er
cts.route_mtx.Lock() cts.route_mtx.Lock()
for _, r = range cts.route_map { for _, r = range cts.route_map {
if r.server_peer_listen_addr.Port == int(port_id) { if r.server_peer_listen_addr.Port == int(port_id) {
delete(cts.route_map, r.id) delete(cts.route_map, r.Id)
cts.cli.stats.routes.Add(-1) cts.cli.stats.routes.Add(-1)
if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) }
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.id, r.peer_addr) cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr)
r.ReqStop() r.ReqStop()
return nil return nil
} }
@ -978,6 +976,8 @@ start_over:
} }
} }
if cts.cli.route_persister != nil { cts.cli.route_persister.LoadAll(cts) }
for { for {
var pkt *Packet var pkt *Packet
@ -1640,3 +1640,7 @@ func (c *Client) WaitForTermination() {
func (c *Client) WriteLog(id string, level LogLevel, fmtstr string, args ...interface{}) { func (c *Client) WriteLog(id string, level LogLevel, fmtstr string, args ...interface{}) {
c.log.Write(id, level, fmtstr, args...) c.log.Write(id, level, fmtstr, args...)
} }
func (c *Client) SetRoutePersister(persister ClientRoutePersister) {
c.route_persister = persister
}