diff --git a/client-ctl.go b/client-ctl.go index 8182944..10ea733 100644 --- a/client-ctl.go +++ b/client-ctl.go @@ -68,8 +68,11 @@ type json_out_client_peer struct { } type json_out_client_stats struct { - NumCPUs int `json:"num-cpus"` - NumGoroutines int `json:"num-goroutines"` + CPUs int `json:"cpus"` + Goroutines int `json:"goroutines"` + ClientConns int64 `json:"client-conns"` + ClientRoutes int64 `json:"client-routes"` + ClientPeers int64 `json:"client-peers"` } // ------------------------------------ @@ -651,8 +654,11 @@ func (ctl *client_ctl_stats) ServeHTTP(w http.ResponseWriter, req *http.Request) switch req.Method { case http.MethodGet: var stats json_out_client_stats - stats.NumCPUs = runtime.NumCPU() - stats.NumGoroutines = runtime.NumGoroutine() + stats.CPUs = runtime.NumCPU() + stats.Goroutines = runtime.NumGoroutine() + stats.ClientConns = c.stats.conns.Load() + stats.ClientRoutes = c.stats.routes.Load() + stats.ClientPeers = c.stats.peers.Load() status_code = http.StatusOK; w.WriteHeader(status_code) if err = je.Encode(stats); err != nil { goto oops } diff --git a/client.go b/client.go index 1990943..80d7649 100644 --- a/client.go +++ b/client.go @@ -62,6 +62,12 @@ type Client struct { stop_chan chan bool log Logger + + stats struct { + conns atomic.Int64 + routes atomic.Int64 + peers atomic.Int64 + } } // client connection to server @@ -164,6 +170,7 @@ func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id uint32, pts_ra r.ptc_mtx.Lock() ptc = NewClientPeerConn(r, c, pts_id, pts_raddr, pts_laddr) r.ptc_map[ptc.conn_id] = ptc + r.cts.cli.stats.peers.Add(1) r.ptc_mtx.Unlock() r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) @@ -185,6 +192,7 @@ func (r *ClientRoute) RemoveClientPeerConn(ptc *ClientPeerConn) error { return fmt.Errorf("conflicting peer id - %d", ptc.conn_id) } delete(r.ptc_map, ptc.conn_id) + r.cts.cli.stats.peers.Add(-1) r.ptc_mtx.Unlock() r.cts.cli.log.Write(r.cts.sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) @@ -192,7 +200,7 @@ func (r *ClientRoute) RemoveClientPeerConn(ptc *ClientPeerConn) error { return nil } -func (r *ClientRoute) RemoveAllClientPeerConns() { +/*func (r *ClientRoute) RemoveAllClientPeerConns() { var c *ClientPeerConn r.ptc_mtx.Lock() @@ -200,9 +208,10 @@ func (r *ClientRoute) RemoveAllClientPeerConns() { for _, c = range r.ptc_map { delete(r.ptc_map, c.conn_id) + r.cts.cli.stats.peers.Add(-1) c.ReqStop() } -} +}*/ func (r *ClientRoute) ReqStopAllClientPeerConns() { var c *ClientPeerConn @@ -571,6 +580,7 @@ func (cts *ClientConn) AddNewClientRoute(addr string, server_peer_net string, pr //} r = NewClientRoute(cts, id, addr, server_peer_net, proto) cts.route_map[id] = r + cts.cli.stats.routes.Add(1) cts.route_mtx.Unlock() cts.cli.log.Write(cts.sid, LOG_INFO, "Added route(%d,%s)", id, addr) @@ -591,6 +601,7 @@ func (cts *ClientConn) ReqStopAllClientRoutes() { } } +/* func (cts *ClientConn) RemoveAllClientRoutes() { var r *ClientRoute @@ -599,9 +610,10 @@ func (cts *ClientConn) RemoveAllClientRoutes() { for _, r = range cts.route_map { delete(cts.route_map, r.id) + cts.cli.stats.routes.Add(-1) r.ReqStop() } -} +}*/ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error { var r *ClientRoute @@ -618,6 +630,7 @@ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error { return fmt.Errorf("conflicting route id - %d", route.id) } delete(cts.route_map, route.id) + cts.cli.stats.routes.Add(-1) cts.route_mtx.Unlock() cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", route.id, route.peer_addr) @@ -637,6 +650,7 @@ func (cts *ClientConn) RemoveClientRouteById(route_id uint32) error { return fmt.Errorf("non-existent route id - %d", route_id) } delete(cts.route_map, route_id) + cts.cli.stats.routes.Add(-1) cts.route_mtx.Unlock() cts.cli.log.Write(cts.sid, LOG_INFO, "Removed route(%d,%s)", r.id, r.peer_addr) @@ -982,7 +996,7 @@ func (hlw *client_ctl_log_writer) Write(p []byte) (n int, err error) { return len(p), nil } -func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, ctltlscfg *tls.Config, rpctlscfg *tls.Config) *Client { +func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, ctl_prefix string, ctltlscfg *tls.Config, rpctlscfg *tls.Config) *Client { var c Client var i int var hs_log *log.Logger @@ -996,7 +1010,7 @@ func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, ctltlscfg c.stop_req.Store(false) c.stop_chan = make(chan bool, 8) c.log = logger - c.ctl_prefix = "" // TODO: + c.ctl_prefix = ctl_prefix c.ctl_mux = http.NewServeMux() c.ctl_mux.Handle(c.ctl_prefix + "/client-conns", &client_ctl_client_conns{c: &c}) @@ -1023,6 +1037,10 @@ func NewClient(ctx context.Context, ctl_addrs []string, logger Logger, ctltlscfg } } + c.stats.conns.Store(0) + c.stats.routes.Store(0) + c.stats.peers.Store(0) + return &c } @@ -1052,6 +1070,7 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) { c.cts_map_by_addr[cfg.ServerAddr] = cts c.cts_map[id] = cts + c.stats.conns.Add(1) c.cts_mtx.Unlock() c.log.Write ("", LOG_INFO, "Added client connection(%d) to %s", cts.id, cfg.ServerAddr) @@ -1069,6 +1088,7 @@ func (c* Client) ReqStopAllClientConns() { } } +/* func (c *Client) RemoveAllClientConns() { var cts *ClientConn @@ -1078,9 +1098,11 @@ func (c *Client) RemoveAllClientConns() { for _, cts = range c.cts_map { delete(c.cts_map_by_addr, cts.cfg.ServerAddr) delete(c.cts_map, cts.id) + c.stats.conns.Store(int64(len(c.cts_map))) cts.ReqStop() } } +*/ func (c *Client) RemoveClientConn(cts *ClientConn) error { var conn *ClientConn @@ -1100,6 +1122,7 @@ func (c *Client) RemoveClientConn(cts *ClientConn) error { delete(c.cts_map_by_addr, cts.cfg.ServerAddr) delete(c.cts_map, cts.id) + c.stats.conns.Store(int64(len(c.cts_map))) c.cts_mtx.Unlock() c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr) @@ -1124,6 +1147,7 @@ func (c *Client) RemoveClientConnById(conn_id uint32) error { delete(c.cts_map_by_addr, cts.cfg.ServerAddr) delete(c.cts_map, cts.id) + c.stats.conns.Store(int64(len(c.cts_map))) c.cts_mtx.Unlock() c.log.Write ("", LOG_INFO, "Removed client connection(%d) to %s", cts.id, cts.cfg.ServerAddr) diff --git a/cmd/config.go b/cmd/config.go index a2e727a..6ceb61e 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -40,29 +40,33 @@ type ClientTLSConfig struct { ServerName string `yaml:"server-name"` } +type ServiceConfig struct { + Prefix string `yaml:"prefix"` + Addrs []string `yaml:"addresses"` +} + type ServerConfig struct { CTL struct { - TLS ServerTLSConfig `yaml:"tls"` - ServiceAddrs []string `yaml:"service-addrs"` + Service ServiceConfig `yaml:"service"` + TLS ServerTLSConfig `yaml:"tls"` } `yaml:"ctl"` RPC struct { - TLS ServerTLSConfig `yaml:"tls"` - ServiceAddrs []string `yaml:"service-addrs"` + TLS ServerTLSConfig `yaml:"tls"` + ServiceAddrs []string `yaml:"service-addrs"` } `yaml:"rpc"` } type ClientConfig struct { CTL struct { - TLS ServerTLSConfig `yaml:"tls"` - ServiceAddrs []string `yaml:"service-addrs"` + Service ServiceConfig `yaml:"endpoint"` + TLS ServerTLSConfig `yaml:"tls"` } `yaml:"ctl"` RPC struct { - TLS ClientTLSConfig `yaml:"tls"` + TLS ClientTLSConfig `yaml:"tls"` } `yaml:"rpc"` } - func load_server_config(cfgfile string) (*ServerConfig, error) { var cfg ServerConfig var f *os.File diff --git a/cmd/main.go b/cmd/main.go index 3395c80..f2f10fa 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -163,6 +163,7 @@ func server_main(ctl_addrs []string, svcaddrs []string, cfg *ServerConfig) error ctl_addrs, svcaddrs, &AppLogger{id: "server", out: os.Stderr}, + cfg.CTL.Service.Prefix, ctltlscfg, rpctlscfg) if err != nil { @@ -201,6 +202,7 @@ func client_main(ctl_addrs []string, server_addr string, peer_addrs []string, cf context.Background(), ctl_addrs, &AppLogger{id: "client", out: os.Stderr}, + cfg.CTL.Service.Prefix, ctltlscfg, rpctlscfg) @@ -260,7 +262,7 @@ func main() { } } - if len(ctl_addrs) <= 0 { ctl_addrs = cfg.CTL.ServiceAddrs } + if len(ctl_addrs) <= 0 { ctl_addrs = cfg.CTL.Service.Addrs } err = server_main(ctl_addrs, rpc_addrs, cfg) if err != nil { @@ -306,7 +308,7 @@ func main() { } } - if len(ctl_addrs) < 1 { ctl_addrs = cfg.CTL.ServiceAddrs } + if len(ctl_addrs) < 1 { ctl_addrs = cfg.CTL.Service.Addrs } err = client_main(ctl_addrs, rpc_addrs[0], flgs.Args(), cfg) if err != nil { diff --git a/server-ctl.go b/server-ctl.go index e67f09c..b1a9030 100644 --- a/server-ctl.go +++ b/server-ctl.go @@ -21,8 +21,11 @@ type json_out_server_route struct { } type json_out_server_stats struct { - NumCPUs int `json:"num-cpus"` - NumGoroutines int `json:"num-goroutines"` + CPUs int `json:"cpus"` + Goroutines int `json:"goroutines"` + ServerConns int64 `json:"server-conns"` + ServerRoutes int64 `json:"server-routes"` + ServerPeers int64 `json:"server-peers"` } // ------------------------------------ @@ -232,8 +235,11 @@ func (ctl *server_ctl_stats) ServeHTTP(w http.ResponseWriter, req *http.Request) switch req.Method { case http.MethodGet: var stats json_out_server_stats - stats.NumCPUs = runtime.NumCPU() - stats.NumGoroutines = runtime.NumGoroutine() + stats.CPUs = runtime.NumCPU() + stats.Goroutines = runtime.NumGoroutine() + stats.ServerConns = s.stats.conns.Load() + stats.ServerRoutes = s.stats.routes.Load() + stats.ServerPeers = s.stats.peers.Load() status_code = http.StatusOK; w.WriteHeader(status_code) if err = je.Encode(stats); err != nil { goto oops } diff --git a/server.go b/server.go index d426843..f117bd3 100644 --- a/server.go +++ b/server.go @@ -20,7 +20,7 @@ import "google.golang.org/grpc/credentials" import "google.golang.org/grpc/peer" import "google.golang.org/grpc/stats" -const PTS_LIMIT = 8192 +const PTS_LIMIT int = 16384 type ServerConnMapByAddr = map[net.Addr]*ServerConn type ServerConnMap = map[uint32]*ServerConn @@ -56,6 +56,12 @@ type Server struct { log Logger + stats struct { + conns atomic.Int64 + routes atomic.Int64 + peers atomic.Int64 + } + UnimplementedHoduServer } @@ -197,6 +203,7 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err pts = NewServerPeerConn(r, c, r.pts_last_id) r.pts_map[pts.conn_id] = pts r.pts_last_id++ + r.cts.svr.stats.peers.Add(1) return pts, nil } @@ -204,6 +211,7 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) { r.pts_mtx.Lock() delete(r.pts_map, pts.conn_id) + r.cts.svr.stats.peers.Add(-1) r.pts_mtx.Unlock() r.cts.svr.log.Write(r.cts.sid, LOG_DEBUG, "Removed server-side peer connection %s from route(%d)", pts.conn.RemoteAddr().String(), r.id) } @@ -343,6 +351,7 @@ func (cts *ServerConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO, ptc return nil, err } cts.route_map[route_id] = r + cts.svr.stats.routes.Add(1) cts.route_mtx.Unlock() cts.route_wg.Add(1) @@ -365,6 +374,7 @@ func (cts *ServerConn) RemoveServerRoute(route *ServerRoute) error { return fmt.Errorf("non-existent route - %d", route.id) } delete(cts.route_map, route.id) + cts.svr.stats.routes.Add(-1) cts.route_mtx.Unlock() r.ReqStop() @@ -382,6 +392,7 @@ func (cts *ServerConn) RemoveServerRouteById(route_id uint32) (*ServerRoute, err return nil, fmt.Errorf("non-existent route id - %d", route_id) } delete(cts.route_map, route_id) + cts.svr.stats.routes.Add(-1) cts.route_mtx.Unlock() r.ReqStop() @@ -807,7 +818,7 @@ func (hlw *server_ctl_log_writer) Write(p []byte) (n int, err error) { return len(p), nil } -func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logger Logger, ctltlscfg *tls.Config, rpctlscfg *tls.Config) (*Server, error) { +func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logger Logger, ctl_prefix string, ctltlscfg *tls.Config, rpctlscfg *tls.Config) (*Server, error) { var s Server var l *net.TCPListener var rpcaddr *net.TCPAddr @@ -871,7 +882,7 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg } RegisterHoduServer(s.rpc_svr, &s) - s.ctl_prefix = "" // TODO: + s.ctl_prefix = ctl_prefix s.ctl_mux = http.NewServeMux() cwd, _ = os.Getwd() @@ -898,6 +909,10 @@ func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logg } } + s.stats.conns.Store(0) + s.stats.routes.Store(0) + s.stats.peers.Store(0) + return &s, nil oops: @@ -1065,6 +1080,7 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p } s.cts_map_by_addr[cts.remote_addr] = &cts s.cts_map[id] = &cts; + s.stats.conns.Store(int64(len(s.cts_map))) s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.remote_addr.String()) return &cts, nil } @@ -1098,6 +1114,7 @@ func (s *Server) RemoveServerConn(cts *ServerConn) error { delete(s.cts_map, cts.id) delete(s.cts_map_by_addr, cts.remote_addr) + s.stats.conns.Store(int64(len(s.cts_map))) s.cts_mtx.Unlock() cts.ReqStop() @@ -1117,6 +1134,7 @@ func (s *Server) RemoveServerConnByAddr(addr net.Addr) error { } delete(s.cts_map, cts.id) delete(s.cts_map_by_addr, cts.remote_addr) + s.stats.conns.Store(int64(len(s.cts_map))) s.cts_mtx.Unlock() cts.ReqStop()