renamed cts.id/sid to cts.Id/Sid respectively
This commit is contained in:
parent
e2f1d58c5e
commit
0809f9bedc
@ -188,7 +188,7 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R
|
||||
})
|
||||
}
|
||||
js = append(js, json_out_client_conn{
|
||||
Id: cts.id,
|
||||
Id: cts.Id,
|
||||
ReqServerAddrs: cts.cfg.ServerAddrs,
|
||||
CurrentServerIndex: cts.cfg.Index,
|
||||
ServerAddr: cts.remote_addr,
|
||||
@ -227,7 +227,7 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R
|
||||
if err = je.Encode(JsonErrmsg{Text: err.Error()}); err != nil { goto oops }
|
||||
} else {
|
||||
status_code = WriteJsonRespHeader(w, http.StatusCreated)
|
||||
if err = je.Encode(json_out_client_conn_id{Id: cts.id}); err != nil { goto oops }
|
||||
if err = je.Encode(json_out_client_conn_id{Id: cts.Id}); err != nil { goto oops }
|
||||
}
|
||||
|
||||
case http.MethodDelete:
|
||||
@ -301,7 +301,7 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt
|
||||
})
|
||||
}
|
||||
js = &json_out_client_conn{
|
||||
Id: cts.id,
|
||||
Id: cts.Id,
|
||||
ReqServerAddrs: cts.cfg.ServerAddrs,
|
||||
CurrentServerIndex: cts.cfg.Index,
|
||||
ServerAddr: cts.local_addr,
|
||||
@ -433,7 +433,7 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r
|
||||
if err = je.Encode(JsonErrmsg{Text: err.Error()}); err != nil { goto oops }
|
||||
} else {
|
||||
status_code = WriteJsonRespHeader(w, http.StatusCreated)
|
||||
if err = je.Encode(json_out_client_route_id{Id: r.Id, CtsId: r.cts.id}); err != nil { goto oops }
|
||||
if err = je.Encode(json_out_client_route_id{Id: r.Id, CtsId: r.cts.Id}); err != nil { goto oops }
|
||||
}
|
||||
|
||||
case http.MethodDelete:
|
||||
|
@ -31,11 +31,11 @@ func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error {
|
||||
n, err = cpc.conn.Read(buf[:])
|
||||
if err != nil {
|
||||
if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "use of closed network connection") { // i hate checking this condition with strings.Contains()
|
||||
cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_INFO,
|
||||
cpc.route.cts.cli.log.Write(cpc.route.cts.Sid, LOG_INFO,
|
||||
"Client-side peer(%d,%d,%s,%s) closed",
|
||||
cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String())
|
||||
} else {
|
||||
cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR,
|
||||
cpc.route.cts.cli.log.Write(cpc.route.cts.Sid, LOG_ERROR,
|
||||
"Failed to read from client-side peer(%d,%d,%s,%s) - %s",
|
||||
cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error())
|
||||
}
|
||||
@ -44,7 +44,7 @@ func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error {
|
||||
|
||||
err = cpc.route.cts.psc.Send(MakePeerDataPacket(cpc.route.Id, cpc.conn_id, buf[0:n]))
|
||||
if err != nil {
|
||||
cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR,
|
||||
cpc.route.cts.cli.log.Write(cpc.route.cts.Sid, LOG_ERROR,
|
||||
"Failed to write peer(%d,%d,%s,%s) data to server - %s",
|
||||
cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error())
|
||||
break
|
||||
|
144
client.go
144
client.go
@ -88,8 +88,8 @@ type Client struct {
|
||||
type ClientConn struct {
|
||||
cli *Client
|
||||
cfg ClientConfigActive
|
||||
id ConnId
|
||||
sid string // id rendered in string
|
||||
Id ConnId
|
||||
Sid string // id rendered in string
|
||||
|
||||
local_addr string
|
||||
remote_addr string
|
||||
@ -210,7 +210,7 @@ func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id PeerId, pts_ra
|
||||
r.cts.cli.stats.peers.Add(1)
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
|
||||
return ptc, nil
|
||||
}
|
||||
|
||||
@ -232,7 +232,7 @@ func (r *ClientRoute) RemoveClientPeerConn(ptc *ClientPeerConn) error {
|
||||
r.cts.cli.stats.peers.Add(-1)
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
|
||||
ptc.ReqStop()
|
||||
return nil
|
||||
}
|
||||
@ -303,12 +303,12 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) {
|
||||
|
||||
err = r.cts.psc.Send(MakeRouteStartPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ServerPeerAddr, r.ServerPeerNet))
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_DEBUG,
|
||||
"Failed to send route_start for route(%d,%s,%v,%v) to %s",
|
||||
r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr)
|
||||
goto done
|
||||
} else {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_DEBUG,
|
||||
"Sent route_start for route(%d,%s,%v,%v) to %s",
|
||||
r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr)
|
||||
}
|
||||
@ -328,7 +328,7 @@ main_loop:
|
||||
break main_loop
|
||||
|
||||
case <-r.lifetime_timer.C:
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "route(%d,%s,%v,%v) reached end of lifetime(%v)",
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_INFO, "route(%d,%s,%v,%v) reached end of lifetime(%v)",
|
||||
r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.Lifetime)
|
||||
break main_loop
|
||||
}
|
||||
@ -353,11 +353,11 @@ done:
|
||||
|
||||
err = r.cts.psc.Send(MakeRouteStopPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ServerPeerAddr, r.ServerPeerNet))
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_DEBUG,
|
||||
"Failed to route_stop for route(%d,%s,%v,%v) to %s - %s",
|
||||
r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr, err.Error())
|
||||
} else {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_DEBUG,
|
||||
"Sent route_stop for route(%d,%s,%v,%v) to %s",
|
||||
r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr)
|
||||
}
|
||||
@ -412,7 +412,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Failed to connect to %s for route(%d,%d,%s,%s) - %s",
|
||||
r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
|
||||
goto peer_aborted
|
||||
@ -420,7 +420,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
|
||||
|
||||
real_conn, ok = conn.(*net.TCPConn)
|
||||
if !ok {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Failed to get connection information to %s for route(%d,%d,%s,%s) - %s",
|
||||
r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
|
||||
goto peer_aborted
|
||||
@ -430,7 +430,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
|
||||
real_conn_laddr = real_conn.LocalAddr().String()
|
||||
ptc, err = r.AddNewClientPeerConn(real_conn, pts_id, pts_raddr, pts_laddr)
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Failed to add client peer %s for route(%d,%d,%s,%s) - %s",
|
||||
r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
|
||||
goto peer_aborted
|
||||
@ -440,7 +440,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
|
||||
|
||||
err = r.cts.psc.Send(MakePeerStartedPacket(r.Id, ptc.conn_id, real_conn_raddr, real_conn_laddr))
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Failed to send peer_start(%d,%d,%s,%s) for route(%d,%d,%s,%s) - %s",
|
||||
r.Id, ptc.conn_id, real_conn_raddr, real_conn_laddr,
|
||||
r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
|
||||
@ -455,7 +455,7 @@ peer_aborted:
|
||||
// real_conn_radd and real_conn_laddr may be empty depending on when the jump to here is made.
|
||||
err = r.cts.psc.Send(MakePeerAbortedPacket(r.Id, pts_id, real_conn_raddr, real_conn_laddr))
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s",
|
||||
r.Id, pts_id, r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
|
||||
}
|
||||
@ -490,13 +490,13 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
|
||||
var rd *RouteDesc
|
||||
rd, ok = event_data.(*RouteDesc)
|
||||
if !ok {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id)
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id)
|
||||
r.ReqStop()
|
||||
} else {
|
||||
var addr *net.TCPAddr
|
||||
addr, err = net.ResolveTCPAddr(TcpAddrStrClass(rd.TargetAddrStr), rd.TargetAddrStr)
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid service address(%s) for server peer in route_started event(%d)", rd.TargetAddrStr, r.Id)
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid service address(%s) for server peer in route_started event(%d)", rd.TargetAddrStr, r.Id)
|
||||
r.ReqStop()
|
||||
} else {
|
||||
r.server_peer_listen_addr = addr
|
||||
@ -512,7 +512,7 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
|
||||
var ok bool
|
||||
_, ok = event_data.(*RouteDesc)
|
||||
if !ok {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id)
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id)
|
||||
r.ReqStop()
|
||||
} else {
|
||||
r.ReqStop()
|
||||
@ -524,18 +524,18 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
|
||||
|
||||
pd, ok = event_data.(*PeerDesc)
|
||||
if !ok {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Protocol error - invalid data in peer_started event(%d,%d)", r.Id, pts_id)
|
||||
r.ReqStop()
|
||||
} else {
|
||||
if r.cts.cli.ptc_limit > 0 && int(r.cts.cli.stats.peers.Load()) >= r.cts.cli.ptc_limit {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Rejecting to connect to peer(%s)for route(%d,%d) - allowed max %d",
|
||||
r.PeerAddr, r.Id, pts_id, r.cts.cli.ptc_limit)
|
||||
|
||||
err = r.cts.psc.Send(MakePeerAbortedPacket(r.Id, pts_id, "", ""))
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s",
|
||||
r.Id, pts_id, r.Id, pts_id, "", "", err.Error())
|
||||
}
|
||||
@ -555,13 +555,13 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
|
||||
|
||||
pd, ok = event_data.(*PeerDesc)
|
||||
if !ok {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Protocol error - invalid data in peer_aborted event(%d,%d)", r.Id, pts_id)
|
||||
r.ReqStop()
|
||||
} else {
|
||||
err = r.DisconnectFromPeer(ptc)
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Failed to disconnect from peer(%d,%d,%s,%s) - %s",
|
||||
r.Id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error())
|
||||
ptc.ReqStop()
|
||||
@ -579,14 +579,14 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
|
||||
|
||||
pd, ok = event_data.(*PeerDesc)
|
||||
if !ok {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Protocol error - invalid data in peer_stopped event(%d,%d)",
|
||||
r.Id, pts_id)
|
||||
ptc.ReqStop()
|
||||
} else {
|
||||
err = r.DisconnectFromPeer(ptc)
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_WARN,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_WARN,
|
||||
"Failed to disconnect from peer(%d,%d,%s,%s) - %s",
|
||||
r.Id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error())
|
||||
ptc.ReqStop()
|
||||
@ -603,7 +603,7 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
|
||||
|
||||
_, ok = event_data.(*PeerDesc)
|
||||
if !ok {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Protocol error - invalid data in peer_eof event(%d,%d)",
|
||||
r.Id, pts_id)
|
||||
ptc.ReqStop()
|
||||
@ -622,14 +622,14 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
|
||||
|
||||
data, ok = event_data.([]byte)
|
||||
if !ok {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Protocol error - invalid data in peer_data event(%d,%d)",
|
||||
r.Id, pts_id)
|
||||
ptc.ReqStop()
|
||||
} else {
|
||||
_, err = ptc.conn.Write(data)
|
||||
if err != nil {
|
||||
r.cts.cli.log.Write(r.cts.sid, LOG_ERROR,
|
||||
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
|
||||
"Failed to write to peer(%d,%d,%s,%s) - %s",
|
||||
r.Id, pts_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String(), err.Error())
|
||||
ptc.ReqStop()
|
||||
@ -704,7 +704,7 @@ func (cts *ClientConn) AddNewClientRoute(rc *ClientRouteConfig) (*ClientRoute, e
|
||||
if cts.cli.route_persister != nil { cts.cli.route_persister.Save(cts, r) }
|
||||
cts.route_mtx.Unlock()
|
||||
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Added route(%d,%d) %s", cts.id, r.Id, r.PeerAddr)
|
||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Added route(%d,%d) %s", cts.Id, r.Id, r.PeerAddr)
|
||||
|
||||
cts.route_wg.Add(1)
|
||||
go r.RunTask(&cts.route_wg)
|
||||
@ -751,7 +751,7 @@ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error {
|
||||
if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) }
|
||||
cts.route_mtx.Unlock()
|
||||
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", route.Id, route.PeerAddr)
|
||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", route.Id, route.PeerAddr)
|
||||
|
||||
r.ReqStop()
|
||||
return nil
|
||||
@ -772,7 +772,7 @@ func (cts *ClientConn) RemoveClientRouteById(route_id RouteId) error {
|
||||
if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) }
|
||||
cts.route_mtx.Unlock()
|
||||
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr)
|
||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr)
|
||||
|
||||
r.ReqStop()
|
||||
return nil
|
||||
@ -791,7 +791,7 @@ func (cts *ClientConn) RemoveClientRouteByServerPeerSvcPortId(port_id PortId) er
|
||||
cts.cli.stats.routes.Add(-1)
|
||||
if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) }
|
||||
cts.route_mtx.Unlock()
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr)
|
||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr)
|
||||
r.ReqStop()
|
||||
return nil
|
||||
}
|
||||
@ -911,7 +911,7 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
||||
|
||||
start_over:
|
||||
cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs)
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
if cts.cli.rpctlscfg == nil {
|
||||
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if cts.cfg.ServerAuthority != "" { opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority)) }
|
||||
@ -930,7 +930,7 @@ start_over:
|
||||
|
||||
cts.conn, err = grpc.NewClient(cts.cfg.ServerAddrs[cts.cfg.Index], opts...)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to make client to server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Failed to make client to server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
||||
goto reconnect_to_server
|
||||
}
|
||||
cts.hdc = NewHoduClient(cts.conn)
|
||||
@ -941,18 +941,18 @@ start_over:
|
||||
c_seed.Flags = 0
|
||||
s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
||||
goto reconnect_to_server
|
||||
}
|
||||
cts.s_seed = *s_seed
|
||||
cts.c_seed = c_seed
|
||||
cts.route_next_id = 1 // reset this whenever a new connection is made. the number of routes must be zero.
|
||||
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version)
|
||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version)
|
||||
|
||||
psc, err = cts.hdc.PacketStream(cts.cli.ctx)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
||||
goto reconnect_to_server
|
||||
}
|
||||
|
||||
@ -962,7 +962,7 @@ start_over:
|
||||
cts.local_addr = p.LocalAddr.String()
|
||||
}
|
||||
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
|
||||
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
|
||||
|
||||
@ -971,7 +971,7 @@ start_over:
|
||||
// let's add routes to the client-side peers if given
|
||||
err = cts.AddClientRoutes(cts.cfg.Routes)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to add routes to server[%d] %s for %v - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.cfg.Routes, err.Error())
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Failed to add routes to server[%d] %s for %v - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.cfg.Routes, err.Error())
|
||||
goto done
|
||||
}
|
||||
}
|
||||
@ -999,7 +999,7 @@ start_over:
|
||||
if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) {
|
||||
goto reconnect_to_server
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet from %s - %s", cts.remote_addr, err.Error())
|
||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Failed to receive packet from %s - %s", cts.remote_addr, err.Error())
|
||||
goto reconnect_to_server
|
||||
}
|
||||
}
|
||||
@ -1013,16 +1013,16 @@ start_over:
|
||||
if ok {
|
||||
err = cts.ReportEvent(RouteId(x.Route.RouteId), 0, pkt.Kind, x.Route)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR,
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR,
|
||||
"Failed to handle route_started event(%d,%s) from %s - %s",
|
||||
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error())
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_DEBUG,
|
||||
cts.cli.log.Write(cts.Sid, LOG_DEBUG,
|
||||
"Handled route_started event(%d,%s) from %s",
|
||||
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr)
|
||||
}
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid route_started event from %s", cts.remote_addr)
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid route_started event from %s", cts.remote_addr)
|
||||
}
|
||||
|
||||
case PACKET_KIND_ROUTE_STOPPED:
|
||||
@ -1032,16 +1032,16 @@ start_over:
|
||||
if ok {
|
||||
err = cts.ReportEvent(RouteId(x.Route.RouteId), 0, pkt.Kind, x.Route)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR,
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR,
|
||||
"Failed to handle route_stopped event(%d,%s) from %s - %s",
|
||||
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error())
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_DEBUG,
|
||||
cts.cli.log.Write(cts.Sid, LOG_DEBUG,
|
||||
"Handled route_stopped event(%d,%s) from %s",
|
||||
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr)
|
||||
}
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid route_stopped event from %s", cts.remote_addr)
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid route_stopped event from %s", cts.remote_addr)
|
||||
}
|
||||
|
||||
case PACKET_KIND_PEER_STARTED:
|
||||
@ -1052,16 +1052,16 @@ start_over:
|
||||
if ok {
|
||||
err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STARTED, x.Peer)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR,
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR,
|
||||
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s",
|
||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_DEBUG,
|
||||
cts.cli.log.Write(cts.Sid, LOG_DEBUG,
|
||||
"Handled peer_started event from %s for peer(%d,%d,%s,%s)",
|
||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
||||
}
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr)
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr)
|
||||
}
|
||||
|
||||
// PACKET_KIND_PEER_ABORTED is never sent by server to client.
|
||||
@ -1075,16 +1075,16 @@ start_over:
|
||||
if ok {
|
||||
err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STOPPED, x.Peer)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR,
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR,
|
||||
"Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s",
|
||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_DEBUG,
|
||||
cts.cli.log.Write(cts.Sid, LOG_DEBUG,
|
||||
"Handled peer_stopped event from %s for peer(%d,%d,%s,%s)",
|
||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
||||
}
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr)
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr)
|
||||
}
|
||||
|
||||
case PACKET_KIND_PEER_EOF:
|
||||
@ -1094,16 +1094,16 @@ start_over:
|
||||
if ok {
|
||||
err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_EOF, x.Peer)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR,
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR,
|
||||
"Failed to handle peer_eof event from %s for peer(%d,%d,%s,%s) - %s",
|
||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_DEBUG,
|
||||
cts.cli.log.Write(cts.Sid, LOG_DEBUG,
|
||||
"Handled peer_eof event from %s for peer(%d,%d,%s,%s)",
|
||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
||||
}
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_eof event from %s", cts.remote_addr)
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_eof event from %s", cts.remote_addr)
|
||||
}
|
||||
|
||||
case PACKET_KIND_PEER_DATA:
|
||||
@ -1114,16 +1114,16 @@ start_over:
|
||||
if ok {
|
||||
err = cts.ReportEvent(RouteId(x.Data.RouteId), PeerId(x.Data.PeerId), PACKET_KIND_PEER_DATA, x.Data.Data)
|
||||
if err != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR,
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR,
|
||||
"Failed to handle peer_data event from %s for peer(%d,%d) - %s",
|
||||
cts.remote_addr, x.Data.RouteId, x.Data.PeerId, err.Error())
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_DEBUG,
|
||||
cts.cli.log.Write(cts.Sid, LOG_DEBUG,
|
||||
"Handled peer_data event from %s for peer(%d,%d)",
|
||||
cts.remote_addr, x.Data.RouteId, x.Data.PeerId)
|
||||
}
|
||||
} else {
|
||||
cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr)
|
||||
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr)
|
||||
}
|
||||
|
||||
default:
|
||||
@ -1132,7 +1132,7 @@ start_over:
|
||||
}
|
||||
|
||||
done:
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
|
||||
req_stop_and_wait_for_termination:
|
||||
//cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination
|
||||
@ -1145,7 +1145,7 @@ wait_for_termination:
|
||||
|
||||
reconnect_to_server:
|
||||
if cts.conn != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnecting from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
cts.cli.log.Write(cts.Sid, LOG_INFO, "Disconnecting from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
}
|
||||
cts.disconnect_from_server()
|
||||
|
||||
@ -1337,14 +1337,14 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
||||
}
|
||||
}
|
||||
|
||||
cts.id = assigned_id
|
||||
cts.sid = fmt.Sprintf("%d", cts.id) // id in string used for logging
|
||||
cts.Id = assigned_id
|
||||
cts.Sid = fmt.Sprintf("%d", cts.Id) // id in string used for logging
|
||||
|
||||
c.cts_map[cts.id] = cts
|
||||
c.cts_map[cts.Id] = cts
|
||||
c.stats.conns.Add(1)
|
||||
c.cts_mtx.Unlock()
|
||||
|
||||
c.log.Write("", LOG_INFO, "Added client connection(%d) to %v", cts.id, cfg.ServerAddrs)
|
||||
c.log.Write("", LOG_INFO, "Added client connection(%d) to %v", cts.Id, cfg.ServerAddrs)
|
||||
return cts, nil
|
||||
}
|
||||
|
||||
@ -1364,7 +1364,7 @@ func (c *Client) RemoveAllClientConns() {
|
||||
|
||||
for _, cts = range c.cts_map {
|
||||
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
|
||||
delete(c.cts_map, cts.id)
|
||||
delete(c.cts_map, cts.Id)
|
||||
c.stats.conns.Store(int64(len(c.cts_map)))
|
||||
cts.ReqStop()
|
||||
}
|
||||
@ -1377,21 +1377,21 @@ func (c *Client) RemoveClientConn(cts *ClientConn) error {
|
||||
|
||||
c.cts_mtx.Lock()
|
||||
|
||||
conn, ok = c.cts_map[cts.id]
|
||||
conn, ok = c.cts_map[cts.Id]
|
||||
if !ok {
|
||||
c.cts_mtx.Unlock()
|
||||
return fmt.Errorf("non-existent connection id - %d", cts.id)
|
||||
return fmt.Errorf("non-existent connection id - %d", cts.Id)
|
||||
}
|
||||
if conn != cts {
|
||||
c.cts_mtx.Unlock()
|
||||
return fmt.Errorf("conflicting connection id - %d", cts.id)
|
||||
return fmt.Errorf("conflicting connection id - %d", cts.Id)
|
||||
}
|
||||
|
||||
delete(c.cts_map, cts.id)
|
||||
delete(c.cts_map, cts.Id)
|
||||
c.stats.conns.Store(int64(len(c.cts_map)))
|
||||
c.cts_mtx.Unlock()
|
||||
|
||||
c.log.Write("", LOG_INFO, "Removed client connection(%d) to %v", cts.id, cts.cfg.ServerAddrs)
|
||||
c.log.Write("", LOG_INFO, "Removed client connection(%d) to %v", cts.Id, cts.cfg.ServerAddrs)
|
||||
|
||||
cts.ReqStop()
|
||||
return nil
|
||||
@ -1411,11 +1411,11 @@ func (c *Client) RemoveClientConnById(conn_id ConnId) error {
|
||||
|
||||
// NOTE: removal by id doesn't perform identity check
|
||||
|
||||
delete(c.cts_map, cts.id)
|
||||
delete(c.cts_map, cts.Id)
|
||||
c.stats.conns.Store(int64(len(c.cts_map)))
|
||||
c.cts_mtx.Unlock()
|
||||
|
||||
c.log.Write("", LOG_INFO, "Removed client connection(%d) to %v", cts.id, cts.cfg.ServerAddrs)
|
||||
c.log.Write("", LOG_INFO, "Removed client connection(%d) to %v", cts.Id, cts.cfg.ServerAddrs)
|
||||
cts.ReqStop()
|
||||
return nil
|
||||
}
|
||||
@ -1607,7 +1607,7 @@ func (c *Client) StartService(data interface{}) {
|
||||
if err != nil {
|
||||
c.log.Write("", LOG_ERROR, "Failed to start service - %s", err.Error())
|
||||
} else {
|
||||
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.id)
|
||||
c.log.Write("", LOG_INFO, "Started service for %v [%d]", cts.cfg.ServerAddrs, cts.Id)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user