added code dealing with server-side peers
This commit is contained in:
parent
cb18a44cfa
commit
f2536a0acc
@ -278,14 +278,14 @@ func (r *ClientRoute) ReqStopAllClientPeerConns() {
|
|||||||
r.ptc_mtx.Unlock()
|
r.ptc_mtx.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *ClientRoute) FindClientPeerConnById(conn_id PeerId) *ClientPeerConn {
|
func (r *ClientRoute) FindClientPeerConnById(peer_id PeerId) *ClientPeerConn {
|
||||||
var c *ClientPeerConn
|
var c *ClientPeerConn
|
||||||
var ok bool
|
var ok bool
|
||||||
|
|
||||||
r.ptc_mtx.Lock()
|
r.ptc_mtx.Lock()
|
||||||
defer r.ptc_mtx.Unlock()
|
defer r.ptc_mtx.Unlock()
|
||||||
|
|
||||||
c, ok = r.ptc_map[conn_id]
|
c, ok = r.ptc_map[peer_id]
|
||||||
if !ok { return nil }
|
if !ok { return nil }
|
||||||
|
|
||||||
return c
|
return c
|
||||||
|
@ -31,6 +31,14 @@ type json_out_server_route struct {
|
|||||||
ServerPeerServiceNet string `json:"server-peer-service-net"`
|
ServerPeerServiceNet string `json:"server-peer-service-net"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type json_out_server_peer struct {
|
||||||
|
Id PeerId `json:"id"`
|
||||||
|
ClientPeerAddr string `json:"client-peer-addr"`
|
||||||
|
ClientLocalAddr string `json:"client-local-addr"`
|
||||||
|
ServerPeerAddr string `json:"server-peer-addr"`
|
||||||
|
ServerLocalAddr string `json:"server-local-addr"`
|
||||||
|
}
|
||||||
|
|
||||||
type json_out_server_stats struct {
|
type json_out_server_stats struct {
|
||||||
json_out_go_stats
|
json_out_go_stats
|
||||||
|
|
||||||
@ -73,6 +81,10 @@ type server_ctl_server_conns_id_routes_id_peers struct {
|
|||||||
server_ctl
|
server_ctl
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type server_ctl_server_conns_id_routes_id_peers_id struct {
|
||||||
|
server_ctl
|
||||||
|
}
|
||||||
|
|
||||||
type server_ctl_stats struct {
|
type server_ctl_stats struct {
|
||||||
server_ctl
|
server_ctl
|
||||||
}
|
}
|
||||||
@ -315,6 +327,8 @@ func (ctl *server_ctl_server_conns_id_routes) ServeHTTP(w http.ResponseWriter, r
|
|||||||
if err = je.Encode(jsp); err != nil { goto oops }
|
if err = je.Encode(jsp); err != nil { goto oops }
|
||||||
|
|
||||||
case http.MethodDelete:
|
case http.MethodDelete:
|
||||||
|
// direct removal causes quite some clean-up issues.
|
||||||
|
// make stop request to all server routes and let the task stop by themselves
|
||||||
//cts.RemoveAllServerRoutes()
|
//cts.RemoveAllServerRoutes()
|
||||||
cts.ReqStopAllServerRoutes()
|
cts.ReqStopAllServerRoutes()
|
||||||
status_code = WriteEmptyRespHeader(w, http.StatusNoContent)
|
status_code = WriteEmptyRespHeader(w, http.StatusNoContent)
|
||||||
@ -441,19 +455,27 @@ func (ctl *server_ctl_server_conns_id_routes_id_peers) ServeHTTP(w http.Response
|
|||||||
|
|
||||||
switch req.Method {
|
switch req.Method {
|
||||||
case http.MethodGet:
|
case http.MethodGet:
|
||||||
|
var p *ServerPeerConn
|
||||||
|
var jcp []json_out_server_peer
|
||||||
|
|
||||||
|
jcp = make([]json_out_server_peer, 0)
|
||||||
|
r.pts_mtx.Lock()
|
||||||
|
for _, p = range r.pts_map {
|
||||||
|
jcp = append(jcp, json_out_server_peer{
|
||||||
|
Id: p.conn_id,
|
||||||
|
ServerPeerAddr: p.conn.RemoteAddr().String(),
|
||||||
|
ServerLocalAddr: p.conn.LocalAddr().String(),
|
||||||
|
ClientPeerAddr: p.cts.remote_addr,
|
||||||
|
ClientLocalAddr: p.cts.local_addr,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
r.pts_mtx.Unlock()
|
||||||
|
|
||||||
status_code = WriteJsonRespHeader(w, http.StatusOK)
|
status_code = WriteJsonRespHeader(w, http.StatusOK)
|
||||||
err = je.Encode(json_out_server_route{
|
if err = je.Encode(jcp); err != nil { goto oops }
|
||||||
Id: r.Id,
|
|
||||||
ClientPeerAddr: r.PtcAddr,
|
|
||||||
ClientPeerName: r.PtcName,
|
|
||||||
ServerPeerServiceAddr: r.SvcAddr.String(),
|
|
||||||
ServerPeerServiceNet: r.SvcPermNet.String(),
|
|
||||||
ServerPeerOption: r.SvcOption.String(),
|
|
||||||
})
|
|
||||||
if err != nil { goto oops }
|
|
||||||
|
|
||||||
case http.MethodDelete:
|
case http.MethodDelete:
|
||||||
//r.ReqStopAllServerPeerConns()
|
r.ReqStopAllServerPeerConns()
|
||||||
status_code = WriteEmptyRespHeader(w, http.StatusNoContent)
|
status_code = WriteEmptyRespHeader(w, http.StatusNoContent)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@ -469,6 +491,61 @@ oops:
|
|||||||
|
|
||||||
// ------------------------------------
|
// ------------------------------------
|
||||||
|
|
||||||
|
func (ctl *server_ctl_server_conns_id_routes_id_peers_id) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
||||||
|
var s *Server
|
||||||
|
var status_code int
|
||||||
|
var conn_id string
|
||||||
|
var route_id string
|
||||||
|
var peer_id string
|
||||||
|
var je *json.Encoder
|
||||||
|
var p *ServerPeerConn
|
||||||
|
var err error
|
||||||
|
|
||||||
|
s = ctl.s
|
||||||
|
je = json.NewEncoder(w)
|
||||||
|
|
||||||
|
conn_id = req.PathValue("conn_id")
|
||||||
|
route_id = req.PathValue("route_id")
|
||||||
|
peer_id = req.PathValue("peer_id")
|
||||||
|
p, err = s.FindServerPeerConnByIdStr(conn_id, route_id, peer_id)
|
||||||
|
if err != nil {
|
||||||
|
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
|
||||||
|
je.Encode(JsonErrmsg{Text: err.Error()})
|
||||||
|
goto oops
|
||||||
|
}
|
||||||
|
|
||||||
|
switch req.Method {
|
||||||
|
case http.MethodGet:
|
||||||
|
var jcp *json_out_server_peer
|
||||||
|
|
||||||
|
jcp = &json_out_server_peer{
|
||||||
|
Id: p.conn_id,
|
||||||
|
ServerPeerAddr: p.conn.RemoteAddr().String(),
|
||||||
|
ServerLocalAddr: p.conn.LocalAddr().String(),
|
||||||
|
ClientPeerAddr: p.cts.remote_addr,
|
||||||
|
ClientLocalAddr: p.cts.local_addr,
|
||||||
|
}
|
||||||
|
|
||||||
|
status_code = WriteJsonRespHeader(w, http.StatusOK)
|
||||||
|
if err = je.Encode(jcp); err != nil { goto oops }
|
||||||
|
|
||||||
|
case http.MethodDelete:
|
||||||
|
p.ReqStop()
|
||||||
|
status_code = WriteEmptyRespHeader(w, http.StatusNoContent)
|
||||||
|
|
||||||
|
default:
|
||||||
|
status_code = WriteEmptyRespHeader(w, http.StatusBadRequest)
|
||||||
|
}
|
||||||
|
|
||||||
|
//done:
|
||||||
|
return status_code, nil
|
||||||
|
|
||||||
|
oops:
|
||||||
|
return status_code, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// ------------------------------------
|
||||||
|
|
||||||
func (ctl *server_ctl_stats) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
func (ctl *server_ctl_stats) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
||||||
var s *Server
|
var s *Server
|
||||||
var status_code int
|
var status_code int
|
||||||
@ -494,10 +571,8 @@ func (ctl *server_ctl_stats) ServeHTTP(w http.ResponseWriter, req *http.Request)
|
|||||||
}
|
}
|
||||||
|
|
||||||
//done:
|
//done:
|
||||||
//s.log.Write(ctl.id, LOG_INFO, "[%s] %s %s %d", req.RemoteAddr, req.Method, req.URL.String(), status_code) // TODO: time taken
|
|
||||||
return status_code, nil
|
return status_code, nil
|
||||||
|
|
||||||
oops:
|
oops:
|
||||||
//s.log.Write(ctl.id, LOG_ERROR, "[%s] %s %s - %s", req.RemoteAddr, req.Method, req.URL.String(), err.Error())
|
|
||||||
return status_code, err
|
return status_code, err
|
||||||
}
|
}
|
||||||
|
97
server.go
97
server.go
@ -341,6 +341,27 @@ func (r *ServerRoute) ReqStop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (r *ServerRoute) ReqStopAllServerPeerConns() {
|
||||||
|
var c *ServerPeerConn
|
||||||
|
|
||||||
|
r.pts_mtx.Lock()
|
||||||
|
for _, c = range r.pts_map { c.ReqStop() }
|
||||||
|
r.pts_mtx.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *ServerRoute) FindServerPeerConnById(peer_id PeerId) *ServerPeerConn {
|
||||||
|
var c *ServerPeerConn
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
r.pts_mtx.Lock()
|
||||||
|
defer r.pts_mtx.Unlock()
|
||||||
|
|
||||||
|
c, ok = r.pts_map[peer_id]
|
||||||
|
if !ok { return nil }
|
||||||
|
|
||||||
|
return c
|
||||||
|
}
|
||||||
|
|
||||||
func (r *ServerRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_data interface{}) error {
|
func (r *ServerRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_data interface{}) error {
|
||||||
var spc *ServerPeerConn
|
var spc *ServerPeerConn
|
||||||
var ok bool
|
var ok bool
|
||||||
@ -510,6 +531,18 @@ func (cts *ServerConn) FindServerRouteById(route_id RouteId) *ServerRoute {
|
|||||||
return r
|
return r
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (cts *ServerConn) FindServerPeerConnById(route_id RouteId, peer_id PeerId) *ServerPeerConn {
|
||||||
|
var r *ServerRoute
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
cts.route_mtx.Lock()
|
||||||
|
defer cts.route_mtx.Unlock()
|
||||||
|
r, ok = cts.route_map[route_id]
|
||||||
|
if !ok { return nil }
|
||||||
|
|
||||||
|
return r.FindServerPeerConnById(peer_id)
|
||||||
|
}
|
||||||
|
|
||||||
func (cts *ServerConn) ReqStopAllServerRoutes() {
|
func (cts *ServerConn) ReqStopAllServerRoutes() {
|
||||||
var r *ServerRoute
|
var r *ServerRoute
|
||||||
|
|
||||||
@ -1076,6 +1109,8 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi
|
|||||||
s.wrap_http_handler(&server_ctl_server_conns_id_routes_id{server_ctl{s: &s, id: HS_ID_CTL}}))
|
s.wrap_http_handler(&server_ctl_server_conns_id_routes_id{server_ctl{s: &s, id: HS_ID_CTL}}))
|
||||||
s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl/server-conns/{conn_id}/routes/{route_id}/peers",
|
s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl/server-conns/{conn_id}/routes/{route_id}/peers",
|
||||||
s.wrap_http_handler(&server_ctl_server_conns_id_routes_id_peers{server_ctl{s: &s, id: HS_ID_CTL}}))
|
s.wrap_http_handler(&server_ctl_server_conns_id_routes_id_peers{server_ctl{s: &s, id: HS_ID_CTL}}))
|
||||||
|
s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl/server-conns/{conn_id}/routes/{route_id}/peers/{peer_id}",
|
||||||
|
s.wrap_http_handler(&server_ctl_server_conns_id_routes_id_peers_id{server_ctl{s: &s, id: HS_ID_CTL}}))
|
||||||
s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl/stats",
|
s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl/stats",
|
||||||
s.wrap_http_handler(&server_ctl_stats{server_ctl{s: &s, id: HS_ID_CTL}}))
|
s.wrap_http_handler(&server_ctl_stats{server_ctl{s: &s, id: HS_ID_CTL}}))
|
||||||
s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl/token",
|
s.ctl_mux.Handle(s.cfg.CtlPrefix + "/_ctl/token",
|
||||||
@ -1594,6 +1629,19 @@ func (s *Server) FindServerRouteById(id ConnId, route_id RouteId) *ServerRoute {
|
|||||||
return cts.FindServerRouteById(route_id)
|
return cts.FindServerRouteById(route_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) FindServerPeerConnById(id ConnId, route_id RouteId, peer_id PeerId) *ServerPeerConn {
|
||||||
|
var cts *ServerConn
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
s.cts_mtx.Lock()
|
||||||
|
defer s.cts_mtx.Unlock()
|
||||||
|
|
||||||
|
cts, ok = s.cts_map[id]
|
||||||
|
if !ok { return nil }
|
||||||
|
|
||||||
|
return cts.FindServerPeerConnById(route_id, peer_id)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) FindServerRouteByPortId(port_id PortId) *ServerRoute {
|
func (s *Server) FindServerRouteByPortId(port_id PortId) *ServerRoute {
|
||||||
var cri ConnRouteId
|
var cri ConnRouteId
|
||||||
var ok bool
|
var ok bool
|
||||||
@ -1606,6 +1654,55 @@ func (s *Server) FindServerRouteByPortId(port_id PortId) *ServerRoute {
|
|||||||
return s.FindServerRouteById(cri.conn_id, cri.route_id)
|
return s.FindServerRouteById(cri.conn_id, cri.route_id)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) FindServerPeerConnByPortId(port_id PortId, peer_id PeerId) *ServerPeerConn {
|
||||||
|
var cri ConnRouteId
|
||||||
|
var ok bool
|
||||||
|
|
||||||
|
s.svc_port_mtx.Lock()
|
||||||
|
defer s.svc_port_mtx.Unlock()
|
||||||
|
|
||||||
|
cri, ok = s.svc_port_map[port_id]
|
||||||
|
if !ok { return nil }
|
||||||
|
return s.FindServerPeerConnById(cri.conn_id, cri.route_id, peer_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Server) FindServerPeerConnByIdStr(conn_id string, route_id string, peer_id string) (*ServerPeerConn, error) {
|
||||||
|
var p *ServerPeerConn
|
||||||
|
var err error
|
||||||
|
|
||||||
|
if route_id == PORT_ID_MARKER {
|
||||||
|
var port_nid uint64
|
||||||
|
var peer_nid uint64
|
||||||
|
|
||||||
|
port_nid, err = strconv.ParseUint(conn_id, 10, int(unsafe.Sizeof(PortId(0)) * 8))
|
||||||
|
if err != nil { return nil, fmt.Errorf("invalid port id %s - %s", conn_id, err.Error()) }
|
||||||
|
|
||||||
|
peer_nid, err = strconv.ParseUint(peer_id, 10, int(unsafe.Sizeof(PeerId(0)) * 8))
|
||||||
|
if err != nil { return nil, fmt.Errorf("invalid peer id %s - %s", peer_id, err.Error()) }
|
||||||
|
|
||||||
|
p = s.FindServerPeerConnByPortId(PortId(port_nid), PeerId(peer_nid))
|
||||||
|
if p == nil { return nil, fmt.Errorf("peer(%d,%d) not found", port_nid, peer_nid) }
|
||||||
|
} else {
|
||||||
|
var conn_nid uint64
|
||||||
|
var route_nid uint64
|
||||||
|
var peer_nid uint64
|
||||||
|
|
||||||
|
conn_nid, err = strconv.ParseUint(conn_id, 10, int(unsafe.Sizeof(ConnId(0)) * 8))
|
||||||
|
if err != nil { return nil, fmt.Errorf("invalid connection id %s - %s", conn_id, err.Error()) }
|
||||||
|
|
||||||
|
route_nid, err = strconv.ParseUint(route_id, 10, int(unsafe.Sizeof(RouteId(0)) * 8))
|
||||||
|
if err != nil { return nil, fmt.Errorf("invalid route id %s - %s", route_id, err.Error()) }
|
||||||
|
|
||||||
|
peer_nid, err = strconv.ParseUint(peer_id, 10, int(unsafe.Sizeof(PeerId(0)) * 8))
|
||||||
|
if err != nil { return nil, fmt.Errorf("invalid peer id %s - %s", peer_id, err.Error()) }
|
||||||
|
|
||||||
|
p = s.FindServerPeerConnById(ConnId(conn_nid), RouteId(route_nid), PeerId(peer_nid))
|
||||||
|
if p == nil { return nil, fmt.Errorf("peer(%d,%d,%d) not found", conn_nid, route_nid, peer_nid) }
|
||||||
|
}
|
||||||
|
|
||||||
|
return p, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) FindServerRouteByIdStr(conn_id string, route_id string) (*ServerRoute, error) {
|
func (s *Server) FindServerRouteByIdStr(conn_id string, route_id string) (*ServerRoute, error) {
|
||||||
var r *ServerRoute
|
var r *ServerRoute
|
||||||
var err error
|
var err error
|
||||||
|
Loading…
x
Reference in New Issue
Block a user