changed id allocation back to linear allocation. time-based or random number-based allocation make debugging troublesome

This commit is contained in:
hyung-hwan 2024-12-13 23:19:26 +09:00
parent ea601f1011
commit 55fc4bdfcb
2 changed files with 85 additions and 67 deletions

View File

@ -5,7 +5,7 @@ import "crypto/tls"
import "errors" import "errors"
import "fmt" import "fmt"
import "log" import "log"
import "math/rand" //import "math/rand"
import "net" import "net"
import "net/http" import "net/http"
import "strings" import "strings"
@ -59,6 +59,7 @@ type Client struct {
ptc_limit int // global maximum number of peers ptc_limit int // global maximum number of peers
cts_limit int cts_limit int
cts_mtx sync.Mutex cts_mtx sync.Mutex
cts_next_id ConnId
cts_map ClientConnMap cts_map ClientConnMap
wg sync.WaitGroup wg sync.WaitGroup
@ -76,26 +77,27 @@ type Client struct {
// client connection to server // client connection to server
type ClientConn struct { type ClientConn struct {
cli *Client cli *Client
cfg ClientConfigActive cfg ClientConfigActive
id ConnId id ConnId
sid string // id rendered in string sid string // id rendered in string
local_addr string local_addr string
remote_addr string remote_addr string
conn *grpc.ClientConn // grpc connection to the server conn *grpc.ClientConn // grpc connection to the server
hdc HoduClient hdc HoduClient
psc *GuardedPacketStreamClient // guarded grpc stream psc *GuardedPacketStreamClient // guarded grpc stream
s_seed Seed s_seed Seed
c_seed Seed c_seed Seed
route_mtx sync.Mutex route_mtx sync.Mutex
route_map ClientRouteMap route_next_id RouteId
route_wg sync.WaitGroup route_map ClientRouteMap
route_wg sync.WaitGroup
stop_req atomic.Bool stop_req atomic.Bool
stop_chan chan bool stop_chan chan bool
} }
type ClientRoute struct { type ClientRoute struct {
@ -579,6 +581,7 @@ func NewClientConn(c *Client, cfg *ClientConfig) *ClientConn {
cts.cli = c cts.cli = c
cts.route_map = make(ClientRouteMap) cts.route_map = make(ClientRouteMap)
cts.route_next_id = 0
cts.cfg.ClientConfig = *cfg cts.cfg.ClientConfig = *cfg
cts.stop_req.Store(false) cts.stop_req.Store(false)
cts.stop_chan = make(chan bool, 8) cts.stop_chan = make(chan bool, 8)
@ -591,32 +594,30 @@ func NewClientConn(c *Client, cfg *ClientConfig) *ClientConn {
func (cts *ClientConn) AddNewClientRoute(addr string, server_peer_svc_addr string, server_peer_svc_net string, option RouteOption) (*ClientRoute, error) { func (cts *ClientConn) AddNewClientRoute(addr string, server_peer_svc_addr string, server_peer_svc_net string, option RouteOption) (*ClientRoute, error) {
var r *ClientRoute var r *ClientRoute
var id RouteId var start_id RouteId
var nattempts RouteId
nattempts = 0 //start_id = RouteId(rand.Uint64())
id = RouteId(rand.Uint32()) start_id = cts.route_next_id
cts.route_mtx.Lock() cts.route_mtx.Lock()
for { for {
var ok bool var ok bool
_, ok = cts.route_map[cts.route_next_id]
_, ok = cts.route_map[id]
if !ok { break } if !ok { break }
id++ cts.route_next_id++
nattempts++ if cts.route_next_id == start_id {
if nattempts == ^RouteId(0) {
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
return nil, fmt.Errorf("route map full") return nil, fmt.Errorf("unable to assign id")
} }
} }
r = NewClientRoute(cts, id, addr, server_peer_svc_addr, server_peer_svc_net, option) r = NewClientRoute(cts, cts.route_next_id, addr, server_peer_svc_addr, server_peer_svc_net, option)
cts.route_map[id] = r cts.route_map[r.id] = r
cts.route_next_id++
cts.cli.stats.routes.Add(1) cts.cli.stats.routes.Add(1)
cts.route_mtx.Unlock() cts.route_mtx.Unlock()
cts.cli.log.Write(cts.sid, LOG_INFO, "Added route(%d,%s)", id, addr) cts.cli.log.Write(cts.sid, LOG_INFO, "Added route(%d,%s)", r.id, addr)
cts.route_wg.Add(1) cts.route_wg.Add(1)
go r.RunTask(&cts.route_wg) go r.RunTask(&cts.route_wg)
@ -847,6 +848,7 @@ start_over:
} }
cts.s_seed = *s_seed cts.s_seed = *s_seed
cts.c_seed = c_seed cts.c_seed = c_seed
cts.route_next_id = 0 // reset this whenever a new connection is made. the number of routes must be zero.
cts.cli.log.Write(cts.sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version) cts.cli.log.Write(cts.sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version)
@ -875,6 +877,7 @@ start_over:
goto done goto done
} }
} }
// TODO: remember the previouslyu POSTed routes and readd them??
for { for {
var pkt *Packet var pkt *Packet
@ -1098,6 +1101,7 @@ func NewClient(ctx context.Context, logger Logger, ctl_addrs []string, ctl_prefi
c.ptc_tmout = peer_conn_tmout c.ptc_tmout = peer_conn_tmout
c.ptc_limit = peer_max c.ptc_limit = peer_max
c.cts_limit = rpc_max c.cts_limit = rpc_max
c.cts_next_id = 0
c.cts_map = make(ClientConnMap) c.cts_map = make(ClientConnMap)
c.stop_req.Store(false) c.stop_req.Store(false)
c.stop_chan = make(chan bool, 8) c.stop_chan = make(chan bool, 8)
@ -1139,7 +1143,7 @@ func NewClient(ctx context.Context, logger Logger, ctl_addrs []string, ctl_prefi
func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) { func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
var cts *ClientConn var cts *ClientConn
var ok bool var ok bool
var id ConnId var start_id ConnId
if len(cfg.ServerAddrs) <= 0 { if len(cfg.ServerAddrs) <= 0 {
return nil, fmt.Errorf("no server rpc address specified") return nil, fmt.Errorf("no server rpc address specified")
@ -1154,18 +1158,24 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
return nil, fmt.Errorf("too many connections - %d", c.cts_limit) return nil, fmt.Errorf("too many connections - %d", c.cts_limit)
} }
//id = rand.Uint32() //start_id = rand.Uint64()
id = ConnId(monotonic_time() / 1000) //start_id = ConnId(monotonic_time() / 1000)
start_id = c.cts_next_id
for { for {
_, ok = c.cts_map[id] _, ok = c.cts_map[c.cts_next_id]
if !ok { break } if !ok { break }
id++ c.cts_next_id++
if c.cts_next_id == start_id {
c.cts_mtx.Lock()
return nil, fmt.Errorf("unable to assign id")
}
} }
cts.id = id cts.id = c.cts_next_id
cts.cfg.Id = id // store it again in the active configuration for easy access via control channel cts.cfg.Id = cts.id // store it again in the active configuration for easy access via control channel
cts.sid = fmt.Sprintf("%d", id) // id in string used for logging cts.sid = fmt.Sprintf("%d", cts.id) // id in string used for logging
c.cts_map[id] = cts c.cts_map[cts.id] = cts
c.cts_next_id++
c.stats.conns.Add(1) c.stats.conns.Add(1)
c.cts_mtx.Unlock() c.cts_mtx.Unlock()

View File

@ -57,6 +57,7 @@ type Server struct {
pts_limit int // global pts limit pts_limit int // global pts limit
cts_limit int cts_limit int
cts_next_id ConnId
cts_mtx sync.Mutex cts_mtx sync.Mutex
cts_map ServerConnMap cts_map ServerConnMap
cts_map_by_addr ServerConnMapByAddr cts_map_by_addr ServerConnMapByAddr
@ -76,21 +77,21 @@ type Server struct {
// connection from client. // connection from client.
// client connect to the server, the server accept it, and makes a tunnel request // client connect to the server, the server accept it, and makes a tunnel request
type ServerConn struct { type ServerConn struct {
svr *Server svr *Server
id ConnId id ConnId
sid string // for logging sid string // for logging
remote_addr net.Addr // client address that created this structure remote_addr net.Addr // client address that created this structure
local_addr net.Addr // local address that the client is connected to local_addr net.Addr // local address that the client is connected to
pss *GuardedPacketStreamServer pss *GuardedPacketStreamServer
route_mtx sync.Mutex route_mtx sync.Mutex
route_map ServerRouteMap route_map ServerRouteMap
route_wg sync.WaitGroup route_wg sync.WaitGroup
wg sync.WaitGroup wg sync.WaitGroup
stop_req atomic.Bool stop_req atomic.Bool
stop_chan chan bool stop_chan chan bool
} }
type ServerRoute struct { type ServerRoute struct {
@ -107,7 +108,7 @@ type ServerRoute struct {
pts_mtx sync.Mutex pts_mtx sync.Mutex
pts_map ServerPeerConnMap pts_map ServerPeerConnMap
pts_limit int pts_limit int
pts_last_id PeerId pts_next_id PeerId
pts_wg sync.WaitGroup pts_wg sync.WaitGroup
stop_req atomic.Bool stop_req atomic.Bool
} }
@ -181,7 +182,7 @@ func NewServerRoute(cts *ServerConn, id RouteId, option RouteOption, ptc_addr st
r.ptc_addr = ptc_addr r.ptc_addr = ptc_addr
r.pts_limit = PTS_LIMIT r.pts_limit = PTS_LIMIT
r.pts_map = make(ServerPeerConnMap) r.pts_map = make(ServerPeerConnMap)
r.pts_last_id = 0 r.pts_next_id = 0
r.stop_req.Store(false) r.stop_req.Store(false)
return &r, nil return &r, nil
@ -199,22 +200,22 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err
return nil, fmt.Errorf("peer-to-server connection table full") return nil, fmt.Errorf("peer-to-server connection table full")
} }
start_id = r.pts_last_id start_id = r.pts_next_id
for { for {
_, ok = r.pts_map[r.pts_last_id] _, ok = r.pts_map[r.pts_next_id]
if !ok { if !ok {
break break
} }
r.pts_last_id++ r.pts_next_id++
if r.pts_last_id == start_id { if r.pts_next_id == start_id {
// unlikely to happen but it cycled through the whole range. // unlikely to happen but it cycled through the whole range.
return nil, fmt.Errorf("failed to assign peer-to-server connection id") return nil, fmt.Errorf("failed to assign peer-to-server connection id")
} }
} }
pts = NewServerPeerConn(r, c, r.pts_last_id) pts = NewServerPeerConn(r, c, r.pts_next_id)
r.pts_map[pts.conn_id] = pts r.pts_map[pts.conn_id] = pts
r.pts_last_id++ r.pts_next_id++
r.cts.svr.stats.peers.Add(1) r.cts.svr.stats.peers.Add(1)
return pts, nil return pts, nil
@ -905,6 +906,7 @@ func NewServer(ctx context.Context, logger Logger, ctl_addrs []string, rpc_addrs
s.ext_svcs = make([]Service, 0, 1) s.ext_svcs = make([]Service, 0, 1)
s.pts_limit = peer_max s.pts_limit = peer_max
s.cts_limit = rpc_max s.cts_limit = rpc_max
s.cts_next_id = 0
s.cts_map = make(ServerConnMap) s.cts_map = make(ServerConnMap)
s.cts_map_by_addr = make(ServerConnMapByAddr) s.cts_map_by_addr = make(ServerConnMapByAddr)
s.stop_chan = make(chan bool, 8) s.stop_chan = make(chan bool, 8)
@ -1185,7 +1187,7 @@ func (s *Server) ReqStop() {
func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, pss Hodu_PacketStreamServer) (*ServerConn, error) { func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, pss Hodu_PacketStreamServer) (*ServerConn, error) {
var cts ServerConn var cts ServerConn
var id ConnId var start_id ConnId
var ok bool var ok bool
cts.svr = s cts.svr = s
@ -1204,15 +1206,20 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
return nil, fmt.Errorf("too many connections - %d", s.cts_limit) return nil, fmt.Errorf("too many connections - %d", s.cts_limit)
} }
//id = rand.Uint32() //start_id = rand.Uint64()
id = ConnId(monotonic_time() / 1000) //start_id = ConnId(monotonic_time() / 1000)
start_id = s.cts_next_id
for { for {
_, ok = s.cts_map[id] _, ok = s.cts_map[s.cts_next_id]
if !ok { break } if !ok { break }
id++ s.cts_next_id++
if s.cts_next_id == start_id {
s.cts_mtx.Unlock()
return nil, fmt.Errorf("unable to assign id")
}
} }
cts.id = id cts.id = s.cts_next_id
cts.sid = fmt.Sprintf("%d", id) // id in string used for logging cts.sid = fmt.Sprintf("%d", cts.id) // id in string used for logging
_, ok = s.cts_map_by_addr[cts.remote_addr] _, ok = s.cts_map_by_addr[cts.remote_addr]
if ok { if ok {
@ -1220,7 +1227,8 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
return nil, fmt.Errorf("existing client - %s", cts.remote_addr.String()) return nil, fmt.Errorf("existing client - %s", cts.remote_addr.String())
} }
s.cts_map_by_addr[cts.remote_addr] = &cts s.cts_map_by_addr[cts.remote_addr] = &cts
s.cts_map[id] = &cts; s.cts_map[cts.id] = &cts;
s.cts_next_id++;
s.stats.conns.Store(int64(len(s.cts_map))) s.stats.conns.Store(int64(len(s.cts_map)))
s.cts_mtx.Unlock() s.cts_mtx.Unlock()