added close-on-conn-error-event.
added the client token in connection output over ctl
This commit is contained in:
parent
bec93289f5
commit
b6fb296608
@ -24,6 +24,7 @@ import "time"
|
|||||||
type json_in_client_conn struct {
|
type json_in_client_conn struct {
|
||||||
ServerAddrs []string `json:"server-addrs"` // multiple addresses for round-robin connection re-attempts
|
ServerAddrs []string `json:"server-addrs"` // multiple addresses for round-robin connection re-attempts
|
||||||
ClientToken string `json:"client-token"`
|
ClientToken string `json:"client-token"`
|
||||||
|
CloseOnConnErrorEvent bool `json:"close-on-conn-error-event"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type json_in_client_route struct {
|
type json_in_client_route struct {
|
||||||
@ -50,6 +51,7 @@ type json_out_client_conn struct {
|
|||||||
CurrentServerIndex int `json:"current-server-index"`
|
CurrentServerIndex int `json:"current-server-index"`
|
||||||
ServerAddr string `json:"server-addr"` // actual server address
|
ServerAddr string `json:"server-addr"` // actual server address
|
||||||
ClientAddr string `json:"client-addr"`
|
ClientAddr string `json:"client-addr"`
|
||||||
|
ClientToken string `json:"client-token"`
|
||||||
Routes []json_out_client_route `json:"routes"`
|
Routes []json_out_client_route `json:"routes"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,6 +253,7 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R
|
|||||||
CurrentServerIndex: cts.cfg.Index,
|
CurrentServerIndex: cts.cfg.Index,
|
||||||
ServerAddr: cts.remote_addr,
|
ServerAddr: cts.remote_addr,
|
||||||
ClientAddr: cts.local_addr,
|
ClientAddr: cts.local_addr,
|
||||||
|
ClientToken: cts.Token,
|
||||||
Routes: jsp,
|
Routes: jsp,
|
||||||
})
|
})
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
@ -279,6 +282,7 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R
|
|||||||
|
|
||||||
cc.ServerAddrs = in_cc.ServerAddrs
|
cc.ServerAddrs = in_cc.ServerAddrs
|
||||||
cc.ClientToken = in_cc.ClientToken
|
cc.ClientToken = in_cc.ClientToken
|
||||||
|
cc.CloseOnConnErrorEvent = in_cc.CloseOnConnErrorEvent
|
||||||
cts, err = c.start_service(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine?
|
cts, err = c.start_service(&cc) // TODO: this can be blocking. do we have to resolve addresses before calling this? also not good because resolution succeed or fail at each attempt. however ok as ServeHTTP itself is in a goroutine?
|
||||||
if err != nil {
|
if err != nil {
|
||||||
status_code = WriteJsonRespHeader(w, http.StatusInternalServerError)
|
status_code = WriteJsonRespHeader(w, http.StatusInternalServerError)
|
||||||
@ -358,6 +362,7 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt
|
|||||||
CurrentServerIndex: cts.cfg.Index,
|
CurrentServerIndex: cts.cfg.Index,
|
||||||
ServerAddr: cts.local_addr,
|
ServerAddr: cts.local_addr,
|
||||||
ClientAddr: cts.remote_addr,
|
ClientAddr: cts.remote_addr,
|
||||||
|
ClientToken: cts.Token,
|
||||||
Routes: jsp,
|
Routes: jsp,
|
||||||
}
|
}
|
||||||
cts.route_mtx.Unlock()
|
cts.route_mtx.Unlock()
|
||||||
|
18
client.go
18
client.go
@ -47,6 +47,7 @@ type ClientConnConfig struct {
|
|||||||
ServerSeedTmout time.Duration
|
ServerSeedTmout time.Duration
|
||||||
ServerAuthority string // http2 :authority header
|
ServerAuthority string // http2 :authority header
|
||||||
ClientToken string
|
ClientToken string
|
||||||
|
CloseOnConnErrorEvent bool
|
||||||
}
|
}
|
||||||
|
|
||||||
type ClientConnConfigActive struct {
|
type ClientConnConfigActive struct {
|
||||||
@ -131,6 +132,7 @@ type ClientConn struct {
|
|||||||
Id ConnId
|
Id ConnId
|
||||||
Sid string // id rendered in string
|
Sid string // id rendered in string
|
||||||
State ClientConnState
|
State ClientConnState
|
||||||
|
Token string
|
||||||
|
|
||||||
local_addr string
|
local_addr string
|
||||||
remote_addr string
|
remote_addr string
|
||||||
@ -945,7 +947,6 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
|||||||
var c_seed Seed
|
var c_seed Seed
|
||||||
var s_seed *Seed
|
var s_seed *Seed
|
||||||
var p *peer.Peer
|
var p *peer.Peer
|
||||||
var client_token string
|
|
||||||
var ok bool
|
var ok bool
|
||||||
var err error
|
var err error
|
||||||
var opts []grpc.DialOption
|
var opts []grpc.DialOption
|
||||||
@ -1009,18 +1010,18 @@ start_over:
|
|||||||
cts.C.log.Write(cts.Sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
cts.C.log.Write(cts.Sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||||
|
|
||||||
cts.State = CLIENT_CONN_CONNECTED
|
cts.State = CLIENT_CONN_CONNECTED
|
||||||
|
cts.Token = cts.cfg.ClientToken
|
||||||
|
if cts.Token == "" { cts.Token = cts.C.token }
|
||||||
|
|
||||||
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
|
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
|
||||||
|
|
||||||
client_token = cts.cfg.ClientToken
|
if cts.Token != "" {
|
||||||
if client_token == "" { client_token = cts.C.token }
|
err = cts.psc.Send(MakeConnDescPacket(cts.Token))
|
||||||
if client_token != "" {
|
|
||||||
err = cts.psc.Send(MakeConnDescPacket(client_token))
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to send conn-desc(%s) to server[%d] %s - %s", client_token, cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to send conn-desc(%s) to server[%d] %s - %s", cts.Token, cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
||||||
goto reconnect_to_server
|
goto reconnect_to_server
|
||||||
} else {
|
} else {
|
||||||
cts.C.log.Write(cts.Sid, LOG_DEBUG, "Sending conn-desc(%s) to server[%d] %s", client_token, cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
cts.C.log.Write(cts.Sid, LOG_DEBUG, "Sending conn-desc(%s) to server[%d] %s", cts.Token, cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1193,8 +1194,7 @@ start_over:
|
|||||||
x, ok = pkt.U.(*Packet_ConnErr)
|
x, ok = pkt.U.(*Packet_ConnErr)
|
||||||
if ok {
|
if ok {
|
||||||
cts.C.log.Write(cts.Sid, LOG_ERROR, "Received conn_error(%d, %s) event from %s", x.ConnErr.ErrorId, x.ConnErr.Text, cts.remote_addr)
|
cts.C.log.Write(cts.Sid, LOG_ERROR, "Received conn_error(%d, %s) event from %s", x.ConnErr.ErrorId, x.ConnErr.Text, cts.remote_addr)
|
||||||
// if no retry goto done.. othersise reconnect...
|
if cts.cfg.CloseOnConnErrorEvent { goto done }
|
||||||
goto done
|
|
||||||
} else {
|
} else {
|
||||||
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_error event from %s", cts.remote_addr)
|
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_error event from %s", cts.remote_addr)
|
||||||
}
|
}
|
||||||
|
@ -769,7 +769,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
|||||||
// error
|
// error
|
||||||
cts.S.cts_mtx.Unlock()
|
cts.S.cts_mtx.Unlock()
|
||||||
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_desc packet from %s - duplicate token '%s'", cts.RemoteAddr, x.Conn.Token)
|
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_desc packet from %s - duplicate token '%s'", cts.RemoteAddr, x.Conn.Token)
|
||||||
cts.pss.Send(MakeConnErrorPacket(1, "duplicate token refused"))
|
cts.pss.Send(MakeConnErrorPacket(1, fmt.Sprintf("duplicate token refused - %s", x.Conn.Token)))
|
||||||
cts.ReqStop() // TODO: is this desirable to disconnect?
|
cts.ReqStop() // TODO: is this desirable to disconnect?
|
||||||
} else {
|
} else {
|
||||||
if cts.ClientToken != "" { delete(cts.S.cts_map_by_token, cts.ClientToken) }
|
if cts.ClientToken != "" { delete(cts.S.cts_map_by_token, cts.ClientToken) }
|
||||||
@ -846,6 +846,7 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
|
|||||||
done:
|
done:
|
||||||
cts.ReqStop() // just in case
|
cts.ReqStop() // just in case
|
||||||
cts.route_wg.Wait()
|
cts.route_wg.Wait()
|
||||||
|
cts.S.log.Write(cts.Sid, LOG_INFO, "End of connection task")
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cts *ServerConn) ReqStop() {
|
func (cts *ServerConn) ReqStop() {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user