From 8cde9f08d497e5a7b6186fee9936d5a7bd312bb7 Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Wed, 19 Mar 2025 00:24:42 +0900 Subject: [PATCH] added /_ctl/server-peers --- server-ctl.go | 53 +++++++++++++++++++++++++++++++++++++++++++++++++- server-peer.go | 16 +++++++++++---- server.go | 13 ++++++++++++- 3 files changed, 76 insertions(+), 6 deletions(-) diff --git a/server-ctl.go b/server-ctl.go index 9c0ac9f..e191146 100644 --- a/server-ctl.go +++ b/server-ctl.go @@ -1,5 +1,6 @@ package hodu +import "container/list" import "encoding/json" import "fmt" import "net/http" @@ -101,6 +102,10 @@ type server_ctl_server_conns_id_routes_id_peers_id struct { server_ctl } +type server_ctl_server_peers struct { + server_ctl +} + type server_ctl_notices struct { server_ctl } @@ -196,7 +201,6 @@ func (ctl *server_ctl_server_conns) ServeHTTP(w http.ResponseWriter, req *http.R s = ctl.s je = json.NewEncoder(w) - q = req.URL.Query() q = req.URL.Query() routes, err = strconv.ParseBool(strings.ToLower(q.Get("routes"))) if err != nil { routes = false } @@ -613,6 +617,53 @@ oops: return status_code, err } +// ------------------------------------ +func (ctl *server_ctl_server_peers) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) { + var s *Server + var status_code int + var je *json.Encoder + var err error + + s = ctl.s + je = json.NewEncoder(w) + + switch req.Method { + case http.MethodGet: + var js []ServerEventPeerAdded + var e *list.Element + + js = make([]ServerEventPeerAdded, 0) + s.pts_mtx.Lock() + for e = s.pts_list.Front(); e != nil; e = e.Next() { + var pts *ServerPeerConn + pts = e.Value.(*ServerPeerConn) + js = append(js, ServerEventPeerAdded{ // TODO: rename or create an alias type? + Conn: pts.route.Cts.Id, + Route: pts.route.Id, + Peer: pts.conn_id, + ServerPeerAddr: pts.conn.RemoteAddr().String(), + ServerLocalAddr: pts.conn.LocalAddr().String(), + ClientPeerAddr: pts.client_peer_raddr.Get(), + ClientLocalAddr: pts.client_peer_laddr.Get(), + CreatedMilli: pts.Created.UnixMilli(), + }) + } + s.pts_mtx.Unlock() + + status_code = WriteJsonRespHeader(w, http.StatusOK) + if err = je.Encode(js); err != nil { goto oops } + + default: + status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed) + } + +//done: + return status_code, nil + +oops: + return status_code, err +} + // ------------------------------------ func (ctl *server_ctl_notices) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) { diff --git a/server-peer.go b/server-peer.go index 5b16913..365cf6e 100644 --- a/server-peer.go +++ b/server-peer.go @@ -1,5 +1,6 @@ package hodu +import "container/list" import "context" import "errors" import "io" @@ -10,10 +11,12 @@ import "sync/atomic" import "time" type ServerPeerConn struct { - route *ServerRoute - conn_id PeerId - conn *net.TCPConn - Created time.Time + route *ServerRoute + conn_id PeerId + conn *net.TCPConn + Created time.Time + + node_in_server *list.Element stop_chan chan bool stop_req atomic.Bool @@ -146,6 +149,11 @@ done_without_stop: spc.ReqStop() spc.route.RemoveServerPeerConn(spc) + spc.route.Cts.S.pts_mtx.Lock() + spc.route.Cts.S.pts_list.Remove(spc.node_in_server) + spc.node_in_server = nil + spc.route.Cts.S.pts_mtx.Unlock() + spc.route.Cts.S.bulletin.Enqueue( &ServerEvent{ Kind: SERVER_EVENT_PEER_DELETED, diff --git a/server.go b/server.go index 901602c..d5b288d 100644 --- a/server.go +++ b/server.go @@ -1,5 +1,6 @@ package hodu +import "container/list" import "context" import "crypto/tls" import "errors" @@ -162,7 +163,6 @@ type Server struct { rpc_wg sync.WaitGroup rpc_svr *grpc.Server - pts_limit int // global pts limit cts_limit int cts_next_id ConnId cts_mtx sync.Mutex @@ -171,6 +171,10 @@ type Server struct { cts_map_by_token ServerConnMapByClientToken cts_wg sync.WaitGroup + pts_limit int // global pts limit + pts_mtx sync.Mutex + pts_list *list.List + log Logger conn_notice ServerConnNoticeHandler @@ -345,6 +349,10 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err r.pts_map[pts.conn_id] = pts r.Cts.S.stats.peers.Add(1) + r.Cts.S.pts_mtx.Lock() + pts.node_in_server = r.Cts.S.pts_list.PushBack(pts) + r.Cts.S.pts_mtx.Unlock() + r.Cts.S.bulletin.Enqueue( &ServerEvent{ Kind: SERVER_EVENT_PEER_ADDED, @@ -1351,6 +1359,7 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi s.Cfg = cfg s.ext_svcs = make([]Service, 0, 1) s.pts_limit = cfg.MaxPeers + s.pts_list = list.New() s.cts_limit = cfg.RpcMaxConns s.cts_next_id = 1 s.cts_map = make(ServerConnMap) @@ -1388,6 +1397,8 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi s.WrapHttpHandler(&server_ctl_server_conns_id_routes_id_peers{server_ctl{s: &s, id: HS_ID_CTL}})) s.ctl_mux.Handle(s.Cfg.CtlPrefix + "/_ctl/server-conns/{conn_id}/routes/{route_id}/peers/{peer_id}", s.WrapHttpHandler(&server_ctl_server_conns_id_routes_id_peers_id{server_ctl{s: &s, id: HS_ID_CTL}})) + s.ctl_mux.Handle(s.Cfg.CtlPrefix + "/_ctl/server-peers", + s.WrapHttpHandler(&server_ctl_server_peers{server_ctl{s: &s, id: HS_ID_CTL}})) s.ctl_mux.Handle(s.Cfg.CtlPrefix + "/_ctl/notices", s.WrapHttpHandler(&server_ctl_notices{server_ctl{s: &s, id: HS_ID_CTL}})) s.ctl_mux.Handle(s.Cfg.CtlPrefix + "/_ctl/notices/{conn_id}",