ensured to call context cancellation function
This commit is contained in:
parent
2424a63db9
commit
1681b34374
30
client.go
30
client.go
@ -325,8 +325,8 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
|
||||
var real_conn_laddr string
|
||||
var ptc *ClientPeerConn
|
||||
var d net.Dialer
|
||||
var ctx context.Context
|
||||
var cancel context.CancelFunc
|
||||
var waitctx context.Context
|
||||
var cancel_wait context.CancelFunc
|
||||
var tmout time.Duration
|
||||
var ok bool
|
||||
|
||||
@ -337,16 +337,17 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
|
||||
defer wg.Done()
|
||||
|
||||
tmout = time.Duration(r.cts.cli.ptc_tmout)
|
||||
if tmout <= 0 { tmout = 10 * time.Second}
|
||||
ctx, cancel = context.WithTimeout(r.cts.cli.ctx, tmout)
|
||||
if tmout <= 0 { tmout = 5 * time.Second} // TODO: make this configurable...
|
||||
waitctx, cancel_wait = context.WithTimeout(r.cts.cli.ctx, tmout)
|
||||
r.ptc_mtx.Lock()
|
||||
r.ptc_cancel_map[pts_id] = cancel
|
||||
r.ptc_cancel_map[pts_id] = cancel_wait
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
d.LocalAddr = nil // TOOD: use this if local address is specified
|
||||
conn, err = d.DialContext(ctx, "tcp", r.peer_addr)
|
||||
conn, err = d.DialContext(waitctx, "tcp", r.peer_addr)
|
||||
|
||||
r.ptc_mtx.Lock()
|
||||
cancel_wait()
|
||||
delete(r.ptc_cancel_map, pts_id)
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
@ -412,9 +413,7 @@ func (r *ClientRoute) DisconnectFromPeer(ptc *ClientPeerConn) error {
|
||||
p, ok = r.ptc_map[ptc.conn_id]
|
||||
if ok && p == ptc {
|
||||
cancel, ok = r.ptc_cancel_map[ptc.conn_id]
|
||||
if ok {
|
||||
cancel()
|
||||
}
|
||||
if ok { cancel() }
|
||||
}
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
@ -784,6 +783,7 @@ func timed_interceptor(tmout time.Duration) grpc.UnaryClientInterceptor {
|
||||
func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
||||
var psc PacketStreamClient
|
||||
var slpctx context.Context
|
||||
var cancel_sleep context.CancelFunc
|
||||
var c_seed Seed
|
||||
var s_seed *Seed
|
||||
var p *peer.Peer
|
||||
@ -1015,6 +1015,8 @@ start_over:
|
||||
|
||||
done:
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
|
||||
req_stop_and_wait_for_termination:
|
||||
//cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination
|
||||
cts.ReqStop()
|
||||
|
||||
@ -1024,21 +1026,27 @@ wait_for_termination:
|
||||
return
|
||||
|
||||
reconnect_to_server:
|
||||
if cts.conn != nil {
|
||||
cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnecting from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
}
|
||||
cts.disconnect_from_server()
|
||||
|
||||
// wait for 2 seconds
|
||||
slpctx, _ = context.WithTimeout(cts.cli.ctx, 2 * time.Second)
|
||||
slpctx, cancel_sleep = context.WithTimeout(cts.cli.ctx, 2 * time.Second)
|
||||
select {
|
||||
case <-cts.cli.ctx.Done():
|
||||
// need to log cts.cli.ctx.Err().Error()?
|
||||
goto done
|
||||
cancel_sleep()
|
||||
goto req_stop_and_wait_for_termination
|
||||
case <-cts.stop_chan:
|
||||
// this signal indicates that ReqStop() has been called
|
||||
// so jumt to the waiting label
|
||||
cancel_sleep()
|
||||
goto wait_for_termination
|
||||
case <-slpctx.Done():
|
||||
// do nothing
|
||||
}
|
||||
cancel_sleep()
|
||||
goto start_over // and reconnect
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
package hodu
|
||||
|
||||
import "context"
|
||||
import "errors"
|
||||
import "io"
|
||||
import "net"
|
||||
@ -44,7 +45,8 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) {
|
||||
var pss *GuardedPacketStreamServer
|
||||
var n int
|
||||
var buf [4096]byte
|
||||
var tmr *time.Timer
|
||||
var waitctx context.Context
|
||||
var cancel_wait context.CancelFunc
|
||||
var status bool
|
||||
var err error
|
||||
var conn_raddr string
|
||||
@ -64,29 +66,30 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) {
|
||||
goto done_without_stop
|
||||
}
|
||||
|
||||
tmr = time.NewTimer(4 * time.Second) // TODO: make this configurable...
|
||||
// set up a timer to set waiting duration until the connection is
|
||||
// actually established on the client side and it's informed...
|
||||
waitctx, cancel_wait = context.WithTimeout(spc.route.cts.svr.ctx, 4 * time.Second)
|
||||
wait_for_started:
|
||||
for {
|
||||
select {
|
||||
case status = <- spc.client_peer_status_chan:
|
||||
if status {
|
||||
break wait_for_started
|
||||
} else {
|
||||
if !status {
|
||||
// the socket must have been closed too.
|
||||
cancel_wait()
|
||||
goto done
|
||||
}
|
||||
break wait_for_started
|
||||
|
||||
case <- tmr.C:
|
||||
// connection failure, not in time
|
||||
tmr.Stop()
|
||||
case <- waitctx.Done():
|
||||
cancel_wait()
|
||||
goto done
|
||||
|
||||
case <-spc.stop_chan:
|
||||
tmr.Stop()
|
||||
cancel_wait()
|
||||
goto done
|
||||
}
|
||||
}
|
||||
tmr.Stop()
|
||||
cancel_wait()
|
||||
|
||||
for {
|
||||
n, err = spc.conn.Read(buf[:])
|
||||
|
@ -349,14 +349,20 @@ func (pxy *server_proxy_http_main) serve_upgraded(w http.ResponseWriter, req *ht
|
||||
}
|
||||
|
||||
func (pxy *server_proxy_http_main) addr_to_transport (ctx context.Context, addr *net.TCPAddr) (*http.Transport, error) {
|
||||
var err error
|
||||
var dialer *net.Dialer
|
||||
var waitctx context.Context
|
||||
var cancel_wait context.CancelFunc
|
||||
var conn net.Conn
|
||||
var err error
|
||||
|
||||
// establish the connection.
|
||||
dialer = &net.Dialer{}
|
||||
conn, err = dialer.DialContext(ctx, "tcp", addr.String())
|
||||
waitctx, cancel_wait = context.WithTimeout(ctx, 3 * time.Second) // TODO: make timeout configurable
|
||||
conn, err = dialer.DialContext(waitctx, "tcp", addr.String())
|
||||
cancel_wait()
|
||||
if err != nil { return nil, err }
|
||||
|
||||
// create a transport that uses the connection
|
||||
return &http.Transport{
|
||||
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
|
||||
return conn, nil
|
||||
@ -442,17 +448,15 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re
|
||||
status_code = http.StatusInternalServerError; w.WriteHeader(status_code)
|
||||
goto oops
|
||||
}
|
||||
|
||||
upgrade_required = mutate_proxy_req_headers(req, proxy_req, path_prefix)
|
||||
fmt.Printf ("AAAAAAAAAAAAAAAAAAAAa\n")
|
||||
|
||||
//fmt.Printf ("proxy NEW req [%+v]\n", proxy_req.Header)
|
||||
client = &http.Client{
|
||||
Transport: transport,
|
||||
CheckRedirect: prevent_follow_redirect,
|
||||
Timeout: 5 * time.Second,
|
||||
Timeout: 4 * time.Second, // TODO: make this configurable....
|
||||
}
|
||||
resp, err = client.Do(proxy_req)
|
||||
fmt.Printf ("BBBBBBBBBBBBBBBBBBBBBBBB\n")
|
||||
//resp, err = transport.RoundTrip(proxy_req)
|
||||
if err != nil {
|
||||
status_code = http.StatusInternalServerError; w.WriteHeader(status_code)
|
||||
|
Loading…
x
Reference in New Issue
Block a user