exposed some fields of ServerRoute and ServerConn
This commit is contained in:
parent
a5bb59622e
commit
4016793327
@ -105,18 +105,18 @@ func (ctl *server_ctl_server_conns) ServeHTTP(w http.ResponseWriter, req *http.R
|
|||||||
cts.route_mtx.Lock()
|
cts.route_mtx.Lock()
|
||||||
for _, r = range cts.route_map {
|
for _, r = range cts.route_map {
|
||||||
jsp = append(jsp, json_out_server_route{
|
jsp = append(jsp, json_out_server_route{
|
||||||
Id: r.id,
|
Id: r.Id,
|
||||||
ClientPeerAddr: r.ptc_addr,
|
ClientPeerAddr: r.PtcAddr,
|
||||||
ClientPeerName: r.ptc_name,
|
ClientPeerName: r.PtcName,
|
||||||
ServerPeerServiceAddr: r.svc_addr.String(),
|
ServerPeerServiceAddr: r.SvcAddr.String(),
|
||||||
ServerPeerServiceNet: r.svc_permitted_net.String(),
|
ServerPeerServiceNet: r.SvcPermNet.String(),
|
||||||
ServerPeerOption: r.svc_option.string(),
|
ServerPeerOption: r.SvcOption.string(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
js = append(js, json_out_server_conn{
|
js = append(js, json_out_server_conn{
|
||||||
Id: cts.id,
|
Id: cts.Id,
|
||||||
ClientAddr: cts.remote_addr.String(),
|
ClientAddr: cts.RemoteAddr.String(),
|
||||||
ServerAddr: cts.local_addr.String(),
|
ServerAddr: cts.LocalAddr.String(),
|
||||||
Routes: jsp,
|
Routes: jsp,
|
||||||
})
|
})
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
@ -177,18 +177,18 @@ func (ctl *server_ctl_server_conns_id) ServeHTTP(w http.ResponseWriter, req *htt
|
|||||||
cts.route_mtx.Lock()
|
cts.route_mtx.Lock()
|
||||||
for _, r = range cts.route_map {
|
for _, r = range cts.route_map {
|
||||||
jsp = append(jsp, json_out_server_route{
|
jsp = append(jsp, json_out_server_route{
|
||||||
Id: r.id,
|
Id: r.Id,
|
||||||
ClientPeerAddr: r.ptc_addr,
|
ClientPeerAddr: r.PtcAddr,
|
||||||
ClientPeerName: r.ptc_name,
|
ClientPeerName: r.PtcName,
|
||||||
ServerPeerServiceAddr: r.svc_addr.String(),
|
ServerPeerServiceAddr: r.SvcAddr.String(),
|
||||||
ServerPeerServiceNet: r.svc_permitted_net.String(),
|
ServerPeerServiceNet: r.SvcPermNet.String(),
|
||||||
ServerPeerOption: r.svc_option.string(),
|
ServerPeerOption: r.SvcOption.string(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
js = &json_out_server_conn{
|
js = &json_out_server_conn{
|
||||||
Id: cts.id,
|
Id: cts.Id,
|
||||||
ClientAddr: cts.remote_addr.String(),
|
ClientAddr: cts.RemoteAddr.String(),
|
||||||
ServerAddr: cts.local_addr.String(),
|
ServerAddr: cts.LocalAddr.String(),
|
||||||
Routes: jsp,
|
Routes: jsp,
|
||||||
}
|
}
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
@ -247,12 +247,12 @@ func (ctl *server_ctl_server_conns_id_routes) ServeHTTP(w http.ResponseWriter, r
|
|||||||
cts.route_mtx.Lock()
|
cts.route_mtx.Lock()
|
||||||
for _, r = range cts.route_map {
|
for _, r = range cts.route_map {
|
||||||
jsp = append(jsp, json_out_server_route{
|
jsp = append(jsp, json_out_server_route{
|
||||||
Id: r.id,
|
Id: r.Id,
|
||||||
ClientPeerAddr: r.ptc_addr,
|
ClientPeerAddr: r.PtcAddr,
|
||||||
ClientPeerName: r.ptc_name,
|
ClientPeerName: r.PtcName,
|
||||||
ServerPeerServiceAddr: r.svc_addr.String(),
|
ServerPeerServiceAddr: r.SvcAddr.String(),
|
||||||
ServerPeerServiceNet: r.svc_permitted_net.String(),
|
ServerPeerServiceNet: r.SvcPermNet.String(),
|
||||||
ServerPeerOption: r.svc_option.string(),
|
ServerPeerOption: r.SvcOption.string(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
@ -307,12 +307,12 @@ func (ctl *server_ctl_server_conns_id_routes_id) ServeHTTP(w http.ResponseWriter
|
|||||||
case http.MethodGet:
|
case http.MethodGet:
|
||||||
status_code = http.StatusOK; w.WriteHeader(status_code)
|
status_code = http.StatusOK; w.WriteHeader(status_code)
|
||||||
err = je.Encode(json_out_server_route{
|
err = je.Encode(json_out_server_route{
|
||||||
Id: r.id,
|
Id: r.Id,
|
||||||
ClientPeerAddr: r.ptc_addr,
|
ClientPeerAddr: r.PtcAddr,
|
||||||
ClientPeerName: r.ptc_name,
|
ClientPeerName: r.PtcName,
|
||||||
ServerPeerServiceAddr: r.svc_addr.String(),
|
ServerPeerServiceAddr: r.SvcAddr.String(),
|
||||||
ServerPeerServiceNet: r.svc_permitted_net.String(),
|
ServerPeerServiceNet: r.SvcPermNet.String(),
|
||||||
ServerPeerOption: r.svc_option.string(),
|
ServerPeerOption: r.SvcOption.string(),
|
||||||
})
|
})
|
||||||
if err != nil { goto oops }
|
if err != nil { goto oops }
|
||||||
|
|
||||||
|
@ -57,18 +57,18 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) {
|
|||||||
conn_raddr = spc.conn.RemoteAddr().String()
|
conn_raddr = spc.conn.RemoteAddr().String()
|
||||||
conn_laddr = spc.conn.LocalAddr().String()
|
conn_laddr = spc.conn.LocalAddr().String()
|
||||||
|
|
||||||
pss = spc.route.cts.pss
|
pss = spc.route.Cts.pss
|
||||||
err = pss.Send(MakePeerStartedPacket(spc.route.id, spc.conn_id, conn_raddr, conn_laddr))
|
err = pss.Send(MakePeerStartedPacket(spc.route.Id, spc.conn_id, conn_raddr, conn_laddr))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
|
spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
||||||
"Failed to send peer_started event(%d,%d,%s,%s) to client - %s",
|
"Failed to send peer_started event(%d,%d,%s,%s) to client - %s",
|
||||||
spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
||||||
goto done_without_stop
|
goto done_without_stop
|
||||||
}
|
}
|
||||||
|
|
||||||
// set up a timer to set waiting duration until the connection is
|
// set up a timer to set waiting duration until the connection is
|
||||||
// actually established on the client side and it's informed...
|
// actually established on the client side and it's informed...
|
||||||
waitctx, cancel_wait = context.WithTimeout(spc.route.cts.svr.ctx, 5 * time.Second) // TODO: make this configurable
|
waitctx, cancel_wait = context.WithTimeout(spc.route.Cts.svr.ctx, 5 * time.Second) // TODO: make this configurable
|
||||||
wait_for_started:
|
wait_for_started:
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
@ -95,27 +95,27 @@ wait_for_started:
|
|||||||
n, err = spc.conn.Read(buf[:])
|
n, err = spc.conn.Read(buf[:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "use of closed network connection") { // i don't like this way to check this error.
|
if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "use of closed network connection") { // i don't like this way to check this error.
|
||||||
err = pss.Send(MakePeerEofPacket(spc.route.id, spc.conn_id))
|
err = pss.Send(MakePeerEofPacket(spc.route.Id, spc.conn_id))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
|
spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
||||||
"Failed to send peer_eof event(%d,%d,%s,%s) to client - %s",
|
"Failed to send peer_eof event(%d,%d,%s,%s) to client - %s",
|
||||||
spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
goto wait_for_stopped
|
goto wait_for_stopped
|
||||||
} else {
|
} else {
|
||||||
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
|
spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
||||||
"Failed to read data from peer(%d,%d,%s,%s) - %s",
|
"Failed to read data from peer(%d,%d,%s,%s) - %s",
|
||||||
spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = pss.Send(MakePeerDataPacket(spc.route.id, spc.conn_id, buf[:n]))
|
err = pss.Send(MakePeerDataPacket(spc.route.Id, spc.conn_id, buf[:n]))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
|
spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
||||||
"Failed to send data from peer(%d,%d,%s,%s) to client - %s",
|
"Failed to send data from peer(%d,%d,%s,%s) to client - %s",
|
||||||
spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -131,11 +131,11 @@ wait_for_stopped:
|
|||||||
}
|
}
|
||||||
|
|
||||||
done:
|
done:
|
||||||
err = pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String()))
|
err = pss.Send(MakePeerStoppedPacket(spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
|
spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
||||||
"Failed to send peer_stopped(%d,%d,%s,%s) to client - %s",
|
"Failed to send peer_stopped(%d,%d,%s,%s) to client - %s",
|
||||||
spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error())
|
||||||
// nothing much to do about the failure of sending this
|
// nothing much to do about the failure of sending this
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -192,23 +192,23 @@ func (spc *ServerPeerConn) ReportEvent(event_type PACKET_KIND, event_data interf
|
|||||||
var err error
|
var err error
|
||||||
_, err = spc.conn.Write(data)
|
_, err = spc.conn.Write(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
|
spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
||||||
"Failed to write data from %s to peer(%d,%d,%s) - %s",
|
"Failed to write data from %s to peer(%d,%d,%s) - %s",
|
||||||
spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), err.Error())
|
spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String(), err.Error())
|
||||||
spc.ReqStop()
|
spc.ReqStop()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// this must not happen.
|
// this must not happen.
|
||||||
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
|
spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
||||||
"Protocol error - invalid data in peer_data event from %s to peer(%d,%d,%s)",
|
"Protocol error - invalid data in peer_data event from %s to peer(%d,%d,%s)",
|
||||||
spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String())
|
spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String())
|
||||||
spc.ReqStop()
|
spc.ReqStop()
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// protocol error. the client must not relay more data from the client-side peer after EOF.
|
// protocol error. the client must not relay more data from the client-side peer after EOF.
|
||||||
spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR,
|
spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR,
|
||||||
"Protocol error - redundant data from %s to (%d,%d,%s)",
|
"Protocol error - redundant data from %s to (%d,%d,%s)",
|
||||||
spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String())
|
spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String())
|
||||||
spc.ReqStop()
|
spc.ReqStop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -352,7 +352,7 @@ func (pxy *server_proxy_http_main) req_to_proxy_url (req *http.Request, r *Serve
|
|||||||
// HTTP or HTTPS is actually a hint to the client-side peer
|
// HTTP or HTTPS is actually a hint to the client-side peer
|
||||||
// Use the hint to compose the URL to the client via the server-side
|
// Use the hint to compose the URL to the client via the server-side
|
||||||
// listening socket as if it connects to the client-side peer
|
// listening socket as if it connects to the client-side peer
|
||||||
if r.svc_option & RouteOption(ROUTE_OPTION_HTTPS) != 0 {
|
if r.SvcOption & RouteOption(ROUTE_OPTION_HTTPS) != 0 {
|
||||||
proxy_proto = "https"
|
proxy_proto = "https"
|
||||||
} else {
|
} else {
|
||||||
proxy_proto = "http"
|
proxy_proto = "http"
|
||||||
@ -363,7 +363,7 @@ func (pxy *server_proxy_http_main) req_to_proxy_url (req *http.Request, r *Serve
|
|||||||
|
|
||||||
return &url.URL{
|
return &url.URL{
|
||||||
Scheme: proxy_proto,
|
Scheme: proxy_proto,
|
||||||
Host: r.ptc_addr,
|
Host: r.PtcAddr,
|
||||||
Path: proxy_url_path,
|
Path: proxy_url_path,
|
||||||
RawQuery: req.URL.RawQuery,
|
RawQuery: req.URL.RawQuery,
|
||||||
Fragment: req.URL.Fragment,
|
Fragment: req.URL.Fragment,
|
||||||
@ -402,13 +402,13 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re
|
|||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
if r.svc_option & (RouteOption(ROUTE_OPTION_HTTP) | RouteOption(ROUTE_OPTION_HTTPS)) == 0 {
|
if r.SvcOption & (RouteOption(ROUTE_OPTION_HTTP) | RouteOption(ROUTE_OPTION_HTTPS)) == 0 {
|
||||||
status_code = http.StatusForbidden; w.WriteHeader(status_code)
|
status_code = http.StatusForbidden; w.WriteHeader(status_code)
|
||||||
err = fmt.Errorf("target not http/https")
|
err = fmt.Errorf("target not http/https")
|
||||||
goto oops
|
goto oops
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
addr = svc_addr_to_dst_addr(r.svc_addr)
|
addr = svc_addr_to_dst_addr(r.SvcAddr)
|
||||||
transport, err = pxy.addr_to_transport(s.ctx, addr)
|
transport, err = pxy.addr_to_transport(s.ctx, addr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
status_code = http.StatusBadGateway; w.WriteHeader(status_code)
|
status_code = http.StatusBadGateway; w.WriteHeader(status_code)
|
||||||
@ -456,7 +456,7 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re
|
|||||||
resp_body = resp.Body
|
resp_body = resp.Body
|
||||||
|
|
||||||
if in_wpx_mode && s.wpx_resp_tf != nil {
|
if in_wpx_mode && s.wpx_resp_tf != nil {
|
||||||
resp_body = s.wpx_resp_tf(path_prefix, resp)
|
resp_body = s.wpx_resp_tf(r, path_prefix, resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
outhdr = w.Header()
|
outhdr = w.Header()
|
||||||
@ -620,13 +620,13 @@ func (pxy *server_proxy_ssh_ws) connect_ssh (ctx context.Context, username strin
|
|||||||
}
|
}
|
||||||
|
|
||||||
/* Is this protection needed?
|
/* Is this protection needed?
|
||||||
if r.svc_option & RouteOption(ROUTE_OPTION_SSH) == 0 {
|
if r.SvcOption & RouteOption(ROUTE_OPTION_SSH) == 0 {
|
||||||
err = fmt.Errorf("target not ssh")
|
err = fmt.Errorf("target not ssh")
|
||||||
goto oops
|
goto oops
|
||||||
}
|
}
|
||||||
*/
|
*/
|
||||||
|
|
||||||
addr = svc_addr_to_dst_addr(r.svc_addr);
|
addr = svc_addr_to_dst_addr(r.SvcAddr);
|
||||||
|
|
||||||
dialer = &net.Dialer{}
|
dialer = &net.Dialer{}
|
||||||
conn, err = dialer.DialContext(ctx, "tcp", addr.String())
|
conn, err = dialer.DialContext(ctx, "tcp", addr.String())
|
||||||
|
175
server.go
175
server.go
@ -34,7 +34,7 @@ type ServerRouteMap = map[RouteId]*ServerRoute
|
|||||||
type ServerPeerConnMap = map[PeerId]*ServerPeerConn
|
type ServerPeerConnMap = map[PeerId]*ServerPeerConn
|
||||||
type ServerSvcPortMap = map[PortId]ConnRouteId
|
type ServerSvcPortMap = map[PortId]ConnRouteId
|
||||||
|
|
||||||
type ServerWpxResponseTransformer func(path_prefix string, resp *http.Response) io.Reader
|
type ServerWpxResponseTransformer func(r *ServerRoute, path_prefix string, resp *http.Response) io.Reader
|
||||||
|
|
||||||
type Server struct {
|
type Server struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
@ -97,11 +97,11 @@ type Server struct {
|
|||||||
// client connect to the server, the server accept it, and makes a tunnel request
|
// client connect to the server, the server accept it, and makes a tunnel request
|
||||||
type ServerConn struct {
|
type ServerConn struct {
|
||||||
svr *Server
|
svr *Server
|
||||||
id ConnId
|
Id ConnId
|
||||||
sid string // for logging
|
sid string // for logging
|
||||||
|
|
||||||
remote_addr net.Addr // client address that created this structure
|
RemoteAddr net.Addr // client address that created this structure
|
||||||
local_addr net.Addr // local address that the client is connected to
|
LocalAddr net.Addr // local address that the client is connected to
|
||||||
pss *GuardedPacketStreamServer
|
pss *GuardedPacketStreamServer
|
||||||
|
|
||||||
route_mtx sync.Mutex
|
route_mtx sync.Mutex
|
||||||
@ -114,16 +114,17 @@ type ServerConn struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type ServerRoute struct {
|
type ServerRoute struct {
|
||||||
cts *ServerConn
|
Cts *ServerConn
|
||||||
svc_l *net.TCPListener
|
Id RouteId
|
||||||
svc_addr *net.TCPAddr // actual listening address
|
|
||||||
svc_requested_addr string
|
|
||||||
svc_permitted_net netip.Prefix
|
|
||||||
svc_option RouteOption
|
|
||||||
|
|
||||||
ptc_addr string
|
svc_l *net.TCPListener
|
||||||
ptc_name string
|
SvcAddr *net.TCPAddr // actual listening address
|
||||||
id RouteId
|
SvcReqAddr string
|
||||||
|
SvcPermNet netip.Prefix // network from which access is allowed
|
||||||
|
SvcOption RouteOption
|
||||||
|
|
||||||
|
PtcAddr string
|
||||||
|
PtcName string
|
||||||
|
|
||||||
pts_mtx sync.Mutex
|
pts_mtx sync.Mutex
|
||||||
pts_map ServerPeerConnMap
|
pts_map ServerPeerConnMap
|
||||||
@ -197,16 +198,16 @@ func NewServerRoute(cts *ServerConn, id RouteId, option RouteOption, ptc_addr st
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
r.cts = cts
|
r.Cts = cts
|
||||||
r.id = id
|
r.Id = id
|
||||||
r.svc_l = l
|
r.svc_l = l
|
||||||
r.svc_addr = svcaddr
|
r.SvcAddr = svcaddr
|
||||||
r.svc_requested_addr = svc_requested_addr
|
r.SvcReqAddr = svc_requested_addr
|
||||||
r.svc_permitted_net = svcnet
|
r.SvcPermNet = svcnet
|
||||||
r.svc_option = option
|
r.SvcOption = option
|
||||||
|
|
||||||
r.ptc_addr = ptc_addr
|
r.PtcAddr = ptc_addr
|
||||||
r.ptc_name = ptc_name
|
r.PtcName = ptc_name
|
||||||
r.pts_limit = PTS_LIMIT
|
r.pts_limit = PTS_LIMIT
|
||||||
r.pts_map = make(ServerPeerConnMap)
|
r.pts_map = make(ServerPeerConnMap)
|
||||||
r.pts_next_id = 0
|
r.pts_next_id = 0
|
||||||
@ -243,7 +244,7 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err
|
|||||||
pts = NewServerPeerConn(r, c, r.pts_next_id)
|
pts = NewServerPeerConn(r, c, r.pts_next_id)
|
||||||
r.pts_map[pts.conn_id] = pts
|
r.pts_map[pts.conn_id] = pts
|
||||||
r.pts_next_id++
|
r.pts_next_id++
|
||||||
r.cts.svr.stats.peers.Add(1)
|
r.Cts.svr.stats.peers.Add(1)
|
||||||
|
|
||||||
return pts, nil
|
return pts, nil
|
||||||
}
|
}
|
||||||
@ -251,9 +252,9 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err
|
|||||||
func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) {
|
func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) {
|
||||||
r.pts_mtx.Lock()
|
r.pts_mtx.Lock()
|
||||||
delete(r.pts_map, pts.conn_id)
|
delete(r.pts_map, pts.conn_id)
|
||||||
r.cts.svr.stats.peers.Add(-1)
|
r.Cts.svr.stats.peers.Add(-1)
|
||||||
r.pts_mtx.Unlock()
|
r.pts_mtx.Unlock()
|
||||||
r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Removed server-side peer connection %s from route(%d)", pts.conn.RemoteAddr().String(), r.id)
|
r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "Removed server-side peer connection %s from route(%d)", pts.conn.RemoteAddr().String(), r.Id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
|
func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
|
||||||
@ -269,9 +270,9 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
|
|||||||
conn, err = r.svc_l.AcceptTCP() // this call is blocking...
|
conn, err = r.svc_l.AcceptTCP() // this call is blocking...
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if errors.Is(err, net.ErrClosed) {
|
if errors.Is(err, net.ErrClosed) {
|
||||||
r.cts.svr.log.Write(r.cts.sid, LOG_INFO, "Server-side peer listener closed on route(%d)", r.id)
|
r.Cts.svr.log.Write(r.Cts.sid, LOG_INFO, "Server-side peer listener closed on route(%d)", r.Id)
|
||||||
} else {
|
} else {
|
||||||
r.cts.svr.log.Write(r.cts.sid, LOG_INFO, "Server-side peer listener error on route(%d) - %s", r.id, err.Error())
|
r.Cts.svr.log.Write(r.Cts.sid, LOG_INFO, "Server-side peer listener error on route(%d) - %s", r.Id, err.Error())
|
||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
@ -279,22 +280,22 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
|
|||||||
raddr = conn.RemoteAddr().(*net.TCPAddr)
|
raddr = conn.RemoteAddr().(*net.TCPAddr)
|
||||||
iaddr, _ = netip.AddrFromSlice(raddr.IP)
|
iaddr, _ = netip.AddrFromSlice(raddr.IP)
|
||||||
|
|
||||||
if !r.svc_permitted_net.Contains(iaddr) {
|
if !r.SvcPermNet.Contains(iaddr) {
|
||||||
r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed range %v", raddr.String(), r.id, r.svc_permitted_net)
|
r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed range %v", raddr.String(), r.Id, r.SvcPermNet)
|
||||||
conn.Close()
|
conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.cts.svr.pts_limit > 0 && int(r.cts.svr.stats.peers.Load()) >= r.cts.svr.pts_limit {
|
if r.Cts.svr.pts_limit > 0 && int(r.Cts.svr.stats.peers.Load()) >= r.Cts.svr.pts_limit {
|
||||||
r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed max %d", raddr.String(), r.id, r.cts.svr.pts_limit)
|
r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed max %d", raddr.String(), r.Id, r.Cts.svr.pts_limit)
|
||||||
conn.Close()
|
conn.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
pts, err = r.AddNewServerPeerConn(conn)
|
pts, err = r.AddNewServerPeerConn(conn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.cts.svr.log.Write(r.cts.sid, LOG_ERROR, "Failed to add server-side peer %s to route(%d) - %s", r.id, raddr.String(), r.id, err.Error())
|
r.Cts.svr.log.Write(r.Cts.sid, LOG_ERROR, "Failed to add server-side peer %s to route(%d) - %s", r.Id, raddr.String(), r.Id, err.Error())
|
||||||
conn.Close()
|
conn.Close()
|
||||||
} else {
|
} else {
|
||||||
r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Added server-side peer %s to route(%d)", raddr.String(), r.id)
|
r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "Added server-side peer %s to route(%d)", raddr.String(), r.Id)
|
||||||
r.pts_wg.Add(1)
|
r.pts_wg.Add(1)
|
||||||
go pts.RunTask(&r.pts_wg)
|
go pts.RunTask(&r.pts_wg)
|
||||||
}
|
}
|
||||||
@ -303,9 +304,9 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
|
|||||||
r.ReqStop()
|
r.ReqStop()
|
||||||
|
|
||||||
r.pts_wg.Wait()
|
r.pts_wg.Wait()
|
||||||
r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "All service-side peer handlers ended on route(%d)", r.id)
|
r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "All service-side peer handlers ended on route(%d)", r.Id)
|
||||||
|
|
||||||
r.cts.RemoveServerRoute(r) // final phase...
|
r.Cts.RemoveServerRoute(r) // final phase...
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ServerRoute) ReqStop() {
|
func (r *ServerRoute) ReqStop() {
|
||||||
@ -386,14 +387,14 @@ func (cts *ServerConn) make_route_listener(id RouteId, option RouteOption, svc_r
|
|||||||
if ok {
|
if ok {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Route(%d,%d) on %s not unique by port number - existing route(%d,%d)",
|
"Route(%d,%d) on %s not unique by port number - existing route(%d,%d)",
|
||||||
cts.id, id, prev_cri.conn_id, prev_cri.route_id, svcaddr.String())
|
cts.Id, id, prev_cri.conn_id, prev_cri.route_id, svcaddr.String())
|
||||||
l.Close()
|
l.Close()
|
||||||
return nil, nil, err
|
return nil, nil, err
|
||||||
}
|
}
|
||||||
cts.svr.svc_port_map[PortId(svcaddr.Port)] = ConnRouteId{conn_id: cts.id, route_id: id}
|
cts.svr.svc_port_map[PortId(svcaddr.Port)] = ConnRouteId{conn_id: cts.Id, route_id: id}
|
||||||
cts.svr.svc_port_mtx.Unlock()
|
cts.svr.svc_port_mtx.Unlock()
|
||||||
|
|
||||||
cts.svr.log.Write(cts.sid, LOG_DEBUG, "Route(%d,%d) listening on %s", cts.id, id, svcaddr.String())
|
cts.svr.log.Write(cts.sid, LOG_DEBUG, "Route(%d,%d) listening on %s", cts.Id, id, svcaddr.String())
|
||||||
return l, svcaddr, nil
|
return l, svcaddr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -429,21 +430,21 @@ func (cts *ServerConn) RemoveServerRoute(route *ServerRoute) error {
|
|||||||
var ok bool
|
var ok bool
|
||||||
|
|
||||||
cts.route_mtx.Lock()
|
cts.route_mtx.Lock()
|
||||||
r, ok = cts.route_map[route.id]
|
r, ok = cts.route_map[route.Id]
|
||||||
if !ok {
|
if !ok {
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
return fmt.Errorf("non-existent route id - %d", route.id)
|
return fmt.Errorf("non-existent route id - %d", route.Id)
|
||||||
}
|
}
|
||||||
if r != route {
|
if r != route {
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
return fmt.Errorf("non-existent route - %d", route.id)
|
return fmt.Errorf("non-existent route - %d", route.Id)
|
||||||
}
|
}
|
||||||
delete(cts.route_map, route.id)
|
delete(cts.route_map, route.Id)
|
||||||
cts.svr.stats.routes.Add(-1)
|
cts.svr.stats.routes.Add(-1)
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
|
|
||||||
cts.svr.svc_port_mtx.Lock()
|
cts.svr.svc_port_mtx.Lock()
|
||||||
delete(cts.svr.svc_port_map, PortId(route.svc_addr.Port))
|
delete(cts.svr.svc_port_map, PortId(route.SvcAddr.Port))
|
||||||
cts.svr.svc_port_mtx.Unlock()
|
cts.svr.svc_port_mtx.Unlock()
|
||||||
|
|
||||||
r.ReqStop()
|
r.ReqStop()
|
||||||
@ -465,7 +466,7 @@ func (cts *ServerConn) RemoveServerRouteById(route_id RouteId) (*ServerRoute, er
|
|||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
|
|
||||||
cts.svr.svc_port_mtx.Lock()
|
cts.svr.svc_port_mtx.Lock()
|
||||||
delete(cts.svr.svc_port_map, PortId(r.svc_addr.Port))
|
delete(cts.svr.svc_port_map, PortId(r.SvcAddr.Port))
|
||||||
cts.svr.svc_port_mtx.Unlock()
|
cts.svr.svc_port_mtx.Unlock()
|
||||||
|
|
||||||
r.ReqStop()
|
r.ReqStop()
|
||||||
@ -519,11 +520,11 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
for {
|
for {
|
||||||
pkt, err = cts.pss.Recv()
|
pkt, err = cts.pss.Recv()
|
||||||
if errors.Is(err, io.EOF) {
|
if errors.Is(err, io.EOF) {
|
||||||
cts.svr.log.Write(cts.sid, LOG_INFO, "RPC stream closed for client %s", cts.remote_addr)
|
cts.svr.log.Write(cts.sid, LOG_INFO, "RPC stream closed for client %s", cts.RemoteAddr)
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR, "RPC stream error for client %s - %s", cts.remote_addr, err.Error())
|
cts.svr.log.Write(cts.sid, LOG_ERROR, "RPC stream error for client %s - %s", cts.RemoteAddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -539,35 +540,35 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Failed to add route(%d,%s) for %s - %s",
|
"Failed to add route(%d,%s) for %s - %s",
|
||||||
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error())
|
x.Route.RouteId, x.Route.TargetAddrStr, cts.RemoteAddr, err.Error())
|
||||||
|
|
||||||
err = cts.pss.Send(MakeRouteStoppedPacket(RouteId(x.Route.RouteId), RouteOption(x.Route.ServiceOption), x.Route.TargetAddrStr, x.Route.TargetName, x.Route.ServiceAddrStr, x.Route.ServiceNetStr))
|
err = cts.pss.Send(MakeRouteStoppedPacket(RouteId(x.Route.RouteId), RouteOption(x.Route.ServiceOption), x.Route.TargetAddrStr, x.Route.TargetName, x.Route.ServiceAddrStr, x.Route.ServiceNetStr))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Failed to send route_stopped event(%d,%s,%v,%s) to client %s - %s",
|
"Failed to send route_stopped event(%d,%s,%v,%s) to client %s - %s",
|
||||||
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.remote_addr, err.Error())
|
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
} else {
|
} else {
|
||||||
cts.svr.log.Write(cts.sid, LOG_DEBUG,
|
cts.svr.log.Write(cts.sid, LOG_DEBUG,
|
||||||
"Sent route_stopped event(%d,%s,%v,%s) to client %s",
|
"Sent route_stopped event(%d,%s,%v,%s) to client %s",
|
||||||
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.remote_addr)
|
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
cts.svr.log.Write(cts.sid, LOG_INFO,
|
cts.svr.log.Write(cts.sid, LOG_INFO,
|
||||||
"Added route(%d,%s,%s,%v,%v) for client %s to cts(%d)",
|
"Added route(%d,%s,%s,%v,%v) for client %s to cts(%d)",
|
||||||
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_option, r.svc_permitted_net, cts.remote_addr, cts.id)
|
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet, cts.RemoteAddr, cts.Id)
|
||||||
err = cts.pss.Send(MakeRouteStartedPacket(r.id, r.svc_option, r.svc_addr.String(), r.ptc_name, r.svc_requested_addr, r.svc_permitted_net.String()))
|
err = cts.pss.Send(MakeRouteStartedPacket(r.Id, r.SvcOption, r.SvcAddr.String(), r.PtcName, r.SvcReqAddr, r.SvcPermNet.String()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.ReqStop()
|
r.ReqStop()
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Failed to send route_started event(%d,%s,%s,%s%v,%v) to client %s - %s",
|
"Failed to send route_started event(%d,%s,%s,%s%v,%v) to client %s - %s",
|
||||||
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_option, r.svc_permitted_net, cts.remote_addr, err.Error())
|
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet, cts.RemoteAddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
cts.svr.log.Write(cts.sid, LOG_INFO, "Received invalid packet from %s", cts.remote_addr)
|
cts.svr.log.Write(cts.sid, LOG_INFO, "Received invalid packet from %s", cts.RemoteAddr)
|
||||||
// TODO: need to abort this client?
|
// TODO: need to abort this client?
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -582,22 +583,22 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Failed to delete route(%d,%s) for client %s - %s",
|
"Failed to delete route(%d,%s) for client %s - %s",
|
||||||
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error())
|
x.Route.RouteId, x.Route.TargetAddrStr, cts.RemoteAddr, err.Error())
|
||||||
} else {
|
} else {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Deleted route(%d,%s,%s,%v,%v) for client %s",
|
"Deleted route(%d,%s,%s,%v,%v) for client %s",
|
||||||
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_option, r.svc_permitted_net.String(), cts.remote_addr)
|
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet.String(), cts.RemoteAddr)
|
||||||
err = cts.pss.Send(MakeRouteStoppedPacket(r.id, r.svc_option, r.ptc_addr, r.ptc_name, r.svc_requested_addr, r.svc_permitted_net.String()))
|
err = cts.pss.Send(MakeRouteStoppedPacket(r.Id, r.SvcOption, r.PtcAddr, r.PtcName, r.SvcReqAddr, r.SvcPermNet.String()))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.ReqStop()
|
r.ReqStop()
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Failed to send route_stopped event(%d,%s,%s,%v.%v) to client %s - %s",
|
"Failed to send route_stopped event(%d,%s,%s,%v.%v) to client %s - %s",
|
||||||
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_option, r.svc_permitted_net.String(), cts.remote_addr, err.Error())
|
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet.String(), cts.RemoteAddr, err.Error())
|
||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid route_stop event from %s", cts.remote_addr)
|
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid route_stop event from %s", cts.RemoteAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
case PACKET_KIND_PEER_STARTED:
|
case PACKET_KIND_PEER_STARTED:
|
||||||
@ -610,15 +611,15 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s",
|
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s",
|
||||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
||||||
} else {
|
} else {
|
||||||
cts.svr.log.Write(cts.sid, LOG_DEBUG,
|
cts.svr.log.Write(cts.sid, LOG_DEBUG,
|
||||||
"Handled peer_started event from %s for peer(%d,%d,%s,%s)",
|
"Handled peer_started event from %s for peer(%d,%d,%s,%s)",
|
||||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// invalid event data
|
// invalid event data
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr)
|
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_started event from %s", cts.RemoteAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
case PACKET_KIND_PEER_ABORTED:
|
case PACKET_KIND_PEER_ABORTED:
|
||||||
@ -630,15 +631,15 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Failed to handle peer_aborted event from %s for peer(%d,%d,%s,%s) - %s",
|
"Failed to handle peer_aborted event from %s for peer(%d,%d,%s,%s) - %s",
|
||||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
||||||
} else {
|
} else {
|
||||||
cts.svr.log.Write(cts.sid, LOG_DEBUG,
|
cts.svr.log.Write(cts.sid, LOG_DEBUG,
|
||||||
"Handled peer_aborted event from %s for peer(%d,%d,%s,%s)",
|
"Handled peer_aborted event from %s for peer(%d,%d,%s,%s)",
|
||||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// invalid event data
|
// invalid event data
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_aborted event from %s", cts.remote_addr)
|
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_aborted event from %s", cts.RemoteAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
case PACKET_KIND_PEER_STOPPED:
|
case PACKET_KIND_PEER_STOPPED:
|
||||||
@ -651,15 +652,15 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s",
|
"Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s",
|
||||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
|
||||||
} else {
|
} else {
|
||||||
cts.svr.log.Write(cts.sid, LOG_DEBUG,
|
cts.svr.log.Write(cts.sid, LOG_DEBUG,
|
||||||
"Handled peer_stopped event from %s for peer(%d,%d,%s,%s)",
|
"Handled peer_stopped event from %s for peer(%d,%d,%s,%s)",
|
||||||
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// invalid event data
|
// invalid event data
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr)
|
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.RemoteAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
case PACKET_KIND_PEER_DATA:
|
case PACKET_KIND_PEER_DATA:
|
||||||
@ -672,15 +673,15 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
cts.svr.log.Write(cts.sid, LOG_ERROR,
|
||||||
"Failed to handle peer_data event from %s for peer(%d,%d) - %s",
|
"Failed to handle peer_data event from %s for peer(%d,%d) - %s",
|
||||||
cts.remote_addr, x.Data.RouteId, x.Data.PeerId, err.Error())
|
cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId, err.Error())
|
||||||
} else {
|
} else {
|
||||||
cts.svr.log.Write(cts.sid, LOG_DEBUG,
|
cts.svr.log.Write(cts.sid, LOG_DEBUG,
|
||||||
"Handled peer_data event from %s for peer(%d,%d)",
|
"Handled peer_data event from %s for peer(%d,%d)",
|
||||||
cts.remote_addr, x.Data.RouteId, x.Data.PeerId)
|
cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId)
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// invalid event data
|
// invalid event data
|
||||||
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr)
|
cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_data event from %s", cts.RemoteAddr)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1118,6 +1119,10 @@ func (s *Server) SetWpxResponseTransformer(tf ServerWpxResponseTransformer) {
|
|||||||
s.wpx_resp_tf = tf
|
s.wpx_resp_tf = tf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) GetWpxResponseTransformer() ServerWpxResponseTransformer {
|
||||||
|
return s.wpx_resp_tf
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error {
|
func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error {
|
||||||
var l *net.TCPListener
|
var l *net.TCPListener
|
||||||
var err error
|
var err error
|
||||||
@ -1350,8 +1355,8 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
|
|||||||
|
|
||||||
cts.svr = s
|
cts.svr = s
|
||||||
cts.route_map = make(ServerRouteMap)
|
cts.route_map = make(ServerRouteMap)
|
||||||
cts.remote_addr = *remote_addr
|
cts.RemoteAddr = *remote_addr
|
||||||
cts.local_addr = *local_addr
|
cts.LocalAddr = *local_addr
|
||||||
cts.pss = &GuardedPacketStreamServer{Hodu_PacketStreamServer: pss}
|
cts.pss = &GuardedPacketStreamServer{Hodu_PacketStreamServer: pss}
|
||||||
|
|
||||||
cts.stop_req.Store(false)
|
cts.stop_req.Store(false)
|
||||||
@ -1376,21 +1381,21 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
|
|||||||
return nil, fmt.Errorf("unable to assign id")
|
return nil, fmt.Errorf("unable to assign id")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
cts.id = s.cts_next_id
|
cts.Id = s.cts_next_id
|
||||||
cts.sid = fmt.Sprintf("%d", cts.id) // id in string used for logging
|
cts.sid = fmt.Sprintf("%d", cts.Id) // id in string used for logging
|
||||||
|
|
||||||
_, ok = s.cts_map_by_addr[cts.remote_addr]
|
_, ok = s.cts_map_by_addr[cts.RemoteAddr]
|
||||||
if ok {
|
if ok {
|
||||||
s.cts_mtx.Unlock()
|
s.cts_mtx.Unlock()
|
||||||
return nil, fmt.Errorf("existing client - %s", cts.remote_addr.String())
|
return nil, fmt.Errorf("existing client - %s", cts.RemoteAddr.String())
|
||||||
}
|
}
|
||||||
s.cts_map_by_addr[cts.remote_addr] = &cts
|
s.cts_map_by_addr[cts.RemoteAddr] = &cts
|
||||||
s.cts_map[cts.id] = &cts;
|
s.cts_map[cts.Id] = &cts;
|
||||||
s.cts_next_id++;
|
s.cts_next_id++;
|
||||||
s.stats.conns.Store(int64(len(s.cts_map)))
|
s.stats.conns.Store(int64(len(s.cts_map)))
|
||||||
s.cts_mtx.Unlock()
|
s.cts_mtx.Unlock()
|
||||||
|
|
||||||
s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.remote_addr.String())
|
s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.RemoteAddr.String())
|
||||||
return &cts, nil
|
return &cts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1407,18 +1412,18 @@ func (s *Server) RemoveServerConn(cts *ServerConn) error {
|
|||||||
|
|
||||||
s.cts_mtx.Lock()
|
s.cts_mtx.Lock()
|
||||||
|
|
||||||
conn, ok = s.cts_map[cts.id]
|
conn, ok = s.cts_map[cts.Id]
|
||||||
if !ok {
|
if !ok {
|
||||||
s.cts_mtx.Unlock()
|
s.cts_mtx.Unlock()
|
||||||
return fmt.Errorf("non-existent connection id - %d", cts.id)
|
return fmt.Errorf("non-existent connection id - %d", cts.Id)
|
||||||
}
|
}
|
||||||
if conn != cts {
|
if conn != cts {
|
||||||
s.cts_mtx.Unlock()
|
s.cts_mtx.Unlock()
|
||||||
return fmt.Errorf("non-existent connection id - %d", cts.id)
|
return fmt.Errorf("non-existent connection id - %d", cts.Id)
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(s.cts_map, cts.id)
|
delete(s.cts_map, cts.Id)
|
||||||
delete(s.cts_map_by_addr, cts.remote_addr)
|
delete(s.cts_map_by_addr, cts.RemoteAddr)
|
||||||
s.stats.conns.Store(int64(len(s.cts_map)))
|
s.stats.conns.Store(int64(len(s.cts_map)))
|
||||||
s.cts_mtx.Unlock()
|
s.cts_mtx.Unlock()
|
||||||
|
|
||||||
@ -1437,8 +1442,8 @@ func (s *Server) RemoveServerConnByAddr(addr net.Addr) error {
|
|||||||
s.cts_mtx.Unlock()
|
s.cts_mtx.Unlock()
|
||||||
return fmt.Errorf("non-existent connection address - %s", addr.String())
|
return fmt.Errorf("non-existent connection address - %s", addr.String())
|
||||||
}
|
}
|
||||||
delete(s.cts_map, cts.id)
|
delete(s.cts_map, cts.Id)
|
||||||
delete(s.cts_map_by_addr, cts.remote_addr)
|
delete(s.cts_map_by_addr, cts.RemoteAddr)
|
||||||
s.stats.conns.Store(int64(len(s.cts_map)))
|
s.stats.conns.Store(int64(len(s.cts_map)))
|
||||||
s.cts_mtx.Unlock()
|
s.cts_mtx.Unlock()
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user