diff --git a/client-peer.go b/client-peer.go index 4165684..909cfaa 100644 --- a/client-peer.go +++ b/client-peer.go @@ -1,6 +1,5 @@ package hodu -import "fmt" import "net" import "sync" @@ -25,32 +24,31 @@ func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error { defer wg.Done() -fmt.Printf("CONNECTION ESTABLISHED TO PEER... ABOUT TO READ DATA...\n") for { n, err = cpc.conn.Read(buf[:]) if err != nil { -// TODO: add proper log header - cpc.route.cts.cli.log.Write("", LOG_ERROR, "Unable to read from the client-side peer %s - %s", cpc.conn.RemoteAddr().String(), err.Error()) + cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR, + "Failed to read from the client-side peer(%d,%d,%s,%s) - %s", + cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error()) break } err = cpc.route.cts.psc.Send(MakePeerDataPacket(cpc.route.id, cpc.conn_id, buf[0:n])) if err != nil { -// TODO: add proper log header - cpc.route.cts.cli.log.Write("", LOG_ERROR, "Unable to write to server - %s", err.Error()) + cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR, + "Failed to write peer(%d,%d,%s,%s) data to server - %s", + cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error()) break } } - cpc.route.cts.psc.Send(MakePeerStoppedPacket(cpc.route.id, cpc.conn_id)) // nothing much to do upon failure. no error check here - + cpc.route.cts.psc.Send(MakePeerStoppedPacket(cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String())) // nothing much to do upon failure. no error check here cpc.ReqStop() - cpc.route.RemoveClientPeerConn(cpc) + cpc.route.RemoveClientPeerConn(cpc) return nil } func (cpc *ClientPeerConn) ReqStop() { - // TODO: because of connect delay in Start, cpc.p may not be yet ready. handle this case... if cpc.stop_req.CompareAndSwap(false, true) { if cpc.conn != nil { cpc.conn.Close() diff --git a/client.go b/client.go index fa2ada2..cac7f0c 100644 --- a/client.go +++ b/client.go @@ -231,10 +231,10 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) { // most useful works are triggered by ReportEvent() and done by ConnectToPeer() defer wg.Done() - r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-start for id=%d peer=%s to %s", r.id, r.peer_addr, r.cts.cfg.ServerAddr) + r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Sending route_start for route(%d,%s) to %s", r.id, r.peer_addr, r.cts.remote_addr) err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.proto, r.peer_addr)) if err != nil { - r.cts.cli.log.Write("", LOG_DEBUG, "Failed to Send route-start for id=%d peer=%s to %s", r.id, r.peer_addr, r.cts.cfg.ServerAddr) + r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Failed to send route_start for route(%d,%s) to %s", r.id, r.peer_addr, r.cts.remote_addr) goto done } @@ -250,10 +250,9 @@ done: r.ReqStop() r.ptc_wg.Wait() // wait for all peer tasks are finished - r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-stop for id=%d peer=%s to %s", r.id, r.peer_addr, r.cts.cfg.ServerAddr) + r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Sending route_stop for route(%d,%s) to %s", r.id, r.peer_addr, r.cts.remote_addr) r.cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr)) r.cts.RemoveClientRoute(r) -fmt.Printf("*** End fo Client Route Task\n") } func (r *ClientRoute) ReqStop() { @@ -264,7 +263,6 @@ func (r *ClientRoute) ReqStop() { } r.stop_chan <- true } -fmt.Printf("*** Sent stop request to Route..\n") } func (r *ClientRoute) ConnectToPeer(pts_id uint32, pts_raddr string, pts_laddr string, wg *sync.WaitGroup) { @@ -312,7 +310,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, pts_raddr string, pts_laddr s fmt.Printf("YYYYYYYY - %s\n", err.Error()) goto peer_aborted } - fmt.Printf("STARTED NEW SERVER PEER STAK\n") + err = r.cts.psc.Send(MakePeerStartedPacket(r.id, ptc.conn_id, real_conn.RemoteAddr().String(), real_conn.LocalAddr().String())) if err != nil { fmt.Printf("CLOSING NEW SERVER PEER STAK - %s\n", err.Error()) diff --git a/packet.go b/packet.go index 8238432..89e5f77 100644 --- a/packet.go +++ b/packet.go @@ -31,9 +31,10 @@ func MakePeerStartedPacket(route_id uint32, peer_id uint32, remote_addr string, } } -func MakePeerStoppedPacket(route_id uint32, peer_id uint32) *Packet { +func MakePeerStoppedPacket(route_id uint32, peer_id uint32, remote_addr string, local_addr string) *Packet { return &Packet{Kind: PACKET_KIND_PEER_STOPPED, - U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id}}} + U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id, RemoteAddrStr: remote_addr, LocalAddrStr: local_addr}}, + } } func MakePeerAbortedPacket(route_id uint32, peer_id uint32) *Packet { diff --git a/server-ctl.go b/server-ctl.go index a513e06..1e357e1 100644 --- a/server-ctl.go +++ b/server-ctl.go @@ -56,13 +56,13 @@ func (ctl *server_ctl_server_conns) ServeHTTP(w http.ResponseWriter, req *http.R jsp = append(jsp, json_out_server_route{ Id: r.id, ClientPeerAddr: r.ptc_addr, - ServerPeerListenAddr: r.svcaddr.String(), + ServerPeerListenAddr: r.svc_addr.String(), }) } js = append(js, json_out_server_conn{ Id: cts.id, - ClientAddr: cts.raddr.String(), - ServerAddr: cts.laddr.String(), + ClientAddr: cts.remote_addr.String(), + ServerAddr: cts.local_addr.String(), Routes: jsp, }) cts.route_mtx.Unlock() @@ -131,13 +131,13 @@ func (ctl *server_ctl_server_conns_id) ServeHTTP(w http.ResponseWriter, req *htt jsp = append(jsp, json_out_server_route{ Id: r.id, ClientPeerAddr: r.ptc_addr, - ServerPeerListenAddr: r.svcaddr.String(), + ServerPeerListenAddr: r.svc_addr.String(), }) } js = &json_out_server_conn{ Id: cts.id, - ClientAddr: cts.raddr.String(), - ServerAddr: cts.laddr.String(), + ClientAddr: cts.remote_addr.String(), + ServerAddr: cts.local_addr.String(), Routes: jsp, } cts.route_mtx.Unlock() diff --git a/server-peer.go b/server-peer.go index f4153e9..6005af3 100644 --- a/server-peer.go +++ b/server-peer.go @@ -1,7 +1,6 @@ package hodu import "errors" -import "fmt" import "io" import "net" import "sync" @@ -37,7 +36,6 @@ func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) *ServerPeerCon spc.client_peer_started.Store(false) spc.client_peer_stopped.Store(false) spc.client_peer_eof.Store(false) -fmt.Printf("~~~~~~~~~~~~~~~ NEW SERVER PEER CONNECTION ADDED %p\n", &spc) return &spc } @@ -54,8 +52,9 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) { pss = spc.route.cts.pss err = pss.Send(MakePeerStartedPacket(spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String())) if err != nil { - // TODO: include route id and conn id in the error message - spc.route.cts.svr.log.Write("", LOG_ERROR, "Unable to notify peer started - %s", err.Error()) + spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + "Failed to send peer_started event(%d,%d,%s,%s) to client - %s", + spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error()) goto done_without_stop } @@ -88,27 +87,31 @@ wait_for_started: if err != nil { if errors.Is(err, io.EOF) { if pss.Send(MakePeerEofPacket(spc.route.id, spc.conn_id)) != nil { - spc.route.cts.svr.log.Write("", LOG_ERROR, "Unable to report eof - %s", err.Error()) + spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + "Failed to send peer_eof event(%d,%d,%s,%s) to client - %s", + spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error()) goto done } goto wait_for_stopped } else { - spc.route.cts.svr.log.Write("", LOG_ERROR, "Unable to read data - %s", err.Error()) + spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + "Failed to read data from peer(%d,%d,%s,%s) - %s", + spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error()) goto done } } err = pss.Send(MakePeerDataPacket(spc.route.id, spc.conn_id, buf[:n])) if err != nil { - // TODO: include route id and conn id in the error message - spc.route.cts.svr.log.Write("", LOG_ERROR, "Unable to send data - %s", err.Error()) + spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + "Failed to send data from peer(%d,%d,%s,%s) to client - %s", + spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error()) goto done } } wait_for_stopped: for { -fmt.Printf ("******************* Waiting for peer Stop\n") select { case status = <-spc.client_peer_status_chan: // something not right... may use a different channel for closing... goto done @@ -116,20 +119,16 @@ fmt.Printf ("******************* Waiting for peer Stop\n") goto done } } -fmt.Printf ("******************* Sending peer stopped\n") - if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) != nil { - fmt.Printf("unable to report data - %s\n", err.Error()) - goto done - } done: - if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) != nil { - fmt.Printf("unable to report data - %s\n", err.Error()) + if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String())) != nil { + spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + "Failed to send peer_stopped(%d,%d,%s,%s) to client - %s", + spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error()) // nothing much to do about the failure of sending this } done_without_stop: - fmt.Printf("SPC really ending..................\n") spc.ReqStop() spc.route.RemoveServerPeerConn(spc) } @@ -149,42 +148,54 @@ func (spc *ServerPeerConn) ReqStop() { } } -func (spc *ServerPeerConn) ReportEvent(event_type PACKET_KIND, event_data []byte) error { +func (spc *ServerPeerConn) ReportEvent(event_type PACKET_KIND, event_data interface{}) error { switch event_type { case PACKET_KIND_PEER_STARTED: -fmt.Printf("******************* AAAAAAAAAAAAAAAAAAAaaa\n") if spc.client_peer_started.CompareAndSwap(false, true) { spc.client_peer_status_chan <- true } case PACKET_KIND_PEER_STOPPED: -fmt.Printf("******************* BBBBBBBBBBBBBBBBBBBBBBBB\n") // this event needs to close on the server-side peer connection. // sending false to the client_peer_status_chan isn't good enough to break // the Recv loop in RunTask(). spc.ReqStop() case PACKET_KIND_PEER_EOF: -fmt.Printf("******************* BBBBBBBBBBBBBBBBBBBBBBBB CLIENT PEER EOF\n") // the client-side peer is not supposed to send data any more if spc.client_peer_eof.CompareAndSwap(false, true) { spc.conn.CloseWrite() } case PACKET_KIND_PEER_DATA: -fmt.Printf("******************* CCCCCCCCCCCCCCCCCCCCCCCccc\n") if spc.client_peer_eof.Load() == false { - var err error + var ok bool + var data []byte - _, err = spc.conn.Write(event_data) - if err != nil { - // TODO: logging - fmt.Printf ("WARNING - failed to write data from %s to %s\n", spc.route.cts.raddr, spc.conn.RemoteAddr().String()) + data, ok = event_data.([]byte) + if ok { + var err error + _, err = spc.conn.Write(data) + if err != nil { + spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + "Failed to write data from %s to peer(%d,%d,%s) - %s", + spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), err.Error()) + spc.ReqStop() + } + } else { + // this must not happen. + spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + "internal server error - invalid data in peer_data event from %s to peer(%d,%d,%s)", + spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String()) + spc.ReqStop() } } else { // protocol error. the client must not relay more data from the client-side peer after EOF. - fmt.Printf("WARNING - broken client - redundant data from %s to %s\n", spc.route.cts.raddr, spc.conn.RemoteAddr().String()) + spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, + "internal client error - redundant data from %s to (%d,%d,%s)", + spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String()) + spc.ReqStop() } default: diff --git a/server.go b/server.go index 949d443..6772b1d 100644 --- a/server.go +++ b/server.go @@ -43,7 +43,7 @@ type Server struct { rpc []*net.TCPListener // main listener for grpc rpc_wg sync.WaitGroup - gs *grpc.Server + rpc_svr *grpc.Server cts_mtx sync.Mutex cts_map ServerConnMap @@ -58,27 +58,27 @@ type Server struct { // connection from client. // client connect to the server, the server accept it, and makes a tunnel request type ServerConn struct { - svr *Server - id uint32 - sid string // for logging + svr *Server + id uint32 + sid string // for logging - raddr net.Addr // client address that created this structure - laddr net.Addr // local address that the client is connected to - pss *GuardedPacketStreamServer + remote_addr net.Addr // client address that created this structure + local_addr net.Addr // local address that the client is connected to + pss *GuardedPacketStreamServer - route_mtx sync.Mutex - route_map ServerRouteMap - route_wg sync.WaitGroup + route_mtx sync.Mutex + route_map ServerRouteMap + route_wg sync.WaitGroup - wg sync.WaitGroup - stop_req atomic.Bool - stop_chan chan bool + wg sync.WaitGroup + stop_req atomic.Bool + stop_chan chan bool } type ServerRoute struct { cts *ServerConn l *net.TCPListener - svcaddr *net.TCPAddr + svc_addr *net.TCPAddr // listening address ptc_addr string id uint32 @@ -125,7 +125,7 @@ func NewServerRoute(cts *ServerConn, id uint32, proto ROUTE_PROTO, ptc_addr stri var svcaddr *net.TCPAddr var err error - l, svcaddr, err = cts.make_route_listener(proto) + l, svcaddr, err = cts.make_route_listener(id, proto) if err != nil { return nil, err } @@ -133,7 +133,7 @@ func NewServerRoute(cts *ServerConn, id uint32, proto ROUTE_PROTO, ptc_addr stri r.cts = cts r.id = id r.l = l - r.svcaddr = svcaddr + r.svc_addr = svcaddr r.ptc_addr = ptc_addr r.pts_limit = PTS_LIMIT r.pts_map = make(ServerPeerConnMap) @@ -179,35 +179,33 @@ func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) { r.pts_mtx.Lock() delete(r.pts_map, pts.conn_id) r.pts_mtx.Unlock() - r.cts.svr.log.Write("", LOG_DEBUG, "Removed server-side peer connection %s", pts.conn.RemoteAddr().String()) + r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Removed server-side peer connection %s from route(%d)", pts.conn.RemoteAddr().String(), r.id) } func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { var err error var conn *net.TCPConn var pts *ServerPeerConn - var log_id string defer wg.Done() - log_id = fmt.Sprintf("%s,%d", r.cts.raddr.String(), r.id) for { conn, err = r.l.AcceptTCP() if err != nil { if errors.Is(err, net.ErrClosed) { - r.cts.svr.log.Write(log_id, LOG_INFO, "Server-side peer listener closed") + r.cts.svr.log.Write(r.cts.sid, LOG_INFO, "Server-side peer listener closed on route(%d)", r.id) } else { - r.cts.svr.log.Write(log_id, LOG_INFO, "Server-side peer listener error - %s", err.Error()) + r.cts.svr.log.Write(r.cts.sid, LOG_INFO, "Server-side peer listener error on route(%d) - %s", r.id, err.Error()) } break } pts, err = r.AddNewServerPeerConn(conn) if err != nil { - r.cts.svr.log.Write(log_id, LOG_ERROR, "Failed to add new server-side peer %s - %s", conn.RemoteAddr().String(), err.Error()) + r.cts.svr.log.Write(r.cts.sid, LOG_ERROR, "Failed to add new server-side peer %s to route(%d) - %s", r.id, conn.RemoteAddr().String(), r.id, err.Error()) conn.Close() } else { - r.cts.svr.log.Write(log_id, LOG_DEBUG, "Added new server-side peer %s", conn.RemoteAddr().String()) + r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Added new server-side peer %s to route(%d)", conn.RemoteAddr().String(), r.id) r.pts_wg.Add(1) go pts.RunTask(&r.pts_wg) } @@ -216,14 +214,12 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { r.ReqStop() r.pts_wg.Wait() - r.cts.svr.log.Write(log_id, LOG_DEBUG, "All service-side peer handlers completed") + r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "All service-side peer handlers completed on route(%d)", r.id) r.cts.RemoveServerRoute(r) // final phase... } func (r *ServerRoute) ReqStop() { - fmt.Printf("requesting to stop route taak..\n") - if r.stop_req.CompareAndSwap(false, true) { var pts *ServerPeerConn @@ -233,10 +229,9 @@ func (r *ServerRoute) ReqStop() { r.l.Close() } - fmt.Printf("requiested to stopp route taak..\n") } -func (r *ServerRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data []byte) error { +func (r *ServerRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data interface{}) error { var spc *ServerPeerConn var ok bool @@ -252,7 +247,7 @@ func (r *ServerRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_d } // ------------------------------------ -func (cts *ServerConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) { +func (cts *ServerConn) make_route_listener(id uint32, proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) { var l *net.TCPListener var err error var svcaddr *net.TCPAddr @@ -276,12 +271,11 @@ func (cts *ServerConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, if err == nil { l, err = net.ListenTCP(nw, svcaddr) // make the binding address configurable. support multiple binding addresses??? if err == nil { - fmt.Printf("listening .... on ... %d\n", port) + cts.svr.log.Write(cts.sid, LOG_DEBUG, "Route(%d) listening on %d", id, port) return l, svcaddr, nil } } - // TODO: implement max retries.. tries++ if tries >= 1000 { err = fmt.Errorf("unable to allocate port") @@ -352,7 +346,7 @@ func (cts *ServerConn) RemoveServerRouteById(route_id uint32) (*ServerRoute, err return r, nil } -func (cts *ServerConn) ReportEvent(route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error { +func (cts *ServerConn) ReportEvent(route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data interface{}) error { var r *ServerRoute var ok bool @@ -376,11 +370,11 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { for { pkt, err = cts.pss.Recv() if errors.Is(err, io.EOF) { - cts.svr.log.Write("", LOG_INFO, "GRPC stream closed for client %s", cts.raddr) + cts.svr.log.Write(cts.sid, LOG_INFO, "RPC stream closed for client %s", cts.remote_addr) goto done } if err != nil { - cts.svr.log.Write("", LOG_ERROR, "GRPC stream error for client %s - %s", cts.raddr, err.Error()) + cts.svr.log.Write(cts.sid, LOG_ERROR, "RPC stream error for client %s - %s", cts.remote_addr, err.Error()) goto done } @@ -394,18 +388,18 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto, x.Route.AddrStr) if err != nil { - cts.svr.log.Write("", LOG_ERROR, "Failed to add server route for client %s peer %s", cts.raddr, x.Route.AddrStr) + cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to add route for client %s peer %s", cts.remote_addr, x.Route.AddrStr) } else { - cts.svr.log.Write("", LOG_INFO, "Added server route(id=%d) for client %s peer %s to cts(id=%d)", r.id, cts.raddr, x.Route.AddrStr, cts.id) - err = cts.pss.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.svcaddr.String())) + cts.svr.log.Write(cts.sid, LOG_INFO, "Added route(%d) for client %s peer %s to cts(%d)", r.id, cts.remote_addr, x.Route.AddrStr, cts.id) + err = cts.pss.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.svc_addr.String())) if err != nil { r.ReqStop() - cts.svr.log.Write("", LOG_ERROR, "Failed to inform client %s of server route started for peer %s", cts.raddr, x.Route.AddrStr) + cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to inform client %s of route started for peer %s", cts.remote_addr, x.Route.AddrStr) goto done } } } else { - cts.svr.log.Write("", LOG_INFO, "Received invalid packet from %s", cts.raddr) + cts.svr.log.Write(cts.sid, LOG_INFO, "Received invalid packet from %s", cts.remote_addr) // TODO: need to abort this client? } @@ -418,19 +412,18 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { r, err = cts.RemoveServerRouteById(x.Route.RouteId) if err != nil { - cts.svr.log.Write("", LOG_ERROR, "Failed to delete server route(id=%d) for client %s peer %s", x.Route.RouteId, cts.raddr, x.Route.AddrStr) + cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to delete route(%d) for client %s peer %s", x.Route.RouteId, cts.remote_addr, x.Route.AddrStr) } else { - cts.svr.log.Write("", LOG_ERROR, "Deleted server route(id=%d) for client %s peer %s", x.Route.RouteId, cts.raddr, x.Route.AddrStr) + cts.svr.log.Write(cts.sid, LOG_ERROR, "Deleted route(%d) for client %s peer %s", x.Route.RouteId, cts.remote_addr, x.Route.AddrStr) err = cts.pss.Send(MakeRouteStoppedPacket(x.Route.RouteId, x.Route.Proto)) if err != nil { r.ReqStop() - cts.svr.log.Write("", LOG_ERROR, "Failed to inform client %s of server route(id=%d) stopped for peer %s", cts.raddr, x.Route.RouteId, x.Route.AddrStr) + cts.svr.log.Write(cts.sid, LOG_ERROR, "Failed to inform client %s of route(%d) stopped for peer %s", cts.remote_addr, x.Route.RouteId, x.Route.AddrStr) goto done } } } else { - cts.svr.log.Write("", LOG_INFO, "Received invalid packet from %s", cts.raddr) - // TODO: need to abort this client? + cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid route_stop event from %s", cts.remote_addr) } case PACKET_KIND_PEER_STARTED: @@ -439,15 +432,19 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { var ok bool x, ok = pkt.U.(*Packet_Peer) if ok { - err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil) + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, x.Peer) if err != nil { - // TODO: - fmt.Printf("Failed to report PEER_STARTED Event") + cts.svr.log.Write(cts.sid, LOG_ERROR, + "Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s", + cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { - // TODO: + cts.svr.log.Write(cts.sid, LOG_DEBUG, + "Handled peer_started event from %s for peer(%d,%d,%s,%s)", + cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { - // TODO + // invalid event data + cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr) } case PACKET_KIND_PEER_ABORTED: @@ -460,13 +457,17 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { if ok { err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil) if err != nil { - // TODO: - fmt.Printf("Failed to report PEER_STOPPED Event") + cts.svr.log.Write(cts.sid, LOG_ERROR, + "Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s", + cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { - // TODO: + cts.svr.log.Write(cts.sid, LOG_DEBUG, + "Handled peer_stopped event from %s for peer(%d,%d,%s,%s)", + cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { - // TODO + // invalid event data + cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr) } case PACKET_KIND_PEER_DATA: @@ -477,19 +478,23 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { if ok { err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data) if err != nil { - // TODO: - fmt.Printf("Failed to report PEER_DATA Event") + cts.svr.log.Write(cts.sid, LOG_ERROR, + "Failed to handle peer_data event from %s for peer(%d,%d) - %s", + cts.remote_addr, x.Data.RouteId, x.Data.PeerId, err.Error()) } else { - // TODO: + cts.svr.log.Write(cts.sid, LOG_DEBUG, + "Handled peer_data event from %s for peer(%d,%d)", + cts.remote_addr, x.Data.RouteId, x.Data.PeerId) } } else { - // TODO + // invalid event data + cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr) } } } done: - fmt.Printf("************ stream receiver finished....\n") + cts.svr.log.Write(cts.sid, LOG_INFO, "RPC stream receiver ended") } func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { @@ -517,7 +522,7 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { // or continue select { case <-ctx.Done(): // the stream context is done -fmt.Printf("grpc server done - %s\n", ctx.Err().Error()) + cts.svr.log.Write(cts.sid, LOG_INFO, "RPC stream done - %s", ctx.Err().Error()) goto done case <- cts.stop_chan: @@ -533,10 +538,8 @@ fmt.Printf("grpc server done - %s\n", ctx.Err().Error()) } done: -fmt.Printf("^^^^^^^^^^^^^^^^^ waiting for reoute_wg...\n") cts.ReqStop() // just in case cts.route_wg.Wait() -fmt.Printf("^^^^^^^^^^^^^^^^^ waited for reoute_wg...\n") } func (cts *ServerConn) ReqStop() { @@ -618,7 +621,6 @@ func (cc *ConnCatcher) TagConn(ctx context.Context, info *stats.ConnTagInfo) con } func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) { -// fmt.Println(ctx.Value("user_id")) // Returns nil, can't access the value var p *peer.Peer var ok bool var addr string @@ -629,44 +631,46 @@ func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) { } else { addr = p.Addr.String() } -/* -md,ok:=metadata.FromIncomingContext(ctx) -fmt.Printf("%+v%+v\n",md,ok) -if ok { -}*/ + + /* + md,ok:=metadata.FromIncomingContext(ctx) + if ok { + }*/ + switch cs.(type) { case *stats.ConnBegin: - fmt.Printf("**** client connected - [%s]\n", addr) + cc.server.log.Write("", LOG_INFO, "Client connected - %s", addr) + case *stats.ConnEnd: - fmt.Printf("**** client disconnected - [%s]\n", addr) + cc.server.log.Write("", LOG_INFO, "Client disconnected - %s", addr) cc.server.RemoveServerConnByAddr(p.Addr) } } -// wrappedStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and -// SendMsg method call. +// ------------------------------------ + type wrappedStream struct { grpc.ServerStream } -func (w *wrappedStream) RecvMsg(m any) error { - //fmt.Printf("Receive a message (Type: %T) at %s\n", m, time.Now().Format(time.RFC3339)) - return w.ServerStream.RecvMsg(m) +func (w *wrappedStream) RecvMsg(msg interface{}) error { + return w.ServerStream.RecvMsg(msg) } -func (w *wrappedStream) SendMsg(m any) error { - //fmt.Printf("Send a message (Type: %T) at %v\n", m, time.Now().Format(time.RFC3339)) - return w.ServerStream.SendMsg(m) +func (w *wrappedStream) SendMsg(msg interface{}) error { + return w.ServerStream.SendMsg(msg) } func newWrappedStream(s grpc.ServerStream) grpc.ServerStream { return &wrappedStream{s} } -func streamInterceptor(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { +func streamInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + var err error + // authentication (token verification) /* - md, ok := metadata.FromIncomingContext(ss.Context()) + md, ok = metadata.FromIncomingContext(ss.Context()) if !ok { return errMissingMetadata } @@ -675,17 +679,20 @@ func streamInterceptor(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, } */ - err := handler(srv, newWrappedStream(ss)) + err = handler(srv, newWrappedStream(ss)) if err != nil { - fmt.Printf("RPC failed with error: %v\n", err) + // TODO: LOGGING } return err } -func unaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { +func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + var v interface{} + var err error + // authentication (token verification) /* - md, ok := metadata.FromIncomingContext(ctx) + md, ok = metadata.FromIncomingContext(ctx) if !ok { return nil, errMissingMetadata } @@ -693,12 +700,14 @@ func unaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, han // return nil, errInvalidToken } */ - m, err := handler(ctx, req) + + v, err = handler(ctx, req) if err != nil { - fmt.Printf("RPC failed with error: %v\n", err) + //fmt.Printf("RPC failed with error: %v\n", err) + // TODO: Logging? } - return m, err + return v, err } func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logger Logger, tlscfg *tls.Config) (*Server, error) { @@ -746,12 +755,12 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg } gs = grpc.NewServer(grpc.Creds(creds)) */ - s.gs = grpc.NewServer( - grpc.UnaryInterceptor(unaryInterceptor), - grpc.StreamInterceptor(streamInterceptor), + s.rpc_svr = grpc.NewServer( + //grpc.UnaryInterceptor(unaryInterceptor), + //grpc.StreamInterceptor(streamInterceptor), grpc.StatsHandler(&ConnCatcher{server: &s}), - ) // TODO: have this outside the server struct? - RegisterHoduServer(s.gs, &s) + ) + RegisterHoduServer(s.rpc_svr, &s) s.ctl_prefix = "" // TODO: @@ -776,7 +785,7 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg return &s, nil oops: -/* TODO: check if gs needs to be closed... */ + // TODO: check if rpc_svr needs to be closed. probably not. closing the listen may be good enough if gl != nil { gl.Close() } @@ -796,13 +805,13 @@ func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error { l = s.rpc[idx] // it seems to be safe to call a single grpc server on differnt listening sockets multiple times - s.log.Write("", LOG_ERROR, "Starting GRPC server on %s", l.Addr().String()) - err = s.gs.Serve(l) + s.log.Write("", LOG_ERROR, "Starting RPC server on %s", l.Addr().String()) + err = s.rpc_svr.Serve(l) if err != nil { if errors.Is(err, net.ErrClosed) { - s.log.Write("", LOG_ERROR, "GRPC server on %s closed", l.Addr().String()) + s.log.Write("", LOG_ERROR, "RPC server on %s closed", l.Addr().String()) } else { - s.log.Write("", LOG_ERROR, "Error from GRPC server on %s - %s", l.Addr().String(), err.Error()) + s.log.Write("", LOG_ERROR, "Error from RPC server on %s - %s", l.Addr().String(), err.Error()) } return err } @@ -834,13 +843,13 @@ task_loop: s.ReqStop() s.rpc_wg.Wait() - s.log.Write("", LOG_DEBUG, "All GRPC listeners completed") + s.log.Write("", LOG_DEBUG, "All RPC listeners completed") s.cts_wg.Wait() s.log.Write("", LOG_DEBUG, "All CTS handlers completed") // stop the main grpc server after all the other tasks are finished. - s.gs.Stop() + s.rpc_svr.Stop() } func (s *Server) RunCtlTask(wg *sync.WaitGroup) { @@ -883,7 +892,7 @@ func (s *Server) ReqStop() { l.Close() } - s.cts_mtx.Lock() // TODO: this mya create dead-lock. check possibility of dead lock??? + s.cts_mtx.Lock() for _, cts = range s.cts_map { cts.ReqStop() // request to stop connections from/to peer held in the cts structure } @@ -901,8 +910,8 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p cts.svr = s cts.route_map = make(ServerRouteMap) - cts.raddr = *remote_addr - cts.laddr = *local_addr + cts.remote_addr = *remote_addr + cts.local_addr = *local_addr cts.pss = &GuardedPacketStreamServer{Hodu_PacketStreamServer: pss} cts.stop_req.Store(false) @@ -920,13 +929,13 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p cts.id = id cts.sid = fmt.Sprintf("%d", id) // id in string used for logging - _, ok = s.cts_map_by_addr[cts.raddr] + _, ok = s.cts_map_by_addr[cts.remote_addr] if ok { - return nil, fmt.Errorf("existing client - %s", cts.raddr.String()) + return nil, fmt.Errorf("existing client - %s", cts.remote_addr.String()) } - s.cts_map_by_addr[cts.raddr] = &cts + s.cts_map_by_addr[cts.remote_addr] = &cts s.cts_map[id] = &cts; - s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.raddr.String()) + s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.remote_addr.String()) return &cts, nil } @@ -958,7 +967,7 @@ func (s *Server) RemoveServerConn(cts *ServerConn) error { } delete(s.cts_map, cts.id) - delete(s.cts_map_by_addr, cts.raddr) + delete(s.cts_map_by_addr, cts.remote_addr) s.cts_mtx.Unlock() cts.ReqStop() @@ -977,7 +986,7 @@ func (s *Server) RemoveServerConnByAddr(addr net.Addr) error { return fmt.Errorf("non-existent connection address - %s", addr.String()) } delete(s.cts_map, cts.id) - delete(s.cts_map_by_addr, cts.raddr) + delete(s.cts_map_by_addr, cts.remote_addr) s.cts_mtx.Unlock() cts.ReqStop()