diff --git a/README.md b/README.md index 55aeb8a..bb50c11 100644 --- a/README.md +++ b/README.md @@ -3,3 +3,20 @@ hodu client [ ...] client requests server that it grants access to the list of peers reserver + + +## server.json +``` +{ + "server-addr": "127.0.0.1:9999", + "peer-addrs": [ + "127.0.0.1:22", + "127.0.0.1:80" + ] +} +``` + +## create a server +``` +curl -X POST --data-binary @server.json http://127.0.0.1:7777/servers +``` diff --git a/c-peer.go b/c-peer.go index c8fbd16..30d8097 100644 --- a/c-peer.go +++ b/c-peer.go @@ -11,9 +11,7 @@ func NewClientPeerConn(r *ClientRoute, c net.Conn, id uint32) (*ClientPeerConn) cpc.conn = c cpc.conn_id = id cpc.stop_req.Store(false) - //cpc.server_peer_status_chan = make(chan bool, 16) - //cpc.server_peer_opened_received.Store(false) - //cpc.server_peer_closed_received.Store(false) + cpc.server_peer_eof.Store(false) return &cpc } @@ -57,3 +55,16 @@ func (cpc *ClientPeerConn) ReqStop() { } } } + +func (cpc* ClientPeerConn) CloseWrite() { + if cpc.server_peer_eof.CompareAndSwap(false, true) { + if cpc.conn != nil { + var conn *net.TCPConn + var ok bool + conn, ok = cpc.conn.(*net.TCPConn) + if ok { + conn.CloseWrite() + } + } + } +} diff --git a/client.go b/client.go index d5bd391..43c27b6 100644 --- a/client.go +++ b/client.go @@ -59,26 +59,30 @@ type ClientPeerConn struct { remot_conn_id uint32 addr string // peer address - stop_req atomic.Bool + stop_chan chan bool + stop_req atomic.Bool + server_peer_eof atomic.Bool } // client connection to server type ServerConn struct { - cli *Client - cfg *ClientConfig - saddr *net.TCPAddr // server address that is connected to + cli *Client + cfg *ClientConfig + saddr *net.TCPAddr // server address that is connected to - conn *grpc.ClientConn // grpc connection to the server - hdc HoduClient - psc *GuardedPacketStreamClient // guarded grpc stream - psc_mtx sync.Mutex + conn *grpc.ClientConn // grpc connection to the server + hdc HoduClient + psc *GuardedPacketStreamClient // guarded grpc stream - route_mtx sync.Mutex + s_seed Seed + c_seed Seed + + route_mtx sync.Mutex route_map ClientRouteMap - route_wg sync.WaitGroup + route_wg sync.WaitGroup - stop_req atomic.Bool + stop_req atomic.Bool stop_chan chan bool } @@ -194,7 +198,7 @@ func (r* ClientRoute) ConnectToPeer(pts_id uint32) { return } - ptc, err = r.AddNewClientPeerConn(conn) + ptc, err = r.AddNewClientPeerConn(conn, pts_id) if err != nil { // TODO: logging // TODO: make send peer started failure mesage? @@ -214,14 +218,60 @@ func (r* ClientRoute) ConnectToPeer(pts_id uint32) { go ptc.RunTask(&r.ptc_wg) } +func (r* ClientRoute) DisconnectFromPeer(pts_id uint32) error { + var ptc *ClientPeerConn + var ok bool + + r.ptc_mtx.Lock() + ptc, ok = r.ptc_map[pts_id] + if !ok { + r.ptc_mtx.Unlock() + return fmt.Errorf("non-existent connection id - %u", pts_id) + } + r.ptc_mtx.Unlock() + + ptc.ReqStop() + return nil +} + +func (r* ClientRoute) CloseWriteToPeer(pts_id uint32) error { + var ptc *ClientPeerConn + var ok bool + + r.ptc_mtx.Lock() + ptc, ok = r.ptc_map[pts_id] + if !ok { + r.ptc_mtx.Unlock() + return fmt.Errorf("non-existent connection id - %u", pts_id) + } + r.ptc_mtx.Unlock() + + ptc.CloseWrite() + return nil +} + + func (r* ClientRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error { + var err error + switch event_type { case PACKET_KIND_PEER_STARTED: +fmt.Printf ("GOT PEER STARTD . CONENCT TO CLIENT_SIDE PEER\n") r.ConnectToPeer(pts_id) -// TODO: -// case PACKET_KIND_PEER_STOPPED: -// r.DisconnectFromPeer(pts_id) + case PACKET_KIND_PEER_STOPPED: +fmt.Printf ("GOT PEER STOPPED . DISCONNECTION FROM CLIENT_SIDE PEER\n") + err = r.DisconnectFromPeer(pts_id) + if err != nil { + // TODO: + } + + case PACKET_KIND_PEER_EOF: +fmt.Printf ("GOT PEER EOF. REMEMBER EOF\n") + err = r.CloseWriteToPeer(pts_id) + if err != nil { + // TODO: + } case PACKET_KIND_PEER_DATA: var ptc *ClientPeerConn @@ -367,6 +417,8 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { var hdc HoduClient var psc PacketStreamClient var slpctx context.Context + var c_seed Seed + var s_seed *Seed var err error defer wg.Done() // arrange to call at the end of this function @@ -383,6 +435,19 @@ fmt.Printf ("Connecting GRPC to [%s]\n", cts.saddr.String()) } hdc = NewHoduClient(conn) + + // seed exchange is for furture expansion of the protocol + // there is nothing to do much about it for now. + c_seed.Version = HODU_VERSION + c_seed.Flags = 0 + s_seed, err = hdc.GetSeed(cts.cli.ctx, &c_seed) + if err != nil { + fmt.Printf("ERROR: unable to get seed from %s - %s\n", cts.cfg.server_addr, err.Error()) + goto reconnect_to_server + } + cts.s_seed = *s_seed + cts.c_seed = c_seed + psc, err = hdc.PacketStream(cts.cli.ctx) if err != nil { fmt.Printf ("ERROR: unable to get grpc packet stream - %s\n", err.Error()) @@ -494,6 +559,21 @@ fmt.Printf("[%v]\n", cts.route_map) // TODO } + case PACKET_KIND_PEER_EOF: + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_EOF, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + case PACKET_KIND_PEER_DATA: // the connection from the client to a peer has been established fmt.Printf ("**** GOT PEER DATA\n") @@ -561,14 +641,15 @@ func (cts *ServerConn) ReportEvent (route_id uint32, pts_id uint32, event_type P } // -------------------------------------------------------------------- -func (r *ClientRoute) AddNewClientPeerConn (c net.Conn) (*ClientPeerConn, error) { +func (r *ClientRoute) AddNewClientPeerConn (c net.Conn, pts_id uint32) (*ClientPeerConn, error) { var ptc *ClientPeerConn - var ok bool - var start_id uint32 + //var ok bool + //var start_id uint32 r.ptc_mtx.Lock() defer r.ptc_mtx.Unlock() +/* if len(r.ptc_map) >= r.ptc_limit { return nil, fmt.Errorf("peer-to-client connection table full") } @@ -587,8 +668,10 @@ func (r *ClientRoute) AddNewClientPeerConn (c net.Conn) (*ClientPeerConn, error) } ptc = NewClientPeerConn(r, c, r.ptc_last_id) +*/ + ptc = NewClientPeerConn(r, c, pts_id) r.ptc_map[ptc.conn_id] = ptc - r.ptc_last_id++ + //r.ptc_last_id++ return ptc, nil } diff --git a/hodu.proto b/hodu.proto index ada4976..c02d368 100644 --- a/hodu.proto +++ b/hodu.proto @@ -5,9 +5,15 @@ option go_package = "./main"; //package hodu; // no idea if it's still important... service Hodu { + rpc GetSeed (Seed) returns (Seed) {} rpc PacketStream (stream Packet) returns (stream Packet) {} } +message Seed { + uint32 Version = 1; + uint64 Flags = 2; +} + enum ROUTE_PROTO { TCP = 0; TCP4 = 1; diff --git a/main.go b/main.go index 0dffc2a..7c19ce0 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,8 @@ import "io" import "os" import "strings" +const HODU_VERSION uint32 = 0x010000 + func main() { var err error var flgs *flag.FlagSet diff --git a/s-peer.go b/s-peer.go index 4144875..35c888a 100644 --- a/s-peer.go +++ b/s-peer.go @@ -13,11 +13,14 @@ type ServerPeerConn struct { conn_id uint32 cts *ClientConn conn *net.TCPConn - stop_req atomic.Bool + stop_chan chan bool + stop_req atomic.Bool + client_peer_status_chan chan bool - client_peer_opened_received atomic.Bool - client_peer_closed_received atomic.Bool + client_peer_started atomic.Bool + client_peer_stopped atomic.Bool + client_peer_eof atomic.Bool } func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) (*ServerPeerConn) { @@ -26,12 +29,15 @@ func NewServerPeerConn(r *ServerRoute, c *net.TCPConn, id uint32) (*ServerPeerCo spc.route = r spc.conn = c spc.conn_id = id - spc.stop_req.Store(false) - spc.stop_chan = make(chan bool, 1) - spc.client_peer_status_chan = make(chan bool, 16) - spc.client_peer_opened_received.Store(false) - spc.client_peer_closed_received.Store(false) + spc.stop_chan = make(chan bool, 8) + spc.stop_req.Store(false) + + spc.client_peer_status_chan = make(chan bool, 8) + spc.client_peer_started.Store(false) + spc.client_peer_stopped.Store(false) + spc.client_peer_eof.Store(false) +fmt.Printf ("~~~~~~~~~~~~~~~ NEW SERVER PEER CONNECTION ADDED %p\n", &spc) return &spc } @@ -50,7 +56,7 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) { if err != nil { // TODO: include route id and conn id in the error message fmt.Printf("unable to send start-pts - %s\n", err.Error()) - goto done + goto done_without_stop } tmr = time.NewTimer(2 * time.Second) // TODO: make this configurable... @@ -80,15 +86,16 @@ wait_for_started: for { n, err = spc.conn.Read(buf[:]) if err != nil { - if !errors.Is(err, io.EOF) { + if errors.Is(err, io.EOF) { + if pss.Send(MakePeerEofPacket(spc.route.id, spc.conn_id)) != nil { + fmt.Printf("unable to report data - %s\n", err.Error()) + goto done + } + goto wait_for_stopped + } else { fmt.Printf("read error - %s\n", err.Error()) goto done } - if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) != nil { - fmt.Printf("unable to report data - %s\n", err.Error()) - goto done - } - goto wait_for_stopped } err = pss.Send(MakePeerDataPacket(spc.route.id, spc.conn_id, buf[:n])) @@ -100,41 +107,45 @@ wait_for_started: } wait_for_stopped: - //if spc.client_peer_open { - for { - select { - case status = <- spc.client_peer_status_chan: // something not right... may use a different channel for closing... - goto done - case <- spc.stop_chan: - goto done - } + for { +fmt.Printf ("******************* Waiting for peer Stop\n") + select { + case status = <- spc.client_peer_status_chan: // something not right... may use a different channel for closing... + goto done + case <- spc.stop_chan: + goto done } - //} + } +fmt.Printf ("******************* Sending peer stopped\n") + if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) != nil { + fmt.Printf("unable to report data - %s\n", err.Error()) + goto done + } done: -// TODO: inform the client to close peer connection.. + if pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) != nil { + fmt.Printf("unable to report data - %s\n", err.Error()) + // nothing much to do about the failure of sending this + } + +done_without_stop: fmt.Printf("SPC really ending..................\n") spc.ReqStop() spc.route.RemoveServerPeerConn(spc) - //spc.cts.wg.Done() } func (spc *ServerPeerConn) ReqStop() { if spc.stop_req.CompareAndSwap(false, true) { - var pss *GuardedPacketStreamServer - var err error - - pss = spc.route.cts.pss spc.stop_chan <- true - if spc.client_peer_opened_received.CompareAndSwap(false, true) { + + if spc.client_peer_started.CompareAndSwap(false, true) { spc.client_peer_status_chan <- false } - spc.conn.Close() - err = pss.Send(MakePeerStoppedPacket(spc.route.id, spc.conn_id)) - if err != nil { - // TODO: print warning - fmt.Printf ("WARNING - failed to report event to %s - %s\n", spc.route.cts.caddr, err.Error()) + if spc.client_peer_stopped.CompareAndSwap(false, true) { + spc.client_peer_status_chan <- false } + + spc.conn.Close() // to abort the main Recv() loop } } @@ -143,28 +154,37 @@ func (spc *ServerPeerConn) ReportEvent (event_type PACKET_KIND, event_data []byt switch event_type { case PACKET_KIND_PEER_STARTED: fmt.Printf("******************* AAAAAAAAAAAAAAAAAAAaaa\n") - if spc.client_peer_opened_received.CompareAndSwap(false, true) { + if spc.client_peer_started.CompareAndSwap(false, true) { spc.client_peer_status_chan <- true } case PACKET_KIND_PEER_STOPPED: fmt.Printf("******************* BBBBBBBBBBBBBBBBBBBBBBBB\n") - //if spc.client_peer_closed_received.CompareAndSwap(false, true) { - // spc.client_peer_status_chan <- false - //} // this event needs to close on the server-side peer connection. // sending false to the client_peer_status_chan isn't good enough to break // the Recv loop in RunTask(). spc.ReqStop() + case PACKET_KIND_PEER_EOF: +fmt.Printf("******************* BBBBBBBBBBBBBBBBBBBBBBBB CLIENT PEER EOF\n") + // the client-side peer is not supposed to send data any more + if spc.client_peer_eof.CompareAndSwap(false, true) { + spc.conn.CloseWrite() + } + case PACKET_KIND_PEER_DATA: fmt.Printf("******************* CCCCCCCCCCCCCCCCCCCCCCCccc\n") - var err error + if spc.client_peer_eof.Load() == false { + var err error - _, err = spc.conn.Write(event_data) - if err != nil { - // TODO: logging - fmt.Printf ("WARNING - failed to write data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String()) + _, err = spc.conn.Write(event_data) + if err != nil { + // TODO: logging + fmt.Printf ("WARNING - failed to write data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String()) + } + } else { + // protocol error. the client must not relay more data from the client-side peer after EOF. + fmt.Printf ("WARNING - broken client - redundant data from %s to %s\n", spc.route.cts.caddr, spc.conn.RemoteAddr().String()) } default: @@ -173,6 +193,3 @@ fmt.Printf("******************* CCCCCCCCCCCCCCCCCCCCCCCccc\n") } return nil } - - - diff --git a/server.go b/server.go index 226c52d..a38b64b 100644 --- a/server.go +++ b/server.go @@ -12,7 +12,7 @@ import "os/signal" import "sync" import "sync/atomic" import "syscall" -import "time" +//import "time" import "google.golang.org/grpc" //import "google.golang.org/grpc/metadata" @@ -39,38 +39,38 @@ type Server struct { cts_wg sync.WaitGroup gs *grpc.Server + UnimplementedHoduServer } // client connection to server. // client connect to the server, the server accept it, and makes a tunnel request type ClientConn struct { - svr *Server - caddr net.Addr // client address that created this structure - pss *GuardedPacketStreamServer + svr *Server + caddr net.Addr // client address that created this structure + pss *GuardedPacketStreamServer - cw_mtx sync.Mutex route_mtx sync.Mutex route_map ServerRouteMap route_wg sync.WaitGroup - wg sync.WaitGroup - stop_req atomic.Bool - stop_chan chan bool + wg sync.WaitGroup + stop_req atomic.Bool + stop_chan chan bool } type ServerRoute struct { cts *ClientConn - l *net.TCPListener - laddr *net.TCPAddr - id uint32 + l *net.TCPListener + laddr *net.TCPAddr + id uint32 pts_mtx sync.Mutex pts_map ServerPeerConnMap pts_limit int pts_last_id uint32 - pts_wg sync.WaitGroup - stop_req atomic.Bool + pts_wg sync.WaitGroup + stop_req atomic.Bool } type GuardedPacketStreamServer struct { @@ -330,11 +330,13 @@ func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type P return r.ReportEvent(pts_id, event_type, event_data) } -func (cts *ClientConn) receive_from_stream () { +func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) { var pkt *Packet var err error - //for { + defer wg.Done() + + for { pkt, err = cts.pss.Recv() if errors.Is(err, io.EOF) { // return will close stream from server side @@ -439,7 +441,7 @@ fmt.Printf ("grpd stream ended\n") // TODO } } - //} + } done: fmt.Printf ("************ stream receiver finished....\n") @@ -451,11 +453,19 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { defer wg.Done() - strm = cts.pss ctx = strm.Context() - //go cts.receive_from_stream() + // it looks like the only proper way to interrupt the blocking Recv + // call on the grpc streaming server is exit from the service handler + // which is this function invoked from PacketStream(). + // there is no cancel function or whatever that can interrupt it. + // so start the Recv() loop in a separte goroutine and let this + // function be the channel waiter only. + // increment on the wait group is for the caller to wait for + // these detached goroutines to finish. + wg.Add(1) + go cts.receive_from_stream(wg) for { // exit if context is done @@ -468,11 +478,10 @@ fmt.Printf("grpd server done - %s\n", ctx.Err().Error()) case <- cts.stop_chan: goto done - default: + //default: // no other case is ready. // without the default case, the select construct would block } - cts.receive_from_stream() } done: @@ -528,6 +537,22 @@ chan_loop: // -------------------------------------------------------------------- +func (s *Server) GetSeed (ctx context.Context, c_seed *Seed) (*Seed, error) { + var s_seed Seed + + // seed exchange is for furture expansion of the protocol + // there is nothing to do much about it for now. + + s_seed.Version = HODU_VERSION + s_seed.Flags = 0 + + // we create no ClientConn structure associated with the connection + // at this phase for the server. it doesn't track the client version and + // features. we delegate protocol selection solely to the client. + + return &s_seed, nil +} + func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { var ctx context.Context var p *peer.Peer @@ -604,12 +629,12 @@ type wrappedStream struct { } func (w *wrappedStream) RecvMsg(m any) error { - fmt.Printf("Receive a message (Type: %T) at %s\n", m, time.Now().Format(time.RFC3339)) + //fmt.Printf("Receive a message (Type: %T) at %s\n", m, time.Now().Format(time.RFC3339)) return w.ServerStream.RecvMsg(m) } func (w *wrappedStream) SendMsg(m any) error { - fmt.Printf("Send a message (Type: %T) at %v\n", m, time.Now().Format(time.RFC3339)) + //fmt.Printf("Send a message (Type: %T) at %v\n", m, time.Now().Format(time.RFC3339)) return w.ServerStream.SendMsg(m) } @@ -744,8 +769,14 @@ func (s *Server) RunTask(wg *sync.WaitGroup) { } s.l_wg.Wait() +fmt.Printf ("waiting for all client to server conn to complete\n") s.cts_wg.Wait() +fmt.Printf ("waited for all client to server conn to complete\n") s.ReqStop() + + // stop the main grpc server after all the other tasks are finished. + s.gs.Stop() + syscall.Kill(syscall.Getpid(), syscall.SIGTERM) } @@ -755,7 +786,7 @@ func (s *Server) ReqStop() { var cts *ClientConn //s.gs.GracefulStop() - s.gs.Stop() + //s.gs.Stop() for _, l = range s.l { l.Close() }