extended stats to include number of rpc connections, routes, peers

This commit is contained in:
hyung-hwan 2024-12-08 12:13:36 +09:00
parent 970b28ec30
commit 87597ad698
6 changed files with 86 additions and 26 deletions

View File

@ -68,8 +68,11 @@ type json_out_client_peer struct {
}
type json_out_client_stats struct {
NumCPUs int `json:"num-cpus"`
NumGoroutines int `json:"num-goroutines"`
CPUs int `json:"cpus"`
Goroutines int `json:"goroutines"`
ClientConns int64 `json:"client-conns"`
ClientRoutes int64 `json:"client-routes"`
ClientPeers int64 `json:"client-peers"`
}
// ------------------------------------
@ -651,8 +654,11 @@ func (ctl *client_ctl_stats) ServeHTTP(w http.ResponseWriter, req *http.Request)
switch req.Method {
case http.MethodGet:
var stats json_out_client_stats
stats.NumCPUs = runtime.NumCPU()
stats.NumGoroutines = runtime.NumGoroutine()
stats.CPUs = runtime.NumCPU()
stats.Goroutines = runtime.NumGoroutine()
stats.ClientConns = c.stats.conns.Load()
stats.ClientRoutes = c.stats.routes.Load()
stats.ClientPeers = c.stats.peers.Load()
status_code = http.StatusOK; w.WriteHeader(status_code)
if err = je.Encode(stats); err != nil { goto oops }

View File

@ -62,6 +62,12 @@ type Client struct {
stop_chan chan bool
log Logger
stats struct {
conns atomic.Int64
routes atomic.Int64
peers atomic.Int64
}
}
// client connection to server
@ -164,6 +170,7 @@ func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id uint32, pts_ra
r.ptc_mtx.Lock()
ptc = NewClientPeerConn(r, c, pts_id, pts_raddr, pts_laddr)
r.ptc_map[ptc.conn_id] = ptc
r.cts.cli.stats.peers.Add(1)
r.ptc_mtx.Unlock()
r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
@ -185,6 +192,7 @@ func (r *ClientRoute) RemoveClientPeerConn(ptc *ClientPeerConn) error {
return fmt.Errorf("conflicting peer id - %d", ptc.conn_id)
}
delete(r.ptc_map, ptc.conn_id)
r.cts.cli.stats.peers.Add(-1)
r.ptc_mtx.Unlock()
r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
@ -192,7 +200,7 @@ func (r *ClientRoute) RemoveClientPeerConn(ptc *ClientPeerConn) error {
return nil
}
func (r *ClientRoute) RemoveAllClientPeerConns() {
/*func (r *ClientRoute) RemoveAllClientPeerConns() {
var c *ClientPeerConn
r.ptc_mtx.Lock()
@ -200,9 +208,10 @@ func (r *ClientRoute) RemoveAllClientPeerConns() {
for _, c = range r.ptc_map {
delete(r.ptc_map, c.conn_id)
r.cts.cli.stats.peers.Add(-1)
c.ReqStop()
}
}
}*/
func (r *ClientRoute) ReqStopAllClientPeerConns() {
var c *ClientPeerConn
@ -571,6 +580,7 @@ func (cts *ClientConn) AddNewClientRoute(addr string, server_peer_net string, pr
//}
r = NewClientRoute(cts, id, addr, server_peer_net, proto)
cts.route_map[id] = r
cts.cli.stats.routes.Add(1)
cts.route_mtx.Unlock()
cts.cli.log.Write(cts.sid, LOG_INFO, "Added route(%d,%s)", id, addr)
@ -591,6 +601,7 @@ func (cts *ClientConn) ReqStopAllClientRoutes() {
}
}
/*
func (cts *ClientConn) RemoveAllClientRoutes() {
var r *ClientRoute
@ -599,9 +610,10 @@ func (cts *ClientConn) RemoveAllClientRoutes() {
for _, r = range cts.route_map {
delete(cts.route_map, r.id)
cts.cli.stats.routes.Add(-1)
r.ReqStop()
}
}
}*/
func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error {
var r *ClientRoute
@ -618,6 +630,7 @@ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error {
return fmt.Errorf("conflicting route id - %d", route.id)
}
delete(cts.route_map, route.id)
cts.cli.stats.routes.Add(-1)
cts.route_mtx.Unlock()
cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", route.id, route.peer_addr)
@ -637,6 +650,7 @@ func (cts *ClientConn) RemoveClientRouteById(route_id uint32) error {
return fmt.Errorf("non-existent route id - %d", route_id)
}
delete(cts.route_map, route_id)
cts.cli.stats.routes.Add(-1)
cts.route_mtx.Unlock()
cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.id, r.peer_addr)
@ -982,7 +996,7 @@ func (hlw *client_ctl_log_writer) Write(p []byte) (n int, err error) {
return len(p), nil
}
func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, ctltlscfg *tls.Config, rpctlscfg *tls.Config) *Client {
func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, ctl_prefix string, ctltlscfg *tls.Config, rpctlscfg *tls.Config) *Client {
var c Client
var i int
var hs_log *log.Logger
@ -996,7 +1010,7 @@ func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, ctltlscfg
c.stop_req.Store(false)
c.stop_chan = make(chan bool, 8)
c.log = logger
c.ctl_prefix = "" // TODO:
c.ctl_prefix = ctl_prefix
c.ctl_mux = http.NewServeMux()
c.ctl_mux.Handle(c.ctl_prefix + "/client-conns", &client_ctl_client_conns{c: &c})
@ -1023,6 +1037,10 @@ func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, ctltlscfg
}
}
c.stats.conns.Store(0)
c.stats.routes.Store(0)
c.stats.peers.Store(0)
return &c
}
@ -1052,6 +1070,7 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
c.cts_map_by_addr[cfg.ServerAddr] = cts
c.cts_map[id] = cts
c.stats.conns.Add(1)
c.cts_mtx.Unlock()
c.log.Write ("", LOG_INFO, "Added client connection(%d) to %s", cts.id, cfg.ServerAddr)
@ -1069,6 +1088,7 @@ func (c* Client) ReqStopAllClientConns() {
}
}
/*
func (c *Client) RemoveAllClientConns() {
var cts *ClientConn
@ -1078,9 +1098,11 @@ func (c *Client) RemoveAllClientConns() {
for _, cts = range c.cts_map {
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
delete(c.cts_map, cts.id)
c.stats.conns.Store(int64(len(c.cts_map)))
cts.ReqStop()
}
}
*/
func (c *Client) RemoveClientConn(cts *ClientConn) error {
var conn *ClientConn
@ -1100,6 +1122,7 @@ func (c *Client) RemoveClientConn(cts *ClientConn) error {
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
delete(c.cts_map, cts.id)
c.stats.conns.Store(int64(len(c.cts_map)))
c.cts_mtx.Unlock()
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr)
@ -1124,6 +1147,7 @@ func (c *Client) RemoveClientConnById(conn_id uint32) error {
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
delete(c.cts_map, cts.id)
c.stats.conns.Store(int64(len(c.cts_map)))
c.cts_mtx.Unlock()
c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr)

View File

@ -40,29 +40,33 @@ type ClientTLSConfig struct {
ServerName string `yaml:"server-name"`
}
type ServiceConfig struct {
Prefix string `yaml:"prefix"`
Addrs []string `yaml:"addresses"`
}
type ServerConfig struct {
CTL struct {
TLS ServerTLSConfig `yaml:"tls"`
ServiceAddrs []string `yaml:"service-addrs"`
Service ServiceConfig `yaml:"service"`
TLS ServerTLSConfig `yaml:"tls"`
} `yaml:"ctl"`
RPC struct {
TLS ServerTLSConfig `yaml:"tls"`
ServiceAddrs []string `yaml:"service-addrs"`
TLS ServerTLSConfig `yaml:"tls"`
ServiceAddrs []string `yaml:"service-addrs"`
} `yaml:"rpc"`
}
type ClientConfig struct {
CTL struct {
TLS ServerTLSConfig `yaml:"tls"`
ServiceAddrs []string `yaml:"service-addrs"`
Service ServiceConfig `yaml:"endpoint"`
TLS ServerTLSConfig `yaml:"tls"`
} `yaml:"ctl"`
RPC struct {
TLS ClientTLSConfig `yaml:"tls"`
TLS ClientTLSConfig `yaml:"tls"`
} `yaml:"rpc"`
}
func load_server_config(cfgfile string) (*ServerConfig, error) {
var cfg ServerConfig
var f *os.File

View File

@ -163,6 +163,7 @@ func server_main(ctl_addrs []string, svcaddrs []string, cfg *ServerConfig) error
ctl_addrs,
svcaddrs,
&AppLogger{id: "server", out: os.Stderr},
cfg.CTL.Service.Prefix,
ctltlscfg,
rpctlscfg)
if err != nil {
@ -201,6 +202,7 @@ func client_main(ctl_addrs []string, server_addr string, peer_addrs []string, cf
context.Background(),
ctl_addrs,
&AppLogger{id: "client", out: os.Stderr},
cfg.CTL.Service.Prefix,
ctltlscfg,
rpctlscfg)
@ -260,7 +262,7 @@ func main() {
}
}
if len(ctl_addrs) <= 0 { ctl_addrs = cfg.CTL.ServiceAddrs }
if len(ctl_addrs) <= 0 { ctl_addrs = cfg.CTL.Service.Addrs }
err = server_main(ctl_addrs, rpc_addrs, cfg)
if err != nil {
@ -306,7 +308,7 @@ func main() {
}
}
if len(ctl_addrs) < 1 { ctl_addrs = cfg.CTL.ServiceAddrs }
if len(ctl_addrs) < 1 { ctl_addrs = cfg.CTL.Service.Addrs }
err = client_main(ctl_addrs, rpc_addrs[0], flgs.Args(), cfg)
if err != nil {

View File

@ -21,8 +21,11 @@ type json_out_server_route struct {
}
type json_out_server_stats struct {
NumCPUs int `json:"num-cpus"`
NumGoroutines int `json:"num-goroutines"`
CPUs int `json:"cpus"`
Goroutines int `json:"goroutines"`
ServerConns int64 `json:"server-conns"`
ServerRoutes int64 `json:"server-routes"`
ServerPeers int64 `json:"server-peers"`
}
// ------------------------------------
@ -232,8 +235,11 @@ func (ctl *server_ctl_stats) ServeHTTP(w http.ResponseWriter, req *http.Request)
switch req.Method {
case http.MethodGet:
var stats json_out_server_stats
stats.NumCPUs = runtime.NumCPU()
stats.NumGoroutines = runtime.NumGoroutine()
stats.CPUs = runtime.NumCPU()
stats.Goroutines = runtime.NumGoroutine()
stats.ServerConns = s.stats.conns.Load()
stats.ServerRoutes = s.stats.routes.Load()
stats.ServerPeers = s.stats.peers.Load()
status_code = http.StatusOK; w.WriteHeader(status_code)
if err = je.Encode(stats); err != nil { goto oops }

View File

@ -20,7 +20,7 @@ import "google.golang.org/grpc/credentials"
import "google.golang.org/grpc/peer"
import "google.golang.org/grpc/stats"
const PTS_LIMIT = 8192
const PTS_LIMIT int = 16384
type ServerConnMapByAddr = map[net.Addr]*ServerConn
type ServerConnMap = map[uint32]*ServerConn
@ -56,6 +56,12 @@ type Server struct {
log Logger
stats struct {
conns atomic.Int64
routes atomic.Int64
peers atomic.Int64
}
UnimplementedHoduServer
}
@ -197,6 +203,7 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err
pts = NewServerPeerConn(r, c, r.pts_last_id)
r.pts_map[pts.conn_id] = pts
r.pts_last_id++
r.cts.svr.stats.peers.Add(1)
return pts, nil
}
@ -204,6 +211,7 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err
func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) {
r.pts_mtx.Lock()
delete(r.pts_map, pts.conn_id)
r.cts.svr.stats.peers.Add(-1)
r.pts_mtx.Unlock()
r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Removed server-side peer connection %s from route(%d)", pts.conn.RemoteAddr().String(), r.id)
}
@ -343,6 +351,7 @@ func (cts *ServerConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO, ptc
return nil, err
}
cts.route_map[route_id] = r
cts.svr.stats.routes.Add(1)
cts.route_mtx.Unlock()
cts.route_wg.Add(1)
@ -365,6 +374,7 @@ func (cts *ServerConn) RemoveServerRoute(route *ServerRoute) error {
return fmt.Errorf("non-existent route - %d", route.id)
}
delete(cts.route_map, route.id)
cts.svr.stats.routes.Add(-1)
cts.route_mtx.Unlock()
r.ReqStop()
@ -382,6 +392,7 @@ func (cts *ServerConn) RemoveServerRouteById(route_id uint32) (*ServerRoute, err
return nil, fmt.Errorf("non-existent route id - %d", route_id)
}
delete(cts.route_map, route_id)
cts.svr.stats.routes.Add(-1)
cts.route_mtx.Unlock()
r.ReqStop()
@ -807,7 +818,7 @@ func (hlw *server_ctl_log_writer) Write(p []byte) (n int, err error) {
return len(p), nil
}
func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logger Logger, ctltlscfg *tls.Config, rpctlscfg *tls.Config) (*Server, error) {
func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logger Logger, ctl_prefix string, ctltlscfg *tls.Config, rpctlscfg *tls.Config) (*Server, error) {
var s Server
var l *net.TCPListener
var rpcaddr *net.TCPAddr
@ -871,7 +882,7 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg
}
RegisterHoduServer(s.rpc_svr, &s)
s.ctl_prefix = "" // TODO:
s.ctl_prefix = ctl_prefix
s.ctl_mux = http.NewServeMux()
cwd, _ = os.Getwd()
@ -898,6 +909,10 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg
}
}
s.stats.conns.Store(0)
s.stats.routes.Store(0)
s.stats.peers.Store(0)
return &s, nil
oops:
@ -1065,6 +1080,7 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
}
s.cts_map_by_addr[cts.remote_addr] = &cts
s.cts_map[id] = &cts;
s.stats.conns.Store(int64(len(s.cts_map)))
s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.remote_addr.String())
return &cts, nil
}
@ -1098,6 +1114,7 @@ func (s *Server) RemoveServerConn(cts *ServerConn) error {
delete(s.cts_map, cts.id)
delete(s.cts_map_by_addr, cts.remote_addr)
s.stats.conns.Store(int64(len(s.cts_map)))
s.cts_mtx.Unlock()
cts.ReqStop()
@ -1117,6 +1134,7 @@ func (s *Server) RemoveServerConnByAddr(addr net.Addr) error {
}
delete(s.cts_map, cts.id)
delete(s.cts_map_by_addr, cts.remote_addr)
s.stats.conns.Store(int64(len(s.cts_map)))
s.cts_mtx.Unlock()
cts.ReqStop()