capatalized the first letter of some field name for exposure outside package
This commit is contained in:
parent
b5c1ae2a73
commit
1c49023c37
@ -61,7 +61,7 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) {
|
|||||||
pss = spc.route.Cts.pss
|
pss = spc.route.Cts.pss
|
||||||
err = pss.Send(MakePeerStartedPacket(spc.route.Id, spc.conn_id, conn_raddr, conn_laddr))
|
err = pss.Send(MakePeerStartedPacket(spc.route.Id, spc.conn_id, conn_raddr, conn_laddr))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
spc.route.Cts.S.log.Write(spc.route.Cts.Sid, LOG_ERROR,
|
||||||
"Failed to send peer_started event(%d,%d,%s,%s) to client - %s",
|
"Failed to send peer_started event(%d,%d,%s,%s) to client - %s",
|
||||||
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
||||||
goto done_without_stop
|
goto done_without_stop
|
||||||
@ -98,14 +98,14 @@ wait_for_started:
|
|||||||
if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "use of closed network connection") { // i don't like this way to check this error.
|
if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "use of closed network connection") { // i don't like this way to check this error.
|
||||||
err = pss.Send(MakePeerEofPacket(spc.route.Id, spc.conn_id))
|
err = pss.Send(MakePeerEofPacket(spc.route.Id, spc.conn_id))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
spc.route.Cts.S.log.Write(spc.route.Cts.Sid, LOG_ERROR,
|
||||||
"Failed to send peer_eof event(%d,%d,%s,%s) to client - %s",
|
"Failed to send peer_eof event(%d,%d,%s,%s) to client - %s",
|
||||||
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
goto wait_for_stopped
|
goto wait_for_stopped
|
||||||
} else {
|
} else {
|
||||||
spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
spc.route.Cts.S.log.Write(spc.route.Cts.Sid, LOG_ERROR,
|
||||||
"Failed to read data from peer(%d,%d,%s,%s) - %s",
|
"Failed to read data from peer(%d,%d,%s,%s) - %s",
|
||||||
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
@ -114,7 +114,7 @@ wait_for_started:
|
|||||||
|
|
||||||
err = pss.Send(MakePeerDataPacket(spc.route.Id, spc.conn_id, buf[:n]))
|
err = pss.Send(MakePeerDataPacket(spc.route.Id, spc.conn_id, buf[:n]))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
spc.route.Cts.S.log.Write(spc.route.Cts.Sid, LOG_ERROR,
|
||||||
"Failed to send data from peer(%d,%d,%s,%s) to client - %s",
|
"Failed to send data from peer(%d,%d,%s,%s) to client - %s",
|
||||||
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
@ -134,7 +134,7 @@ wait_for_stopped:
|
|||||||
done:
|
done:
|
||||||
err = pss.Send(MakePeerStoppedPacket(spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String()))
|
err = pss.Send(MakePeerStoppedPacket(spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
spc.route.Cts.S.log.Write(spc.route.Cts.Sid, LOG_ERROR,
|
||||||
"Failed to send peer_stopped(%d,%d,%s,%s) to client - %s",
|
"Failed to send peer_stopped(%d,%d,%s,%s) to client - %s",
|
||||||
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
||||||
// nothing much to do about the failure of sending this
|
// nothing much to do about the failure of sending this
|
||||||
@ -206,21 +206,21 @@ func (spc *ServerPeerConn) ReportEvent(event_type PACKET_KIND, event_data interf
|
|||||||
var err error
|
var err error
|
||||||
_, err = spc.conn.Write(data)
|
_, err = spc.conn.Write(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
spc.route.Cts.S.log.Write(spc.route.Cts.Sid, LOG_ERROR,
|
||||||
"Failed to write data from %s to peer(%d,%d,%s) - %s",
|
"Failed to write data from %s to peer(%d,%d,%s) - %s",
|
||||||
spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String(), err.Error())
|
spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String(), err.Error())
|
||||||
spc.ReqStop()
|
spc.ReqStop()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// this must not happen.
|
// this must not happen.
|
||||||
spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
spc.route.Cts.S.log.Write(spc.route.Cts.Sid, LOG_ERROR,
|
||||||
"Protocol error - invalid data in peer_data event from %s to peer(%d,%d,%s)",
|
"Protocol error - invalid data in peer_data event from %s to peer(%d,%d,%s)",
|
||||||
spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String())
|
spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String())
|
||||||
spc.ReqStop()
|
spc.ReqStop()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// protocol error. the client must not relay more data from the client-side peer after EOF.
|
// protocol error. the client must not relay more data from the client-side peer after EOF.
|
||||||
spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
spc.route.Cts.S.log.Write(spc.route.Cts.Sid, LOG_ERROR,
|
||||||
"Protocol error - redundant data from %s to (%d,%d,%s)",
|
"Protocol error - redundant data from %s to (%d,%d,%s)",
|
||||||
spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String())
|
spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String())
|
||||||
spc.ReqStop()
|
spc.ReqStop()
|
||||||
|
82
server.go
82
server.go
@ -127,7 +127,7 @@ type Server struct {
|
|||||||
type ServerConn struct {
|
type ServerConn struct {
|
||||||
S *Server
|
S *Server
|
||||||
Id ConnId
|
Id ConnId
|
||||||
sid string // for logging
|
Sid string // for logging
|
||||||
|
|
||||||
RemoteAddr net.Addr // client address that created this structure
|
RemoteAddr net.Addr // client address that created this structure
|
||||||
LocalAddr net.Addr // local address that the client is connected to
|
LocalAddr net.Addr // local address that the client is connected to
|
||||||
@ -279,7 +279,7 @@ func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) {
|
|||||||
delete(r.pts_map, pts.conn_id)
|
delete(r.pts_map, pts.conn_id)
|
||||||
r.Cts.S.stats.peers.Add(-1)
|
r.Cts.S.stats.peers.Add(-1)
|
||||||
r.pts_mtx.Unlock()
|
r.pts_mtx.Unlock()
|
||||||
r.Cts.S.log.Write(r.Cts.sid, LOG_DEBUG, "Removed server-side peer connection %s from route(%d)", pts.conn.RemoteAddr().String(), r.Id)
|
r.Cts.S.log.Write(r.Cts.Sid, LOG_DEBUG, "Removed server-side peer connection %s from route(%d)", pts.conn.RemoteAddr().String(), r.Id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
|
func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
|
||||||
@ -295,9 +295,9 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
|
|||||||
conn, err = r.svc_l.AcceptTCP() // this call is blocking...
|
conn, err = r.svc_l.AcceptTCP() // this call is blocking...
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, net.ErrClosed) {
|
if errors.Is(err, net.ErrClosed) {
|
||||||
r.Cts.S.log.Write(r.Cts.sid, LOG_INFO, "Server-side peer listener closed on route(%d)", r.Id)
|
r.Cts.S.log.Write(r.Cts.Sid, LOG_INFO, "Server-side peer listener closed on route(%d)", r.Id)
|
||||||
} else {
|
} else {
|
||||||
r.Cts.S.log.Write(r.Cts.sid, LOG_INFO, "Server-side peer listener error on route(%d) - %s", r.Id, err.Error())
|
r.Cts.S.log.Write(r.Cts.Sid, LOG_INFO, "Server-side peer listener error on route(%d) - %s", r.Id, err.Error())
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -306,21 +306,21 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
|
|||||||
iaddr, _ = netip.AddrFromSlice(raddr.IP)
|
iaddr, _ = netip.AddrFromSlice(raddr.IP)
|
||||||
|
|
||||||
if !r.SvcPermNet.Contains(iaddr) {
|
if !r.SvcPermNet.Contains(iaddr) {
|
||||||
r.Cts.S.log.Write(r.Cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed range %v", raddr.String(), r.Id, r.SvcPermNet)
|
r.Cts.S.log.Write(r.Cts.Sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed range %v", raddr.String(), r.Id, r.SvcPermNet)
|
||||||
conn.Close()
|
conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.Cts.S.pts_limit > 0 && int(r.Cts.S.stats.peers.Load()) >= r.Cts.S.pts_limit {
|
if r.Cts.S.pts_limit > 0 && int(r.Cts.S.stats.peers.Load()) >= r.Cts.S.pts_limit {
|
||||||
r.Cts.S.log.Write(r.Cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed max %d", raddr.String(), r.Id, r.Cts.S.pts_limit)
|
r.Cts.S.log.Write(r.Cts.Sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed max %d", raddr.String(), r.Id, r.Cts.S.pts_limit)
|
||||||
conn.Close()
|
conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
pts, err = r.AddNewServerPeerConn(conn)
|
pts, err = r.AddNewServerPeerConn(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Cts.S.log.Write(r.Cts.sid, LOG_ERROR, "Failed to add server-side peer %s to route(%d) - %s", r.Id, raddr.String(), r.Id, err.Error())
|
r.Cts.S.log.Write(r.Cts.Sid, LOG_ERROR, "Failed to add server-side peer %s to route(%d) - %s", r.Id, raddr.String(), r.Id, err.Error())
|
||||||
conn.Close()
|
conn.Close()
|
||||||
} else {
|
} else {
|
||||||
r.Cts.S.log.Write(r.Cts.sid, LOG_DEBUG, "Added server-side peer %s to route(%d)", raddr.String(), r.Id)
|
r.Cts.S.log.Write(r.Cts.Sid, LOG_DEBUG, "Added server-side peer %s to route(%d)", raddr.String(), r.Id)
|
||||||
r.pts_wg.Add(1)
|
r.pts_wg.Add(1)
|
||||||
go pts.RunTask(&r.pts_wg)
|
go pts.RunTask(&r.pts_wg)
|
||||||
}
|
}
|
||||||
@ -329,7 +329,7 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
|
|||||||
r.ReqStop()
|
r.ReqStop()
|
||||||
|
|
||||||
r.pts_wg.Wait()
|
r.pts_wg.Wait()
|
||||||
r.Cts.S.log.Write(r.Cts.sid, LOG_DEBUG, "All service-side peer handlers ended on route(%d)", r.Id)
|
r.Cts.S.log.Write(r.Cts.Sid, LOG_DEBUG, "All service-side peer handlers ended on route(%d)", r.Id)
|
||||||
|
|
||||||
r.Cts.RemoveServerRoute(r) // final phase...
|
r.Cts.RemoveServerRoute(r) // final phase...
|
||||||
}
|
}
|
||||||
@ -433,7 +433,7 @@ func (cts *ServerConn) make_route_listener(id RouteId, option RouteOption, svc_r
|
|||||||
prev_cri, ok = cts.S.svc_port_map[PortId(svcaddr.Port)]
|
prev_cri, ok = cts.S.svc_port_map[PortId(svcaddr.Port)]
|
||||||
if ok {
|
if ok {
|
||||||
cts.S.svc_port_mtx.Unlock()
|
cts.S.svc_port_mtx.Unlock()
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR,
|
cts.S.log.Write(cts.Sid, LOG_ERROR,
|
||||||
"Route(%d,%d) on %s not unique by port number - existing route(%d,%d)",
|
"Route(%d,%d) on %s not unique by port number - existing route(%d,%d)",
|
||||||
cts.Id, id, prev_cri.conn_id, prev_cri.route_id, svcaddr.String())
|
cts.Id, id, prev_cri.conn_id, prev_cri.route_id, svcaddr.String())
|
||||||
l.Close()
|
l.Close()
|
||||||
@ -442,7 +442,7 @@ func (cts *ServerConn) make_route_listener(id RouteId, option RouteOption, svc_r
|
|||||||
cts.S.svc_port_map[PortId(svcaddr.Port)] = ConnRouteId{conn_id: cts.Id, route_id: id}
|
cts.S.svc_port_map[PortId(svcaddr.Port)] = ConnRouteId{conn_id: cts.Id, route_id: id}
|
||||||
cts.S.svc_port_mtx.Unlock()
|
cts.S.svc_port_mtx.Unlock()
|
||||||
|
|
||||||
cts.S.log.Write(cts.sid, LOG_DEBUG, "Route(%d,%d) listening on %s", cts.Id, id, svcaddr.String())
|
cts.S.log.Write(cts.Sid, LOG_DEBUG, "Route(%d,%d) listening on %s", cts.Id, id, svcaddr.String())
|
||||||
return l, svcaddr, nil
|
return l, svcaddr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -580,11 +580,11 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
for {
|
for {
|
||||||
pkt, err = cts.pss.Recv()
|
pkt, err = cts.pss.Recv()
|
||||||
if errors.Is(err, io.EOF) {
|
if errors.Is(err, io.EOF) {
|
||||||
cts.S.log.Write(cts.sid, LOG_INFO, "RPC stream closed for client %s", cts.RemoteAddr)
|
cts.S.log.Write(cts.Sid, LOG_INFO, "RPC stream closed for client %s", cts.RemoteAddr)
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR, "RPC stream error for client %s - %s", cts.RemoteAddr, err.Error())
|
cts.S.log.Write(cts.Sid, LOG_ERROR, "RPC stream error for client %s - %s", cts.RemoteAddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -598,37 +598,37 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
|
|
||||||
r, err = cts.AddNewServerRoute(RouteId(x.Route.RouteId), RouteOption(x.Route.ServiceOption), x.Route.TargetAddrStr, x.Route.TargetName, x.Route.ServiceAddrStr, x.Route.ServiceNetStr)
|
r, err = cts.AddNewServerRoute(RouteId(x.Route.RouteId), RouteOption(x.Route.ServiceOption), x.Route.TargetAddrStr, x.Route.TargetName, x.Route.ServiceAddrStr, x.Route.ServiceNetStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR,
|
cts.S.log.Write(cts.Sid, LOG_ERROR,
|
||||||
"Failed to add route(%d,%s) for %s - %s",
|
"Failed to add route(%d,%s) for %s - %s",
|
||||||
x.Route.RouteId, x.Route.TargetAddrStr, cts.RemoteAddr, err.Error())
|
x.Route.RouteId, x.Route.TargetAddrStr, cts.RemoteAddr, err.Error())
|
||||||
|
|
||||||
err = cts.pss.Send(MakeRouteStoppedPacket(RouteId(x.Route.RouteId), RouteOption(x.Route.ServiceOption), x.Route.TargetAddrStr, x.Route.TargetName, x.Route.ServiceAddrStr, x.Route.ServiceNetStr))
|
err = cts.pss.Send(MakeRouteStoppedPacket(RouteId(x.Route.RouteId), RouteOption(x.Route.ServiceOption), x.Route.TargetAddrStr, x.Route.TargetName, x.Route.ServiceAddrStr, x.Route.ServiceNetStr))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR,
|
cts.S.log.Write(cts.Sid, LOG_ERROR,
|
||||||
"Failed to send route_stopped event(%d,%s,%v,%s) to client %s - %s",
|
"Failed to send route_stopped event(%d,%s,%v,%s) to client %s - %s",
|
||||||
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr, err.Error())
|
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
} else {
|
} else {
|
||||||
cts.S.log.Write(cts.sid, LOG_DEBUG,
|
cts.S.log.Write(cts.Sid, LOG_DEBUG,
|
||||||
"Sent route_stopped event(%d,%s,%v,%s) to client %s",
|
"Sent route_stopped event(%d,%s,%v,%s) to client %s",
|
||||||
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr)
|
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
cts.S.log.Write(cts.sid, LOG_INFO,
|
cts.S.log.Write(cts.Sid, LOG_INFO,
|
||||||
"Added route(%d,%s,%s,%v,%v) for client %s to cts(%d)",
|
"Added route(%d,%s,%s,%v,%v) for client %s to cts(%d)",
|
||||||
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet, cts.RemoteAddr, cts.Id)
|
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet, cts.RemoteAddr, cts.Id)
|
||||||
err = cts.pss.Send(MakeRouteStartedPacket(r.Id, r.SvcOption, r.SvcAddr.String(), r.PtcName, r.SvcReqAddr, r.SvcPermNet.String()))
|
err = cts.pss.Send(MakeRouteStartedPacket(r.Id, r.SvcOption, r.SvcAddr.String(), r.PtcName, r.SvcReqAddr, r.SvcPermNet.String()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.ReqStop()
|
r.ReqStop()
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR,
|
cts.S.log.Write(cts.Sid, LOG_ERROR,
|
||||||
"Failed to send route_started event(%d,%s,%s,%s%v,%v) to client %s - %s",
|
"Failed to send route_started event(%d,%s,%s,%s%v,%v) to client %s - %s",
|
||||||
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet, cts.RemoteAddr, err.Error())
|
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet, cts.RemoteAddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
cts.S.log.Write(cts.sid, LOG_INFO, "Received invalid packet from %s", cts.RemoteAddr)
|
cts.S.log.Write(cts.Sid, LOG_INFO, "Received invalid packet from %s", cts.RemoteAddr)
|
||||||
// TODO: need to abort this client?
|
// TODO: need to abort this client?
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -641,24 +641,24 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
|
|
||||||
r, err = cts.RemoveServerRouteById(RouteId(x.Route.RouteId))
|
r, err = cts.RemoveServerRouteById(RouteId(x.Route.RouteId))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR,
|
cts.S.log.Write(cts.Sid, LOG_ERROR,
|
||||||
"Failed to delete route(%d,%s) for client %s - %s",
|
"Failed to delete route(%d,%s) for client %s - %s",
|
||||||
x.Route.RouteId, x.Route.TargetAddrStr, cts.RemoteAddr, err.Error())
|
x.Route.RouteId, x.Route.TargetAddrStr, cts.RemoteAddr, err.Error())
|
||||||
} else {
|
} else {
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR,
|
cts.S.log.Write(cts.Sid, LOG_ERROR,
|
||||||
"Deleted route(%d,%s,%s,%v,%v) for client %s",
|
"Deleted route(%d,%s,%s,%v,%v) for client %s",
|
||||||
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet.String(), cts.RemoteAddr)
|
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet.String(), cts.RemoteAddr)
|
||||||
err = cts.pss.Send(MakeRouteStoppedPacket(r.Id, r.SvcOption, r.PtcAddr, r.PtcName, r.SvcReqAddr, r.SvcPermNet.String()))
|
err = cts.pss.Send(MakeRouteStoppedPacket(r.Id, r.SvcOption, r.PtcAddr, r.PtcName, r.SvcReqAddr, r.SvcPermNet.String()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.ReqStop()
|
r.ReqStop()
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR,
|
cts.S.log.Write(cts.Sid, LOG_ERROR,
|
||||||
"Failed to send route_stopped event(%d,%s,%s,%v.%v) to client %s - %s",
|
"Failed to send route_stopped event(%d,%s,%s,%v.%v) to client %s - %s",
|
||||||
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet.String(), cts.RemoteAddr, err.Error())
|
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet.String(), cts.RemoteAddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR, "Invalid route_stop event from %s", cts.RemoteAddr)
|
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid route_stop event from %s", cts.RemoteAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
case PACKET_KIND_PEER_STARTED:
|
case PACKET_KIND_PEER_STARTED:
|
||||||
@ -669,17 +669,17 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
if ok {
|
if ok {
|
||||||
err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STARTED, x.Peer)
|
err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STARTED, x.Peer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR,
|
cts.S.log.Write(cts.Sid, LOG_ERROR,
|
||||||
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s",
|
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s",
|
||||||
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
||||||
} else {
|
} else {
|
||||||
cts.S.log.Write(cts.sid, LOG_DEBUG,
|
cts.S.log.Write(cts.Sid, LOG_DEBUG,
|
||||||
"Handled peer_started event from %s for peer(%d,%d,%s,%s)",
|
"Handled peer_started event from %s for peer(%d,%d,%s,%s)",
|
||||||
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// invalid event data
|
// invalid event data
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR, "Invalid peer_started event from %s", cts.RemoteAddr)
|
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_started event from %s", cts.RemoteAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
case PACKET_KIND_PEER_ABORTED:
|
case PACKET_KIND_PEER_ABORTED:
|
||||||
@ -689,17 +689,17 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
if ok {
|
if ok {
|
||||||
err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_ABORTED, x.Peer)
|
err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_ABORTED, x.Peer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR,
|
cts.S.log.Write(cts.Sid, LOG_ERROR,
|
||||||
"Failed to handle peer_aborted event from %s for peer(%d,%d,%s,%s) - %s",
|
"Failed to handle peer_aborted event from %s for peer(%d,%d,%s,%s) - %s",
|
||||||
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
||||||
} else {
|
} else {
|
||||||
cts.S.log.Write(cts.sid, LOG_DEBUG,
|
cts.S.log.Write(cts.Sid, LOG_DEBUG,
|
||||||
"Handled peer_aborted event from %s for peer(%d,%d,%s,%s)",
|
"Handled peer_aborted event from %s for peer(%d,%d,%s,%s)",
|
||||||
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// invalid event data
|
// invalid event data
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR, "Invalid peer_aborted event from %s", cts.RemoteAddr)
|
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_aborted event from %s", cts.RemoteAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
case PACKET_KIND_PEER_STOPPED:
|
case PACKET_KIND_PEER_STOPPED:
|
||||||
@ -710,17 +710,17 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
if ok {
|
if ok {
|
||||||
err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STOPPED, x.Peer)
|
err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STOPPED, x.Peer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR,
|
cts.S.log.Write(cts.Sid, LOG_ERROR,
|
||||||
"Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s",
|
"Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s",
|
||||||
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
||||||
} else {
|
} else {
|
||||||
cts.S.log.Write(cts.sid, LOG_DEBUG,
|
cts.S.log.Write(cts.Sid, LOG_DEBUG,
|
||||||
"Handled peer_stopped event from %s for peer(%d,%d,%s,%s)",
|
"Handled peer_stopped event from %s for peer(%d,%d,%s,%s)",
|
||||||
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// invalid event data
|
// invalid event data
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.RemoteAddr)
|
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.RemoteAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
case PACKET_KIND_PEER_DATA:
|
case PACKET_KIND_PEER_DATA:
|
||||||
@ -731,17 +731,17 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
if ok {
|
if ok {
|
||||||
err = cts.ReportEvent(RouteId(x.Data.RouteId), PeerId(x.Data.PeerId), PACKET_KIND_PEER_DATA, x.Data.Data)
|
err = cts.ReportEvent(RouteId(x.Data.RouteId), PeerId(x.Data.PeerId), PACKET_KIND_PEER_DATA, x.Data.Data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR,
|
cts.S.log.Write(cts.Sid, LOG_ERROR,
|
||||||
"Failed to handle peer_data event from %s for peer(%d,%d) - %s",
|
"Failed to handle peer_data event from %s for peer(%d,%d) - %s",
|
||||||
cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId, err.Error())
|
cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId, err.Error())
|
||||||
} else {
|
} else {
|
||||||
cts.S.log.Write(cts.sid, LOG_DEBUG,
|
cts.S.log.Write(cts.Sid, LOG_DEBUG,
|
||||||
"Handled peer_data event from %s for peer(%d,%d)",
|
"Handled peer_data event from %s for peer(%d,%d)",
|
||||||
cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId)
|
cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// invalid event data
|
// invalid event data
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR, "Invalid peer_data event from %s", cts.RemoteAddr)
|
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_data event from %s", cts.RemoteAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
case PACKET_KIND_CONN_NOTICE:
|
case PACKET_KIND_CONN_NOTICE:
|
||||||
@ -754,13 +754,13 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
cts.S.conn_notice.Handle(cts, x.Notice.Text)
|
cts.S.conn_notice.Handle(cts, x.Notice.Text)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
cts.S.log.Write(cts.sid, LOG_ERROR, "Invalid conn_data event from %s", cts.RemoteAddr)
|
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_data event from %s", cts.RemoteAddr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
done:
|
done:
|
||||||
cts.S.log.Write(cts.sid, LOG_INFO, "RPC stream receiver ended")
|
cts.S.log.Write(cts.Sid, LOG_INFO, "RPC stream receiver ended")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
|
func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
|
||||||
@ -788,7 +788,7 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
|
|||||||
// or continue
|
// or continue
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done(): // the stream context is done
|
case <-ctx.Done(): // the stream context is done
|
||||||
cts.S.log.Write(cts.sid, LOG_INFO, "RPC stream done - %s", ctx.Err().Error())
|
cts.S.log.Write(cts.Sid, LOG_INFO, "RPC stream done - %s", ctx.Err().Error())
|
||||||
goto done
|
goto done
|
||||||
|
|
||||||
case <- cts.stop_chan:
|
case <- cts.stop_chan:
|
||||||
@ -911,7 +911,7 @@ func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) {
|
|||||||
var cts *ServerConn
|
var cts *ServerConn
|
||||||
var log_id string
|
var log_id string
|
||||||
cts, _ = cc.server.RemoveServerConnByAddr(p.Addr)
|
cts, _ = cc.server.RemoveServerConnByAddr(p.Addr)
|
||||||
if cts != nil { log_id = cts.sid }
|
if cts != nil { log_id = cts.Sid }
|
||||||
cc.server.log.Write(log_id, LOG_INFO, "Client disconnected - %s", addr)
|
cc.server.log.Write(log_id, LOG_INFO, "Client disconnected - %s", addr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1541,7 +1541,7 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
cts.Id = assigned_id
|
cts.Id = assigned_id
|
||||||
cts.sid = fmt.Sprintf("%d", cts.Id) // id in string used for logging
|
cts.Sid = fmt.Sprintf("%d", cts.Id) // id in string used for logging
|
||||||
|
|
||||||
_, ok = s.cts_map_by_addr[cts.RemoteAddr]
|
_, ok = s.cts_map_by_addr[cts.RemoteAddr]
|
||||||
if ok {
|
if ok {
|
||||||
@ -1553,7 +1553,7 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
|
|||||||
s.stats.conns.Store(int64(len(s.cts_map)))
|
s.stats.conns.Store(int64(len(s.cts_map)))
|
||||||
s.cts_mtx.Unlock()
|
s.cts_mtx.Unlock()
|
||||||
|
|
||||||
s.log.Write(cts.sid, LOG_DEBUG, "Added client connection from %s", cts.RemoteAddr.String())
|
s.log.Write(cts.Sid, LOG_DEBUG, "Added client connection from %s", cts.RemoteAddr.String())
|
||||||
return &cts, nil
|
return &cts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user