diff --git a/client.go b/client.go index 3a1b94b..0f89384 100644 --- a/client.go +++ b/client.go @@ -30,6 +30,7 @@ type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet] type ServerConnMap = map[net.Addr]*ServerConn type ClientPeerConnMap = map[uint32]*ClientPeerConn type ClientRouteMap = map[uint32]*ClientRoute +type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc // -------------------------------------------------------------------- type ClientConfig struct { @@ -92,10 +93,11 @@ type ClientRoute struct { peer_addr *net.TCPAddr proto ROUTE_PROTO - ptc_mtx sync.Mutex - ptc_map ClientPeerConnMap - ptc_limit int - ptc_last_id uint32 + ptc_mtx sync.Mutex + ptc_map ClientPeerConnMap + ptc_cancel_map ClientPeerCancelFuncMap + //ptc_limit int + //ptc_last_id uint32 ptc_wg sync.WaitGroup stop_req atomic.Bool @@ -136,9 +138,10 @@ func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_P r.cts = cts r.id = id - r.ptc_limit = PTC_LIMIT + //r.ptc_limit = PTC_LIMIT r.ptc_map = make(ClientPeerConnMap) - r.ptc_last_id = 0 + r.ptc_cancel_map = make(ClientPeerCancelFuncMap) + //r.ptc_last_id = 0 r.proto = proto r.peer_addr = addr r.stop_req.Store(false) @@ -176,24 +179,32 @@ func (r *ClientRoute) ReqStop() { fmt.Printf ("*** Sent stop request to Route..\n"); } -func (r* ClientRoute) ConnectToPeer(pts_id uint32) { +func (r* ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) { var err error var conn net.Conn var real_conn *net.TCPConn var ptc *ClientPeerConn var d net.Dialer var ctx context.Context - //var cancel context.CancelFunc + var cancel context.CancelFunc var ok bool -// TODO: how to abort blocking DialTCP()? call cancellation funtion? + defer wg.Done() + // TODO: make timeuot value configurable // TODO: fire the cancellation function upon stop request??? - ctx, _ = context.WithTimeout(r.cts.cli.ctx, 10 * time.Second) - //defer cancel(): + ctx, cancel = context.WithTimeout(r.cts.cli.ctx, 10 * time.Second) + r.ptc_mtx.Lock() + r.ptc_cancel_map[pts_id] = cancel + r.ptc_mtx.Unlock() d.LocalAddr = nil // TOOD: use this if local address is specified conn, err = d.DialContext(ctx, "tcp", r.peer_addr.String()); + + r.ptc_mtx.Lock() + delete(r.ptc_cancel_map, pts_id) + r.ptc_mtx.Unlock() + if err != nil { // TODO: make send peer started failure mesage? fmt.Printf ("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error()) @@ -220,8 +231,8 @@ func (r* ClientRoute) ConnectToPeer(pts_id uint32) { goto peer_aborted } - r.ptc_wg.Add(1) - go ptc.RunTask(&r.ptc_wg) + wg.Add(1) + go ptc.RunTask(wg) return peer_aborted: @@ -236,9 +247,16 @@ peer_aborted: func (r* ClientRoute) DisconnectFromPeer(pts_id uint32) error { var ptc *ClientPeerConn + var cancel context.CancelFunc var ok bool r.ptc_mtx.Lock() + cancel, ok = r.ptc_cancel_map[pts_id] + if ok { +fmt.Printf ("~~~~~~~~~~~~~~~~ cancelling.....\n") + cancel() + } + ptc, ok = r.ptc_map[pts_id] if !ok { r.ptc_mtx.Unlock() @@ -273,7 +291,8 @@ func (r* ClientRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_ switch event_type { case PACKET_KIND_PEER_STARTED: fmt.Printf ("GOT PEER STARTD . CONENCT TO CLIENT_SIDE PEER\n") - r.ConnectToPeer(pts_id) + r.ptc_wg.Add(1) + go r.ConnectToPeer(pts_id, &r.ptc_wg) case PACKET_KIND_PEER_ABORTED: fallthrough