added Atom[T] to have atomic manipulation of composite values
This commit is contained in:
100
server.go
100
server.go
@ -67,10 +67,6 @@ type ServerConfig struct {
|
||||
WpxTls *tls.Config
|
||||
}
|
||||
|
||||
const SERVER_EVENT_TOPIC_CONN string = "conn"
|
||||
const SERVER_EVENT_TOPIC_ROUTE string = "route"
|
||||
const SERVER_EVENT_TOPIC_PEER string = "peer"
|
||||
|
||||
type ServerEventKind int
|
||||
const (
|
||||
SERVER_EVENT_CONN_ADDED = iota
|
||||
@ -81,15 +77,35 @@ const (
|
||||
SERVER_EVENT_PEER_DELETED
|
||||
)
|
||||
|
||||
type ServerEventDesc struct {
|
||||
Conn ConnId
|
||||
Route RouteId
|
||||
Peer PeerId
|
||||
type ServerEvent struct {
|
||||
Kind ServerEventKind `json:"type"`
|
||||
Data interface{} `json:"data"`
|
||||
}
|
||||
|
||||
type ServerEvent struct {
|
||||
Kind ServerEventKind
|
||||
Desc ServerEventDesc
|
||||
type ServerEventConnAdded struct {
|
||||
Conn ConnId `json:"conn-id"`
|
||||
ServerAddr string `json:"server-addr"`
|
||||
ClientAddr string `json:"client-addr"`
|
||||
ClientToken string `json:"client-token"`
|
||||
}
|
||||
|
||||
type ServerEventConnDeleted struct {
|
||||
Conn ConnId `json:"conn-id:"`
|
||||
}
|
||||
|
||||
type ServerEventRouteAdded struct {
|
||||
Conn ConnId `json:"conn-id"`
|
||||
Route RouteId `json:"route-id"`
|
||||
ClientPeerAddr string `json:"client-peer-addr"`
|
||||
ClientPeerName string `json:"client-peer-name"`
|
||||
ServerPeerOption string `json:"server-peer-option"`
|
||||
ServerPeerSvcAddr string `json:"server-peer-svc-addr"`
|
||||
ServerPeerSvcNet string `json:"server-peer-svc-net"`
|
||||
}
|
||||
|
||||
type ServerEventRouteDeleted struct {
|
||||
Conn ConnId `json:"conn-id"`
|
||||
Route RouteId `json:"route-id"`
|
||||
}
|
||||
|
||||
type ServerEventBulletin = Bulletin[*ServerEvent]
|
||||
@ -160,7 +176,7 @@ type ServerConn struct {
|
||||
S *Server
|
||||
Id ConnId
|
||||
Sid string // for logging
|
||||
ClientToken string // provided by client
|
||||
ClientToken Atom[string] // provided by client
|
||||
|
||||
RemoteAddr net.Addr // client address that created this structure
|
||||
LocalAddr net.Addr // local address that the client is connected to
|
||||
@ -365,6 +381,15 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
|
||||
r.Cts.S.log.Write(r.Cts.Sid, LOG_DEBUG, "All service-side peer handlers ended on route(%d)", r.Id)
|
||||
|
||||
r.Cts.RemoveServerRoute(r) // final phase...
|
||||
|
||||
r.Cts.S.bulletin.Enqueue(
|
||||
&ServerEvent{
|
||||
Kind: SERVER_EVENT_ROUTE_DELETED,
|
||||
Data: &ServerEventRouteDeleted {
|
||||
Conn: r.Cts.Id, Route: r.Id,
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
|
||||
func (r *ServerRoute) ReqStop() {
|
||||
@ -501,6 +526,22 @@ func (cts *ServerConn) AddNewServerRoute(route_id RouteId, proto RouteOption, pt
|
||||
cts.S.stats.routes.Add(1)
|
||||
cts.route_mtx.Unlock()
|
||||
|
||||
cts.S.bulletin.Enqueue(
|
||||
&ServerEvent{
|
||||
Kind: SERVER_EVENT_ROUTE_ADDED,
|
||||
Data: &ServerEventRouteAdded{
|
||||
Conn: cts.Id,
|
||||
Route: r.Id,
|
||||
ClientPeerAddr: r.PtcAddr,
|
||||
ClientPeerName: r.PtcName,
|
||||
ServerPeerSvcAddr: r.SvcAddr.String(),
|
||||
ServerPeerSvcNet: r.SvcPermNet.String(),
|
||||
ServerPeerOption: r.SvcOption.String(),
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
// Don't detached the cts task as a go-routine as this function
|
||||
cts.route_wg.Add(1)
|
||||
go r.RunTask(&cts.route_wg)
|
||||
return r, nil
|
||||
@ -786,7 +827,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
||||
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_desc packet from %s - blank token", cts.RemoteAddr)
|
||||
cts.pss.Send(MakeConnErrorPacket(1, "blank token refused"))
|
||||
cts.ReqStop() // TODO: is this desirable to disconnect?
|
||||
} else if x.Conn.Token != cts.ClientToken {
|
||||
} else if x.Conn.Token != cts.ClientToken.Get() {
|
||||
_, err = strconv.ParseUint(x.Conn.Token, 10, int(unsafe.Sizeof(ConnId(0)) * 8))
|
||||
if err == nil { // this is not != nil. this is to check if the token is numeric
|
||||
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_desc packet from %s - numeric token '%s'", cts.RemoteAddr, x.Conn.Token)
|
||||
@ -802,8 +843,8 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
|
||||
cts.pss.Send(MakeConnErrorPacket(1, fmt.Sprintf("duplicate token refused - %s", x.Conn.Token)))
|
||||
cts.ReqStop() // TODO: is this desirable to disconnect?
|
||||
} else {
|
||||
if cts.ClientToken != "" { delete(cts.S.cts_map_by_token, cts.ClientToken) }
|
||||
cts.ClientToken = x.Conn.Token
|
||||
if cts.ClientToken.Get() != "" { delete(cts.S.cts_map_by_token, cts.ClientToken.Get()) }
|
||||
cts.ClientToken.Set(x.Conn.Token)
|
||||
cts.S.cts_map_by_token[x.Conn.Token] = cts
|
||||
cts.S.cts_mtx.Unlock()
|
||||
cts.S.log.Write(cts.Sid, LOG_INFO, "client(%d) %s - token set to '%s'", cts.Id, cts.RemoteAddr, x.Conn.Token)
|
||||
@ -850,8 +891,10 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
|
||||
// function be the channel waiter only.
|
||||
// increment on the wait group is for the caller to wait for
|
||||
// these detached goroutines to finish.
|
||||
wg.Add(1)
|
||||
go cts.receive_from_stream(wg)
|
||||
//wg.Add(1)
|
||||
//go cts.receive_from_stream(wg)
|
||||
cts.route_wg.Add(1)
|
||||
go cts.receive_from_stream(&cts.route_wg)
|
||||
|
||||
for {
|
||||
// exit if context is done
|
||||
@ -879,7 +922,7 @@ done:
|
||||
cts.S.bulletin.Enqueue(
|
||||
&ServerEvent{
|
||||
Kind: SERVER_EVENT_CONN_DELETED,
|
||||
Desc: ServerEventDesc{ Conn: cts.Id },
|
||||
Data: &ServerEventConnDeleted{ Conn: cts.Id },
|
||||
},
|
||||
)
|
||||
// Don't detached the cts task as a go-routine as this function
|
||||
@ -942,7 +985,12 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error {
|
||||
s.bulletin.Enqueue(
|
||||
&ServerEvent{
|
||||
Kind: SERVER_EVENT_CONN_ADDED,
|
||||
Desc: ServerEventDesc{ Conn: cts.Id },
|
||||
Data: &ServerEventConnAdded{
|
||||
Conn: cts.Id,
|
||||
ServerAddr: cts.LocalAddr.String(),
|
||||
ClientAddr: cts.RemoteAddr.String(),
|
||||
ClientToken: cts.ClientToken.Get(),
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
@ -1700,15 +1748,15 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
|
||||
s.cts_mtx.Unlock()
|
||||
return nil, fmt.Errorf("existing client address - %s", cts.RemoteAddr.String())
|
||||
}
|
||||
if cts.ClientToken != "" {
|
||||
if cts.ClientToken.Get() != "" {
|
||||
// this check is not needed as Token is never set at this phase
|
||||
// however leave statements here for completeness
|
||||
_, ok = s.cts_map_by_token[cts.ClientToken]
|
||||
_, ok = s.cts_map_by_token[cts.ClientToken.Get()]
|
||||
if ok {
|
||||
s.cts_mtx.Unlock()
|
||||
return nil, fmt.Errorf("existing client token - %s", cts.ClientToken)
|
||||
return nil, fmt.Errorf("existing client token - %s", cts.ClientToken.Get())
|
||||
}
|
||||
s.cts_map_by_token[cts.ClientToken] = &cts
|
||||
s.cts_map_by_token[cts.ClientToken.Get()] = &cts
|
||||
}
|
||||
s.cts_map_by_addr[cts.RemoteAddr] = &cts
|
||||
s.cts_map[cts.Id] = &cts
|
||||
@ -1744,7 +1792,7 @@ func (s *Server) RemoveServerConn(cts *ServerConn) error {
|
||||
|
||||
delete(s.cts_map, cts.Id)
|
||||
delete(s.cts_map_by_addr, cts.RemoteAddr)
|
||||
if cts.ClientToken != "" { delete(s.cts_map_by_token, cts.ClientToken) }
|
||||
if cts.ClientToken.Get() != "" { delete(s.cts_map_by_token, cts.ClientToken.Get()) }
|
||||
s.stats.conns.Store(int64(len(s.cts_map)))
|
||||
s.cts_mtx.Unlock()
|
||||
|
||||
@ -1765,7 +1813,7 @@ func (s *Server) RemoveServerConnByAddr(addr net.Addr) (*ServerConn, error) {
|
||||
}
|
||||
delete(s.cts_map, cts.Id)
|
||||
delete(s.cts_map_by_addr, cts.RemoteAddr)
|
||||
if cts.ClientToken != "" { delete(s.cts_map_by_token, cts.ClientToken) }
|
||||
if cts.ClientToken.Get() != "" { delete(s.cts_map_by_token, cts.ClientToken.Get()) }
|
||||
s.stats.conns.Store(int64(len(s.cts_map)))
|
||||
s.cts_mtx.Unlock()
|
||||
|
||||
@ -1786,7 +1834,7 @@ func (s *Server) RemoveServerConnByClientToken(token string) (*ServerConn, error
|
||||
}
|
||||
delete(s.cts_map, cts.Id)
|
||||
delete(s.cts_map_by_addr, cts.RemoteAddr)
|
||||
delete(s.cts_map_by_token, cts.ClientToken) // no Empty check becuase an empty token is never found in the map
|
||||
delete(s.cts_map_by_token, cts.ClientToken.Get()) // no Empty check becuase an empty token is never found in the map
|
||||
s.stats.conns.Store(int64(len(s.cts_map)))
|
||||
s.cts_mtx.Unlock()
|
||||
|
||||
|
Reference in New Issue
Block a user