updated to assign connection id and route id from 1, using 0 as an automatic assigned marker
updated the client-side api to accept a route id for route creation
This commit is contained in:
39
server.go
39
server.go
@ -212,7 +212,7 @@ func NewServerRoute(cts *ServerConn, id RouteId, option RouteOption, ptc_addr st
|
||||
r.PtcName = ptc_name
|
||||
r.pts_limit = PTS_LIMIT
|
||||
r.pts_map = make(ServerPeerConnMap)
|
||||
r.pts_next_id = 0
|
||||
r.pts_next_id = 1
|
||||
r.stop_req.Store(false)
|
||||
|
||||
return &r, nil
|
||||
@ -222,6 +222,7 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err
|
||||
var pts *ServerPeerConn
|
||||
var ok bool
|
||||
var start_id PeerId
|
||||
var assigned_id PeerId
|
||||
|
||||
r.pts_mtx.Lock()
|
||||
defer r.pts_mtx.Unlock()
|
||||
@ -234,18 +235,21 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err
|
||||
for {
|
||||
_, ok = r.pts_map[r.pts_next_id]
|
||||
if !ok {
|
||||
assigned_id = r.pts_next_id
|
||||
r.pts_next_id++
|
||||
if r.pts_next_id == 0 { r.pts_next_id++ }
|
||||
break
|
||||
}
|
||||
r.pts_next_id++
|
||||
if r.pts_next_id == 0 { r.pts_next_id++ }
|
||||
if r.pts_next_id == start_id {
|
||||
// unlikely to happen but it cycled through the whole range.
|
||||
return nil, fmt.Errorf("failed to assign peer-to-server connection id")
|
||||
}
|
||||
}
|
||||
|
||||
pts = NewServerPeerConn(r, c, r.pts_next_id)
|
||||
pts = NewServerPeerConn(r, c, assigned_id)
|
||||
r.pts_map[pts.conn_id] = pts
|
||||
r.pts_next_id++
|
||||
r.Cts.svr.stats.peers.Add(1)
|
||||
|
||||
return pts, nil
|
||||
@ -837,8 +841,11 @@ func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) {
|
||||
cc.server.log.Write("", LOG_INFO, "Client connected - %s", addr)
|
||||
|
||||
case *stats.ConnEnd:
|
||||
cc.server.log.Write("", LOG_INFO, "Client disconnected - %s", addr)
|
||||
cc.server.RemoveServerConnByAddr(p.Addr)
|
||||
var cts *ServerConn
|
||||
var log_id string
|
||||
cts, _ = cc.server.RemoveServerConnByAddr(p.Addr)
|
||||
if cts != nil { log_id = cts.sid }
|
||||
cc.server.log.Write(log_id, LOG_INFO, "Client disconnected - %s", addr)
|
||||
}
|
||||
}
|
||||
|
||||
@ -984,7 +991,7 @@ func NewServer(ctx context.Context, logger Logger, ctl_addrs []string, rpc_addrs
|
||||
s.ext_svcs = make([]Service, 0, 1)
|
||||
s.pts_limit = peer_max
|
||||
s.cts_limit = rpc_max
|
||||
s.cts_next_id = 0
|
||||
s.cts_next_id = 1
|
||||
s.cts_map = make(ServerConnMap)
|
||||
s.cts_map_by_addr = make(ServerConnMapByAddr)
|
||||
s.svc_port_map = make(ServerSvcPortMap)
|
||||
@ -1359,6 +1366,7 @@ func (s *Server) ReqStop() {
|
||||
func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, pss Hodu_PacketStreamServer) (*ServerConn, error) {
|
||||
var cts ServerConn
|
||||
var start_id ConnId
|
||||
var assigned_id ConnId
|
||||
var ok bool
|
||||
|
||||
cts.svr = s
|
||||
@ -1382,14 +1390,20 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
|
||||
start_id = s.cts_next_id
|
||||
for {
|
||||
_, ok = s.cts_map[s.cts_next_id]
|
||||
if !ok { break }
|
||||
if !ok {
|
||||
assigned_id = s.cts_next_id
|
||||
s.cts_next_id++;
|
||||
if s.cts_next_id == 0 { s.cts_next_id++ }
|
||||
break
|
||||
}
|
||||
s.cts_next_id++
|
||||
if s.cts_next_id == 0 { s.cts_next_id++ }
|
||||
if s.cts_next_id == start_id {
|
||||
s.cts_mtx.Unlock()
|
||||
return nil, fmt.Errorf("unable to assign id")
|
||||
}
|
||||
}
|
||||
cts.Id = s.cts_next_id
|
||||
cts.Id = assigned_id
|
||||
cts.sid = fmt.Sprintf("%d", cts.Id) // id in string used for logging
|
||||
|
||||
_, ok = s.cts_map_by_addr[cts.RemoteAddr]
|
||||
@ -1399,11 +1413,10 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
|
||||
}
|
||||
s.cts_map_by_addr[cts.RemoteAddr] = &cts
|
||||
s.cts_map[cts.Id] = &cts;
|
||||
s.cts_next_id++;
|
||||
s.stats.conns.Store(int64(len(s.cts_map)))
|
||||
s.cts_mtx.Unlock()
|
||||
|
||||
s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.RemoteAddr.String())
|
||||
s.log.Write(cts.sid, LOG_DEBUG, "Added client connection from %s", cts.RemoteAddr.String())
|
||||
return &cts, nil
|
||||
}
|
||||
|
||||
@ -1439,7 +1452,7 @@ func (s *Server) RemoveServerConn(cts *ServerConn) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) RemoveServerConnByAddr(addr net.Addr) error {
|
||||
func (s *Server) RemoveServerConnByAddr(addr net.Addr) (*ServerConn, error) {
|
||||
var cts *ServerConn
|
||||
var ok bool
|
||||
|
||||
@ -1448,7 +1461,7 @@ func (s *Server) RemoveServerConnByAddr(addr net.Addr) error {
|
||||
cts, ok = s.cts_map_by_addr[addr]
|
||||
if !ok {
|
||||
s.cts_mtx.Unlock()
|
||||
return fmt.Errorf("non-existent connection address - %s", addr.String())
|
||||
return nil, fmt.Errorf("non-existent connection address - %s", addr.String())
|
||||
}
|
||||
delete(s.cts_map, cts.Id)
|
||||
delete(s.cts_map_by_addr, cts.RemoteAddr)
|
||||
@ -1456,7 +1469,7 @@ func (s *Server) RemoveServerConnByAddr(addr net.Addr) error {
|
||||
s.cts_mtx.Unlock()
|
||||
|
||||
cts.ReqStop()
|
||||
return nil
|
||||
return cts, nil
|
||||
}
|
||||
|
||||
func (s *Server) FindServerConnById(id ConnId) *ServerConn {
|
||||
|
Reference in New Issue
Block a user