diff --git a/client.go b/client.go index fdd2766..148678e 100644 --- a/client.go +++ b/client.go @@ -325,8 +325,8 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts var real_conn_laddr string var ptc *ClientPeerConn var d net.Dialer - var ctx context.Context - var cancel context.CancelFunc + var waitctx context.Context + var cancel_wait context.CancelFunc var tmout time.Duration var ok bool @@ -337,16 +337,17 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts defer wg.Done() tmout = time.Duration(r.cts.cli.ptc_tmout) - if tmout <= 0 { tmout = 10 * time.Second} - ctx, cancel = context.WithTimeout(r.cts.cli.ctx, tmout) + if tmout <= 0 { tmout = 5 * time.Second} // TODO: make this configurable... + waitctx, cancel_wait = context.WithTimeout(r.cts.cli.ctx, tmout) r.ptc_mtx.Lock() - r.ptc_cancel_map[pts_id] = cancel + r.ptc_cancel_map[pts_id] = cancel_wait r.ptc_mtx.Unlock() d.LocalAddr = nil // TOOD: use this if local address is specified - conn, err = d.DialContext(ctx, "tcp", r.peer_addr) + conn, err = d.DialContext(waitctx, "tcp", r.peer_addr) r.ptc_mtx.Lock() + cancel_wait() delete(r.ptc_cancel_map, pts_id) r.ptc_mtx.Unlock() @@ -412,9 +413,7 @@ func (r *ClientRoute) DisconnectFromPeer(ptc *ClientPeerConn) error { p, ok = r.ptc_map[ptc.conn_id] if ok && p == ptc { cancel, ok = r.ptc_cancel_map[ptc.conn_id] - if ok { - cancel() - } + if ok { cancel() } } r.ptc_mtx.Unlock() @@ -784,6 +783,7 @@ func timed_interceptor(tmout time.Duration) grpc.UnaryClientInterceptor { func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { var psc PacketStreamClient var slpctx context.Context + var cancel_sleep context.CancelFunc var c_seed Seed var s_seed *Seed var p *peer.Peer @@ -1015,6 +1015,8 @@ start_over: done: cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) + +req_stop_and_wait_for_termination: //cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination cts.ReqStop() @@ -1024,21 +1026,27 @@ wait_for_termination: return reconnect_to_server: + if cts.conn != nil { + cts.cli.log.Write(cts.sid, LOG_INFO, "Disconnecting from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) + } cts.disconnect_from_server() // wait for 2 seconds - slpctx, _ = context.WithTimeout(cts.cli.ctx, 2 * time.Second) + slpctx, cancel_sleep = context.WithTimeout(cts.cli.ctx, 2 * time.Second) select { case <-cts.cli.ctx.Done(): // need to log cts.cli.ctx.Err().Error()? - goto done + cancel_sleep() + goto req_stop_and_wait_for_termination case <-cts.stop_chan: // this signal indicates that ReqStop() has been called // so jumt to the waiting label + cancel_sleep() goto wait_for_termination case <-slpctx.Done(): // do nothing } + cancel_sleep() goto start_over // and reconnect } diff --git a/server-peer.go b/server-peer.go index 64a9ab4..5ef37b4 100644 --- a/server-peer.go +++ b/server-peer.go @@ -1,5 +1,6 @@ package hodu +import "context" import "errors" import "io" import "net" @@ -44,7 +45,8 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) { var pss *GuardedPacketStreamServer var n int var buf [4096]byte - var tmr *time.Timer + var waitctx context.Context + var cancel_wait context.CancelFunc var status bool var err error var conn_raddr string @@ -64,29 +66,30 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) { goto done_without_stop } - tmr = time.NewTimer(4 * time.Second) // TODO: make this configurable... + // set up a timer to set waiting duration until the connection is + // actually established on the client side and it's informed... + waitctx, cancel_wait = context.WithTimeout(spc.route.cts.svr.ctx, 4 * time.Second) wait_for_started: for { select { case status = <- spc.client_peer_status_chan: - if status { - break wait_for_started - } else { + if !status { // the socket must have been closed too. + cancel_wait() goto done } + break wait_for_started - case <- tmr.C: - // connection failure, not in time - tmr.Stop() + case <- waitctx.Done(): + cancel_wait() goto done case <-spc.stop_chan: - tmr.Stop() + cancel_wait() goto done } } - tmr.Stop() + cancel_wait() for { n, err = spc.conn.Read(buf[:]) diff --git a/server-proxy.go b/server-proxy.go index cf0c546..92e9588 100644 --- a/server-proxy.go +++ b/server-proxy.go @@ -349,14 +349,20 @@ func (pxy *server_proxy_http_main) serve_upgraded(w http.ResponseWriter, req *ht } func (pxy *server_proxy_http_main) addr_to_transport (ctx context.Context, addr *net.TCPAddr) (*http.Transport, error) { - var err error var dialer *net.Dialer + var waitctx context.Context + var cancel_wait context.CancelFunc var conn net.Conn + var err error + // establish the connection. dialer = &net.Dialer{} - conn, err = dialer.DialContext(ctx, "tcp", addr.String()) + waitctx, cancel_wait = context.WithTimeout(ctx, 3 * time.Second) // TODO: make timeout configurable + conn, err = dialer.DialContext(waitctx, "tcp", addr.String()) + cancel_wait() if err != nil { return nil, err } + // create a transport that uses the connection return &http.Transport{ DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { return conn, nil @@ -442,17 +448,15 @@ func (pxy *server_proxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Re status_code = http.StatusInternalServerError; w.WriteHeader(status_code) goto oops } - upgrade_required = mutate_proxy_req_headers(req, proxy_req, path_prefix) -fmt.Printf ("AAAAAAAAAAAAAAAAAAAAa\n") + //fmt.Printf ("proxy NEW req [%+v]\n", proxy_req.Header) client = &http.Client{ Transport: transport, CheckRedirect: prevent_follow_redirect, - Timeout: 5 * time.Second, + Timeout: 4 * time.Second, // TODO: make this configurable.... } resp, err = client.Do(proxy_req) -fmt.Printf ("BBBBBBBBBBBBBBBBBBBBBBBB\n") //resp, err = transport.RoundTrip(proxy_req) if err != nil { status_code = http.StatusInternalServerError; w.WriteHeader(status_code)