From 2b3a841299e95ad4836a2fa4315f1b8228a9bc52 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Tue, 18 Feb 2025 14:44:45 +0900 Subject: [PATCH] added default notice handling interface --- client-ctl.go | 47 +++++++++++- client-peer.go | 6 +- client.go | 205 ++++++++++++++++++++++++++----------------------- server-ctl.go | 49 +++++++++++- server-peer.go | 18 ++--- server.go | 135 +++++++++++++++++--------------- 6 files changed, 286 insertions(+), 174 deletions(-) diff --git a/client-ctl.go b/client-ctl.go index fa3a810..954c3e3 100644 --- a/client-ctl.go +++ b/client-ctl.go @@ -125,7 +125,11 @@ type client_ctl_client_conns_id_routes_id_peers_id struct { client_ctl } -type client_ctl_client_conns_id_notices struct { +type client_ctl_notices struct { + client_ctl +} + +type client_ctl_notices_id struct { client_ctl } @@ -876,7 +880,46 @@ oops: // ------------------------------------ -func (ctl *client_ctl_client_conns_id_notices) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) { +func (ctl *client_ctl_notices) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) { + var c *Client + var status_code int + var cts *ClientConn + var err error + + c = ctl.c + + switch req.Method { + case http.MethodPost: + var noti json_in_notice + + err = json.NewDecoder(req.Body).Decode(¬i) + if err != nil { + status_code = WriteEmptyRespHeader(w, http.StatusBadRequest) + goto oops + } + + c.cts_mtx.Lock() + for _, cts = range c.cts_map { + cts.psc.Send(MakeConnNoticePacket(noti.Text)) + // let's not care about an error when broacasting a notice to all connections + } + c.cts_mtx.Unlock() + status_code = WriteJsonRespHeader(w, http.StatusOK) + + default: + status_code = WriteEmptyRespHeader(w, http.StatusBadRequest) + } + +//done: + return status_code, nil + +oops: + return status_code, err +} + +// ------------------------------------ + +func (ctl *client_ctl_notices_id) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) { var c *Client var status_code int var conn_id string diff --git a/client-peer.go b/client-peer.go index e88fee6..28f5ab3 100644 --- a/client-peer.go +++ b/client-peer.go @@ -31,11 +31,11 @@ func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error { n, err = cpc.conn.Read(buf[:]) if err != nil { if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "use of closed network connection") { // i hate checking this condition with strings.Contains() - cpc.route.cts.cli.log.Write(cpc.route.cts.Sid, LOG_INFO, + cpc.route.cts.C.log.Write(cpc.route.cts.Sid, LOG_INFO, "Client-side peer(%d,%d,%s,%s) closed", cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String()) } else { - cpc.route.cts.cli.log.Write(cpc.route.cts.Sid, LOG_ERROR, + cpc.route.cts.C.log.Write(cpc.route.cts.Sid, LOG_ERROR, "Failed to read from client-side peer(%d,%d,%s,%s) - %s", cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error()) } @@ -44,7 +44,7 @@ func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error { err = cpc.route.cts.psc.Send(MakePeerDataPacket(cpc.route.Id, cpc.conn_id, buf[0:n])) if err != nil { - cpc.route.cts.cli.log.Write(cpc.route.cts.Sid, LOG_ERROR, + cpc.route.cts.C.log.Write(cpc.route.cts.Sid, LOG_ERROR, "Failed to write peer(%d,%d,%s,%s) data to server - %s", cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error()) break diff --git a/client.go b/client.go index 8387a17..cadfbd4 100644 --- a/client.go +++ b/client.go @@ -66,6 +66,10 @@ type ClientConfig struct { PeerConnTmout time.Duration } +type ClientConnNoticeHandler interface { + Handle(cts *ClientConn, text string) +} + type Client struct { Named @@ -95,7 +99,8 @@ type Client struct { stop_req atomic.Bool stop_chan chan bool - log Logger + log Logger + conn_notice ClientConnNoticeHandler route_persister ClientRoutePersister promreg *prometheus.Registry @@ -117,7 +122,7 @@ const ( // client connection to server type ClientConn struct { - cli *Client + C *Client cfg ClientConnConfigActive Id ConnId Sid string // id rendered in string @@ -242,10 +247,10 @@ func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id PeerId, pts_ra r.ptc_mtx.Lock() ptc = NewClientPeerConn(r, c, pts_id, pts_raddr, pts_laddr) r.ptc_map[ptc.conn_id] = ptc - r.cts.cli.stats.peers.Add(1) + r.cts.C.stats.peers.Add(1) r.ptc_mtx.Unlock() - r.cts.cli.log.Write(r.cts.Sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) + r.cts.C.log.Write(r.cts.Sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) return ptc, nil } @@ -264,10 +269,10 @@ func (r *ClientRoute) RemoveClientPeerConn(ptc *ClientPeerConn) error { return fmt.Errorf("conflicting peer id - %d", ptc.conn_id) } delete(r.ptc_map, ptc.conn_id) - r.cts.cli.stats.peers.Add(-1) + r.cts.C.stats.peers.Add(-1) r.ptc_mtx.Unlock() - r.cts.cli.log.Write(r.cts.Sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) + r.cts.C.log.Write(r.cts.Sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) ptc.ReqStop() return nil } @@ -306,7 +311,7 @@ func (r *ClientRoute) ExtendLifetime(lifetime time.Duration) error { r.Lifetime = r.Lifetime + lifetime expiry = r.LifetimeStart.Add(r.Lifetime) r.lifetime_timer.Reset(expiry.Sub(time.Now())) - if r.cts.cli.route_persister != nil { r.cts.cli.route_persister.Save(r.cts, r) } + if r.cts.C.route_persister != nil { r.cts.C.route_persister.Save(r.cts, r) } return nil } } @@ -323,7 +328,7 @@ func (r *ClientRoute) ResetLifetime(lifetime time.Duration) error { r.Lifetime = lifetime r.LifetimeStart = time.Now() r.lifetime_timer.Reset(lifetime) - if r.cts.cli.route_persister != nil { r.cts.cli.route_persister.Save(r.cts, r) } + if r.cts.C.route_persister != nil { r.cts.C.route_persister.Save(r.cts, r) } return nil } } @@ -338,12 +343,12 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) { err = r.cts.psc.Send(MakeRouteStartPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ServerPeerAddr, r.ServerPeerNet)) if err != nil { - r.cts.cli.log.Write(r.cts.Sid, LOG_DEBUG, + r.cts.C.log.Write(r.cts.Sid, LOG_DEBUG, "Failed to send route_start for route(%d,%s,%v,%v) to %s", r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr) goto done } else { - r.cts.cli.log.Write(r.cts.Sid, LOG_DEBUG, + r.cts.C.log.Write(r.cts.Sid, LOG_DEBUG, "Sent route_start for route(%d,%s,%v,%v) to %s", r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr) } @@ -363,7 +368,7 @@ main_loop: break main_loop case <-r.lifetime_timer.C: - r.cts.cli.log.Write(r.cts.Sid, LOG_INFO, "route(%d,%s,%v,%v) reached end of lifetime(%v)", + r.cts.C.log.Write(r.cts.Sid, LOG_INFO, "route(%d,%s,%v,%v) reached end of lifetime(%v)", r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.Lifetime) break main_loop } @@ -388,11 +393,11 @@ done: err = r.cts.psc.Send(MakeRouteStopPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ServerPeerAddr, r.ServerPeerNet)) if err != nil { - r.cts.cli.log.Write(r.cts.Sid, LOG_DEBUG, + r.cts.C.log.Write(r.cts.Sid, LOG_DEBUG, "Failed to route_stop for route(%d,%s,%v,%v) to %s - %s", r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr, err.Error()) } else { - r.cts.cli.log.Write(r.cts.Sid, LOG_DEBUG, + r.cts.C.log.Write(r.cts.Sid, LOG_DEBUG, "Sent route_stop for route(%d,%s,%v,%v) to %s", r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr) } @@ -431,9 +436,9 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts defer wg.Done() - tmout = time.Duration(r.cts.cli.ptc_tmout) + tmout = time.Duration(r.cts.C.ptc_tmout) if tmout <= 0 { tmout = 5 * time.Second} // TODO: make this configurable... - waitctx, cancel_wait = context.WithTimeout(r.cts.cli.ctx, tmout) + waitctx, cancel_wait = context.WithTimeout(r.cts.C.ctx, tmout) r.ptc_mtx.Lock() r.ptc_cancel_map[pts_id] = cancel_wait r.ptc_mtx.Unlock() @@ -447,7 +452,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts r.ptc_mtx.Unlock() if err != nil { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Failed to connect to %s for route(%d,%d,%s,%s) - %s", r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted @@ -455,7 +460,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts real_conn, ok = conn.(*net.TCPConn) if !ok { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Failed to get connection information to %s for route(%d,%d,%s,%s) - %s", r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted @@ -465,7 +470,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts real_conn_laddr = real_conn.LocalAddr().String() ptc, err = r.AddNewClientPeerConn(real_conn, pts_id, pts_raddr, pts_laddr) if err != nil { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Failed to add client peer %s for route(%d,%d,%s,%s) - %s", r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted @@ -475,7 +480,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts err = r.cts.psc.Send(MakePeerStartedPacket(r.Id, ptc.conn_id, real_conn_raddr, real_conn_laddr)) if err != nil { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Failed to send peer_start(%d,%d,%s,%s) for route(%d,%d,%s,%s) - %s", r.Id, ptc.conn_id, real_conn_raddr, real_conn_laddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error()) @@ -490,7 +495,7 @@ peer_aborted: // real_conn_radd and real_conn_laddr may be empty depending on when the jump to here is made. err = r.cts.psc.Send(MakePeerAbortedPacket(r.Id, pts_id, real_conn_raddr, real_conn_laddr)) if err != nil { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s", r.Id, pts_id, r.Id, pts_id, pts_raddr, pts_laddr, err.Error()) } @@ -525,13 +530,13 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d var rd *RouteDesc rd, ok = event_data.(*RouteDesc) if !ok { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id) + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id) r.ReqStop() } else { var addr *net.TCPAddr addr, err = net.ResolveTCPAddr(TcpAddrStrClass(rd.TargetAddrStr), rd.TargetAddrStr) if err != nil { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid service address(%s) for server peer in route_started event(%d)", rd.TargetAddrStr, r.Id) + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid service address(%s) for server peer in route_started event(%d)", rd.TargetAddrStr, r.Id) r.ReqStop() } else { r.server_peer_listen_addr = addr @@ -546,7 +551,7 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d // but the server must be able to handle this case as invalid route. var ok bool _, ok = event_data.(*RouteDesc) - if !ok { r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id) } + if !ok { r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id) } r.ReqStop() case PACKET_KIND_PEER_STARTED: @@ -555,18 +560,18 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d pd, ok = event_data.(*PeerDesc) if !ok { - r.cts.cli.log.Write(r.cts.Sid, LOG_WARN, + r.cts.C.log.Write(r.cts.Sid, LOG_WARN, "Protocol error - invalid data in peer_started event(%d,%d)", r.Id, pts_id) // ignore it. don't want to delete the whole route } else { - if r.cts.cli.ptc_limit > 0 && int(r.cts.cli.stats.peers.Load()) >= r.cts.cli.ptc_limit { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + if r.cts.C.ptc_limit > 0 && int(r.cts.C.stats.peers.Load()) >= r.cts.C.ptc_limit { + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Rejecting to connect to peer(%s)for route(%d,%d) - allowed max %d", - r.PeerAddr, r.Id, pts_id, r.cts.cli.ptc_limit) + r.PeerAddr, r.Id, pts_id, r.cts.C.ptc_limit) err = r.cts.psc.Send(MakePeerAbortedPacket(r.Id, pts_id, "", "")) if err != nil { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s", r.Id, pts_id, r.Id, pts_id, "", "", err.Error()) } @@ -586,13 +591,13 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d pd, ok = event_data.(*PeerDesc) if !ok { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in peer_aborted event(%d,%d)", r.Id, pts_id) ptc.ReqStop() } else { err = r.DisconnectFromPeer(ptc) if err != nil { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Failed to disconnect from peer(%d,%d,%s,%s) - %s", r.Id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error()) ptc.ReqStop() @@ -610,14 +615,14 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d pd, ok = event_data.(*PeerDesc) if !ok { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in peer_stopped event(%d,%d)", r.Id, pts_id) ptc.ReqStop() } else { err = r.DisconnectFromPeer(ptc) if err != nil { - r.cts.cli.log.Write(r.cts.Sid, LOG_WARN, + r.cts.C.log.Write(r.cts.Sid, LOG_WARN, "Failed to disconnect from peer(%d,%d,%s,%s) - %s", r.Id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error()) ptc.ReqStop() @@ -634,7 +639,7 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d _, ok = event_data.(*PeerDesc) if !ok { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in peer_eof event(%d,%d)", r.Id, pts_id) ptc.ReqStop() @@ -653,14 +658,14 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d data, ok = event_data.([]byte) if !ok { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in peer_data event(%d,%d)", r.Id, pts_id) ptc.ReqStop() } else { _, err = ptc.conn.Write(data) if err != nil { - r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, + r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Failed to write to peer(%d,%d,%s,%s) - %s", r.Id, pts_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String(), err.Error()) ptc.ReqStop() @@ -680,7 +685,7 @@ func NewClientConn(c *Client, cfg *ClientConnConfig) *ClientConn { var cts ClientConn var i int - cts.cli = c + cts.C = c cts.route_map = make(ClientRouteMap) cts.route_next_id = 1 cts.cfg.ClientConnConfig = *cfg @@ -737,11 +742,11 @@ func (cts *ClientConn) AddNewClientRoute(rc *ClientRouteConfig) (*ClientRoute, e r = NewClientRoute(cts, assigned_id, rc.Static, rc.PeerAddr, rc.PeerName, rc.ServiceAddr, rc.ServiceNet, rc.Option, rc.Lifetime) cts.route_map[r.Id] = r - cts.cli.stats.routes.Add(1) - if cts.cli.route_persister != nil { cts.cli.route_persister.Save(cts, r) } + cts.C.stats.routes.Add(1) + if cts.C.route_persister != nil { cts.C.route_persister.Save(cts, r) } cts.route_mtx.Unlock() - cts.cli.log.Write(cts.Sid, LOG_INFO, "Added route(%d,%d) %s", cts.Id, r.Id, r.PeerAddr) + cts.C.log.Write(cts.Sid, LOG_INFO, "Added route(%d,%d) %s", cts.Id, r.Id, r.PeerAddr) cts.route_wg.Add(1) go r.RunTask(&cts.route_wg) @@ -764,7 +769,7 @@ func (cts *ClientConn) RemoveAllClientRoutes() { for _, r = range cts.route_map { delete(cts.route_map, r.Id) - cts.cli.stats.routes.Add(-1) + cts.C.stats.routes.Add(-1) r.ReqStop() } }*/ @@ -784,13 +789,13 @@ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error { return fmt.Errorf("conflicting route id - %d", route.Id) } delete(cts.route_map, route.Id) - cts.cli.stats.routes.Add(-1) - if cts.cli.route_persister != nil { - cts.cli.route_persister.Delete(cts, r) + cts.C.stats.routes.Add(-1) + if cts.C.route_persister != nil { + cts.C.route_persister.Delete(cts, r) } cts.route_mtx.Unlock() - cts.cli.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", route.Id, route.PeerAddr) + cts.C.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", route.Id, route.PeerAddr) r.ReqStop() return nil @@ -807,11 +812,11 @@ func (cts *ClientConn) RemoveClientRouteById(route_id RouteId) error { return fmt.Errorf("non-existent route id - %d", route_id) } delete(cts.route_map, route_id) - cts.cli.stats.routes.Add(-1) - if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) } + cts.C.stats.routes.Add(-1) + if cts.C.route_persister != nil { cts.C.route_persister.Delete(cts, r) } cts.route_mtx.Unlock() - cts.cli.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr) + cts.C.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr) r.ReqStop() return nil @@ -827,10 +832,10 @@ func (cts *ClientConn) RemoveClientRouteByServerPeerSvcPortId(port_id PortId) er for _, r = range cts.route_map { if r.server_peer_listen_addr.Port == int(port_id) { delete(cts.route_map, r.Id) - cts.cli.stats.routes.Add(-1) - if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) } + cts.C.stats.routes.Add(-1) + if cts.C.route_persister != nil { cts.C.route_persister.Delete(cts, r) } cts.route_mtx.Unlock() - cts.cli.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr) + cts.C.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr) r.ReqStop() return nil } @@ -945,17 +950,17 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { start_over: cts.State = CLIENT_CONN_CONNECTING cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs) - cts.cli.log.Write(cts.Sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) - if cts.cli.rpc_tls == nil { + cts.C.log.Write(cts.Sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) + if cts.C.rpc_tls == nil { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) if cts.cfg.ServerAuthority != "" { opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority)) } } else { - opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpc_tls))) + opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.C.rpc_tls))) // set the http2 :authority header with tls server name defined. if cts.cfg.ServerAuthority != "" { opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority)) - } else if cts.cli.rpc_tls.ServerName != "" { - opts = append(opts, grpc.WithAuthority(cts.cli.rpc_tls.ServerName)) + } else if cts.C.rpc_tls.ServerName != "" { + opts = append(opts, grpc.WithAuthority(cts.C.rpc_tls.ServerName)) } } if cts.cfg.ServerSeedTmout > 0 { @@ -964,7 +969,7 @@ start_over: cts.conn, err = grpc.NewClient(cts.cfg.ServerAddrs[cts.cfg.Index], opts...) if err != nil { - cts.cli.log.Write(cts.Sid, LOG_ERROR, "Failed to make client to server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) + cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to make client to server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) goto reconnect_to_server } cts.hdc = NewHoduClient(cts.conn) @@ -973,20 +978,20 @@ start_over: // there is nothing to do much about it for now. c_seed.Version = HODU_RPC_VERSION c_seed.Flags = 0 - s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed) + s_seed, err = cts.hdc.GetSeed(cts.C.ctx, &c_seed) if err != nil { - cts.cli.log.Write(cts.Sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) + cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) goto reconnect_to_server } cts.s_seed = *s_seed cts.c_seed = c_seed cts.route_next_id = 1 // reset this whenever a new connection is made. the number of routes must be zero. - cts.cli.log.Write(cts.Sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version) + cts.C.log.Write(cts.Sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version) - psc, err = cts.hdc.PacketStream(cts.cli.ctx) + psc, err = cts.hdc.PacketStream(cts.C.ctx) if err != nil { - cts.cli.log.Write(cts.Sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) + cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error()) goto reconnect_to_server } @@ -996,7 +1001,7 @@ start_over: cts.local_addr = p.LocalAddr.String() } - cts.cli.log.Write(cts.Sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) + cts.C.log.Write(cts.Sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) cts.State = CLIENT_CONN_CONNECTED @@ -1007,22 +1012,22 @@ start_over: // let's add statically configured routes to the client-side peers err = cts.add_client_routes(cts.cfg.Routes) if err != nil { - cts.cli.log.Write(cts.Sid, LOG_ERROR, "Failed to add routes to server[%d] %s for %v - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.cfg.Routes, err.Error()) + cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to add routes to server[%d] %s for %v - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.cfg.Routes, err.Error()) goto done } } - if cts.cli.route_persister != nil { + if cts.C.route_persister != nil { // restore the client routes added and saved via the control channel - cts.cli.route_persister.LoadAll(cts) + cts.C.route_persister.LoadAll(cts) } for { var pkt *Packet select { - case <-cts.cli.ctx.Done(): - // need to log cts.cli.ctx.Err().Error()? + case <-cts.C.ctx.Done(): + // need to log cts.C.ctx.Err().Error()? goto done case <-cts.stop_chan: @@ -1038,7 +1043,7 @@ start_over: if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) { goto reconnect_to_server } else { - cts.cli.log.Write(cts.Sid, LOG_INFO, "Failed to receive packet from %s - %s", cts.remote_addr, err.Error()) + cts.C.log.Write(cts.Sid, LOG_INFO, "Failed to receive packet from %s - %s", cts.remote_addr, err.Error()) goto reconnect_to_server } } @@ -1052,16 +1057,16 @@ start_over: if ok { err = cts.ReportEvent(RouteId(x.Route.RouteId), 0, pkt.Kind, x.Route) if err != nil { - cts.cli.log.Write(cts.Sid, LOG_ERROR, + cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to handle route_started event(%d,%s) from %s - %s", x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error()) } else { - cts.cli.log.Write(cts.Sid, LOG_DEBUG, + cts.C.log.Write(cts.Sid, LOG_DEBUG, "Handled route_started event(%d,%s) from %s", x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr) } } else { - cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid route_started event from %s", cts.remote_addr) + cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid route_started event from %s", cts.remote_addr) } case PACKET_KIND_ROUTE_STOPPED: @@ -1071,16 +1076,16 @@ start_over: if ok { err = cts.ReportEvent(RouteId(x.Route.RouteId), 0, pkt.Kind, x.Route) if err != nil { - cts.cli.log.Write(cts.Sid, LOG_ERROR, + cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to handle route_stopped event(%d,%s) from %s - %s", x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error()) } else { - cts.cli.log.Write(cts.Sid, LOG_DEBUG, + cts.C.log.Write(cts.Sid, LOG_DEBUG, "Handled route_stopped event(%d,%s) from %s", x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr) } } else { - cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid route_stopped event from %s", cts.remote_addr) + cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid route_stopped event from %s", cts.remote_addr) } case PACKET_KIND_PEER_STARTED: @@ -1091,16 +1096,16 @@ start_over: if ok { err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STARTED, x.Peer) if err != nil { - cts.cli.log.Write(cts.Sid, LOG_ERROR, + cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s", cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { - cts.cli.log.Write(cts.Sid, LOG_DEBUG, + cts.C.log.Write(cts.Sid, LOG_DEBUG, "Handled peer_started event from %s for peer(%d,%d,%s,%s)", cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { - cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr) + cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr) } // PACKET_KIND_PEER_ABORTED is never sent by server to client. @@ -1114,16 +1119,16 @@ start_over: if ok { err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STOPPED, x.Peer) if err != nil { - cts.cli.log.Write(cts.Sid, LOG_ERROR, + cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s", cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { - cts.cli.log.Write(cts.Sid, LOG_DEBUG, + cts.C.log.Write(cts.Sid, LOG_DEBUG, "Handled peer_stopped event from %s for peer(%d,%d,%s,%s)", cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { - cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr) + cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr) } case PACKET_KIND_PEER_EOF: @@ -1133,16 +1138,16 @@ start_over: if ok { err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_EOF, x.Peer) if err != nil { - cts.cli.log.Write(cts.Sid, LOG_ERROR, + cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to handle peer_eof event from %s for peer(%d,%d,%s,%s) - %s", cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { - cts.cli.log.Write(cts.Sid, LOG_DEBUG, + cts.C.log.Write(cts.Sid, LOG_DEBUG, "Handled peer_eof event from %s for peer(%d,%d,%s,%s)", cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { - cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_eof event from %s", cts.remote_addr) + cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_eof event from %s", cts.remote_addr) } case PACKET_KIND_PEER_DATA: @@ -1153,16 +1158,16 @@ start_over: if ok { err = cts.ReportEvent(RouteId(x.Data.RouteId), PeerId(x.Data.PeerId), PACKET_KIND_PEER_DATA, x.Data.Data) if err != nil { - cts.cli.log.Write(cts.Sid, LOG_ERROR, + cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to handle peer_data event from %s for peer(%d,%d) - %s", cts.remote_addr, x.Data.RouteId, x.Data.PeerId, err.Error()) } else { - cts.cli.log.Write(cts.Sid, LOG_DEBUG, + cts.C.log.Write(cts.Sid, LOG_DEBUG, "Handled peer_data event from %s for peer(%d,%d)", cts.remote_addr, x.Data.RouteId, x.Data.PeerId) } } else { - cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr) + cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr) } case PACKET_KIND_CONN_NOTICE: @@ -1171,9 +1176,11 @@ start_over: var ok bool x, ok = pkt.U.(*Packet_Notice) if ok { -fmt.Printf ("CONN NOTICE [%s] from %s\n", x.Notice.Text, cts.remote_addr) + if cts.C.conn_notice != nil { + cts.C.conn_notice.Handle(cts, x.Notice.Text) + } } else { - cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_data event from %s", cts.remote_addr) + cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_data event from %s", cts.remote_addr) } default: @@ -1182,7 +1189,7 @@ fmt.Printf ("CONN NOTICE [%s] from %s\n", x.Notice.Text, cts.remote_addr) } done: - cts.cli.log.Write(cts.Sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) + cts.C.log.Write(cts.Sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) cts.State = CLIENT_CONN_DISCONNECTED req_stop_and_wait_for_termination: @@ -1191,23 +1198,23 @@ req_stop_and_wait_for_termination: wait_for_termination: cts.route_wg.Wait() // wait until all route tasks are finished - cts.cli.RemoveClientConn(cts) + cts.C.RemoveClientConn(cts) return reconnect_to_server: cts.State = CLIENT_CONN_DISCONNECTING if cts.conn != nil { - cts.cli.log.Write(cts.Sid, LOG_INFO, "Disconnecting from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) + cts.C.log.Write(cts.Sid, LOG_INFO, "Disconnecting from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) } cts.disconnect_from_server() cts.State = CLIENT_CONN_DISCONNECTED // wait for 2 seconds - slpctx, cancel_sleep = context.WithTimeout(cts.cli.ctx, 2 * time.Second) + slpctx, cancel_sleep = context.WithTimeout(cts.C.ctx, 2 * time.Second) select { - case <-cts.cli.ctx.Done(): - // need to log cts.cli.ctx.Err().Error()? + case <-cts.C.ctx.Done(): + // need to log cts.C.ctx.Err().Error()? cancel_sleep() goto req_stop_and_wait_for_termination case <-cts.stop_chan: @@ -1348,8 +1355,10 @@ func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfi c.wrap_http_handler(&client_ctl_client_conns_id_routes_id_peers{client_ctl{c: &c, id: HS_ID_CTL}})) c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/client-conns/{conn_id}/routes/{route_id}/peers/{peer_id}", c.wrap_http_handler(&client_ctl_client_conns_id_routes_id_peers_id{client_ctl{c: &c, id: HS_ID_CTL}})) - c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/client-conns/{conn_id}/notices", - c.wrap_http_handler(&client_ctl_client_conns_id_notices{client_ctl{c: &c, id: HS_ID_CTL}})) + c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/notices", + c.wrap_http_handler(&client_ctl_notices{client_ctl{c: &c, id: HS_ID_CTL}})) + c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/notices/{conn_id}", + c.wrap_http_handler(&client_ctl_notices_id{client_ctl{c: &c, id: HS_ID_CTL}})) c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/stats", c.wrap_http_handler(&client_ctl_stats{client_ctl{c: &c, id: HS_ID_CTL}})) c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/token", @@ -1742,6 +1751,10 @@ func (c *Client) WriteLog(id string, level LogLevel, fmtstr string, args ...inte c.log.Write(id, level, fmtstr, args...) } +func (c *Client) SetConnNoticeHandler(handler ClientConnNoticeHandler) { + c.conn_notice = handler +} + func (c *Client) SetRoutePersister(persister ClientRoutePersister) { c.route_persister = persister } diff --git a/server-ctl.go b/server-ctl.go index 537c9a9..46b2e6a 100644 --- a/server-ctl.go +++ b/server-ctl.go @@ -85,7 +85,11 @@ type server_ctl_server_conns_id_routes_id_peers_id struct { server_ctl } -type server_ctl_server_conns_id_notices struct { +type server_ctl_notices struct { + server_ctl +} + +type server_ctl_notices_id struct { server_ctl } @@ -550,7 +554,46 @@ oops: // ------------------------------------ -func (ctl *server_ctl_server_conns_id_notices) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) { +func (ctl *server_ctl_notices) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) { + var s *Server + var status_code int + var cts *ServerConn + var err error + + s = ctl.s + + switch req.Method { + case http.MethodPost: + var noti json_in_notice + + err = json.NewDecoder(req.Body).Decode(¬i) + if err != nil { + status_code = WriteEmptyRespHeader(w, http.StatusBadRequest) + goto oops + } + + s.cts_mtx.Lock() + for _, cts = range s.cts_map { + cts.pss.Send(MakeConnNoticePacket(noti.Text)) + // let's not care about an error when broacasting a notice to all connections + } + s.cts_mtx.Unlock() + status_code = WriteJsonRespHeader(w, http.StatusOK) + + default: + status_code = WriteEmptyRespHeader(w, http.StatusBadRequest) + } + +//done: + return status_code, nil + +oops: + return status_code, err +} + +// ------------------------------------ + +func (ctl *server_ctl_notices_id) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) { var s *Server var status_code int var conn_id string @@ -561,7 +604,7 @@ func (ctl *server_ctl_server_conns_id_notices) ServeHTTP(w http.ResponseWriter, s = ctl.s je = json.NewEncoder(w) - conn_id = req.PathValue("conn_id") + conn_id = req.PathValue("conn_id") // server connection cts, err = s.FindServerConnByIdStr(conn_id) if err != nil { status_code = WriteJsonRespHeader(w, http.StatusNotFound) diff --git a/server-peer.go b/server-peer.go index bf1d940..9859af3 100644 --- a/server-peer.go +++ b/server-peer.go @@ -61,7 +61,7 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) { pss = spc.route.Cts.pss err = pss.Send(MakePeerStartedPacket(spc.route.Id, spc.conn_id, conn_raddr, conn_laddr)) if err != nil { - spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, + spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR, "Failed to send peer_started event(%d,%d,%s,%s) to client - %s", spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) goto done_without_stop @@ -69,7 +69,7 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) { // set up a timer to set waiting duration until the connection is // actually established on the client side and it's informed... - waitctx, cancel_wait = context.WithTimeout(spc.route.Cts.svr.ctx, 5 * time.Second) // TODO: make this configurable + waitctx, cancel_wait = context.WithTimeout(spc.route.Cts.S.ctx, 5 * time.Second) // TODO: make this configurable wait_for_started: for { select { @@ -98,14 +98,14 @@ wait_for_started: if errors.Is(err, io.EOF) || strings.Contains(err.Error(), "use of closed network connection") { // i don't like this way to check this error. err = pss.Send(MakePeerEofPacket(spc.route.Id, spc.conn_id)) if err != nil { - spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, + spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR, "Failed to send peer_eof event(%d,%d,%s,%s) to client - %s", spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) goto done } goto wait_for_stopped } else { - spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, + spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR, "Failed to read data from peer(%d,%d,%s,%s) - %s", spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) goto done @@ -114,7 +114,7 @@ wait_for_started: err = pss.Send(MakePeerDataPacket(spc.route.Id, spc.conn_id, buf[:n])) if err != nil { - spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, + spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR, "Failed to send data from peer(%d,%d,%s,%s) to client - %s", spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) goto done @@ -134,7 +134,7 @@ wait_for_stopped: done: err = pss.Send(MakePeerStoppedPacket(spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String())) if err != nil { - spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, + spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR, "Failed to send peer_stopped(%d,%d,%s,%s) to client - %s", spc.route.Id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) // nothing much to do about the failure of sending this @@ -206,21 +206,21 @@ func (spc *ServerPeerConn) ReportEvent(event_type PACKET_KIND, event_data interf var err error _, err = spc.conn.Write(data) if err != nil { - spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, + spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR, "Failed to write data from %s to peer(%d,%d,%s) - %s", spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String(), err.Error()) spc.ReqStop() } } else { // this must not happen. - spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, + spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR, "Protocol error - invalid data in peer_data event from %s to peer(%d,%d,%s)", spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String()) spc.ReqStop() } } else { // protocol error. the client must not relay more data from the client-side peer after EOF. - spc.route.Cts.svr.log.Write(spc.route.Cts.sid, LOG_ERROR, + spc.route.Cts.S.log.Write(spc.route.Cts.sid, LOG_ERROR, "Protocol error - redundant data from %s to (%d,%d,%s)", spc.route.Cts.RemoteAddr, spc.route.Id, spc.conn_id, spc.conn.RemoteAddr().String()) spc.ReqStop() diff --git a/server.go b/server.go index 9dc59f3..e8f05fd 100644 --- a/server.go +++ b/server.go @@ -42,6 +42,10 @@ type ServerSvcPortMap = map[PortId]ConnRouteId type ServerWpxResponseTransformer func(r *ServerRouteProxyInfo, resp *http.Response) io.Reader type ServerWpxForeignPortProxyMaker func(wpx_type string, port_id string) (*ServerRouteProxyInfo, error) +type ServerConnNoticeHandler interface { + Handle(cts* ServerConn, text string) +} + type ServerConfig struct { RpcAddrs []string RpcTls *tls.Config @@ -100,6 +104,7 @@ type Server struct { cts_wg sync.WaitGroup log Logger + conn_notice ServerConnNoticeHandler svc_port_mtx sync.Mutex svc_port_map ServerSvcPortMap @@ -120,12 +125,12 @@ type Server struct { // connection from client. // client connect to the server, the server accept it, and makes a tunnel request type ServerConn struct { - svr *Server + S *Server Id ConnId sid string // for logging - RemoteAddr net.Addr // client address that created this structure - LocalAddr net.Addr // local address that the client is connected to + RemoteAddr net.Addr // client address that created this structure + LocalAddr net.Addr // local address that the client is connected to pss *GuardedPacketStreamServer route_mtx sync.Mutex @@ -264,7 +269,7 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err pts = NewServerPeerConn(r, c, assigned_id) r.pts_map[pts.conn_id] = pts - r.Cts.svr.stats.peers.Add(1) + r.Cts.S.stats.peers.Add(1) return pts, nil } @@ -272,9 +277,9 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) { r.pts_mtx.Lock() delete(r.pts_map, pts.conn_id) - r.Cts.svr.stats.peers.Add(-1) + r.Cts.S.stats.peers.Add(-1) r.pts_mtx.Unlock() - r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "Removed server-side peer connection %s from route(%d)", pts.conn.RemoteAddr().String(), r.Id) + r.Cts.S.log.Write(r.Cts.sid, LOG_DEBUG, "Removed server-side peer connection %s from route(%d)", pts.conn.RemoteAddr().String(), r.Id) } func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { @@ -290,9 +295,9 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { conn, err = r.svc_l.AcceptTCP() // this call is blocking... if err != nil { if errors.Is(err, net.ErrClosed) { - r.Cts.svr.log.Write(r.Cts.sid, LOG_INFO, "Server-side peer listener closed on route(%d)", r.Id) + r.Cts.S.log.Write(r.Cts.sid, LOG_INFO, "Server-side peer listener closed on route(%d)", r.Id) } else { - r.Cts.svr.log.Write(r.Cts.sid, LOG_INFO, "Server-side peer listener error on route(%d) - %s", r.Id, err.Error()) + r.Cts.S.log.Write(r.Cts.sid, LOG_INFO, "Server-side peer listener error on route(%d) - %s", r.Id, err.Error()) } break } @@ -301,21 +306,21 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { iaddr, _ = netip.AddrFromSlice(raddr.IP) if !r.SvcPermNet.Contains(iaddr) { - r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed range %v", raddr.String(), r.Id, r.SvcPermNet) + r.Cts.S.log.Write(r.Cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed range %v", raddr.String(), r.Id, r.SvcPermNet) conn.Close() } - if r.Cts.svr.pts_limit > 0 && int(r.Cts.svr.stats.peers.Load()) >= r.Cts.svr.pts_limit { - r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed max %d", raddr.String(), r.Id, r.Cts.svr.pts_limit) + if r.Cts.S.pts_limit > 0 && int(r.Cts.S.stats.peers.Load()) >= r.Cts.S.pts_limit { + r.Cts.S.log.Write(r.Cts.sid, LOG_DEBUG, "Rejected server-side peer %s to route(%d) - allowed max %d", raddr.String(), r.Id, r.Cts.S.pts_limit) conn.Close() } pts, err = r.AddNewServerPeerConn(conn) if err != nil { - r.Cts.svr.log.Write(r.Cts.sid, LOG_ERROR, "Failed to add server-side peer %s to route(%d) - %s", r.Id, raddr.String(), r.Id, err.Error()) + r.Cts.S.log.Write(r.Cts.sid, LOG_ERROR, "Failed to add server-side peer %s to route(%d) - %s", r.Id, raddr.String(), r.Id, err.Error()) conn.Close() } else { - r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "Added server-side peer %s to route(%d)", raddr.String(), r.Id) + r.Cts.S.log.Write(r.Cts.sid, LOG_DEBUG, "Added server-side peer %s to route(%d)", raddr.String(), r.Id) r.pts_wg.Add(1) go pts.RunTask(&r.pts_wg) } @@ -324,7 +329,7 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { r.ReqStop() r.pts_wg.Wait() - r.Cts.svr.log.Write(r.Cts.sid, LOG_DEBUG, "All service-side peer handlers ended on route(%d)", r.Id) + r.Cts.S.log.Write(r.Cts.sid, LOG_DEBUG, "All service-side peer handlers ended on route(%d)", r.Id) r.Cts.RemoveServerRoute(r) // final phase... } @@ -424,20 +429,20 @@ func (cts *ServerConn) make_route_listener(id RouteId, option RouteOption, svc_r // uniqueness by port id can be checked after listener creation, // especially when automatic assignment is requested. - cts.svr.svc_port_mtx.Lock() - prev_cri, ok = cts.svr.svc_port_map[PortId(svcaddr.Port)] + cts.S.svc_port_mtx.Lock() + prev_cri, ok = cts.S.svc_port_map[PortId(svcaddr.Port)] if ok { - cts.svr.svc_port_mtx.Unlock() - cts.svr.log.Write(cts.sid, LOG_ERROR, + cts.S.svc_port_mtx.Unlock() + cts.S.log.Write(cts.sid, LOG_ERROR, "Route(%d,%d) on %s not unique by port number - existing route(%d,%d)", cts.Id, id, prev_cri.conn_id, prev_cri.route_id, svcaddr.String()) l.Close() return nil, nil, err } - cts.svr.svc_port_map[PortId(svcaddr.Port)] = ConnRouteId{conn_id: cts.Id, route_id: id} - cts.svr.svc_port_mtx.Unlock() + cts.S.svc_port_map[PortId(svcaddr.Port)] = ConnRouteId{conn_id: cts.Id, route_id: id} + cts.S.svc_port_mtx.Unlock() - cts.svr.log.Write(cts.sid, LOG_DEBUG, "Route(%d,%d) listening on %s", cts.Id, id, svcaddr.String()) + cts.S.log.Write(cts.sid, LOG_DEBUG, "Route(%d,%d) listening on %s", cts.Id, id, svcaddr.String()) return l, svcaddr, nil } @@ -460,7 +465,7 @@ func (cts *ServerConn) AddNewServerRoute(route_id RouteId, proto RouteOption, pt return nil, err } cts.route_map[route_id] = r - cts.svr.stats.routes.Add(1) + cts.S.stats.routes.Add(1) cts.route_mtx.Unlock() cts.route_wg.Add(1) @@ -483,12 +488,12 @@ func (cts *ServerConn) RemoveServerRoute(route *ServerRoute) error { return fmt.Errorf("non-existent route - %d", route.Id) } delete(cts.route_map, route.Id) - cts.svr.stats.routes.Add(-1) + cts.S.stats.routes.Add(-1) cts.route_mtx.Unlock() - cts.svr.svc_port_mtx.Lock() - delete(cts.svr.svc_port_map, PortId(route.SvcAddr.Port)) - cts.svr.svc_port_mtx.Unlock() + cts.S.svc_port_mtx.Lock() + delete(cts.S.svc_port_map, PortId(route.SvcAddr.Port)) + cts.S.svc_port_mtx.Unlock() r.ReqStop() return nil @@ -505,12 +510,12 @@ func (cts *ServerConn) RemoveServerRouteById(route_id RouteId) (*ServerRoute, er return nil, fmt.Errorf("non-existent route id - %d", route_id) } delete(cts.route_map, route_id) - cts.svr.stats.routes.Add(-1) + cts.S.stats.routes.Add(-1) cts.route_mtx.Unlock() - cts.svr.svc_port_mtx.Lock() - delete(cts.svr.svc_port_map, PortId(r.SvcAddr.Port)) - cts.svr.svc_port_mtx.Unlock() + cts.S.svc_port_mtx.Lock() + delete(cts.S.svc_port_map, PortId(r.SvcAddr.Port)) + cts.S.svc_port_mtx.Unlock() r.ReqStop() return r, nil @@ -575,11 +580,11 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { for { pkt, err = cts.pss.Recv() if errors.Is(err, io.EOF) { - cts.svr.log.Write(cts.sid, LOG_INFO, "RPC stream closed for client %s", cts.RemoteAddr) + cts.S.log.Write(cts.sid, LOG_INFO, "RPC stream closed for client %s", cts.RemoteAddr) goto done } if err != nil { - cts.svr.log.Write(cts.sid, LOG_ERROR, "RPC stream error for client %s - %s", cts.RemoteAddr, err.Error()) + cts.S.log.Write(cts.sid, LOG_ERROR, "RPC stream error for client %s - %s", cts.RemoteAddr, err.Error()) goto done } @@ -593,37 +598,37 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { r, err = cts.AddNewServerRoute(RouteId(x.Route.RouteId), RouteOption(x.Route.ServiceOption), x.Route.TargetAddrStr, x.Route.TargetName, x.Route.ServiceAddrStr, x.Route.ServiceNetStr) if err != nil { - cts.svr.log.Write(cts.sid, LOG_ERROR, + cts.S.log.Write(cts.sid, LOG_ERROR, "Failed to add route(%d,%s) for %s - %s", x.Route.RouteId, x.Route.TargetAddrStr, cts.RemoteAddr, err.Error()) err = cts.pss.Send(MakeRouteStoppedPacket(RouteId(x.Route.RouteId), RouteOption(x.Route.ServiceOption), x.Route.TargetAddrStr, x.Route.TargetName, x.Route.ServiceAddrStr, x.Route.ServiceNetStr)) if err != nil { - cts.svr.log.Write(cts.sid, LOG_ERROR, + cts.S.log.Write(cts.sid, LOG_ERROR, "Failed to send route_stopped event(%d,%s,%v,%s) to client %s - %s", x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr, err.Error()) goto done } else { - cts.svr.log.Write(cts.sid, LOG_DEBUG, + cts.S.log.Write(cts.sid, LOG_DEBUG, "Sent route_stopped event(%d,%s,%v,%s) to client %s", x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr) } } else { - cts.svr.log.Write(cts.sid, LOG_INFO, + cts.S.log.Write(cts.sid, LOG_INFO, "Added route(%d,%s,%s,%v,%v) for client %s to cts(%d)", r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet, cts.RemoteAddr, cts.Id) err = cts.pss.Send(MakeRouteStartedPacket(r.Id, r.SvcOption, r.SvcAddr.String(), r.PtcName, r.SvcReqAddr, r.SvcPermNet.String())) if err != nil { r.ReqStop() - cts.svr.log.Write(cts.sid, LOG_ERROR, + cts.S.log.Write(cts.sid, LOG_ERROR, "Failed to send route_started event(%d,%s,%s,%s%v,%v) to client %s - %s", r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet, cts.RemoteAddr, err.Error()) goto done } } } else { - cts.svr.log.Write(cts.sid, LOG_INFO, "Received invalid packet from %s", cts.RemoteAddr) + cts.S.log.Write(cts.sid, LOG_INFO, "Received invalid packet from %s", cts.RemoteAddr) // TODO: need to abort this client? } @@ -636,24 +641,24 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { r, err = cts.RemoveServerRouteById(RouteId(x.Route.RouteId)) if err != nil { - cts.svr.log.Write(cts.sid, LOG_ERROR, + cts.S.log.Write(cts.sid, LOG_ERROR, "Failed to delete route(%d,%s) for client %s - %s", x.Route.RouteId, x.Route.TargetAddrStr, cts.RemoteAddr, err.Error()) } else { - cts.svr.log.Write(cts.sid, LOG_ERROR, + cts.S.log.Write(cts.sid, LOG_ERROR, "Deleted route(%d,%s,%s,%v,%v) for client %s", r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet.String(), cts.RemoteAddr) err = cts.pss.Send(MakeRouteStoppedPacket(r.Id, r.SvcOption, r.PtcAddr, r.PtcName, r.SvcReqAddr, r.SvcPermNet.String())) if err != nil { r.ReqStop() - cts.svr.log.Write(cts.sid, LOG_ERROR, + cts.S.log.Write(cts.sid, LOG_ERROR, "Failed to send route_stopped event(%d,%s,%s,%v.%v) to client %s - %s", r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet.String(), cts.RemoteAddr, err.Error()) goto done } } } else { - cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid route_stop event from %s", cts.RemoteAddr) + cts.S.log.Write(cts.sid, LOG_ERROR, "Invalid route_stop event from %s", cts.RemoteAddr) } case PACKET_KIND_PEER_STARTED: @@ -664,17 +669,17 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { if ok { err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STARTED, x.Peer) if err != nil { - cts.svr.log.Write(cts.sid, LOG_ERROR, + cts.S.log.Write(cts.sid, LOG_ERROR, "Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s", cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { - cts.svr.log.Write(cts.sid, LOG_DEBUG, + cts.S.log.Write(cts.sid, LOG_DEBUG, "Handled peer_started event from %s for peer(%d,%d,%s,%s)", cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { // invalid event data - cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_started event from %s", cts.RemoteAddr) + cts.S.log.Write(cts.sid, LOG_ERROR, "Invalid peer_started event from %s", cts.RemoteAddr) } case PACKET_KIND_PEER_ABORTED: @@ -684,17 +689,17 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { if ok { err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_ABORTED, x.Peer) if err != nil { - cts.svr.log.Write(cts.sid, LOG_ERROR, + cts.S.log.Write(cts.sid, LOG_ERROR, "Failed to handle peer_aborted event from %s for peer(%d,%d,%s,%s) - %s", cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { - cts.svr.log.Write(cts.sid, LOG_DEBUG, + cts.S.log.Write(cts.sid, LOG_DEBUG, "Handled peer_aborted event from %s for peer(%d,%d,%s,%s)", cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { // invalid event data - cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_aborted event from %s", cts.RemoteAddr) + cts.S.log.Write(cts.sid, LOG_ERROR, "Invalid peer_aborted event from %s", cts.RemoteAddr) } case PACKET_KIND_PEER_STOPPED: @@ -705,17 +710,17 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { if ok { err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STOPPED, x.Peer) if err != nil { - cts.svr.log.Write(cts.sid, LOG_ERROR, + cts.S.log.Write(cts.sid, LOG_ERROR, "Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s", cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { - cts.svr.log.Write(cts.sid, LOG_DEBUG, + cts.S.log.Write(cts.sid, LOG_DEBUG, "Handled peer_stopped event from %s for peer(%d,%d,%s,%s)", cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { // invalid event data - cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.RemoteAddr) + cts.S.log.Write(cts.sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.RemoteAddr) } case PACKET_KIND_PEER_DATA: @@ -726,17 +731,17 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { if ok { err = cts.ReportEvent(RouteId(x.Data.RouteId), PeerId(x.Data.PeerId), PACKET_KIND_PEER_DATA, x.Data.Data) if err != nil { - cts.svr.log.Write(cts.sid, LOG_ERROR, + cts.S.log.Write(cts.sid, LOG_ERROR, "Failed to handle peer_data event from %s for peer(%d,%d) - %s", cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId, err.Error()) } else { - cts.svr.log.Write(cts.sid, LOG_DEBUG, + cts.S.log.Write(cts.sid, LOG_DEBUG, "Handled peer_data event from %s for peer(%d,%d)", cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId) } } else { // invalid event data - cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_data event from %s", cts.RemoteAddr) + cts.S.log.Write(cts.sid, LOG_ERROR, "Invalid peer_data event from %s", cts.RemoteAddr) } case PACKET_KIND_CONN_NOTICE: @@ -745,15 +750,17 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { var ok bool x, ok = pkt.U.(*Packet_Notice) if ok { -fmt.Printf ("CONN NOTICE [%s] from %s\n", x.Notice.Text, cts.RemoteAddr) + if cts.S.conn_notice != nil { + cts.S.conn_notice.Handle(cts, x.Notice.Text) + } } else { - cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid conn_data event from %s", cts.RemoteAddr) + cts.S.log.Write(cts.sid, LOG_ERROR, "Invalid conn_data event from %s", cts.RemoteAddr) } } } done: - cts.svr.log.Write(cts.sid, LOG_INFO, "RPC stream receiver ended") + cts.S.log.Write(cts.sid, LOG_INFO, "RPC stream receiver ended") } func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { @@ -781,7 +788,7 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { // or continue select { case <-ctx.Done(): // the stream context is done - cts.svr.log.Write(cts.sid, LOG_INFO, "RPC stream done - %s", ctx.Err().Error()) + cts.S.log.Write(cts.sid, LOG_INFO, "RPC stream done - %s", ctx.Err().Error()) goto done case <- cts.stop_chan: @@ -1123,8 +1130,10 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi s.wrap_http_handler(&server_ctl_server_conns_id_routes_id_peers{server_ctl{s: &s, id: HS_ID_CTL}})) s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl/server-conns/{conn_id}/routes/{route_id}/peers/{peer_id}", s.wrap_http_handler(&server_ctl_server_conns_id_routes_id_peers_id{server_ctl{s: &s, id: HS_ID_CTL}})) - s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl/server-conns/{conn_id}/notices", - s.wrap_http_handler(&server_ctl_server_conns_id_notices{server_ctl{s: &s, id: HS_ID_CTL}})) + s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl/notices", + s.wrap_http_handler(&server_ctl_notices{server_ctl{s: &s, id: HS_ID_CTL}})) + s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl/notices/{conn_id}", + s.wrap_http_handler(&server_ctl_notices_id{server_ctl{s: &s, id: HS_ID_CTL}})) s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl/stats", s.wrap_http_handler(&server_ctl_stats{server_ctl{s: &s, id: HS_ID_CTL}})) s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl/token", @@ -1497,7 +1506,7 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p var assigned_id ConnId var ok bool - cts.svr = s + cts.S = s cts.route_map = make(ServerRouteMap) cts.RemoteAddr = *remote_addr cts.LocalAddr = *local_addr @@ -1808,6 +1817,10 @@ func (s *Server) WriteLog(id string, level LogLevel, fmtstr string, args ...inte s.log.Write(id, level, fmtstr, args...) } +func (s *Server) SetConnNoticeHandler(handler ServerConnNoticeHandler) { + s.conn_notice = handler +} + func (s *Server) AddCtlHandler(path string, handler ServerHttpHandler) { s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl" + path, s.wrap_http_handler(handler)) }