added /_ctl/server-peers
This commit is contained in:
parent
9865914436
commit
8cde9f08d4
@ -1,5 +1,6 @@
|
|||||||
package hodu
|
package hodu
|
||||||
|
|
||||||
|
import "container/list"
|
||||||
import "encoding/json"
|
import "encoding/json"
|
||||||
import "fmt"
|
import "fmt"
|
||||||
import "net/http"
|
import "net/http"
|
||||||
@ -101,6 +102,10 @@ type server_ctl_server_conns_id_routes_id_peers_id struct {
|
|||||||
server_ctl
|
server_ctl
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type server_ctl_server_peers struct {
|
||||||
|
server_ctl
|
||||||
|
}
|
||||||
|
|
||||||
type server_ctl_notices struct {
|
type server_ctl_notices struct {
|
||||||
server_ctl
|
server_ctl
|
||||||
}
|
}
|
||||||
@ -196,7 +201,6 @@ func (ctl *server_ctl_server_conns) ServeHTTP(w http.ResponseWriter, req *http.R
|
|||||||
s = ctl.s
|
s = ctl.s
|
||||||
je = json.NewEncoder(w)
|
je = json.NewEncoder(w)
|
||||||
|
|
||||||
q = req.URL.Query()
|
|
||||||
q = req.URL.Query()
|
q = req.URL.Query()
|
||||||
routes, err = strconv.ParseBool(strings.ToLower(q.Get("routes")))
|
routes, err = strconv.ParseBool(strings.ToLower(q.Get("routes")))
|
||||||
if err != nil { routes = false }
|
if err != nil { routes = false }
|
||||||
@ -613,6 +617,53 @@ oops:
|
|||||||
return status_code, err
|
return status_code, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------------------------
|
||||||
|
func (ctl *server_ctl_server_peers) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
||||||
|
var s *Server
|
||||||
|
var status_code int
|
||||||
|
var je *json.Encoder
|
||||||
|
var err error
|
||||||
|
|
||||||
|
s = ctl.s
|
||||||
|
je = json.NewEncoder(w)
|
||||||
|
|
||||||
|
switch req.Method {
|
||||||
|
case http.MethodGet:
|
||||||
|
var js []ServerEventPeerAdded
|
||||||
|
var e *list.Element
|
||||||
|
|
||||||
|
js = make([]ServerEventPeerAdded, 0)
|
||||||
|
s.pts_mtx.Lock()
|
||||||
|
for e = s.pts_list.Front(); e != nil; e = e.Next() {
|
||||||
|
var pts *ServerPeerConn
|
||||||
|
pts = e.Value.(*ServerPeerConn)
|
||||||
|
js = append(js, ServerEventPeerAdded{ // TODO: rename or create an alias type?
|
||||||
|
Conn: pts.route.Cts.Id,
|
||||||
|
Route: pts.route.Id,
|
||||||
|
Peer: pts.conn_id,
|
||||||
|
ServerPeerAddr: pts.conn.RemoteAddr().String(),
|
||||||
|
ServerLocalAddr: pts.conn.LocalAddr().String(),
|
||||||
|
ClientPeerAddr: pts.client_peer_raddr.Get(),
|
||||||
|
ClientLocalAddr: pts.client_peer_laddr.Get(),
|
||||||
|
CreatedMilli: pts.Created.UnixMilli(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
s.pts_mtx.Unlock()
|
||||||
|
|
||||||
|
status_code = WriteJsonRespHeader(w, http.StatusOK)
|
||||||
|
if err = je.Encode(js); err != nil { goto oops }
|
||||||
|
|
||||||
|
default:
|
||||||
|
status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed)
|
||||||
|
}
|
||||||
|
|
||||||
|
//done:
|
||||||
|
return status_code, nil
|
||||||
|
|
||||||
|
oops:
|
||||||
|
return status_code, err
|
||||||
|
}
|
||||||
|
|
||||||
// ------------------------------------
|
// ------------------------------------
|
||||||
|
|
||||||
func (ctl *server_ctl_notices) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
func (ctl *server_ctl_notices) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
package hodu
|
package hodu
|
||||||
|
|
||||||
|
import "container/list"
|
||||||
import "context"
|
import "context"
|
||||||
import "errors"
|
import "errors"
|
||||||
import "io"
|
import "io"
|
||||||
@ -15,6 +16,8 @@ type ServerPeerConn struct {
|
|||||||
conn *net.TCPConn
|
conn *net.TCPConn
|
||||||
Created time.Time
|
Created time.Time
|
||||||
|
|
||||||
|
node_in_server *list.Element
|
||||||
|
|
||||||
stop_chan chan bool
|
stop_chan chan bool
|
||||||
stop_req atomic.Bool
|
stop_req atomic.Bool
|
||||||
|
|
||||||
@ -146,6 +149,11 @@ done_without_stop:
|
|||||||
spc.ReqStop()
|
spc.ReqStop()
|
||||||
spc.route.RemoveServerPeerConn(spc)
|
spc.route.RemoveServerPeerConn(spc)
|
||||||
|
|
||||||
|
spc.route.Cts.S.pts_mtx.Lock()
|
||||||
|
spc.route.Cts.S.pts_list.Remove(spc.node_in_server)
|
||||||
|
spc.node_in_server = nil
|
||||||
|
spc.route.Cts.S.pts_mtx.Unlock()
|
||||||
|
|
||||||
spc.route.Cts.S.bulletin.Enqueue(
|
spc.route.Cts.S.bulletin.Enqueue(
|
||||||
&ServerEvent{
|
&ServerEvent{
|
||||||
Kind: SERVER_EVENT_PEER_DELETED,
|
Kind: SERVER_EVENT_PEER_DELETED,
|
||||||
|
13
server.go
13
server.go
@ -1,5 +1,6 @@
|
|||||||
package hodu
|
package hodu
|
||||||
|
|
||||||
|
import "container/list"
|
||||||
import "context"
|
import "context"
|
||||||
import "crypto/tls"
|
import "crypto/tls"
|
||||||
import "errors"
|
import "errors"
|
||||||
@ -162,7 +163,6 @@ type Server struct {
|
|||||||
rpc_wg sync.WaitGroup
|
rpc_wg sync.WaitGroup
|
||||||
rpc_svr *grpc.Server
|
rpc_svr *grpc.Server
|
||||||
|
|
||||||
pts_limit int // global pts limit
|
|
||||||
cts_limit int
|
cts_limit int
|
||||||
cts_next_id ConnId
|
cts_next_id ConnId
|
||||||
cts_mtx sync.Mutex
|
cts_mtx sync.Mutex
|
||||||
@ -171,6 +171,10 @@ type Server struct {
|
|||||||
cts_map_by_token ServerConnMapByClientToken
|
cts_map_by_token ServerConnMapByClientToken
|
||||||
cts_wg sync.WaitGroup
|
cts_wg sync.WaitGroup
|
||||||
|
|
||||||
|
pts_limit int // global pts limit
|
||||||
|
pts_mtx sync.Mutex
|
||||||
|
pts_list *list.List
|
||||||
|
|
||||||
log Logger
|
log Logger
|
||||||
conn_notice ServerConnNoticeHandler
|
conn_notice ServerConnNoticeHandler
|
||||||
|
|
||||||
@ -345,6 +349,10 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err
|
|||||||
r.pts_map[pts.conn_id] = pts
|
r.pts_map[pts.conn_id] = pts
|
||||||
r.Cts.S.stats.peers.Add(1)
|
r.Cts.S.stats.peers.Add(1)
|
||||||
|
|
||||||
|
r.Cts.S.pts_mtx.Lock()
|
||||||
|
pts.node_in_server = r.Cts.S.pts_list.PushBack(pts)
|
||||||
|
r.Cts.S.pts_mtx.Unlock()
|
||||||
|
|
||||||
r.Cts.S.bulletin.Enqueue(
|
r.Cts.S.bulletin.Enqueue(
|
||||||
&ServerEvent{
|
&ServerEvent{
|
||||||
Kind: SERVER_EVENT_PEER_ADDED,
|
Kind: SERVER_EVENT_PEER_ADDED,
|
||||||
@ -1351,6 +1359,7 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi
|
|||||||
s.Cfg = cfg
|
s.Cfg = cfg
|
||||||
s.ext_svcs = make([]Service, 0, 1)
|
s.ext_svcs = make([]Service, 0, 1)
|
||||||
s.pts_limit = cfg.MaxPeers
|
s.pts_limit = cfg.MaxPeers
|
||||||
|
s.pts_list = list.New()
|
||||||
s.cts_limit = cfg.RpcMaxConns
|
s.cts_limit = cfg.RpcMaxConns
|
||||||
s.cts_next_id = 1
|
s.cts_next_id = 1
|
||||||
s.cts_map = make(ServerConnMap)
|
s.cts_map = make(ServerConnMap)
|
||||||
@ -1388,6 +1397,8 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi
|
|||||||
s.WrapHttpHandler(&server_ctl_server_conns_id_routes_id_peers{server_ctl{s: &s, id: HS_ID_CTL}}))
|
s.WrapHttpHandler(&server_ctl_server_conns_id_routes_id_peers{server_ctl{s: &s, id: HS_ID_CTL}}))
|
||||||
s.ctl_mux.Handle(s.Cfg.CtlPrefix + "/_ctl/server-conns/{conn_id}/routes/{route_id}/peers/{peer_id}",
|
s.ctl_mux.Handle(s.Cfg.CtlPrefix + "/_ctl/server-conns/{conn_id}/routes/{route_id}/peers/{peer_id}",
|
||||||
s.WrapHttpHandler(&server_ctl_server_conns_id_routes_id_peers_id{server_ctl{s: &s, id: HS_ID_CTL}}))
|
s.WrapHttpHandler(&server_ctl_server_conns_id_routes_id_peers_id{server_ctl{s: &s, id: HS_ID_CTL}}))
|
||||||
|
s.ctl_mux.Handle(s.Cfg.CtlPrefix + "/_ctl/server-peers",
|
||||||
|
s.WrapHttpHandler(&server_ctl_server_peers{server_ctl{s: &s, id: HS_ID_CTL}}))
|
||||||
s.ctl_mux.Handle(s.Cfg.CtlPrefix + "/_ctl/notices",
|
s.ctl_mux.Handle(s.Cfg.CtlPrefix + "/_ctl/notices",
|
||||||
s.WrapHttpHandler(&server_ctl_notices{server_ctl{s: &s, id: HS_ID_CTL}}))
|
s.WrapHttpHandler(&server_ctl_notices{server_ctl{s: &s, id: HS_ID_CTL}}))
|
||||||
s.ctl_mux.Handle(s.Cfg.CtlPrefix + "/_ctl/notices/{conn_id}",
|
s.ctl_mux.Handle(s.Cfg.CtlPrefix + "/_ctl/notices/{conn_id}",
|
||||||
|
Loading…
x
Reference in New Issue
Block a user