From 903e4cf6d3a9929c79bf4499a3a3b350fe73a77f Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Sun, 24 Nov 2024 22:33:19 +0900 Subject: [PATCH] updating for robustness --- client.go | 138 +++++++++++++++++++++--------------------------------- server.go | 1 - 2 files changed, 53 insertions(+), 86 deletions(-) diff --git a/client.go b/client.go index 0d81479..26e3140 100644 --- a/client.go +++ b/client.go @@ -7,7 +7,6 @@ import "crypto/tls" import "encoding/json" import "errors" import "fmt" -import "io" import "net" import "net/http" import "sync" @@ -16,7 +15,9 @@ import "time" //import "github.com/google/uuid" import "google.golang.org/grpc" +import "google.golang.org/grpc/codes" import "google.golang.org/grpc/credentials/insecure" +import "google.golang.org/grpc/status" const PTC_LIMIT = 8192 @@ -94,8 +95,6 @@ type ClientRoute struct { ptc_mtx sync.Mutex ptc_map ClientPeerConnMap ptc_cancel_map ClientPeerCancelFuncMap - //ptc_limit int - //ptc_last_id uint32 ptc_wg sync.WaitGroup stop_req atomic.Bool @@ -136,10 +135,8 @@ func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_P r.cts = cts r.id = id - //r.ptc_limit = PTC_LIMIT r.ptc_map = make(ClientPeerConnMap) r.ptc_cancel_map = make(ClientPeerCancelFuncMap) - //r.ptc_last_id = 0 r.proto = proto r.peer_addr = addr r.stop_req.Store(false) @@ -149,10 +146,18 @@ func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_P } func (r *ClientRoute) RunTask(wg *sync.WaitGroup) { + var err error + // this task on the route object isn't actually necessary. // most useful works are triggered by ReportEvent() and done by ConnectToPeer() defer wg.Done() + err = r.cts.psc.Send(MakeRouteStartPacket(r.id, r.proto, r.peer_addr.String())) + if err != nil { + //return fmt.Errorf("unable to send route-start packet - %s", err.Error()) + goto done; + } + main_loop: for { select { @@ -161,7 +166,12 @@ main_loop: } } +done: + r.ReqStop() r.ptc_wg.Wait() // wait for all peer tasks are finished + + r.cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr.String())) + r.cts.RemoveClientRoute(r) fmt.Printf ("*** End fo Client Roue Task\n") } @@ -171,7 +181,6 @@ func (r *ClientRoute) ReqStop() { for _, ptc = range r.ptc_map { ptc.ReqStop() } - r.stop_chan <- true } fmt.Printf ("*** Sent stop request to Route..\n") @@ -404,7 +413,6 @@ func (cts *ServerConn) AddClientRoutes (peer_addrs []string) error { var v string var addr *net.TCPAddr var proto ROUTE_PROTO - var r *ClientRoute var err error for i, v = range peer_addrs { @@ -425,58 +433,37 @@ func (cts *ServerConn) AddClientRoutes (peer_addrs []string) error { } } -// TODO: mutex protection - for _, r = range cts.route_map { - err = cts.psc.Send(MakeRouteStartPacket(r.id, r.proto, addr.String())) - if err != nil { -// TODO: remove all routes??? - return fmt.Errorf("unable to send route-start packet - %s", err.Error()) - } - } - return nil } -func (cts *ServerConn) RemoveClientRoutes () { - var r *ClientRoute - var id uint32 +func (cts *ServerConn) disconnect_from_server() { + if cts.conn != nil { + var r* ClientRoute - cts.route_mtx.Lock() - for _, r = range cts.route_map { - r.ReqStop() - } + cts.route_mtx.Lock() + for _, r = range cts.route_map { + r.ReqStop() + } + cts.route_mtx.Unlock() - for id, r = range cts.route_map { - delete(cts.route_map, id) - } - - cts.route_map = make(ClientRouteMap) - cts.route_mtx.Unlock() - -// TODO: mutex protection? - for _, r = range cts.route_map { - cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr.String())) + cts.conn.Close() + // don't reset cts.conn to nil here + // if this function is called from RunTask() + // for reconnection, it will be set to a new value + // immediately after the start_over lable in it. + // if it's called from ReqStop(), we don't really + // need to care about it. } } func (cts *ServerConn) ReqStop() { if cts.stop_req.CompareAndSwap(false, true) { - var r *ClientRoute - - cts.route_mtx.Lock() - for _, r = range cts.route_map { - cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr.String())) // don't care about failure - r.ReqStop() - } - cts.route_mtx.Unlock() - + cts.disconnect_from_server() cts.stop_chan <- true } } func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { - var conn *grpc.ClientConn = nil - var hdc HoduClient var psc PacketStreamClient var slpctx context.Context var c_seed Seed @@ -485,23 +472,25 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { defer wg.Done() // arrange to call at the end of this function -// TODO: HANDLE connection timeout.. - // ctx, _/*cancel*/ := context.WithTimeout(context.Background(), time.Second) start_over: + cts.cli.log.Write ("", LOG_DEBUG, "Total number of server connections = %d", len(cts.cli.cts_map)) + cts.cli.log.Write("", LOG_INFO, "Connecting to server %s", cts.saddr.String()) - conn, err = grpc.NewClient(cts.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) + cts.conn, err = grpc.NewClient(cts.saddr.String(), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { - cts.cli.log.Write("", LOG_ERROR, "Failed to connect to server %s - %s", cts.saddr.String(), err.Error()) + cts.cli.log.Write("", LOG_ERROR, "Failed to make client to server %s - %s", cts.saddr.String(), err.Error()) goto reconnect_to_server } + cts.hdc = NewHoduClient(cts.conn) - hdc = NewHoduClient(conn) +// TODO: HANDLE connection timeout.. may have to run GetSeed or PacketStream in anther goroutnine +// ctx, _/*cancel*/ := context.WithTimeout(context.Background(), time.Second) // seed exchange is for furture expansion of the protocol // there is nothing to do much about it for now. c_seed.Version = HODU_VERSION c_seed.Flags = 0 - s_seed, err = hdc.GetSeed(cts.cli.ctx, &c_seed) + s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed) if err != nil { cts.cli.log.Write("", LOG_ERROR, "Failed to get seed from server %s - %s", cts.saddr.String(), err.Error()) goto reconnect_to_server @@ -509,7 +498,9 @@ start_over: cts.s_seed = *s_seed cts.c_seed = c_seed - psc, err = hdc.PacketStream(cts.cli.ctx) + cts.cli.log.Write("", LOG_INFO, "Got seed from server %s - ver=%#x", cts.saddr.String(), cts.s_seed.Version) + + psc, err = cts.hdc.PacketStream(cts.cli.ctx) if err != nil { cts.cli.log.Write("", LOG_ERROR, "Failed to get packet stream from server %s - %s", cts.saddr.String(), err.Error()) goto reconnect_to_server @@ -517,9 +508,6 @@ start_over: cts.cli.log.Write("", LOG_INFO, "Got packet stream from server %s", cts.saddr.String()) - cts.conn = conn - cts.hdc = hdc - //cts.psc = &GuardedPacketStreamClient{psc: psc} cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc} // the connection structure to a server is ready. @@ -550,7 +538,7 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) pkt, err = psc.Recv() if err != nil { - if errors.Is(err, io.EOF) { + if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) { goto reconnect_to_server } else { cts.cli.log.Write("", LOG_INFO, "Failed to receive packet form server %s - %s", cts.saddr.String(), err.Error()) @@ -660,14 +648,16 @@ fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) done: cts.cli.log.Write("", LOG_INFO, "Disconnected from server %s", cts.saddr.String()) - cts.RemoveClientRoutes() - if conn != nil { conn.Close() } + //cts.RemoveClientRoutes() + cts.ReqStop() +wait_for_termination: cts.route_wg.Wait() // wait until all route tasks are finished + cts.cli.RemoveServerConn(cts) return reconnect_to_server: - cts.RemoveClientRoutes() - if conn != nil { conn.Close() } + cts.disconnect_from_server() + // wait for 2 seconds slpctx, _ = context.WithTimeout(cts.cli.ctx, 2 * time.Second) select { @@ -675,7 +665,9 @@ reconnect_to_server: fmt.Printf("context doine... error - %s\n", cts.cli.ctx.Err().Error()) goto done case <-cts.stop_chan: - goto done + // this signal indicates that ReqStop() has been called + // so jumt to the waiting label + goto wait_for_termination case <- slpctx.Done(): // do nothing } @@ -700,35 +692,12 @@ func (cts *ServerConn) ReportEvent (route_id uint32, pts_id uint32, event_type P func (r *ClientRoute) AddNewClientPeerConn (c *net.TCPConn, pts_id uint32) (*ClientPeerConn, error) { var ptc *ClientPeerConn - //var ok bool - //var start_id uint32 r.ptc_mtx.Lock() defer r.ptc_mtx.Unlock() -/* - if len(r.ptc_map) >= r.ptc_limit { - return nil, fmt.Errorf("peer-to-client connection table full") - } - - start_id = r.ptc_last_id - for { - _, ok = r.ptc_map[r.ptc_last_id] - if !ok { - break - } - r.ptc_last_id++ - if r.ptc_last_id == start_id { - // unlikely to happen but it cycled through the whole range. - return nil, fmt.Errorf("failed to assign peer-to-table connection id") - } - } - - ptc = NewClientPeerConn(r, c, r.ptc_last_id) -*/ ptc = NewClientPeerConn(r, c, pts_id) r.ptc_map[ptc.conn_id] = ptc - //r.ptc_last_id++ return ptc, nil } @@ -777,7 +746,7 @@ fmt.Printf ("ADD total servers %d\n", len(c.cts_map)) func (c *Client) RemoveServerConn(cts *ServerConn) { c.cts_mtx.Lock() delete(c.cts_map, cts.saddr) -fmt.Printf ("REMOVE total servers %d\n", len(c.cts_map)) +fmt.Printf ("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.saddr, len(c.cts_map)) c.cts_mtx.Unlock() } @@ -793,7 +762,6 @@ func (c *Client) ReqStop() { cts.ReqStop() } - // TODO: notify the server.. send term command??? c.stop_chan <- true c.ctx_cancel() } diff --git a/server.go b/server.go index 28e429d..326aefd 100644 --- a/server.go +++ b/server.go @@ -18,7 +18,6 @@ import "google.golang.org/grpc/peer" import "google.golang.org/grpc/stats" const PTS_LIMIT = 8192 -//const CTS_LIMIT = 2048 type ClientConnMap = map[net.Addr]*ClientConn type ServerPeerConnMap = map[uint32]*ServerPeerConn