From 847f71d914970e28ebd897463e840a2907f82d52 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Sun, 17 Nov 2024 14:57:56 +0900 Subject: [PATCH] more logic implemented --- c-peer.go | 3 ++- client.go | 32 +++++++++++++++++++++++++++++--- s-peer.go | 28 ++++++++++++++++++++-------- server.go | 14 +++++++++++--- 4 files changed, 62 insertions(+), 15 deletions(-) diff --git a/c-peer.go b/c-peer.go index 920f6c8..c8fbd16 100644 --- a/c-peer.go +++ b/c-peer.go @@ -43,7 +43,8 @@ func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error { } } -//done: + cpc.route.cts.psc.Send(MakePeerStoppedPacket(cpc.route.id, cpc.conn_id)) // nothing much to do upon failure. no error check here + cpc.ReqStop() return nil } diff --git a/client.go b/client.go index 4c57fa3..f1e3300 100644 --- a/client.go +++ b/client.go @@ -6,6 +6,7 @@ import "context" import "crypto/tls" import "crypto/x509" import "encoding/json" +import "errors" import "fmt" import "io" import "log" @@ -164,8 +165,8 @@ func (r* ClientRoute) ConnectToPeer(pts_id uint32) { d.LocalAddr = nil // TOOD: use this if local address is specified conn, err = d.DialContext(ctx, "tcp", r.peer_addr.String()); - //conn, err = net.DialTCP("tcp", nil, r.peer_addr); if err != nil { +// TODO: make send peer started failure mesage? fmt.Printf ("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error()) return } @@ -173,11 +174,18 @@ func (r* ClientRoute) ConnectToPeer(pts_id uint32) { ptc, err = r.AddNewClientPeerConn(conn) if err != nil { // TODO: logging +// TODO: make send peer started failure mesage? fmt.Printf("YYYYYYYY - %s\n", err.Error()) conn.Close() return } fmt.Printf("STARTED NEW SERVER PEER STAK\n") + err = r.cts.psc.Send(MakePeerStartedPacket(r.id, ptc.conn_id)) + if err != nil { + fmt.Printf("CLOSING NEW SERVER PEER STAK - %s\n", err.Error()) + conn.Close() + return + } r.ptc_wg.Add(1) go ptc.RunTask(&r.ptc_wg) @@ -188,6 +196,22 @@ func (r* ClientRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_ case PACKET_KIND_PEER_STARTED: r.ConnectToPeer(pts_id) +// TODO: +// case PACKET_KIND_PEER_STOPPED: +// r.DisconnectFromPeer(pts_id) + + case PACKET_KIND_PEER_DATA: + var ptc *ClientPeerConn + var ok bool + var err error + ptc, ok = r.ptc_map[pts_id] + if ok { + _, err = ptc.conn.Write(event_data) + return err + } else { + + } + // TODO: other types } @@ -367,12 +391,12 @@ fmt.Printf("[%v]\n", cts.route_map) goto done default: - // no other case is ready. + // no other case is ready. run the code below select. // without the default case, the select construct would block } pkt, err = psc.Recv() - if err == io.EOF { + if errors.Is(err, io.EOF) { fmt.Printf("server disconnected\n") goto reconnect_to_server } @@ -448,12 +472,14 @@ fmt.Printf("[%v]\n", cts.route_map) case PACKET_KIND_PEER_DATA: // the connection from the client to a peer has been established + fmt.Printf ("**** GOT PEER DATA\n") var x *Packet_Data var ok bool x, ok = pkt.U.(*Packet_Data) if ok { err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data) if err != nil { + fmt.Printf ("failed to report event - %s\n", err.Error()) // TODO: } else { // TODO: diff --git a/s-peer.go b/s-peer.go index c6f47ef..8fa94f6 100644 --- a/s-peer.go +++ b/s-peer.go @@ -1,6 +1,7 @@ package main import "fmt" +import "io" import "net" import "sync/atomic" import "time" @@ -43,7 +44,7 @@ func (spc *ServerPeerConn) RunTask() error { err = pss.Send(MakePeerStartedPacket(spc.route.id, spc.conn_id)) if err != nil { // TODO: include route id and conn id in the error message - err = fmt.Errorf("unable to send start-pts - %s\n", err.Error()) + fmt.Printf("unable to send start-pts - %s\n", err.Error()) goto done } @@ -72,23 +73,27 @@ wait_for_started: tmr.Stop() for { +fmt.Printf("******************* TRYING TO READ...\n") n, err = spc.conn.Read(buf[:]) if err != nil { - fmt.Printf("read error - %s\n", err.Error()) - break + if err != io.EOF { + fmt.Printf("read error - %s\n", err.Error()) + } + goto done } // TODO: this needs to be guarded err = pss.Send(MakePeerDataPacket(spc.route.id, spc.conn_id, buf[:n])) if err != nil { // TODO: include route id and conn id in the error message - err = fmt.Errorf("unable to send data - %s\n", err.Error()) + fmt.Printf("unable to send data - %s\n", err.Error()) goto done; } } done: - fmt.Printf("spc really ending..................\n") +// TODO: inform the client to close peer connection.. + fmt.Printf("SPC really ending..................\n") spc.ReqStop() spc.route.RemoveServerPeerConn(spc) //spc.cts.wg.Done() @@ -118,16 +123,23 @@ func (spc *ServerPeerConn) ReportEvent (event_type PACKET_KIND, event_data []byt switch event_type { case PACKET_KIND_PEER_STARTED: +fmt.Printf("******************* AAAAAAAAAAAAAAAAAAAaaa\n") if spc.client_peer_opened_received.CompareAndSwap(false, true) { spc.client_peer_status_chan <- true } case PACKET_KIND_PEER_STOPPED: - if spc.client_peer_closed_received.CompareAndSwap(false, true) { - spc.client_peer_status_chan <- false - } +fmt.Printf("******************* BBBBBBBBBBBBBBBBBBBBBBBB\n") + //if spc.client_peer_closed_received.CompareAndSwap(false, true) { + // spc.client_peer_status_chan <- false + //} + // this event needs to close on the server-side peer connection. + // sending false to the client_peer_status_chan isn't good enough to break + // the Recv loop in RunTask(). + spc.ReqStop() case PACKET_KIND_PEER_DATA: +fmt.Printf("******************* CCCCCCCCCCCCCCCCCCCCCCCccc\n") var err error _, err = spc.conn.Write(event_data) diff --git a/server.go b/server.go index 9ef4346..e23d516 100644 --- a/server.go +++ b/server.go @@ -4,12 +4,14 @@ package main //import "bytes" import "context" import "crypto/tls" +import "errors" import "fmt" import "io" import "math/rand" import "net" import "os" import "os/signal" +//import "strings" import "sync" import "sync/atomic" import "syscall" @@ -121,7 +123,13 @@ func (r *ServerRoute) RunTask() { conn, err = r.l.AcceptTCP() if err != nil { // TODO: logging - fmt.Printf("[%s,%d] accept failure - %s\n", r.cts.caddr.String(), r.id, err.Error()) + //if strings.Contains(err.Error(), "use of closed network connection") { + //if err == net.ErrClosed { + if errors.Is(err, net.ErrClosed) { + fmt.Printf("[%s,%d] END OF TASK...[%#v] [%#v]\n", r.cts.caddr.String(), r.id, err, net.ErrClosed) + } else { + fmt.Printf("[%s,%d] accept failure - %s\n", r.cts.caddr.String(), r.id, err.Error()) + } break } @@ -145,7 +153,7 @@ func (r *ServerRoute) RunTask() { } func (r *ServerRoute) StopTask() { -fmt.Printf ("stoppping stak..\n") +fmt.Printf ("stoppping taak..\n") // TODO: all pts stop... r.l.Close(); // TODO: wait?? @@ -358,7 +366,7 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { } pkt, err = strm.Recv() - if err == io.EOF { + if errors.Is(err, io.EOF) { // return will close stream from server side return nil }