diff --git a/server-ctl.go b/server-ctl.go index ce093db..fcdd788 100644 --- a/server-ctl.go +++ b/server-ctl.go @@ -105,18 +105,18 @@ func (ctl *server_ctl_server_conns) ServeHTTP(w http.ResponseWriter, req *http.R cts.route_mtx.Lock() for _, r = range cts.route_map { jsp = append(jsp, json_out_server_route{ - Id: r.id, - ClientPeerAddr: r.ptc_addr, - ClientPeerName: r.ptc_name, - ServerPeerServiceAddr: r.svc_addr.String(), - ServerPeerServiceNet: r.svc_permitted_net.String(), - ServerPeerOption: r.svc_option.string(), + Id: r.Id, + ClientPeerAddr: r.PtcAddr, + ClientPeerName: r.PtcName, + ServerPeerServiceAddr: r.SvcAddr.String(), + ServerPeerServiceNet: r.SvcPermNet.String(), + ServerPeerOption: r.SvcOption.string(), }) } js = append(js, json_out_server_conn{ - Id: cts.id, - ClientAddr: cts.remote_addr.String(), - ServerAddr: cts.local_addr.String(), + Id: cts.Id, + ClientAddr: cts.RemoteAddr.String(), + ServerAddr: cts.LocalAddr.String(), Routes: jsp, }) cts.route_mtx.Unlock() @@ -177,18 +177,18 @@ func (ctl *server_ctl_server_conns_id) ServeHTTP(w http.ResponseWriter, req *htt cts.route_mtx.Lock() for _, r = range cts.route_map { jsp = append(jsp, json_out_server_route{ - Id: r.id, - ClientPeerAddr: r.ptc_addr, - ClientPeerName: r.ptc_name, - ServerPeerServiceAddr: r.svc_addr.String(), - ServerPeerServiceNet: r.svc_permitted_net.String(), - ServerPeerOption: r.svc_option.string(), + Id: r.Id, + ClientPeerAddr: r.PtcAddr, + ClientPeerName: r.PtcName, + ServerPeerServiceAddr: r.SvcAddr.String(), + ServerPeerServiceNet: r.SvcPermNet.String(), + ServerPeerOption: r.SvcOption.string(), }) } js = &json_out_server_conn{ - Id: cts.id, - ClientAddr: cts.remote_addr.String(), - ServerAddr: cts.local_addr.String(), + Id: cts.Id, + ClientAddr: cts.RemoteAddr.String(), + ServerAddr: cts.LocalAddr.String(), Routes: jsp, } cts.route_mtx.Unlock() @@ -247,12 +247,12 @@ func (ctl *server_ctl_server_conns_id_routes) ServeHTTP(w http.ResponseWriter, r cts.route_mtx.Lock() for _, r = range cts.route_map { jsp = append(jsp, json_out_server_route{ - Id: r.id, - ClientPeerAddr: r.ptc_addr, - ClientPeerName: r.ptc_name, - ServerPeerServiceAddr: r.svc_addr.String(), - ServerPeerServiceNet: r.svc_permitted_net.String(), - ServerPeerOption: r.svc_option.string(), + Id: r.Id, + ClientPeerAddr: r.PtcAddr, + ClientPeerName: r.PtcName, + ServerPeerServiceAddr: r.SvcAddr.String(), + ServerPeerServiceNet: r.SvcPermNet.String(), + ServerPeerOption: r.SvcOption.string(), }) } cts.route_mtx.Unlock() @@ -307,12 +307,12 @@ func (ctl *server_ctl_server_conns_id_routes_id) ServeHTTP(w http.ResponseWriter case http.MethodGet: status_code = http.StatusOK; w.WriteHeader(status_code) err = je.Encode(json_out_server_route{ - Id: r.id, - ClientPeerAddr: r.ptc_addr, - ClientPeerName: r.ptc_name, - ServerPeerServiceAddr: r.svc_addr.String(), - ServerPeerServiceNet: r.svc_permitted_net.String(), - ServerPeerOption: r.svc_option.string(), + Id: r.Id, + ClientPeerAddr: r.PtcAddr, + ClientPeerName: r.PtcName, + ServerPeerServiceAddr: r.SvcAddr.String(), + ServerPeerServiceNet: r.SvcPermNet.String(), + ServerPeerOption: r.SvcOption.string(), }) if err != nil { goto oops } diff --git a/server-peer.go b/server-peer.go index 4939be6..4d28842 100644 --- a/server-peer.go +++ b/server-peer.go @@ -57,18 +57,18 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) { conn_raddr = spc.conn.RemoteAddr().String() conn_laddr = spc.conn.LocalAddr().String() - pss = spc.route.cts.pss - err = pss.Send(MakePeerStartedPacket(spc.route.id, spc.conn_id, conn_raddr, conn_laddr)) + pss = spc.route.Cts.pss + err = pss.Send(MakePeerStartedPacket(spc.route.Id, spc.conn_id, conn_raddr, conn_laddr)) if err != nil { - spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, "Failed to send peer_started event(%d,%d,%s,%s) to client - %s", - spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) + spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) goto done_without_stop } // set up a timer to set waiting duration until the connection is // actually established on the client side and it's informed... - waitctx, cancel_wait = context.WithTimeout(spc.route.cts.svr.ctx, 5 * time.Second) // TODO: make this configurable + waitctx, cancel_wait = context.WithTimeout(spc.route.Cts.svr.ctx, 5 * time.Second) // TODO: make this configurable wait_for_started: for { select { @@ -95,27 +95,27 @@ wait_for_started: n, err = spc.conn.Read(buf[:]) if err != nil { if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "use of closed network connection") { // i don't like this way to check this error. - err = pss.Send(MakePeerEofPacket(spc.route.id, spc.conn_id)) + err = pss.Send(MakePeerEofPacket(spc.route.Id, spc.conn_id)) if err != nil { - spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, "Failed to send peer_eof event(%d,%d,%s,%s) to client - %s", - spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) + spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) goto done } goto wait_for_stopped } else { - spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, "Failed to read data from peer(%d,%d,%s,%s) - %s", - spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) + spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) goto done } } - err = pss.Send(MakePeerDataPacket(spc.route.id, spc.conn_id, buf[:n])) + err = pss.Send(MakePeerDataPacket(spc.route.Id, spc.conn_id, buf[:n])) if err != nil { - spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, "Failed to send data from peer(%d,%d,%s,%s) to client - %s", - spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) + spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) goto done } } @@ -131,11 +131,11 @@ wait_for_stopped: } done: - err = pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String())) + err = pss.Send(MakePeerStoppedPacket(spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String())) if err != nil { - spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, "Failed to send peer_stopped(%d,%d,%s,%s) to client - %s", - spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) + spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) // nothing much to do about the failure of sending this } @@ -192,23 +192,23 @@ func (spc *ServerPeerConn) ReportEvent(event_type PACKET_KIND, event_data interf var err error _, err = spc.conn.Write(data) if err != nil { - spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, "Failed to write data from %s to peer(%d,%d,%s) - %s", - spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), err.Error()) + spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String(), err.Error()) spc.ReqStop() } } else { // this must not happen. - spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, "Protocol error - invalid data in peer_data event from %s to peer(%d,%d,%s)", - spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String()) + spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String()) spc.ReqStop() } } else { // protocol error. the client must not relay more data from the client-side peer after EOF. - spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, "Protocol error - redundant data from %s to (%d,%d,%s)", - spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String()) + spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String()) spc.ReqStop() } diff --git a/server-proxy.go b/server-proxy.go index 5ba170c..9f7ca8c 100644 --- a/server-proxy.go +++ b/server-proxy.go @@ -352,7 +352,7 @@ func (pxy *server_proxy_http_main) req_to_proxy_url (req *http.Request, r *Serve // HTTP or HTTPS is actually a hint to the client-side peer // Use the hint to compose the URL to the client via the server-side // listening socket as if it connects to the client-side peer - if r.svc_option & RouteOption(ROUTE_OPTION_HTTPS) != 0 { + if r.SvcOption & RouteOption(ROUTE_OPTION_HTTPS) != 0 { proxy_proto = "https" } else { proxy_proto = "http" @@ -363,7 +363,7 @@ func (pxy *server_proxy_http_main) req_to_proxy_url (req *http.Request, r *Serve return &url.URL{ Scheme: proxy_proto, - Host: r.ptc_addr, + Host: r.PtcAddr, Path: proxy_url_path, RawQuery: req.URL.RawQuery, Fragment: req.URL.Fragment, @@ -402,13 +402,13 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re } /* - if r.svc_option & (RouteOption(ROUTE_OPTION_HTTP) | RouteOption(ROUTE_OPTION_HTTPS)) == 0 { + if r.SvcOption & (RouteOption(ROUTE_OPTION_HTTP) | RouteOption(ROUTE_OPTION_HTTPS)) == 0 { status_code = http.StatusForbidden; w.WriteHeader(status_code) err = fmt.Errorf("target not http/https") goto oops } */ - addr = svc_addr_to_dst_addr(r.svc_addr) + addr = svc_addr_to_dst_addr(r.SvcAddr) transport, err = pxy.addr_to_transport(s.ctx, addr) if err != nil { status_code = http.StatusBadGateway; w.WriteHeader(status_code) @@ -456,7 +456,7 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re resp_body = resp.Body if in_wpx_mode && s.wpx_resp_tf != nil { - resp_body = s.wpx_resp_tf(path_prefix, resp) + resp_body = s.wpx_resp_tf(r, path_prefix, resp) } outhdr = w.Header() @@ -620,13 +620,13 @@ func (pxy *server_proxy_ssh_ws) connect_ssh (ctx context.Context, username strin } /* Is this protection needed? - if r.svc_option & RouteOption(ROUTE_OPTION_SSH) == 0 { + if r.SvcOption & RouteOption(ROUTE_OPTION_SSH) == 0 { err = fmt.Errorf("target not ssh") goto oops } */ - addr = svc_addr_to_dst_addr(r.svc_addr); + addr = svc_addr_to_dst_addr(r.SvcAddr); dialer = &net.Dialer{} conn, err = dialer.DialContext(ctx, "tcp", addr.String()) diff --git a/server.go b/server.go index 09655c0..5a82fbd 100644 --- a/server.go +++ b/server.go @@ -34,7 +34,7 @@ type ServerRouteMap = map[RouteId]*ServerRoute type ServerPeerConnMap = map[PeerId]*ServerPeerConn type ServerSvcPortMap = map[PortId]ConnRouteId -type ServerWpxResponseTransformer func(path_prefix string, resp *http.Response) io.Reader +type ServerWpxResponseTransformer func(r *ServerRoute, path_prefix string, resp *http.Response) io.Reader type Server struct { ctx context.Context @@ -97,11 +97,11 @@ type Server struct { // client connect to the server, the server accept it, and makes a tunnel request type ServerConn struct { svr *Server - id ConnId + Id ConnId sid string // for logging - remote_addr net.Addr // client address that created this structure - local_addr net.Addr // local address that the client is connected to + RemoteAddr net.Addr // client address that created this structure + LocalAddr net.Addr // local address that the client is connected to pss *GuardedPacketStreamServer route_mtx sync.Mutex @@ -114,16 +114,17 @@ type ServerConn struct { } type ServerRoute struct { - cts *ServerConn - svc_l *net.TCPListener - svc_addr *net.TCPAddr // actual listening address - svc_requested_addr string - svc_permitted_net netip.Prefix - svc_option RouteOption + Cts *ServerConn + Id RouteId - ptc_addr string - ptc_name string - id RouteId + svc_l *net.TCPListener + SvcAddr *net.TCPAddr // actual listening address + SvcReqAddr string + SvcPermNet netip.Prefix // network from which access is allowed + SvcOption RouteOption + + PtcAddr string + PtcName string pts_mtx sync.Mutex pts_map ServerPeerConnMap @@ -197,16 +198,16 @@ func NewServerRoute(cts *ServerConn, id RouteId, option RouteOption, ptc_addr st } } - r.cts = cts - r.id = id + r.Cts = cts + r.Id = id r.svc_l = l - r.svc_addr = svcaddr - r.svc_requested_addr = svc_requested_addr - r.svc_permitted_net = svcnet - r.svc_option = option + r.SvcAddr = svcaddr + r.SvcReqAddr = svc_requested_addr + r.SvcPermNet = svcnet + r.SvcOption = option - r.ptc_addr = ptc_addr - r.ptc_name = ptc_name + r.PtcAddr = ptc_addr + r.PtcName = ptc_name r.pts_limit = PTS_LIMIT r.pts_map = make(ServerPeerConnMap) r.pts_next_id = 0 @@ -243,7 +244,7 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err pts = NewServerPeerConn(r, c, r.pts_next_id) r.pts_map[pts.conn_id] = pts r.pts_next_id++ - r.cts.svr.stats.peers.Add(1) + r.Cts.svr.stats.peers.Add(1) return pts, nil } @@ -251,9 +252,9 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) { r.pts_mtx.Lock() delete(r.pts_map, pts.conn_id) - r.cts.svr.stats.peers.Add(-1) + r.Cts.svr.stats.peers.Add(-1) r.pts_mtx.Unlock() - r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Removed server-side peer connection %s from route(%d)", pts.conn.RemoteAddr().String(), r.id) + r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "Removed server-side peer connection %s from route(%d)", pts.conn.RemoteAddr().String(), r.Id) } func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { @@ -269,9 +270,9 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { conn, err = r.svc_l.AcceptTCP() // this call is blocking... if err != nil { if errors.Is(err, net.ErrClosed) { - r.cts.svr.log.Write(r.cts.sid, LOG_INFO, "Server-side peer listener closed on route(%d)", r.id) + r.Cts.svr.log.Write(r.Cts.sid, LOG_INFO, "Server-side peer listener closed on route(%d)", r.Id) } else { - r.cts.svr.log.Write(r.cts.sid, LOG_INFO, "Server-side peer listener error on route(%d) - %s", r.id, err.Error()) + r.Cts.svr.log.Write(r.Cts.sid, LOG_INFO, "Server-side peer listener error on route(%d) - %s", r.Id, err.Error()) } break } @@ -279,22 +280,22 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { raddr = conn.RemoteAddr().(*net.TCPAddr) iaddr, _ = netip.AddrFromSlice(raddr.IP) - if !r.svc_permitted_net.Contains(iaddr) { - r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed range %v", raddr.String(), r.id, r.svc_permitted_net) + if !r.SvcPermNet.Contains(iaddr) { + r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed range %v", raddr.String(), r.Id, r.SvcPermNet) conn.Close() } - if r.cts.svr.pts_limit > 0 && int(r.cts.svr.stats.peers.Load()) >= r.cts.svr.pts_limit { - r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed max %d", raddr.String(), r.id, r.cts.svr.pts_limit) + if r.Cts.svr.pts_limit > 0 && int(r.Cts.svr.stats.peers.Load()) >= r.Cts.svr.pts_limit { + r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed max %d", raddr.String(), r.Id, r.Cts.svr.pts_limit) conn.Close() } pts, err = r.AddNewServerPeerConn(conn) if err != nil { - r.cts.svr.log.Write(r.cts.sid, LOG_ERROR, "Failed to add server-side peer %s to route(%d) - %s", r.id, raddr.String(), r.id, err.Error()) + r.Cts.svr.log.Write(r.Cts.sid, LOG_ERROR, "Failed to add server-side peer %s to route(%d) - %s", r.Id, raddr.String(), r.Id, err.Error()) conn.Close() } else { - r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Added server-side peer %s to route(%d)", raddr.String(), r.id) + r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "Added server-side peer %s to route(%d)", raddr.String(), r.Id) r.pts_wg.Add(1) go pts.RunTask(&r.pts_wg) } @@ -303,9 +304,9 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { r.ReqStop() r.pts_wg.Wait() - r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "All service-side peer handlers ended on route(%d)", r.id) + r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "All service-side peer handlers ended on route(%d)", r.Id) - r.cts.RemoveServerRoute(r) // final phase... + r.Cts.RemoveServerRoute(r) // final phase... } func (r *ServerRoute) ReqStop() { @@ -386,14 +387,14 @@ func (cts *ServerConn) make_route_listener(id RouteId, option RouteOption, svc_r if ok { cts.svr.log.Write(cts.sid, LOG_ERROR, "Route(%d,%d) on %s not unique by port number - existing route(%d,%d)", - cts.id, id, prev_cri.conn_id, prev_cri.route_id, svcaddr.String()) + cts.Id, id, prev_cri.conn_id, prev_cri.route_id, svcaddr.String()) l.Close() return nil, nil, err } - cts.svr.svc_port_map[PortId(svcaddr.Port)] = ConnRouteId{conn_id: cts.id, route_id: id} + cts.svr.svc_port_map[PortId(svcaddr.Port)] = ConnRouteId{conn_id: cts.Id, route_id: id} cts.svr.svc_port_mtx.Unlock() - cts.svr.log.Write(cts.sid, LOG_DEBUG, "Route(%d,%d) listening on %s", cts.id, id, svcaddr.String()) + cts.svr.log.Write(cts.sid, LOG_DEBUG, "Route(%d,%d) listening on %s", cts.Id, id, svcaddr.String()) return l, svcaddr, nil } @@ -429,21 +430,21 @@ func (cts *ServerConn) RemoveServerRoute(route *ServerRoute) error { var ok bool cts.route_mtx.Lock() - r, ok = cts.route_map[route.id] + r, ok = cts.route_map[route.Id] if !ok { cts.route_mtx.Unlock() - return fmt.Errorf("non-existent route id - %d", route.id) + return fmt.Errorf("non-existent route id - %d", route.Id) } if r != route { cts.route_mtx.Unlock() - return fmt.Errorf("non-existent route - %d", route.id) + return fmt.Errorf("non-existent route - %d", route.Id) } - delete(cts.route_map, route.id) + delete(cts.route_map, route.Id) cts.svr.stats.routes.Add(-1) cts.route_mtx.Unlock() cts.svr.svc_port_mtx.Lock() - delete(cts.svr.svc_port_map, PortId(route.svc_addr.Port)) + delete(cts.svr.svc_port_map, PortId(route.SvcAddr.Port)) cts.svr.svc_port_mtx.Unlock() r.ReqStop() @@ -465,7 +466,7 @@ func (cts *ServerConn) RemoveServerRouteById(route_id RouteId) (*ServerRoute, er cts.route_mtx.Unlock() cts.svr.svc_port_mtx.Lock() - delete(cts.svr.svc_port_map, PortId(r.svc_addr.Port)) + delete(cts.svr.svc_port_map, PortId(r.SvcAddr.Port)) cts.svr.svc_port_mtx.Unlock() r.ReqStop() @@ -519,11 +520,11 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { for { pkt, err = cts.pss.Recv() if errors.Is(err, io.EOF) { - cts.svr.log.Write(cts.sid, LOG_INFO, "RPC stream closed for client %s", cts.remote_addr) + cts.svr.log.Write(cts.sid, LOG_INFO, "RPC stream closed for client %s", cts.RemoteAddr) goto done } if err != nil { - cts.svr.log.Write(cts.sid, LOG_ERROR, "RPC stream error for client %s - %s", cts.remote_addr, err.Error()) + cts.svr.log.Write(cts.sid, LOG_ERROR, "RPC stream error for client %s - %s", cts.RemoteAddr, err.Error()) goto done } @@ -539,35 +540,35 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { if err != nil { cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to add route(%d,%s) for %s - %s", - x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error()) + x.Route.RouteId, x.Route.TargetAddrStr, cts.RemoteAddr, err.Error()) err = cts.pss.Send(MakeRouteStoppedPacket(RouteId(x.Route.RouteId), RouteOption(x.Route.ServiceOption), x.Route.TargetAddrStr, x.Route.TargetName, x.Route.ServiceAddrStr, x.Route.ServiceNetStr)) if err != nil { cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to send route_stopped event(%d,%s,%v,%s) to client %s - %s", - x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.remote_addr, err.Error()) + x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr, err.Error()) goto done } else { cts.svr.log.Write(cts.sid, LOG_DEBUG, "Sent route_stopped event(%d,%s,%v,%s) to client %s", - x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.remote_addr) + x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr) } } else { cts.svr.log.Write(cts.sid, LOG_INFO, "Added route(%d,%s,%s,%v,%v) for client %s to cts(%d)", - r.id, r.ptc_addr, r.svc_addr.String(), r.svc_option, r.svc_permitted_net, cts.remote_addr, cts.id) - err = cts.pss.Send(MakeRouteStartedPacket(r.id, r.svc_option, r.svc_addr.String(), r.ptc_name, r.svc_requested_addr, r.svc_permitted_net.String())) + r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet, cts.RemoteAddr, cts.Id) + err = cts.pss.Send(MakeRouteStartedPacket(r.Id, r.SvcOption, r.SvcAddr.String(), r.PtcName, r.SvcReqAddr, r.SvcPermNet.String())) if err != nil { r.ReqStop() cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to send route_started event(%d,%s,%s,%s%v,%v) to client %s - %s", - r.id, r.ptc_addr, r.svc_addr.String(), r.svc_option, r.svc_permitted_net, cts.remote_addr, err.Error()) + r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet, cts.RemoteAddr, err.Error()) goto done } } } else { - cts.svr.log.Write(cts.sid, LOG_INFO, "Received invalid packet from %s", cts.remote_addr) + cts.svr.log.Write(cts.sid, LOG_INFO, "Received invalid packet from %s", cts.RemoteAddr) // TODO: need to abort this client? } @@ -582,22 +583,22 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { if err != nil { cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to delete route(%d,%s) for client %s - %s", - x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error()) + x.Route.RouteId, x.Route.TargetAddrStr, cts.RemoteAddr, err.Error()) } else { cts.svr.log.Write(cts.sid, LOG_ERROR, "Deleted route(%d,%s,%s,%v,%v) for client %s", - r.id, r.ptc_addr, r.svc_addr.String(), r.svc_option, r.svc_permitted_net.String(), cts.remote_addr) - err = cts.pss.Send(MakeRouteStoppedPacket(r.id, r.svc_option, r.ptc_addr, r.ptc_name, r.svc_requested_addr, r.svc_permitted_net.String())) + r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet.String(), cts.RemoteAddr) + err = cts.pss.Send(MakeRouteStoppedPacket(r.Id, r.SvcOption, r.PtcAddr, r.PtcName, r.SvcReqAddr, r.SvcPermNet.String())) if err != nil { r.ReqStop() cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to send route_stopped event(%d,%s,%s,%v.%v) to client %s - %s", - r.id, r.ptc_addr, r.svc_addr.String(), r.svc_option, r.svc_permitted_net.String(), cts.remote_addr, err.Error()) + r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet.String(), cts.RemoteAddr, err.Error()) goto done } } } else { - cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid route_stop event from %s", cts.remote_addr) + cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid route_stop event from %s", cts.RemoteAddr) } case PACKET_KIND_PEER_STARTED: @@ -610,15 +611,15 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { if err != nil { cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s", - cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) + cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { cts.svr.log.Write(cts.sid, LOG_DEBUG, "Handled peer_started event from %s for peer(%d,%d,%s,%s)", - cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) + cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { // invalid event data - cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr) + cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_started event from %s", cts.RemoteAddr) } case PACKET_KIND_PEER_ABORTED: @@ -630,15 +631,15 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { if err != nil { cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to handle peer_aborted event from %s for peer(%d,%d,%s,%s) - %s", - cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) + cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { cts.svr.log.Write(cts.sid, LOG_DEBUG, "Handled peer_aborted event from %s for peer(%d,%d,%s,%s)", - cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) + cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { // invalid event data - cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_aborted event from %s", cts.remote_addr) + cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_aborted event from %s", cts.RemoteAddr) } case PACKET_KIND_PEER_STOPPED: @@ -651,15 +652,15 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { if err != nil { cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s", - cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) + cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { cts.svr.log.Write(cts.sid, LOG_DEBUG, "Handled peer_stopped event from %s for peer(%d,%d,%s,%s)", - cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) + cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { // invalid event data - cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr) + cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.RemoteAddr) } case PACKET_KIND_PEER_DATA: @@ -672,15 +673,15 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { if err != nil { cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to handle peer_data event from %s for peer(%d,%d) - %s", - cts.remote_addr, x.Data.RouteId, x.Data.PeerId, err.Error()) + cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId, err.Error()) } else { cts.svr.log.Write(cts.sid, LOG_DEBUG, "Handled peer_data event from %s for peer(%d,%d)", - cts.remote_addr, x.Data.RouteId, x.Data.PeerId) + cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId) } } else { // invalid event data - cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr) + cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_data event from %s", cts.RemoteAddr) } } } @@ -1118,6 +1119,10 @@ func (s *Server) SetWpxResponseTransformer(tf ServerWpxResponseTransformer) { s.wpx_resp_tf = tf } +func (s *Server) GetWpxResponseTransformer() ServerWpxResponseTransformer { + return s.wpx_resp_tf +} + func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error { var l *net.TCPListener var err error @@ -1350,8 +1355,8 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p cts.svr = s cts.route_map = make(ServerRouteMap) - cts.remote_addr = *remote_addr - cts.local_addr = *local_addr + cts.RemoteAddr = *remote_addr + cts.LocalAddr = *local_addr cts.pss = &GuardedPacketStreamServer{Hodu_PacketStreamServer: pss} cts.stop_req.Store(false) @@ -1376,21 +1381,21 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p return nil, fmt.Errorf("unable to assign id") } } - cts.id = s.cts_next_id - cts.sid = fmt.Sprintf("%d", cts.id) // id in string used for logging + cts.Id = s.cts_next_id + cts.sid = fmt.Sprintf("%d", cts.Id) // id in string used for logging - _, ok = s.cts_map_by_addr[cts.remote_addr] + _, ok = s.cts_map_by_addr[cts.RemoteAddr] if ok { s.cts_mtx.Unlock() - return nil, fmt.Errorf("existing client - %s", cts.remote_addr.String()) + return nil, fmt.Errorf("existing client - %s", cts.RemoteAddr.String()) } - s.cts_map_by_addr[cts.remote_addr] = &cts - s.cts_map[cts.id] = &cts; + s.cts_map_by_addr[cts.RemoteAddr] = &cts + s.cts_map[cts.Id] = &cts; s.cts_next_id++; s.stats.conns.Store(int64(len(s.cts_map))) s.cts_mtx.Unlock() - s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.remote_addr.String()) + s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.RemoteAddr.String()) return &cts, nil } @@ -1407,18 +1412,18 @@ func (s *Server) RemoveServerConn(cts *ServerConn) error { s.cts_mtx.Lock() - conn, ok = s.cts_map[cts.id] + conn, ok = s.cts_map[cts.Id] if !ok { s.cts_mtx.Unlock() - return fmt.Errorf("non-existent connection id - %d", cts.id) + return fmt.Errorf("non-existent connection id - %d", cts.Id) } if conn != cts { s.cts_mtx.Unlock() - return fmt.Errorf("non-existent connection id - %d", cts.id) + return fmt.Errorf("non-existent connection id - %d", cts.Id) } - delete(s.cts_map, cts.id) - delete(s.cts_map_by_addr, cts.remote_addr) + delete(s.cts_map, cts.Id) + delete(s.cts_map_by_addr, cts.RemoteAddr) s.stats.conns.Store(int64(len(s.cts_map))) s.cts_mtx.Unlock() @@ -1437,8 +1442,8 @@ func (s *Server) RemoveServerConnByAddr(addr net.Addr) error { s.cts_mtx.Unlock() return fmt.Errorf("non-existent connection address - %s", addr.String()) } - delete(s.cts_map, cts.id) - delete(s.cts_map_by_addr, cts.remote_addr) + delete(s.cts_map, cts.Id) + delete(s.cts_map_by_addr, cts.RemoteAddr) s.stats.conns.Store(int64(len(s.cts_map))) s.cts_mtx.Unlock()