From 53777f1f606d95f543fb0a0cb254c0fd41834918 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Thu, 5 Dec 2024 01:26:44 +0900 Subject: [PATCH] cleaed up most of the logging lines --- client-ctl.go | 28 ++--- client-peer.go | 14 ++- client.go | 330 +++++++++++++++++++++++++++++++------------------ cmd/main.go | 4 +- hodu.pb.go | 102 ++++++++------- hodu.proto | 21 ++-- packet.go | 5 +- server-peer.go | 32 +++-- server.go | 28 ++++- 9 files changed, 337 insertions(+), 227 deletions(-) diff --git a/client-ctl.go b/client-ctl.go index 72b654a..b699985 100644 --- a/client-ctl.go +++ b/client-ctl.go @@ -2,6 +2,7 @@ package hodu import "encoding/json" import "net/http" +import "net/url" import "strconv" /* @@ -86,14 +87,6 @@ type client_ctl_client_conns_id_routes_id_peers_id struct { c *Client } -type client_ctl_clients struct { - c *Client -} - -type client_ctl_clients_id struct { - c *Client -} - // ------------------------------------ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.Request) { @@ -109,6 +102,13 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R case http.MethodGet: var cts *ClientConn var js []json_out_client_conn + var q url.Values + + q = req.URL.Query() + +// TODO: brief listing vs full listing + if q.Get("brief") == "true" { + } js = make([]json_out_client_conn, 0) c.cts_mtx.Lock() @@ -321,7 +321,7 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r var r *ClientRoute err = json.NewDecoder(req.Body).Decode(&jcr) - if err != nil { + if err != nil || jcr.ClientPeerAddr == "" { status_code = http.StatusBadRequest; w.WriteHeader(status_code) goto done } @@ -582,13 +582,3 @@ oops: c.log.Write("", LOG_ERROR, "[%s] %s %s - %s", req.RemoteAddr, req.Method, req.URL.String(), err.Error()) return } - -// ------------------------------------ - -func (ctl *client_ctl_clients) ServeHTTP(w http.ResponseWriter, req *http.Request) { -} - -// ------------------------------------ - -func (ctl *client_ctl_clients_id) ServeHTTP(w http.ResponseWriter, req *http.Request) { -} diff --git a/client-peer.go b/client-peer.go index 909cfaa..80b0c24 100644 --- a/client-peer.go +++ b/client-peer.go @@ -1,5 +1,7 @@ package hodu +import "errors" +import "io" import "net" import "sync" @@ -27,9 +29,15 @@ func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error { for { n, err = cpc.conn.Read(buf[:]) if err != nil { - cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR, - "Failed to read from the client-side peer(%d,%d,%s,%s) - %s", - cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error()) + if errors.Is(err, io.EOF) { + cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_INFO, + "Client-side peer(%d,%d,%s,%s) closed", + cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String()) + } else { + cpc.route.cts.cli.log.Write(cpc.route.cts.sid, LOG_ERROR, + "Failed to read from client-side peer(%d,%d,%s,%s) - %s", + cpc.route.id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String(), err.Error()) + } break } diff --git a/client.go b/client.go index cac7f0c..0a9b5e3 100644 --- a/client.go +++ b/client.go @@ -157,11 +157,11 @@ func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id uint32, pts_ra var ptc *ClientPeerConn r.ptc_mtx.Lock() - defer r.ptc_mtx.Unlock() - ptc = NewClientPeerConn(r, c, pts_id, pts_raddr, pts_laddr) r.ptc_map[ptc.conn_id] = ptc + r.ptc_mtx.Unlock() + r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) return ptc, nil } @@ -177,11 +177,12 @@ func (r *ClientRoute) RemoveClientPeerConn(ptc *ClientPeerConn) error { } if c != ptc { r.ptc_mtx.Unlock() - return fmt.Errorf("non-existent peer id - %d", ptc.conn_id) + return fmt.Errorf("conflicting peer id - %d", ptc.conn_id) } delete(r.ptc_map, ptc.conn_id) r.ptc_mtx.Unlock() + r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) ptc.ReqStop() return nil } @@ -252,6 +253,7 @@ done: r.cts.cli.log.Write(r.cts.sid, LOG_DEBUG, "Sending route_stop for route(%d,%s) to %s", r.id, r.peer_addr, r.cts.remote_addr) r.cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr)) + r.cts.RemoveClientRoute(r) } @@ -269,6 +271,8 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, pts_raddr string, pts_laddr s var err error var conn net.Conn var real_conn *net.TCPConn + var real_conn_raddr string + var real_conn_laddr string var ptc *ClientPeerConn var d net.Dialer var ctx context.Context @@ -292,28 +296,38 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, pts_raddr string, pts_laddr s r.ptc_mtx.Unlock() if err != nil { -// TODO: make send peer started failure mesage? - fmt.Printf("failed to connect to %s - %s\n", r.peer_addr, err.Error()) + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, + "Failed to connect to %s for route(%d,%d,%s,%s) - %s", + r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted } real_conn, ok = conn.(*net.TCPConn) if !ok { - fmt.Printf("not tcp connection - %s\n", err.Error()) + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, + "Failed to get connection information to %s for route(%d,%d,%s,%s) - %s", + r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted } + real_conn_raddr = real_conn.RemoteAddr().String() + real_conn_laddr = real_conn.LocalAddr().String() ptc, err = r.AddNewClientPeerConn(real_conn, pts_id, pts_raddr, pts_laddr) if err != nil { - // TODO: logging -// TODO: make send peer started failure mesage? - fmt.Printf("YYYYYYYY - %s\n", err.Error()) + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, + "Failed to add client peer %s for route(%d,%d,%s,%s) - %s", + r.peer_addr, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted } - err = r.cts.psc.Send(MakePeerStartedPacket(r.id, ptc.conn_id, real_conn.RemoteAddr().String(), real_conn.LocalAddr().String())) + // ptc.conn is equal to pts_id as assigned in r.AddNewClientPeerConn() + + err = r.cts.psc.Send(MakePeerStartedPacket(r.id, ptc.conn_id, real_conn_raddr, real_conn_laddr)) if err != nil { - fmt.Printf("CLOSING NEW SERVER PEER STAK - %s\n", err.Error()) + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, + "Failed to send peer_start(%d,%d,%s,%s) for route(%d,%d,%s,%s) - %s", + r.id, ptc.conn_id, real_conn_raddr, real_conn_laddr, + r.id, pts_id, pts_raddr, pts_laddr, err.Error()) goto peer_aborted } @@ -322,31 +336,30 @@ func (r *ClientRoute) ConnectToPeer(pts_id uint32, pts_raddr string, pts_laddr s return peer_aborted: + // real_conn_radd and real_conn_laddr may be empty depending on when the jump to here is made. + err = r.cts.psc.Send(MakePeerAbortedPacket(r.id, pts_id, real_conn_raddr, real_conn_laddr)) + if err != nil { + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, + "Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s", + r.id, pts_id, r.id, pts_id, pts_raddr, pts_laddr, err.Error()) + } if conn != nil { conn.Close() - err = r.cts.psc.Send(MakePeerAbortedPacket(r.id, ptc.conn_id)) - if err != nil { - // TODO: logging - } } } -func (r *ClientRoute) DisconnectFromPeer(pts_id uint32) error { - var ptc *ClientPeerConn +func (r *ClientRoute) DisconnectFromPeer(ptc* ClientPeerConn) error { + var p *ClientPeerConn var cancel context.CancelFunc var ok bool r.ptc_mtx.Lock() - cancel, ok = r.ptc_cancel_map[pts_id] - if ok { -fmt.Printf("~~~~~~~~~~~~~~~~ cancelling.....\n") - cancel() - } - - ptc, ok = r.ptc_map[pts_id] - if !ok { - r.ptc_mtx.Unlock() - return fmt.Errorf("non-existent connection id - %u", pts_id) + p, ok = r.ptc_map[ptc.conn_id] + if ok && p == ptc { + cancel, ok = r.ptc_cancel_map[ptc.conn_id] + if ok { + cancel() + } } r.ptc_mtx.Unlock() @@ -354,96 +367,138 @@ fmt.Printf("~~~~~~~~~~~~~~~~ cancelling.....\n") return nil } -func (r *ClientRoute) CloseWriteToPeer(pts_id uint32) error { - var ptc *ClientPeerConn - var ok bool - - r.ptc_mtx.Lock() - ptc, ok = r.ptc_map[pts_id] - if !ok { - r.ptc_mtx.Unlock() - return fmt.Errorf("non-existent connection id - %u", pts_id) - } - r.ptc_mtx.Unlock() - - ptc.CloseWrite() - return nil -} - func (r *ClientRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data interface{}) error { var err error switch event_type { case PACKET_KIND_ROUTE_STARTED: var ok bool - var str string - str, ok = event_data.(string) + var rd *RouteDesc + rd, ok = event_data.(*RouteDesc) if !ok { - // TODO: internal error -fmt.Printf("INTERNAL ERROR. MUST NOT HAPPEN\n") + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.id) + r.ReqStop() } else { var addr *net.TCPAddr - addr, err = net.ResolveTCPAddr("tcp", str) + addr, err = net.ResolveTCPAddr("tcp", rd.AddrStr) if err != nil { - // TODO: + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Protocol error - invalid service address(%s) for server peer in route_started event(%d)", rd.AddrStr, r.id) + r.ReqStop() } else { r.server_peer_listen_addr = addr } } case PACKET_KIND_ROUTE_STOPPED: - // TODO: + // this is the service side notification agasint ROUTE_STOP send by client itself. + // so there is nothing to do for now case PACKET_KIND_PEER_STARTED: -fmt.Printf("GOT PEER STARTD . CONENCT TO CLIENT_SIDE PEER\n") var ok bool var pd *PeerDesc pd, ok = event_data.(*PeerDesc) if !ok { - // TODO: internal error -fmt.Printf("INTERNAL ERROR. MUST NOT HAPPEN\n") + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, + "Protocol error - invalid data in peer_started event(%d,%d)", r.id, pts_id) + r.ReqStop() } else { r.ptc_wg.Add(1) go r.ConnectToPeer(pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, &r.ptc_wg) } case PACKET_KIND_PEER_ABORTED: - fallthrough + var ptc *ClientPeerConn + + ptc = r.FindClientPeerConnById(pts_id) + if ptc != nil { + var ok bool + var pd *PeerDesc + + pd, ok = event_data.(*PeerDesc) + if !ok { + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, + "Protocol error - invalid data in peer_aborted event(%d,%d)", r.id, pts_id) + r.ReqStop() + } else { + err = r.DisconnectFromPeer(ptc) + if err != nil { + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, + "Failed to disconnect from peer(%d,%d,%s,%s) - %s", + r.id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error()) + ptc.ReqStop() + } + } + } + case PACKET_KIND_PEER_STOPPED: -fmt.Printf("GOT PEER STOPPED . DISCONNECTION FROM CLIENT_SIDE PEER\n") - err = r.DisconnectFromPeer(pts_id) - if err != nil { - // TODO: + var ptc *ClientPeerConn + + ptc = r.FindClientPeerConnById(pts_id) + if ptc != nil { + var ok bool + var pd *PeerDesc + + pd, ok = event_data.(*PeerDesc) + if !ok { + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, + "Protocol error - invalid data in peer_stopped event(%d,%d)", + r.id, pts_id) + ptc.ReqStop() + } else { + err = r.DisconnectFromPeer(ptc) + if err != nil { + r.cts.cli.log.Write(r.cts.sid, LOG_WARN, + "Failed to disconnect from peer(%d,%d,%s,%s) - %s", + r.id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error()) + ptc.ReqStop() + } + } } case PACKET_KIND_PEER_EOF: -fmt.Printf("GOT PEER EOF. REMEMBER EOF\n") - err = r.CloseWriteToPeer(pts_id) - if err != nil { - // TODO: + var ptc *ClientPeerConn + + ptc = r.FindClientPeerConnById(pts_id) + if ptc != nil { + var ok bool + + _, ok = event_data.(*PeerDesc) + if !ok { + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, + "Protocol error - invalid data in peer_eof event(%d,%d)", + r.id, pts_id) + ptc.ReqStop() + } else { + ptc.CloseWrite() + } } case PACKET_KIND_PEER_DATA: var ptc *ClientPeerConn - var ok bool - var err error - var data []byte - ptc, ok = r.ptc_map[pts_id] - if ok { + ptc = r.FindClientPeerConnById(pts_id) + if ptc != nil { + var ok bool + var data []byte + data, ok = event_data.([]byte) - if ok { - _, err = ptc.conn.Write(data) - return err + if !ok { + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, + "Protocol error - invalid data in peer_data event(%d,%d)", + r.id, pts_id) + ptc.ReqStop() } else { - // internal error + _, err = ptc.conn.Write(data) + if err != nil { + r.cts.cli.log.Write(r.cts.sid, LOG_ERROR, "Failed to write to peer(%d,%d,%s,%s) - %s", r.id, pts_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String(), err.Error()) + ptc.ReqStop() + } } - } else { - } - // TODO: other types + default: + // ignore all others } return nil @@ -487,7 +542,8 @@ func (cts *ClientConn) AddNewClientRoute(addr string, proto ROUTE_PROTO) (*Clien cts.route_map[id] = r cts.route_mtx.Unlock() -fmt.Printf("added client route.... %d -> %d\n", id, len(cts.route_map)) + cts.cli.log.Write(cts.sid, LOG_INFO, "Added route(%d,%s)", id, addr) + cts.route_wg.Add(1) go r.RunTask(&cts.route_wg) return r, nil @@ -528,11 +584,13 @@ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error { } if r != route { cts.route_mtx.Unlock() - return fmt.Errorf("non-existent route id - %d", route.id) + return fmt.Errorf("conflicting route id - %d", route.id) } delete(cts.route_map, route.id) cts.route_mtx.Unlock() + cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", route.id, route.peer_addr) + r.ReqStop() return nil } @@ -550,6 +608,8 @@ func (cts *ClientConn) RemoveClientRouteById(route_id uint32) error { delete(cts.route_map, route_id) cts.route_mtx.Unlock() + cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.id, r.peer_addr) + r.ReqStop() return nil } @@ -663,22 +723,22 @@ start_over: cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc} - // the connection structure to a server is ready. - // let's add routes to the client-side peers. - err = cts.AddClientRoutes(cts.cfg.PeerAddrs) - if err != nil { - cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to add routes to server %s for %v - %s", cts.cfg.ServerAddr, cts.cfg.PeerAddrs, err.Error()) - goto done + if len(cts.cfg.PeerAddrs) > 0 { + // the connection structure to a server is ready. + // let's add routes to the client-side peers if given + err = cts.AddClientRoutes(cts.cfg.PeerAddrs) + if err != nil { + cts.cli.log.Write(cts.sid, LOG_ERROR, "Failed to add routes to server %s for %v - %s", cts.cfg.ServerAddr, cts.cfg.PeerAddrs, err.Error()) + goto done + } } -fmt.Printf("[%v]\n", cts.route_map) - for { var pkt *Packet select { case <-cts.cli.ctx.Done(): -fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) + // need to log cts.cli.ctx.Err().Error()? goto done case <-cts.stop_chan: @@ -694,7 +754,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) { goto reconnect_to_server } else { - cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.cfg.ServerAddr, err.Error()) + cts.cli.log.Write(cts.sid, LOG_INFO, "Failed to receive packet form server %s - %s", cts.remote_addr, err.Error()) goto reconnect_to_server } } @@ -706,15 +766,18 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) var ok bool x, ok = pkt.U.(*Packet_Route) if ok { - fmt.Printf ("SERVER LISTENING ON %s\n", x.Route.AddrStr) - err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, x.Route.AddrStr) + err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, x.Route) if err != nil { - // TODO: + cts.cli.log.Write(cts.sid, LOG_ERROR, + "Failed to handle route_started event(%d,%s) from %s - %s", + x.Route.RouteId, x.Route.AddrStr, cts.remote_addr, err.Error()) } else { - // TODO: + cts.cli.log.Write(cts.sid, LOG_DEBUG, + "Handled route_started event(%d,%s) from %s", + x.Route.RouteId, x.Route.AddrStr, cts.remote_addr) } } else { - // TODO: send invalid request... or simply keep quiet? + cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid route_started event from %s", cts.remote_addr) } case PACKET_KIND_ROUTE_STOPPED: @@ -722,14 +785,18 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) var ok bool x, ok = pkt.U.(*Packet_Route) if ok { - err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil) + err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, x.Route) if err != nil { - // TODO: + cts.cli.log.Write(cts.sid, LOG_ERROR, + "Failed to handle route_stopped event(%d,%s) from %s - %s", + x.Route.RouteId, x.Route.AddrStr, cts.remote_addr, err.Error()) } else { - // TODO: + cts.cli.log.Write(cts.sid, LOG_DEBUG, + "Handled route_stopped event(%d,%s) from %s", + x.Route.RouteId, x.Route.AddrStr, cts.remote_addr) } } else { - // TODO: send invalid request... or simply keep quiet? + cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid route_stopped event from %s", cts.remote_addr) } case PACKET_KIND_PEER_STARTED: @@ -740,28 +807,39 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) if ok { err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, x.Peer) if err != nil { - // TODO: + cts.cli.log.Write(cts.sid, LOG_ERROR, + "Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s", + cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { - // TODO: + cts.cli.log.Write(cts.sid, LOG_DEBUG, + "Handled peer_started event from %s for peer(%d,%d,%s,%s)", + cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { - // TODO + cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr) } + // PACKET_KIND_PEER_ABORTED is never sent by server to client. + // the code here doesn't handle the event. + case PACKET_KIND_PEER_STOPPED: // the connection from the client to a peer has been established var x *Packet_Peer var ok bool x, ok = pkt.U.(*Packet_Peer) if ok { - err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil) + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, x.Peer) if err != nil { - // TODO: + cts.cli.log.Write(cts.sid, LOG_ERROR, + "Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s", + cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { - // TODO: + cts.cli.log.Write(cts.sid, LOG_DEBUG, + "Handled peer_stopped event from %s for peer(%d,%d,%s,%s)", + cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { - // TODO + cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr) } case PACKET_KIND_PEER_EOF: @@ -769,40 +847,50 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) var ok bool x, ok = pkt.U.(*Packet_Peer) if ok { - err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_EOF, nil) + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_EOF, x.Peer) if err != nil { - // TODO: + cts.cli.log.Write(cts.sid, LOG_ERROR, + "Failed to handle peer_eof event from %s for peer(%d,%d,%s,%s) - %s", + cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { - // TODO: + cts.cli.log.Write(cts.sid, LOG_DEBUG, + "Handled peer_eof event from %s for peer(%d,%d,%s,%s)", + cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) } } else { - // TODO + cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_eof event from %s", cts.remote_addr) } case PACKET_KIND_PEER_DATA: // the connection from the client to a peer has been established - //fmt.Printf ("**** GOT PEER DATA\n") var x *Packet_Data var ok bool x, ok = pkt.U.(*Packet_Data) if ok { err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data) if err != nil { - fmt.Printf("failed to report event - %s\n", err.Error()) - // TODO: + cts.cli.log.Write(cts.sid, LOG_ERROR, + "Failed to handle peer_data event from %s for peer(%d,%d) - %s", + cts.remote_addr, x.Data.RouteId, x.Data.PeerId, err.Error()) } else { - // TODO: + cts.cli.log.Write(cts.sid, LOG_DEBUG, + "Handled peer_data event from %s for peer(%d,%d)", + cts.remote_addr, x.Data.RouteId, x.Data.PeerId) } } else { - // TODO + cts.cli.log.Write(cts.sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr) } + + default: + // do nothing. ignore the rest } } done: - cts.cli.log.Write("", LOG_INFO, "Disconnected from server %s", cts.cfg.ServerAddr) + cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server %s", cts.cfg.ServerAddr) //cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination cts.ReqStop() + wait_for_termination: cts.route_wg.Wait() // wait until all route tasks are finished cts.cli.RemoveClientConn(cts) @@ -815,7 +903,7 @@ reconnect_to_server: slpctx, _ = context.WithTimeout(cts.cli.ctx, 2 * time.Second) select { case <-cts.cli.ctx.Done(): - fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) + // need to log cts.cli.ctx.Err().Error()? goto done case <-cts.stop_chan: // this signal indicates that ReqStop() has been called @@ -865,8 +953,6 @@ func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, tlscfg *t c.ctl_mux.Handle(c.ctl_prefix + "/client-conns/{conn_id}/routes/{route_id}", &client_ctl_client_conns_id_routes_id{c: &c}) c.ctl_mux.Handle(c.ctl_prefix + "/client-conns/{conn_id}/routes/{route_id}/peers", &client_ctl_client_conns_id_routes_id_peers{c: &c}) c.ctl_mux.Handle(c.ctl_prefix + "/client-conns/{conn_id}/routes/{route_id}/peers/{peer_id}", &client_ctl_client_conns_id_routes_id_peers_id{c: &c}) - c.ctl_mux.Handle(c.ctl_prefix + "/server-conns", &client_ctl_clients{c: &c}) - c.ctl_mux.Handle(c.ctl_prefix + "/server-conns/{id}", &client_ctl_clients_id{c: &c}) c.ctl_addr = make([]string, len(ctl_addrs)) c.ctl = make([]*http.Server, len(ctl_addrs)) @@ -890,10 +976,9 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) { cts = NewClientConn(c, cfg) c.cts_mtx.Lock() - defer c.cts_mtx.Unlock() - _, ok = c.cts_map_by_addr[cfg.ServerAddr] if ok { + c.cts_mtx.Unlock() return nil, fmt.Errorf("existing server - %s", cfg.ServerAddr) } @@ -909,7 +994,9 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) { c.cts_map_by_addr[cfg.ServerAddr] = cts c.cts_map[id] = cts -fmt.Printf("ADD total servers %d\n", len(c.cts_map_by_addr)) + c.cts_mtx.Unlock() + + c.log.Write ("", LOG_INFO, "Added client connection(%d) to %s", cts.id, cfg.ServerAddr) return cts, nil } @@ -950,14 +1037,15 @@ func (c *Client) RemoveClientConn(cts *ClientConn) error { } if conn != cts { c.cts_mtx.Unlock() - return fmt.Errorf("non-existent connection id - %d", cts.id) + return fmt.Errorf("conflicting connection id - %d", cts.id) } delete(c.cts_map_by_addr, cts.cfg.ServerAddr) delete(c.cts_map, cts.id) -fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.cfg.ServerAddr, len(c.cts_map_by_addr)) c.cts_mtx.Unlock() + c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr) + cts.ReqStop() return nil } @@ -978,9 +1066,9 @@ func (c *Client) RemoveClientConnById(conn_id uint32) error { delete(c.cts_map_by_addr, cts.cfg.ServerAddr) delete(c.cts_map, cts.id) -fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.cfg.ServerAddr, len(c.cts_map_by_addr)) c.cts_mtx.Unlock() + c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr) cts.ReqStop() return nil } diff --git a/cmd/main.go b/cmd/main.go index 00d77a6..94bc8ea 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -264,7 +264,7 @@ func main() { goto wrong_usage } - if len(rpc_addrs) < 1 || flgs.NArg() < 1 { + if len(rpc_addrs) < 1 { goto wrong_usage } err = client_main(ctl_addrs, rpc_addrs[0], flgs.Args()) @@ -280,7 +280,7 @@ func main() { wrong_usage: fmt.Fprintf(os.Stderr, "USAGE: %s server --rpc-on=addr:port --ctl-on=addr:port \n", os.Args[0]) - fmt.Fprintf(os.Stderr, " %s client --rpc-server=addr:port --ctl-on=addr:port peer-addr:peer-port\n", os.Args[0]) + fmt.Fprintf(os.Stderr, " %s client --rpc-server=addr:port --ctl-on=addr:port [peer-addr:peer-port ...]\n", os.Args[0]) os.Exit(1) oops: diff --git a/hodu.pb.go b/hodu.pb.go index 75583c2..a5a8790 100644 --- a/hodu.pb.go +++ b/hodu.pb.go @@ -72,46 +72,43 @@ func (ROUTE_PROTO) EnumDescriptor() ([]byte, []int) { type PACKET_KIND int32 const ( - PACKET_KIND_ERROR PACKET_KIND = 0 // generic error response - PACKET_KIND_OK PACKET_KIND = 1 // generic success response - PACKET_KIND_ROUTE_START PACKET_KIND = 2 - PACKET_KIND_ROUTE_STOP PACKET_KIND = 3 - PACKET_KIND_ROUTE_STARTED PACKET_KIND = 4 - PACKET_KIND_ROUTE_STOPPED PACKET_KIND = 5 - PACKET_KIND_PEER_STARTED PACKET_KIND = 6 - PACKET_KIND_PEER_STOPPED PACKET_KIND = 7 - PACKET_KIND_PEER_ABORTED PACKET_KIND = 8 - PACKET_KIND_PEER_EOF PACKET_KIND = 9 - PACKET_KIND_PEER_DATA PACKET_KIND = 10 + PACKET_KIND_RESERVED PACKET_KIND = 0 // not used + PACKET_KIND_ROUTE_START PACKET_KIND = 1 + PACKET_KIND_ROUTE_STOP PACKET_KIND = 2 + PACKET_KIND_ROUTE_STARTED PACKET_KIND = 3 + PACKET_KIND_ROUTE_STOPPED PACKET_KIND = 4 + PACKET_KIND_PEER_STARTED PACKET_KIND = 5 + PACKET_KIND_PEER_STOPPED PACKET_KIND = 6 + PACKET_KIND_PEER_ABORTED PACKET_KIND = 7 + PACKET_KIND_PEER_EOF PACKET_KIND = 8 + PACKET_KIND_PEER_DATA PACKET_KIND = 9 ) // Enum value maps for PACKET_KIND. var ( PACKET_KIND_name = map[int32]string{ - 0: "ERROR", - 1: "OK", - 2: "ROUTE_START", - 3: "ROUTE_STOP", - 4: "ROUTE_STARTED", - 5: "ROUTE_STOPPED", - 6: "PEER_STARTED", - 7: "PEER_STOPPED", - 8: "PEER_ABORTED", - 9: "PEER_EOF", - 10: "PEER_DATA", + 0: "RESERVED", + 1: "ROUTE_START", + 2: "ROUTE_STOP", + 3: "ROUTE_STARTED", + 4: "ROUTE_STOPPED", + 5: "PEER_STARTED", + 6: "PEER_STOPPED", + 7: "PEER_ABORTED", + 8: "PEER_EOF", + 9: "PEER_DATA", } PACKET_KIND_value = map[string]int32{ - "ERROR": 0, - "OK": 1, - "ROUTE_START": 2, - "ROUTE_STOP": 3, - "ROUTE_STARTED": 4, - "ROUTE_STOPPED": 5, - "PEER_STARTED": 6, - "PEER_STOPPED": 7, - "PEER_ABORTED": 8, - "PEER_EOF": 9, - "PEER_DATA": 10, + "RESERVED": 0, + "ROUTE_START": 1, + "ROUTE_STOP": 2, + "ROUTE_STARTED": 3, + "ROUTE_STOPPED": 4, + "PEER_STARTED": 5, + "PEER_STOPPED": 6, + "PEER_ABORTED": 7, + "PEER_EOF": 8, + "PEER_DATA": 9, } ) @@ -434,7 +431,7 @@ func (x *Packet) GetKind() PACKET_KIND { if x != nil { return x.Kind } - return PACKET_KIND_ERROR + return PACKET_KIND_RESERVED } func (m *Packet) GetU() isPacket_U { @@ -526,25 +523,24 @@ var file_hodu_proto_rawDesc = []byte{ 0x00, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x42, 0x03, 0x0a, 0x01, 0x55, 0x2a, 0x2a, 0x0a, 0x0b, 0x52, 0x4f, 0x55, 0x54, 0x45, 0x5f, 0x50, 0x52, 0x4f, 0x54, 0x4f, 0x12, 0x07, 0x0a, 0x03, 0x54, 0x43, 0x50, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x54, 0x43, 0x50, 0x34, 0x10, 0x01, 0x12, 0x08, - 0x0a, 0x04, 0x54, 0x43, 0x50, 0x36, 0x10, 0x02, 0x2a, 0xba, 0x01, 0x0a, 0x0b, 0x50, 0x41, 0x43, - 0x4b, 0x45, 0x54, 0x5f, 0x4b, 0x49, 0x4e, 0x44, 0x12, 0x09, 0x0a, 0x05, 0x45, 0x52, 0x52, 0x4f, - 0x52, 0x10, 0x00, 0x12, 0x06, 0x0a, 0x02, 0x4f, 0x4b, 0x10, 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x52, - 0x4f, 0x55, 0x54, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a, - 0x52, 0x4f, 0x55, 0x54, 0x45, 0x5f, 0x53, 0x54, 0x4f, 0x50, 0x10, 0x03, 0x12, 0x11, 0x0a, 0x0d, - 0x52, 0x4f, 0x55, 0x54, 0x45, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44, 0x10, 0x04, 0x12, - 0x11, 0x0a, 0x0d, 0x52, 0x4f, 0x55, 0x54, 0x45, 0x5f, 0x53, 0x54, 0x4f, 0x50, 0x50, 0x45, 0x44, - 0x10, 0x05, 0x12, 0x10, 0x0a, 0x0c, 0x50, 0x45, 0x45, 0x52, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, - 0x45, 0x44, 0x10, 0x06, 0x12, 0x10, 0x0a, 0x0c, 0x50, 0x45, 0x45, 0x52, 0x5f, 0x53, 0x54, 0x4f, - 0x50, 0x50, 0x45, 0x44, 0x10, 0x07, 0x12, 0x10, 0x0a, 0x0c, 0x50, 0x45, 0x45, 0x52, 0x5f, 0x41, - 0x42, 0x4f, 0x52, 0x54, 0x45, 0x44, 0x10, 0x08, 0x12, 0x0c, 0x0a, 0x08, 0x50, 0x45, 0x45, 0x52, - 0x5f, 0x45, 0x4f, 0x46, 0x10, 0x09, 0x12, 0x0d, 0x0a, 0x09, 0x50, 0x45, 0x45, 0x52, 0x5f, 0x44, - 0x41, 0x54, 0x41, 0x10, 0x0a, 0x32, 0x49, 0x0a, 0x04, 0x48, 0x6f, 0x64, 0x75, 0x12, 0x19, 0x0a, - 0x07, 0x47, 0x65, 0x74, 0x53, 0x65, 0x65, 0x64, 0x12, 0x05, 0x2e, 0x53, 0x65, 0x65, 0x64, 0x1a, - 0x05, 0x2e, 0x53, 0x65, 0x65, 0x64, 0x22, 0x00, 0x12, 0x26, 0x0a, 0x0c, 0x50, 0x61, 0x63, 0x6b, - 0x65, 0x74, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x07, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x65, - 0x74, 0x1a, 0x07, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, - 0x42, 0x08, 0x5a, 0x06, 0x2e, 0x2f, 0x68, 0x6f, 0x64, 0x75, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x0a, 0x04, 0x54, 0x43, 0x50, 0x36, 0x10, 0x02, 0x2a, 0xb5, 0x01, 0x0a, 0x0b, 0x50, 0x41, 0x43, + 0x4b, 0x45, 0x54, 0x5f, 0x4b, 0x49, 0x4e, 0x44, 0x12, 0x0c, 0x0a, 0x08, 0x52, 0x45, 0x53, 0x45, + 0x52, 0x56, 0x45, 0x44, 0x10, 0x00, 0x12, 0x0f, 0x0a, 0x0b, 0x52, 0x4f, 0x55, 0x54, 0x45, 0x5f, + 0x53, 0x54, 0x41, 0x52, 0x54, 0x10, 0x01, 0x12, 0x0e, 0x0a, 0x0a, 0x52, 0x4f, 0x55, 0x54, 0x45, + 0x5f, 0x53, 0x54, 0x4f, 0x50, 0x10, 0x02, 0x12, 0x11, 0x0a, 0x0d, 0x52, 0x4f, 0x55, 0x54, 0x45, + 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44, 0x10, 0x03, 0x12, 0x11, 0x0a, 0x0d, 0x52, 0x4f, + 0x55, 0x54, 0x45, 0x5f, 0x53, 0x54, 0x4f, 0x50, 0x50, 0x45, 0x44, 0x10, 0x04, 0x12, 0x10, 0x0a, + 0x0c, 0x50, 0x45, 0x45, 0x52, 0x5f, 0x53, 0x54, 0x41, 0x52, 0x54, 0x45, 0x44, 0x10, 0x05, 0x12, + 0x10, 0x0a, 0x0c, 0x50, 0x45, 0x45, 0x52, 0x5f, 0x53, 0x54, 0x4f, 0x50, 0x50, 0x45, 0x44, 0x10, + 0x06, 0x12, 0x10, 0x0a, 0x0c, 0x50, 0x45, 0x45, 0x52, 0x5f, 0x41, 0x42, 0x4f, 0x52, 0x54, 0x45, + 0x44, 0x10, 0x07, 0x12, 0x0c, 0x0a, 0x08, 0x50, 0x45, 0x45, 0x52, 0x5f, 0x45, 0x4f, 0x46, 0x10, + 0x08, 0x12, 0x0d, 0x0a, 0x09, 0x50, 0x45, 0x45, 0x52, 0x5f, 0x44, 0x41, 0x54, 0x41, 0x10, 0x09, + 0x32, 0x49, 0x0a, 0x04, 0x48, 0x6f, 0x64, 0x75, 0x12, 0x19, 0x0a, 0x07, 0x47, 0x65, 0x74, 0x53, + 0x65, 0x65, 0x64, 0x12, 0x05, 0x2e, 0x53, 0x65, 0x65, 0x64, 0x1a, 0x05, 0x2e, 0x53, 0x65, 0x65, + 0x64, 0x22, 0x00, 0x12, 0x26, 0x0a, 0x0c, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x12, 0x07, 0x2e, 0x50, 0x61, 0x63, 0x6b, 0x65, 0x74, 0x1a, 0x07, 0x2e, 0x50, + 0x61, 0x63, 0x6b, 0x65, 0x74, 0x22, 0x00, 0x28, 0x01, 0x30, 0x01, 0x42, 0x08, 0x5a, 0x06, 0x2e, + 0x2f, 0x68, 0x6f, 0x64, 0x75, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/hodu.proto b/hodu.proto index 17dd8d9..5e8a4c3 100644 --- a/hodu.proto +++ b/hodu.proto @@ -40,17 +40,16 @@ message PeerData { }; enum PACKET_KIND { - ERROR = 0; // generic error response - OK = 1; // generic success response - ROUTE_START = 2; - ROUTE_STOP = 3; - ROUTE_STARTED = 4; - ROUTE_STOPPED = 5; - PEER_STARTED = 6; - PEER_STOPPED = 7; - PEER_ABORTED = 8; - PEER_EOF = 9; - PEER_DATA = 10; + RESERVED = 0; // not used + ROUTE_START = 1; + ROUTE_STOP = 2; + ROUTE_STARTED = 3; + ROUTE_STOPPED = 4; + PEER_STARTED = 5; + PEER_STOPPED = 6; + PEER_ABORTED = 7; + PEER_EOF = 8; + PEER_DATA = 9; }; message Packet { diff --git a/packet.go b/packet.go index 89e5f77..b757393 100644 --- a/packet.go +++ b/packet.go @@ -37,9 +37,10 @@ func MakePeerStoppedPacket(route_id uint32, peer_id uint32, remote_addr string, } } -func MakePeerAbortedPacket(route_id uint32, peer_id uint32) *Packet { +func MakePeerAbortedPacket(route_id uint32, peer_id uint32, remote_addr string, local_addr string) *Packet { return &Packet{Kind: PACKET_KIND_PEER_ABORTED, - U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id}}} + U: &Packet_Peer{Peer: &PeerDesc{RouteId: route_id, PeerId: peer_id, RemoteAddrStr: remote_addr, LocalAddrStr: local_addr}}, + } } func MakePeerEofPacket(route_id uint32, peer_id uint32) *Packet { diff --git a/server-peer.go b/server-peer.go index 6005af3..4b95181 100644 --- a/server-peer.go +++ b/server-peer.go @@ -45,16 +45,21 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) { var buf [4096]byte var tmr *time.Timer var status bool - var err error = nil + var err error + var conn_raddr string + var conn_laddr string defer wg.Done() + conn_raddr = spc.conn.RemoteAddr().String() + conn_laddr = spc.conn.LocalAddr().String() + pss = spc.route.cts.pss - err = pss.Send(MakePeerStartedPacket(spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String())) + err = pss.Send(MakePeerStartedPacket(spc.route.id, spc.conn_id, conn_raddr, conn_laddr)) if err != nil { spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, "Failed to send peer_started event(%d,%d,%s,%s) to client - %s", - spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error()) + spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) goto done_without_stop } @@ -86,17 +91,18 @@ wait_for_started: n, err = spc.conn.Read(buf[:]) if err != nil { if errors.Is(err, io.EOF) { - if pss.Send(MakePeerEofPacket(spc.route.id, spc.conn_id)) != nil { + err = pss.Send(MakePeerEofPacket(spc.route.id, spc.conn_id)) + if err != nil { spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, "Failed to send peer_eof event(%d,%d,%s,%s) to client - %s", - spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error()) + spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) goto done } goto wait_for_stopped } else { spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, "Failed to read data from peer(%d,%d,%s,%s) - %s", - spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error()) + spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) goto done } } @@ -105,7 +111,7 @@ wait_for_started: if err != nil { spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, "Failed to send data from peer(%d,%d,%s,%s) to client - %s", - spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error()) + spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) goto done } } @@ -121,10 +127,11 @@ wait_for_stopped: } done: - if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String())) != nil { + err = pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String())) + if err != nil { spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, "Failed to send peer_stopped(%d,%d,%s,%s) to client - %s", - spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String(), spc.conn.LocalAddr().String(), err.Error()) + spc.route.id, spc.conn_id, conn_raddr, conn_laddr, err.Error()) // nothing much to do about the failure of sending this } @@ -156,6 +163,9 @@ func (spc *ServerPeerConn) ReportEvent(event_type PACKET_KIND, event_data interf spc.client_peer_status_chan <- true } + case PACKET_KIND_PEER_ABORTED: + spc.ReqStop() + case PACKET_KIND_PEER_STOPPED: // this event needs to close on the server-side peer connection. // sending false to the client_peer_status_chan isn't good enough to break @@ -186,14 +196,14 @@ func (spc *ServerPeerConn) ReportEvent(event_type PACKET_KIND, event_data interf } else { // this must not happen. spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, - "internal server error - invalid data in peer_data event from %s to peer(%d,%d,%s)", + "Protocol error - invalid data in peer_data event from %s to peer(%d,%d,%s)", spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String()) spc.ReqStop() } } else { // protocol error. the client must not relay more data from the client-side peer after EOF. spc.route.cts.svr.log.Write(spc.route.cts.sid, LOG_ERROR, - "internal client error - redundant data from %s to (%d,%d,%s)", + "Protocol error - redundant data from %s to (%d,%d,%s)", spc.route.cts.remote_addr, spc.route.id, spc.conn_id, spc.conn.RemoteAddr().String()) spc.ReqStop() } diff --git a/server.go b/server.go index 6772b1d..a8bcc25 100644 --- a/server.go +++ b/server.go @@ -448,17 +448,35 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { } case PACKET_KIND_PEER_ABORTED: - fallthrough + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_ABORTED, x.Peer) + if err != nil { + cts.svr.log.Write(cts.sid, LOG_ERROR, + "Failed to handle peer_aborted event from %s for peer(%d,%d,%s,%s) - %s", + cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) + } else { + cts.svr.log.Write(cts.sid, LOG_DEBUG, + "Handled peer_aborted event from %s for peer(%d,%d,%s,%s)", + cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr) + } + } else { + // invalid event data + cts.svr.log.Write(cts.sid, LOG_ERROR, "Invalid peer_aborted event from %s", cts.remote_addr) + } + case PACKET_KIND_PEER_STOPPED: // the connection from the client to a peer has been established var x *Packet_Peer var ok bool x, ok = pkt.U.(*Packet_Peer) if ok { - err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil) + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, x.Peer) if err != nil { cts.svr.log.Write(cts.sid, LOG_ERROR, - "Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s", + "Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s", cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error()) } else { cts.svr.log.Write(cts.sid, LOG_DEBUG, @@ -886,8 +904,8 @@ func (s *Server) ReqStop() { ctl.Shutdown(s.ctx) // to break c.ctl.ListenAndServe() } - //s.gs.GracefulStop() - //s.gs.Stop() + //s.rpc_svr.GracefulStop() + //s.rpc_svr.Stop() for _, l = range s.rpc { l.Close() }