updated to support multiple notice handlers with a fix on race condition accessing cts.Token on the client side
This commit is contained in:
parent
8cde9f08d4
commit
e01a6b347c
@ -270,7 +270,7 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R
|
||||
CurrentServerIndex: cts.cfg.Index,
|
||||
ServerAddr: cts.remote_addr.Get(),
|
||||
ClientAddr: cts.local_addr.Get(),
|
||||
ClientToken: cts.Token,
|
||||
ClientToken: cts.Token.Get(),
|
||||
Routes: jsp,
|
||||
})
|
||||
}
|
||||
@ -385,7 +385,7 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt
|
||||
CurrentServerIndex: cts.cfg.Index,
|
||||
ServerAddr: cts.remote_addr.Get(),
|
||||
ClientAddr: cts.local_addr.Get(),
|
||||
ClientToken: cts.Token,
|
||||
ClientToken: cts.Token.Get(),
|
||||
Routes: jsp,
|
||||
}
|
||||
|
||||
|
23
client.go
23
client.go
@ -106,7 +106,7 @@ type Client struct {
|
||||
token string
|
||||
|
||||
log Logger
|
||||
conn_notice ClientConnNoticeHandler
|
||||
conn_notice_handlers []ClientConnNoticeHandler
|
||||
route_persister ClientRoutePersister
|
||||
|
||||
promreg *prometheus.Registry
|
||||
@ -132,7 +132,7 @@ type ClientConn struct {
|
||||
Id ConnId
|
||||
Sid string // id rendered in string
|
||||
State atomic.Int32 // ClientConnState
|
||||
Token string
|
||||
Token Atom[string]
|
||||
|
||||
local_addr Atom[string]
|
||||
remote_addr Atom[string]
|
||||
@ -1052,13 +1052,13 @@ start_over:
|
||||
cts.C.log.Write(cts.Sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
|
||||
cts.State.Store(CLIENT_CONN_CONNECTED)
|
||||
cts.Token = cts.cfg.ClientToken
|
||||
if cts.Token == "" { cts.Token = cts.C.token }
|
||||
cts.Token.Set(cts.cfg.ClientToken)
|
||||
if cts.Token.Get() == "" { cts.Token.Set(cts.C.token) }
|
||||
|
||||
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
|
||||
|
||||
if cts.Token != "" {
|
||||
err = cts.psc.Send(MakeConnDescPacket(cts.Token))
|
||||
if cts.Token.Get() != "" {
|
||||
err = cts.psc.Send(MakeConnDescPacket(cts.Token.Get()))
|
||||
if err != nil {
|
||||
cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to send conn-desc(%s) to server[%d] %s - %s", cts.Token, cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
|
||||
goto reconnect_to_server
|
||||
@ -1248,8 +1248,11 @@ start_over:
|
||||
x, ok = pkt.U.(*Packet_ConnNoti)
|
||||
if ok {
|
||||
cts.C.log.Write(cts.Sid, LOG_DEBUG, "conn_notice message '%s' received from %s", x.ConnNoti.Text, cts.remote_addr_p)
|
||||
if cts.C.conn_notice != nil {
|
||||
cts.C.conn_notice.Handle(cts, x.ConnNoti.Text)
|
||||
if cts.C.conn_notice_handlers != nil {
|
||||
var handler ClientConnNoticeHandler
|
||||
for _, handler = range cts.C.conn_notice_handlers {
|
||||
handler.Handle(cts, x.ConnNoti.Text)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_notice packet from %s", cts.remote_addr_p)
|
||||
@ -1931,8 +1934,8 @@ func (c *Client) WriteLog(id string, level LogLevel, fmtstr string, args ...inte
|
||||
c.log.Write(id, level, fmtstr, args...)
|
||||
}
|
||||
|
||||
func (c *Client) SetConnNoticeHandler(handler ClientConnNoticeHandler) {
|
||||
c.conn_notice = handler
|
||||
func (c *Client) SetConnNoticeHandlers(handlers []ClientConnNoticeHandler) {
|
||||
c.conn_notice_handlers = handlers
|
||||
}
|
||||
|
||||
func (c *Client) SetRoutePersister(persister ClientRoutePersister) {
|
||||
|
13
server.go
13
server.go
@ -176,7 +176,7 @@ type Server struct {
|
||||
pts_list *list.List
|
||||
|
||||
log Logger
|
||||
conn_notice ServerConnNoticeHandler
|
||||
conn_notice_handlers []ServerConnNoticeHandler
|
||||
|
||||
svc_port_mtx sync.Mutex
|
||||
svc_port_map ServerSvcPortMap
|
||||
@ -923,8 +923,11 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
||||
var ok bool
|
||||
x, ok = pkt.U.(*Packet_ConnNoti)
|
||||
if ok {
|
||||
if cts.S.conn_notice != nil {
|
||||
cts.S.conn_notice.Handle(cts, x.ConnNoti.Text)
|
||||
if cts.S.conn_notice_handlers != nil {
|
||||
var handler ServerConnNoticeHandler
|
||||
for _, handler = range cts.S.conn_notice_handlers {
|
||||
handler.Handle(cts, x.ConnNoti.Text)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_notice packet from %s", cts.RemoteAddr)
|
||||
@ -2152,8 +2155,8 @@ func (s *Server) WriteLog(id string, level LogLevel, fmtstr string, args ...inte
|
||||
s.log.Write(id, level, fmtstr, args...)
|
||||
}
|
||||
|
||||
func (s *Server) SetConnNoticeHandler(handler ServerConnNoticeHandler) {
|
||||
s.conn_notice = handler
|
||||
func (s *Server) SetConnNoticeHandlers(handlers []ServerConnNoticeHandler) {
|
||||
s.conn_notice_handlers = handlers
|
||||
}
|
||||
|
||||
func (s *Server) AddCtlHandler(path string, handler ServerHttpHandler) {
|
||||
|
Loading…
x
Reference in New Issue
Block a user