writing clent-side api code for /servers/id/routes/id/peers
This commit is contained in:
parent
2b29d05a4a
commit
b792997184
137
client-ctl.go
137
client-ctl.go
@ -51,6 +51,11 @@ type json_out_client_route struct {
|
||||
ServerPeerListenAddr string `json:"server-peer-listen-addr"`
|
||||
}
|
||||
|
||||
type json_out_client_peer struct {
|
||||
Id uint32 `json:"id"`
|
||||
ClientPeerAddr string `json:"client-peer-addr"`
|
||||
ServerPeerAddr string `json:"server-peer-addr"`
|
||||
}
|
||||
// ------------------------------------
|
||||
|
||||
type client_ctl_client_conns struct {
|
||||
@ -69,6 +74,13 @@ type client_ctl_client_conns_id_routes_id struct {
|
||||
c *Client
|
||||
}
|
||||
|
||||
type client_ctl_client_conns_id_routes_id_peers struct {
|
||||
c *Client
|
||||
}
|
||||
|
||||
type client_ctl_client_conns_id_routes_id_peers_id struct {
|
||||
c *Client
|
||||
}
|
||||
|
||||
type client_ctl_clients struct {
|
||||
c *Client
|
||||
@ -411,6 +423,131 @@ oops:
|
||||
|
||||
// ------------------------------------
|
||||
|
||||
func (ctl *client_ctl_client_conns_id_routes_id_peers) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
var c *Client
|
||||
var status_code int
|
||||
var err error
|
||||
var conn_id string
|
||||
var route_id string
|
||||
var conn_nid uint64
|
||||
var route_nid uint64
|
||||
var je *json.Encoder
|
||||
var r *ClientRoute
|
||||
|
||||
c = ctl.c
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
conn_id = req.PathValue("conn_id")
|
||||
route_id = req.PathValue("route_id")
|
||||
|
||||
conn_nid, err = strconv.ParseUint(conn_id, 10, 32)
|
||||
if err != nil {
|
||||
status_code = http.StatusBadRequest; w.WriteHeader(status_code)
|
||||
if err = je.Encode(json_errmsg{Text: "wrong connection id - " + conn_id}); err != nil { goto oops }
|
||||
goto done
|
||||
}
|
||||
route_nid, err = strconv.ParseUint(route_id, 10, 32)
|
||||
if err != nil {
|
||||
status_code = http.StatusBadRequest; w.WriteHeader(status_code)
|
||||
if err = je.Encode(json_errmsg{Text: "wrong route id - " + route_id}); err != nil { goto oops }
|
||||
goto done
|
||||
}
|
||||
|
||||
r = c.FindClientRouteById(uint32(conn_nid), uint32(route_nid))
|
||||
if r == nil {
|
||||
status_code = http.StatusNotFound; w.WriteHeader(status_code)
|
||||
if err = je.Encode(json_errmsg{Text: "non-existent connection/route id - " + conn_id + "/" + route_id}); err != nil { goto oops }
|
||||
goto done
|
||||
}
|
||||
|
||||
switch req.Method {
|
||||
case http.MethodGet:
|
||||
var p *ClientPeerConn
|
||||
var jcp []json_out_client_peer
|
||||
|
||||
jcp = make([]json_out_client_peer, 0)
|
||||
r.ptc_mtx.Lock()
|
||||
for _, p = range r.ptc_map {
|
||||
jcp = append(jcp, json_out_client_peer{
|
||||
Id: p.conn_id,
|
||||
ClientPeerAddr: p.conn.RemoteAddr().String(),
|
||||
//ServerPeerAddr: r.server_peer,
|
||||
})
|
||||
}
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
status_code = http.StatusOK; w.WriteHeader(status_code)
|
||||
if err = je.Encode(jcp); err != nil { goto oops }
|
||||
|
||||
// TODO: implemente MethodDelete to support forced disconnect from the peer.
|
||||
|
||||
default:
|
||||
status_code = http.StatusBadRequest; w.WriteHeader(status_code)
|
||||
}
|
||||
done:
|
||||
c.log.Write("", LOG_DEBUG, "[%s] %s %s %d", req.RemoteAddr, req.Method, req.URL.String(), status_code) // TODO: time taken
|
||||
return
|
||||
|
||||
oops:
|
||||
c.log.Write("", LOG_ERROR, "[%s] %s %s - %s", req.RemoteAddr, req.Method, req.URL.String(), err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// ------------------------------------
|
||||
|
||||
func (ctl *client_ctl_client_conns_id_routes_id_peers_id) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
var c *Client
|
||||
var status_code int
|
||||
var err error
|
||||
var conn_id string
|
||||
var route_id string
|
||||
var conn_nid uint64
|
||||
var route_nid uint64
|
||||
var je *json.Encoder
|
||||
var r *ClientRoute
|
||||
|
||||
c = ctl.c
|
||||
je = json.NewEncoder(w)
|
||||
|
||||
conn_id = req.PathValue("conn_id")
|
||||
route_id = req.PathValue("route_id")
|
||||
|
||||
conn_nid, err = strconv.ParseUint(conn_id, 10, 32)
|
||||
if err != nil {
|
||||
status_code = http.StatusBadRequest; w.WriteHeader(status_code)
|
||||
if err = je.Encode(json_errmsg{Text: "wrong connection id - " + conn_id}); err != nil { goto oops }
|
||||
goto done
|
||||
}
|
||||
route_nid, err = strconv.ParseUint(route_id, 10, 32)
|
||||
if err != nil {
|
||||
status_code = http.StatusBadRequest; w.WriteHeader(status_code)
|
||||
if err = je.Encode(json_errmsg{Text: "wrong route id - " + route_id}); err != nil { goto oops }
|
||||
goto done
|
||||
}
|
||||
|
||||
r = c.FindClientRouteById(uint32(conn_nid), uint32(route_nid))
|
||||
if r == nil {
|
||||
status_code = http.StatusNotFound; w.WriteHeader(status_code)
|
||||
if err = je.Encode(json_errmsg{Text: "non-existent connection/route id - " + conn_id + "/" + route_id}); err != nil { goto oops }
|
||||
goto done
|
||||
}
|
||||
|
||||
switch req.Method {
|
||||
case http.MethodGet:
|
||||
default:
|
||||
status_code = http.StatusBadRequest; w.WriteHeader(status_code)
|
||||
}
|
||||
done:
|
||||
c.log.Write("", LOG_DEBUG, "[%s] %s %s %d", req.RemoteAddr, req.Method, req.URL.String(), status_code) // TODO: time taken
|
||||
return
|
||||
|
||||
oops:
|
||||
c.log.Write("", LOG_ERROR, "[%s] %s %s - %s", req.RemoteAddr, req.Method, req.URL.String(), err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// ------------------------------------
|
||||
|
||||
func (ctl *client_ctl_clients) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
|
||||
|
@ -17,26 +17,25 @@ func NewClientPeerConn(r *ClientRoute, c *net.TCPConn, id uint32) *ClientPeerCon
|
||||
}
|
||||
|
||||
func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error {
|
||||
//var conn *net.TCPConn
|
||||
//var addr *net.TCPAddr
|
||||
var err error
|
||||
var buf [4096]byte
|
||||
var n int
|
||||
|
||||
defer wg.Done()
|
||||
|
||||
fmt.Printf("CONNECTION ESTABLISHED TO PEER... ABOUT TO READ DATA...\n")
|
||||
fmt.Printf("CONNECTION ESTABLISHED TO PEER... ABOUT TO READ DATA...\n")
|
||||
for {
|
||||
n, err = cpc.conn.Read(buf[:])
|
||||
if err != nil {
|
||||
fmt.Printf("unable to read from the client-side peer %s - %s\n", cpc.addr, err.Error())
|
||||
// TODO: add proper log header
|
||||
cpc.route.cts.cli.log.Write("", LOG_ERROR, "Unable to read from the client-side peer %s - %s", cpc.conn.RemoteAddr().String(), err.Error())
|
||||
break
|
||||
}
|
||||
|
||||
// TODO: guarded call..
|
||||
err = cpc.route.cts.psc.Send(MakePeerDataPacket(cpc.route.id, cpc.conn_id, buf[0:n]))
|
||||
if err != nil {
|
||||
fmt.Printf("unable to write data to server - %s\n", err.Error())
|
||||
// TODO: add proper log header
|
||||
cpc.route.cts.cli.log.Write("", LOG_ERROR, "Unable to write to server - %s", err.Error())
|
||||
break
|
||||
}
|
||||
}
|
||||
|
43
client.go
43
client.go
@ -56,24 +56,10 @@ type Client struct {
|
||||
mux *http.ServeMux
|
||||
}
|
||||
|
||||
type ClientPeerConn struct {
|
||||
route *ClientRoute
|
||||
conn_id uint32
|
||||
conn *net.TCPConn
|
||||
remot_conn_id uint32
|
||||
|
||||
addr string // peer address
|
||||
|
||||
stop_chan chan bool
|
||||
stop_req atomic.Bool
|
||||
server_peer_eof atomic.Bool
|
||||
}
|
||||
|
||||
// client connection to server
|
||||
type ClientConn struct {
|
||||
cli *Client
|
||||
cfg ClientConfigActive
|
||||
//saddr *net.TCPAddr // server address that is connected to
|
||||
id uint32
|
||||
lid string
|
||||
|
||||
@ -108,6 +94,16 @@ type ClientRoute struct {
|
||||
stop_chan chan bool
|
||||
}
|
||||
|
||||
type ClientPeerConn struct {
|
||||
route *ClientRoute
|
||||
conn_id uint32
|
||||
conn *net.TCPConn
|
||||
|
||||
stop_chan chan bool
|
||||
stop_req atomic.Bool
|
||||
server_peer_eof atomic.Bool
|
||||
}
|
||||
|
||||
type GuardedPacketStreamClient struct {
|
||||
mtx sync.Mutex
|
||||
//psc Hodu_PacketStreamClient
|
||||
@ -176,7 +172,7 @@ done:
|
||||
r.cts.cli.log.Write("", LOG_DEBUG, "Sending route-stop for id=%d peer=%s to %s", r.id, r.peer_addr, r.cts.cfg.ServerAddr)
|
||||
r.cts.psc.Send(MakeRouteStopPacket(r.id, r.proto, r.peer_addr))
|
||||
r.cts.RemoveClientRoute(r)
|
||||
fmt.Printf("*** End fo Client Roue Task\n")
|
||||
fmt.Printf("*** End fo Client Route Task\n")
|
||||
}
|
||||
|
||||
func (r *ClientRoute) ReqStop() {
|
||||
@ -790,6 +786,8 @@ func NewClient(ctx context.Context, listen_on string, logger Logger, tlscfg *tls
|
||||
c.mux.Handle(c.api_prefix + "/client-conns/{conn_id}", &client_ctl_client_conns_id{c: &c})
|
||||
c.mux.Handle(c.api_prefix + "/client-conns/{conn_id}/routes", &client_ctl_client_conns_id_routes{c: &c})
|
||||
c.mux.Handle(c.api_prefix + "/client-conns/{conn_id}/routes/{route_id}", &client_ctl_client_conns_id_routes_id{c: &c})
|
||||
c.mux.Handle(c.api_prefix + "/client-conns/{conn_id}/routes/{route_id}/peers", &client_ctl_client_conns_id_routes_id_peers{c: &c})
|
||||
c.mux.Handle(c.api_prefix + "/client-conns/{conn_id}/routes/{route_id}/peers/{peer_id}", &client_ctl_client_conns_id_routes_id_peers_id{c: &c})
|
||||
c.mux.Handle(c.api_prefix + "/server-conns", &client_ctl_clients{c: &c})
|
||||
c.mux.Handle(c.api_prefix + "/server-conns/{id}", &client_ctl_clients_id{c: &c})
|
||||
|
||||
@ -920,6 +918,21 @@ func (c *Client) FindClientConnById(id uint32) *ClientConn {
|
||||
return cts
|
||||
}
|
||||
|
||||
func (c *Client) FindClientRouteById(conn_id uint32, route_id uint32) *ClientRoute {
|
||||
var cts *ClientConn
|
||||
var ok bool
|
||||
|
||||
c.cts_mtx.Lock()
|
||||
defer c.cts_mtx.Unlock()
|
||||
|
||||
cts, ok = c.cts_map[conn_id]
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
|
||||
return cts.FindClientRouteById(route_id)
|
||||
}
|
||||
|
||||
func (c *Client) ReqStop() {
|
||||
if c.stop_req.CompareAndSwap(false, true) {
|
||||
var cts *ClientConn
|
||||
|
Loading…
x
Reference in New Issue
Block a user