diff --git a/c-peer.go b/c-peer.go index be62e23..dc0740a 100644 --- a/c-peer.go +++ b/c-peer.go @@ -3,7 +3,7 @@ package main import "fmt" import "net" -func NewClientPeerConn(r *ClientRoute, c *net.TCPConn, id uint32) (*ClientPeerConn) { +func NewClientPeerConn(r *ClientRoute, c net.Conn, id uint32) (*ClientPeerConn) { var cpc ClientPeerConn cpc.route = r diff --git a/client.go b/client.go index cb4dbe6..b9933e2 100644 --- a/client.go +++ b/client.go @@ -15,7 +15,7 @@ import "os/signal" import "sync" import "sync/atomic" import "syscall" -//import "time" +import "time" //import "github.com/google/uuid" import "google.golang.org/grpc" @@ -25,7 +25,7 @@ const PTC_LIMIT = 8192 type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet] -type ServerConnMap = map[*net.TCPAddr]*ServerConn +type ServerConnMap = map[net.Addr]*ServerConn type ClientPeerConnMap = map[uint32]*ClientPeerConn type ClientRouteMap = map[uint32]*ClientRoute @@ -36,47 +36,48 @@ type ClientConfig struct { } type Client struct { - cfg *ClientConfig - tlscfg *tls.Config - saddr *net.TCPAddr + ctx context.Context + ctx_cancel context.CancelFunc + tlscfg *tls.Config - sc *grpc.ClientConn // main control connection to the server - sg HoduClient - psc PacketStreamClient - psc_mtx sync.Mutex + cts_mtx sync.Mutex + cts_map ServerConnMap - cts_mtx sync.Mutex - cts_map ServerConnMap - wg sync.WaitGroup - stop_req atomic.Bool + wg sync.WaitGroup + stop_req atomic.Bool + stop_chan chan bool } type ClientPeerConn struct { route *ClientRoute conn_id uint32 - conn *net.TCPConn + conn net.Conn remot_conn_id uint32 addr string // peer address stop_req atomic.Bool + stop_chan chan bool } // client connection to server type ServerConn struct { cli *Client + cfg *ClientConfig saddr *net.TCPAddr // server address that is connected to - psc Hodu_PacketStreamClient + + conn *grpc.ClientConn // grpc connection to the server + hdc HoduClient + psc Hodu_PacketStreamClient // grpc stream + psc_mtx sync.Mutex route_mtx sync.Mutex - routes ClientRouteMap + route_map ClientRouteMap //route_wg sync.WaitGroup - //cw_mtx sync.Mutex - - wg sync.WaitGroup + //wg sync.WaitGroup stop_req atomic.Bool - greeted bool + stop_chan chan bool } type ClientRoute struct { @@ -90,6 +91,9 @@ type ClientRoute struct { ptc_limit int ptc_last_id uint32 ptc_wg sync.WaitGroup + + stop_req atomic.Bool + stop_chan chan bool } @@ -104,27 +108,55 @@ func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_P r.ptc_last_id = 0 r.proto = proto r.peer_addr = addr + r.stop_req.Store(false) + r.stop_chan = make(chan bool, 1) return &r; } func (r *ClientRoute) RunTask() { // this task on the route object isn't actually necessary. + // most useful works are triggered by ReportEvent() and done by ConnectToPeer() + +main_loop: + for { + select { + case <- r.stop_chan: + break main_loop + } + } +fmt.Printf ("*** End fo Client Roue Task\n") } -func (r *ClientRoute) StopTask() { - // TODO: - fmt.Printf ("ClientRoute StopTask not implemented yet\n") - // TOOD: stop all peer connection jobs +func (r *ClientRoute) ReqStop() { + if r.stop_req.CompareAndSwap(false, true) { + var ptc *ClientPeerConn + for _, ptc = range r.ptc_map { + ptc.ReqStop() + } + + r.stop_chan <- true + } +fmt.Printf ("*** Sent stop request to Route..\n"); } func (r* ClientRoute) ConnectToPeer(pts_id uint32) { var err error - var conn *net.TCPConn + var conn net.Conn var ptc *ClientPeerConn + var d net.Dialer + var ctx context.Context + //var cancel context.CancelFunc -// MAKE thesse into a separte go rountine... so it doesn't block - conn, err = net.DialTCP("tcp", nil, r.peer_addr); +// TODO: how to abort blocking DialTCP()? call cancellation funtion? +// TODO: make timeuot value configurable +// TODO: fire the cancellation function upon stop request??? + ctx, _ = context.WithTimeout(r.cts.cli.ctx, 10 * time.Second) + //defer cancel(): + + d.LocalAddr = nil // TOOD: use this if local address is specified + conn, err = d.DialContext(ctx, "tcp", r.peer_addr.String()); + //conn, err = net.DialTCP("tcp", nil, r.peer_addr); if err != nil { fmt.Printf ("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error()) return @@ -139,9 +171,10 @@ func (r* ClientRoute) ConnectToPeer(pts_id uint32) { } fmt.Printf("STARTED NEW SERVER PEER STAK\n") - r.ptc_wg.Add(1) - go ptc.RunTask() - r.ptc_wg.Wait() + //r.ptc_wg.Add(1) + //go ptc.RunTask() + //r.ptc_wg.Wait() + ptc.RunTask() conn.Close() // don't care about double close. it could have been closed in StopTask } @@ -162,15 +195,15 @@ func (cts *ServerConn) AddNewClientRoute(route_id uint32, addr *net.TCPAddr, pro var r *ClientRoute cts.route_mtx.Lock() - if cts.routes[route_id] != nil { + if cts.route_map[route_id] != nil { cts.route_mtx.Unlock() return nil, fmt.Errorf ("existent route id - %d", route_id) } r = NewClientRoute(cts, route_id, addr, proto) - cts.routes[route_id] = r + cts.route_map[route_id] = r cts.route_mtx.Unlock() -fmt.Printf ("added client route.... %d -> %d\n", route_id, len(cts.routes)) +fmt.Printf ("added client route.... %d -> %d\n", route_id, len(cts.route_map)) go r.RunTask() return r, nil } @@ -180,15 +213,15 @@ func (cts *ServerConn) RemoveClientRoute (route_id uint32) error { var ok bool cts.route_mtx.Lock() - r, ok = cts.routes[route_id] + r, ok = cts.route_map[route_id] if (!ok) { cts.route_mtx.Unlock() return fmt.Errorf ("non-existent route id - %d", route_id) } - delete(cts.routes, route_id) + delete(cts.route_map, route_id) cts.route_mtx.Unlock() - r.StopTask() // TODO: make this unblocking or blocking? + r.ReqStop() // TODO: make this unblocking or blocking? return nil; } @@ -218,8 +251,8 @@ func (cts *ServerConn) AddClientRoutes (peer_addrs []string) error { } } - for _, r = range cts.routes { - err = cts.cli.psc.Send(MakeRouteStartPacket(r.id, r.proto, addr.String())) + for _, r = range cts.route_map { + err = cts.psc.Send(MakeRouteStartPacket(r.id, r.proto, addr.String())) if err != nil { return fmt.Errorf("unable to send route-start packet - %s", err.Error()) } @@ -228,23 +261,191 @@ func (cts *ServerConn) AddClientRoutes (peer_addrs []string) error { return nil; } +func (cts *ServerConn) ReqStop() { + if cts.stop_req.CompareAndSwap(false, true) { + var r *ClientRoute + for _, r = range cts.route_map { + r.ReqStop() + } + + // TODO: notify the server.. send term command??? + cts.stop_chan <- true + } +fmt.Printf ("*** Sent stop request to ServerConn..\n"); +} + +func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { + var conn *grpc.ClientConn = nil + var hdc HoduClient + var psc PacketStreamClient + var err error + + defer wg.Done(); // arrange to call at the end of this function + +// TODO: HANDLE connection timeout.. + // ctx, _/*cancel*/ := context.WithTimeout(context.Background(), time.Second) + conn, err = grpc.NewClient(cts.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + // TODO: logging + fmt.Printf("ERROR - unable to connect to %s - %s", cts.cfg.server_addr, err.Error()) + goto done + } + + hdc = NewHoduClient(conn) + psc, err = hdc.PacketStream(cts.cli.ctx) // TODO: accept external context and use it.L + if err != nil { + fmt.Printf ("failed to get the packet stream - %s", err.Error()) + goto done + } + + cts.conn = conn + cts.hdc = hdc + cts.psc = psc + + // the connection structure to a server is ready. + // let's add routes to the client-side peers. + err = cts.AddClientRoutes(cts.cfg.peer_addrs) + if err != nil { + fmt.Printf ("unable to add routes to client-side peers - %s", err.Error()) + goto done + } + +main_loop: + for { + var pkt *Packet + + select { + case <-cts.cli.ctx.Done(): + fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) + break main_loop + + case <-cts.stop_chan: + break main_loop + + default: + // no other case is ready. + // without the default case, the select construct would block + } + + pkt, err = psc.Recv() + if err == io.EOF { + fmt.Printf("server disconnected\n") + break + } + if err != nil { + fmt.Printf("server receive error - %s\n", err.Error()) + break + } + + switch pkt.Kind { + case PACKET_KIND_ROUTE_STARTED: + // the server side managed to set up the route the client requested + var x *Packet_Route + var ok bool + x, ok = pkt.U.(*Packet_Route) + if ok { + fmt.Printf ("SERVER LISTENING ON %s\n", x.Route.AddrStr); + err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO: send invalid request... or simply keep quiet? + } + + case PACKET_KIND_ROUTE_STOPPED: + var x *Packet_Route + var ok bool + x, ok = pkt.U.(*Packet_Route) + if ok { + err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO: send invalid request... or simply keep quiet? + } + + case PACKET_KIND_PEER_STARTED: + // the connection from the client to a peer has been established + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + + case PACKET_KIND_PEER_STOPPED: + // the connection from the client to a peer has been established + var x *Packet_Peer + var ok bool + x, ok = pkt.U.(*Packet_Peer) + if ok { + err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + + case PACKET_KIND_PEER_DATA: + // the connection from the client to a peer has been established + var x *Packet_Data + var ok bool + x, ok = pkt.U.(*Packet_Data) + if ok { + err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data) + if err != nil { + // TODO: + } else { + // TODO: + } + } else { + // TODO + } + } + } + +done: +fmt.Printf ("^^^^^^^^^^^^^^^^^^^^ Server Coon RunTask ending...\n") + if conn != nil { + conn.Close() + // TODO: need to reset c.sc, c.sg, c.psc to nil? + // for this we need to ensure that everyone is ending + } +} + func (cts *ServerConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error { var r *ClientRoute var ok bool cts.route_mtx.Lock() - r, ok = cts.routes[route_id] + r, ok = cts.route_map[route_id] if (!ok) { cts.route_mtx.Unlock() return fmt.Errorf ("non-existent route id - %d", route_id) } cts.route_mtx.Unlock() - + return r.ReportEvent(pts_id, event_type, event_data) } // -------------------------------------------------------------------- -func (r *ClientRoute) AddNewClientPeerConn (c* net.TCPConn) (*ClientPeerConn, error) { +func (r *ClientRoute) AddNewClientPeerConn (c net.Conn) (*ClientPeerConn, error) { var ptc *ClientPeerConn var ok bool var start_id uint32 @@ -277,17 +478,31 @@ func (r *ClientRoute) AddNewClientPeerConn (c* net.TCPConn) (*ClientPeerConn, er } // -------------------------------------------------------------------- -func (c *Client) AddNewServerConn(addr *net.TCPAddr, psc Hodu_PacketStreamClient) (*ServerConn, error) { +func NewClient(ctx context.Context, tlscfg *tls.Config) *Client { + var c Client + + c.ctx, c.ctx_cancel = context.WithCancel(ctx) + c.tlscfg = tlscfg + c.cts_map = make(ServerConnMap) // TODO: make it configurable... + c.stop_req.Store(false) + c.stop_chan = make(chan bool, 1) + + return &c +} + +func (c *Client) AddNewServerConn(addr *net.TCPAddr, cfg *ClientConfig) (*ServerConn, error) { var cts ServerConn var ok bool cts.cli = c - cts.routes = make(ClientRouteMap) - cts.saddr = addr - cts.psc = psc - + cts.route_map = make(ClientRouteMap) + cts.saddr = addr + cts.cfg = cfg + //cts.conn = conn + //cts.hdc = hdc + //cts.psc = psc cts.stop_req.Store(false) - cts.greeted = false + cts.stop_chan = make(chan bool, 1) c.cts_mtx.Lock() defer c.cts_mtx.Unlock() @@ -309,189 +524,62 @@ fmt.Printf ("REMOVE total servers %d\n", len(c.cts_map)); c.cts_mtx.Unlock() } -// -------------------------------------------------------------------- -func NewClient(cfg *ClientConfig, tlscfg *tls.Config) (*Client, error) { - var c Client +func (c *Client) ReqStop() { + if c.stop_req.CompareAndSwap(false, true) { + var cts *ServerConn + for _, cts = range c.cts_map { + cts.ReqStop() + } + + // TODO: notify the server.. send term command??? + c.stop_chan <- true + c.ctx_cancel() + } +fmt.Printf ("*** Sent stop request to client..\n"); +} + +// naming convention: +// RunService - returns after having executed another go routine +// RunTask - supposed to be detached as a go routine +func (c *Client) RunService(cfg *ClientConfig) { var saddr *net.TCPAddr + var cts *ServerConn var err error if len(cfg.peer_addrs) < 0 || len(cfg.peer_addrs) > int(^uint16(0)) { // TODO: change this check... not really right... - return nil, fmt.Errorf("no peer addresses or too many peer addresses") + fmt.Printf("no peer addresses or too many peer addresses") + return } saddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, cfg.server_addr) if err != nil { - return nil, fmt.Errorf("unable to resolve %s - %s", cfg.server_addr, err.Error()) + fmt.Printf("unable to resolve %s - %s", cfg.server_addr, err.Error()) + return } - c.cfg = cfg - c.tlscfg = tlscfg - c.saddr = saddr - c.cts_map = make(ServerConnMap) // TODO: make it configurable... - c.stop_req.Store(false) + cts, err = c.AddNewServerConn(saddr, cfg) + if err != nil { + fmt.Printf("unable to add server connection structure to %s - %s", cfg.server_addr, err.Error()) + return + } - return &c, nil + c.wg.Add(1) + go cts.RunTask(&c.wg) } -func (c *Client) RunTask(ctx context.Context) { - var conn *grpc.ClientConn - var cts *ServerConn - var err error +func (c *Client) WaitForTermination() { - defer c.wg.Done(); - -// TODO: HANDLE connection timeout.. - // ctx, _/*cancel*/ := context.WithTimeout(context.Background(), time.Second) - conn, err = grpc.NewClient(c.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) - if err != nil { - // TODO: logging - fmt.Printf("ERROR - unable to connect to %s - %s", c.cfg.server_addr, err.Error()) - return - } - - c.sc = conn - c.sg = NewHoduClient(conn) - - c.psc, err = c.sg.PacketStream(ctx) // TODO: accept external context and use it.L - if err != nil { - conn.Close() - fmt.Printf ("failed to get the packet stream - %s", err.Error()) - return - } - - cts, err = c.AddNewServerConn(c.saddr, c.psc) - if err != nil { - conn.Close() - fmt.Printf ("failed to register connection to server - %s", err.Error()) - return - } - - err = cts.AddClientRoutes(c.cfg.peer_addrs) - if err != nil { - conn.Close() - fmt.Printf("unable to make client routes - %s", err.Error()) - return - } - - for { - var pkt *Packet - - select { - case <-ctx.Done(): - fmt.Printf("context doine... error - %s\n", ctx.Err().Error()) - default: - // no other case is ready. - // without the default case, the select construct would block - } - - pkt, err = c.psc.Recv() - if err == io.EOF { - // return will close stream from server side - fmt.Printf("server disconnected\n") - break - } - if err != nil { - fmt.Printf("server receive error - %s\n", err.Error()) - break - } - - switch pkt.Kind { - case PACKET_KIND_ROUTE_STARTED: - var x *Packet_Route - var ok bool - x, ok = pkt.U.(*Packet_Route) - if ok { - fmt.Printf ("SERVER LISTENING ON %s\n", x.Route.AddrStr); - err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil) - if err != nil { - // TODO: - } else { - // TODO: - } - } else { - // TODO: send invalid request... or simply keep quiet? - } - - case PACKET_KIND_ROUTE_STOPPED: - var x *Packet_Route - var ok bool - x, ok = pkt.U.(*Packet_Route) - if ok { - err = cts.ReportEvent(x.Route.RouteId, 0, pkt.Kind, nil) - if err != nil { - // TODO: - } else { - // TODO: - } - } else { - // TODO: send invalid request... or simply keep quiet? - } - - case PACKET_KIND_PEER_STARTED: - // the connection from the client to a peer has been established - var x *Packet_Peer - var ok bool - x, ok = pkt.U.(*Packet_Peer) - if ok { - err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil) - if err != nil { - // TODO: - } else { - // TODO: - } - } else { - // TODO - } - - case PACKET_KIND_PEER_STOPPED: - // the connection from the client to a peer has been established - var x *Packet_Peer - var ok bool - x, ok = pkt.U.(*Packet_Peer) - if ok { - err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil) - if err != nil { - // TODO: - } else { - // TODO: - } - } else { - // TODO - } - - case PACKET_KIND_PEER_DATA: - // the connection from the client to a peer has been established - var x *Packet_Data - var ok bool - x, ok = pkt.U.(*Packet_Data) - if ok { - err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data) - if err != nil { - // TODO: - } else { - // TODO: - } - } else { - // TODO - } - } - } - -//done: - c.ReqStop() // just in case... - c.sc.Close() +fmt.Printf ("Waiting for task top stop\n") +// waiting for tasks to stop + c.wg.Wait() +fmt.Printf ("XXXXXXXXXXXX Waiting for task top stop\n") + // TOOD: find a better way to stop the signal handling loop. + // above all the signal handler must not be with a single client, + // but with the whole app. syscall.Kill(syscall.Getpid(), syscall.SIGTERM) // TODO: find a better to terminate the signal handler... } -func (c *Client) ReqStop() { - if c.stop_req.CompareAndSwap(false, true) { - // TODO: notify the server.. send term command??? - c.sc.Close() - } -} - - // -------------------------------------------------------------------- func (c *Client) handle_os_signals() { @@ -522,8 +610,7 @@ chan_loop: break chan_loop } } - -fmt.Printf ("end of signal handler...\n"); +fmt.Printf("end of signal handler\n") } // -------------------------------------------------------------------- @@ -545,7 +632,6 @@ BAMCA0gAMEUCIEKzVMF3JqjQjuM2rX7Rx8hancI5KJhwfeKu1xbyR7XaAiEA2UT7 func client_main(server_addr string, peer_addrs []string) error { var c *Client - var err error var cert_pool *x509.CertPool var tlscfg *tls.Config var cc ClientConfig @@ -555,23 +641,26 @@ func client_main(server_addr string, peer_addrs []string) error { if !ok { log.Fatal("failed to parse root certificate") } - tlscfg = &tls.Config{RootCAs: cert_pool, ServerName: "localhost", InsecureSkipVerify: true} + tlscfg = &tls.Config{ + RootCAs: cert_pool, + ServerName: "localhost", + InsecureSkipVerify: true, + } + + c = NewClient(context.Background(), tlscfg) + + c.wg.Add(1) + go c.handle_os_signals() cc.server_addr = server_addr cc.peer_addrs = peer_addrs - c, err = NewClient(&cc, tlscfg) - if err != nil { - fmt.Printf("failed create client - %s\n", err.Error()) - return err - } + c.RunService(&cc) -fmt.Printf ("XXXXXXXXXXXXXXXXXXXXXXXXXXXX\n"); - c.wg.Add(1) - go c.handle_os_signals() - c.wg.Add(1) - go c.RunTask(context.Background()); - c.wg.Wait(); -fmt.Printf ("YYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYYY\n"); + //cc.server_addr = "some other address..." + //cc.peer_addrs = peer_addrs + //c.RunService(&cc) + + c.WaitForTermination() return nil } diff --git a/s-peer.go b/s-peer.go index 5da0778..c6f47ef 100644 --- a/s-peer.go +++ b/s-peer.go @@ -58,12 +58,12 @@ wait_for_started: // the socket must have been closed too. goto done } - + case <- tmr.C: // connection failure, not in time tmr.Stop() goto done - + /*case <- spc->ctx->Done(): tmr.Stop() goto done*/ @@ -115,13 +115,13 @@ func (spc *ServerPeerConn) ReqStop() { } func (spc *ServerPeerConn) ReportEvent (event_type PACKET_KIND, event_data []byte) error { - + switch event_type { case PACKET_KIND_PEER_STARTED: if spc.client_peer_opened_received.CompareAndSwap(false, true) { spc.client_peer_status_chan <- true } - + case PACKET_KIND_PEER_STOPPED: if spc.client_peer_closed_received.CompareAndSwap(false, true) { spc.client_peer_status_chan <- false diff --git a/server.go b/server.go index 86e4b96..9ef4346 100644 --- a/server.go +++ b/server.go @@ -136,10 +136,10 @@ func (r *ServerRoute) RunTask() { go pts.RunTask() } } - + r.l.Close() // don't care about double close. it could have been closed in StopTask r.pts_wg.Wait() - + // cts.l_wg.Done() // TODO:inform that the job is done? } @@ -269,7 +269,7 @@ func (cts *ClientConn) RemoveServerRoute (route_id uint32) error { func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error { var r *ServerRoute var ok bool - + cts.route_mtx.Lock() r, ok = cts.routes[route_id] if (!ok) { @@ -277,7 +277,7 @@ func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type P return fmt.Errorf ("non-existent route id - %d", route_id) } cts.route_mtx.Unlock() - + return r.ReportEvent(pts_id, event_type, event_data) } @@ -334,7 +334,7 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { var pkt *Packet var err error var cts *ClientConn - + ctx = strm.Context() p, ok = peer.FromContext(ctx) if (!ok) { @@ -346,7 +346,6 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error()) } - for { // exit if context is done // or continue @@ -407,7 +406,7 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { } else { // TODO: send invalid request... or simply keep quiet? } - + case PACKET_KIND_PEER_STARTED: // the connection from the client to a peer has been established var x *Packet_Peer @@ -418,12 +417,12 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { if err != nil { // TODO: } else { - // TODO: + // TODO: } } else { // TODO } - + case PACKET_KIND_PEER_STOPPED: // the connection from the client to a peer has been established var x *Packet_Peer @@ -434,7 +433,7 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { if err != nil { // TODO: } else { - // TODO: + // TODO: } } else { // TODO @@ -450,7 +449,7 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { if err != nil { // TODO: } else { - // TODO: + // TODO: } } else { // TODO @@ -678,7 +677,7 @@ func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (* cts.svr = s cts.routes = make(ServerRouteMap) - cts.caddr = addr + cts.caddr = addr cts.pss = pss cts.stop_req.Store(false) @@ -727,7 +726,7 @@ func (s *Server) FindClientConnByAddr (addr net.Addr) *ClientConn { cts, ok = s.cts_map[addr] if !ok { - return nil + return nil } return cts