implemented cancellation of the client-peer connection attempt
This commit is contained in:
parent
b981748b78
commit
e018bc9cab
47
client.go
47
client.go
@ -30,6 +30,7 @@ type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet]
|
|||||||
type ServerConnMap = map[net.Addr]*ServerConn
|
type ServerConnMap = map[net.Addr]*ServerConn
|
||||||
type ClientPeerConnMap = map[uint32]*ClientPeerConn
|
type ClientPeerConnMap = map[uint32]*ClientPeerConn
|
||||||
type ClientRouteMap = map[uint32]*ClientRoute
|
type ClientRouteMap = map[uint32]*ClientRoute
|
||||||
|
type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc
|
||||||
|
|
||||||
// --------------------------------------------------------------------
|
// --------------------------------------------------------------------
|
||||||
type ClientConfig struct {
|
type ClientConfig struct {
|
||||||
@ -92,10 +93,11 @@ type ClientRoute struct {
|
|||||||
peer_addr *net.TCPAddr
|
peer_addr *net.TCPAddr
|
||||||
proto ROUTE_PROTO
|
proto ROUTE_PROTO
|
||||||
|
|
||||||
ptc_mtx sync.Mutex
|
ptc_mtx sync.Mutex
|
||||||
ptc_map ClientPeerConnMap
|
ptc_map ClientPeerConnMap
|
||||||
ptc_limit int
|
ptc_cancel_map ClientPeerCancelFuncMap
|
||||||
ptc_last_id uint32
|
//ptc_limit int
|
||||||
|
//ptc_last_id uint32
|
||||||
ptc_wg sync.WaitGroup
|
ptc_wg sync.WaitGroup
|
||||||
|
|
||||||
stop_req atomic.Bool
|
stop_req atomic.Bool
|
||||||
@ -136,9 +138,10 @@ func NewClientRoute(cts *ServerConn, id uint32, addr *net.TCPAddr, proto ROUTE_P
|
|||||||
|
|
||||||
r.cts = cts
|
r.cts = cts
|
||||||
r.id = id
|
r.id = id
|
||||||
r.ptc_limit = PTC_LIMIT
|
//r.ptc_limit = PTC_LIMIT
|
||||||
r.ptc_map = make(ClientPeerConnMap)
|
r.ptc_map = make(ClientPeerConnMap)
|
||||||
r.ptc_last_id = 0
|
r.ptc_cancel_map = make(ClientPeerCancelFuncMap)
|
||||||
|
//r.ptc_last_id = 0
|
||||||
r.proto = proto
|
r.proto = proto
|
||||||
r.peer_addr = addr
|
r.peer_addr = addr
|
||||||
r.stop_req.Store(false)
|
r.stop_req.Store(false)
|
||||||
@ -176,24 +179,32 @@ func (r *ClientRoute) ReqStop() {
|
|||||||
fmt.Printf ("*** Sent stop request to Route..\n");
|
fmt.Printf ("*** Sent stop request to Route..\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r* ClientRoute) ConnectToPeer(pts_id uint32) {
|
func (r* ClientRoute) ConnectToPeer(pts_id uint32, wg *sync.WaitGroup) {
|
||||||
var err error
|
var err error
|
||||||
var conn net.Conn
|
var conn net.Conn
|
||||||
var real_conn *net.TCPConn
|
var real_conn *net.TCPConn
|
||||||
var ptc *ClientPeerConn
|
var ptc *ClientPeerConn
|
||||||
var d net.Dialer
|
var d net.Dialer
|
||||||
var ctx context.Context
|
var ctx context.Context
|
||||||
//var cancel context.CancelFunc
|
var cancel context.CancelFunc
|
||||||
var ok bool
|
var ok bool
|
||||||
|
|
||||||
// TODO: how to abort blocking DialTCP()? call cancellation funtion?
|
defer wg.Done()
|
||||||
|
|
||||||
// TODO: make timeuot value configurable
|
// TODO: make timeuot value configurable
|
||||||
// TODO: fire the cancellation function upon stop request???
|
// TODO: fire the cancellation function upon stop request???
|
||||||
ctx, _ = context.WithTimeout(r.cts.cli.ctx, 10 * time.Second)
|
ctx, cancel = context.WithTimeout(r.cts.cli.ctx, 10 * time.Second)
|
||||||
//defer cancel():
|
r.ptc_mtx.Lock()
|
||||||
|
r.ptc_cancel_map[pts_id] = cancel
|
||||||
|
r.ptc_mtx.Unlock()
|
||||||
|
|
||||||
d.LocalAddr = nil // TOOD: use this if local address is specified
|
d.LocalAddr = nil // TOOD: use this if local address is specified
|
||||||
conn, err = d.DialContext(ctx, "tcp", r.peer_addr.String());
|
conn, err = d.DialContext(ctx, "tcp", r.peer_addr.String());
|
||||||
|
|
||||||
|
r.ptc_mtx.Lock()
|
||||||
|
delete(r.ptc_cancel_map, pts_id)
|
||||||
|
r.ptc_mtx.Unlock()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// TODO: make send peer started failure mesage?
|
// TODO: make send peer started failure mesage?
|
||||||
fmt.Printf ("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error())
|
fmt.Printf ("failed to connect to %s - %s\n", r.peer_addr.String(), err.Error())
|
||||||
@ -220,8 +231,8 @@ func (r* ClientRoute) ConnectToPeer(pts_id uint32) {
|
|||||||
goto peer_aborted
|
goto peer_aborted
|
||||||
}
|
}
|
||||||
|
|
||||||
r.ptc_wg.Add(1)
|
wg.Add(1)
|
||||||
go ptc.RunTask(&r.ptc_wg)
|
go ptc.RunTask(wg)
|
||||||
return
|
return
|
||||||
|
|
||||||
peer_aborted:
|
peer_aborted:
|
||||||
@ -236,9 +247,16 @@ peer_aborted:
|
|||||||
|
|
||||||
func (r* ClientRoute) DisconnectFromPeer(pts_id uint32) error {
|
func (r* ClientRoute) DisconnectFromPeer(pts_id uint32) error {
|
||||||
var ptc *ClientPeerConn
|
var ptc *ClientPeerConn
|
||||||
|
var cancel context.CancelFunc
|
||||||
var ok bool
|
var ok bool
|
||||||
|
|
||||||
r.ptc_mtx.Lock()
|
r.ptc_mtx.Lock()
|
||||||
|
cancel, ok = r.ptc_cancel_map[pts_id]
|
||||||
|
if ok {
|
||||||
|
fmt.Printf ("~~~~~~~~~~~~~~~~ cancelling.....\n")
|
||||||
|
cancel()
|
||||||
|
}
|
||||||
|
|
||||||
ptc, ok = r.ptc_map[pts_id]
|
ptc, ok = r.ptc_map[pts_id]
|
||||||
if !ok {
|
if !ok {
|
||||||
r.ptc_mtx.Unlock()
|
r.ptc_mtx.Unlock()
|
||||||
@ -273,7 +291,8 @@ func (r* ClientRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_
|
|||||||
switch event_type {
|
switch event_type {
|
||||||
case PACKET_KIND_PEER_STARTED:
|
case PACKET_KIND_PEER_STARTED:
|
||||||
fmt.Printf ("GOT PEER STARTD . CONENCT TO CLIENT_SIDE PEER\n")
|
fmt.Printf ("GOT PEER STARTD . CONENCT TO CLIENT_SIDE PEER\n")
|
||||||
r.ConnectToPeer(pts_id)
|
r.ptc_wg.Add(1)
|
||||||
|
go r.ConnectToPeer(pts_id, &r.ptc_wg)
|
||||||
|
|
||||||
case PACKET_KIND_PEER_ABORTED:
|
case PACKET_KIND_PEER_ABORTED:
|
||||||
fallthrough
|
fallthrough
|
||||||
|
Loading…
Reference in New Issue
Block a user