minor touch up
This commit is contained in:
parent
049937a53b
commit
2b29d05a4a
@ -96,7 +96,7 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R
|
|||||||
|
|
||||||
js = make([]json_out_client_conn, 0)
|
js = make([]json_out_client_conn, 0)
|
||||||
c.cts_mtx.Lock()
|
c.cts_mtx.Lock()
|
||||||
for _, cts = range c.cts_map_by_id {
|
for _, cts = range c.cts_map {
|
||||||
var r *ClientRoute
|
var r *ClientRoute
|
||||||
var jsp []json_out_client_route
|
var jsp []json_out_client_route
|
||||||
|
|
||||||
@ -145,11 +145,11 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R
|
|||||||
}
|
}
|
||||||
|
|
||||||
case http.MethodDelete:
|
case http.MethodDelete:
|
||||||
// delete all server conneections
|
// delete all client connections to servers. if we request to stop all
|
||||||
var cts *ClientConn
|
// client connections, they will remove themselves from the client.
|
||||||
c.cts_mtx.Lock()
|
// we do passive deletion rather than doing active deletion by calling
|
||||||
for _, cts = range c.cts_map { cts.ReqStop() }
|
// c.RemoveAllClientConns()
|
||||||
c.cts_mtx.Unlock()
|
c.ReqStopAllClientConns()
|
||||||
status_code = http.StatusNoContent; w.WriteHeader(status_code)
|
status_code = http.StatusNoContent; w.WriteHeader(status_code)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
@ -177,7 +177,6 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt
|
|||||||
var conn_nid uint64
|
var conn_nid uint64
|
||||||
var je *json.Encoder
|
var je *json.Encoder
|
||||||
|
|
||||||
|
|
||||||
c = ctl.c
|
c = ctl.c
|
||||||
je = json.NewEncoder(w)
|
je = json.NewEncoder(w)
|
||||||
|
|
||||||
@ -190,7 +189,6 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt
|
|||||||
goto done
|
goto done
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
switch req.Method {
|
switch req.Method {
|
||||||
case http.MethodGet:
|
case http.MethodGet:
|
||||||
var r *ClientRoute
|
var r *ClientRoute
|
||||||
@ -234,7 +232,6 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt
|
|||||||
}
|
}
|
||||||
return
|
return
|
||||||
|
|
||||||
|
|
||||||
done:
|
done:
|
||||||
// TODO: need to handle x-forwarded-for and other stuff? this is not a real web service, though
|
// TODO: need to handle x-forwarded-for and other stuff? this is not a real web service, though
|
||||||
c.log.Write("", LOG_DEBUG, "[%s] %s %s %d", req.RemoteAddr, req.Method, req.URL.String(), status_code) // TODO: time taken
|
c.log.Write("", LOG_DEBUG, "[%s] %s %s %d", req.RemoteAddr, req.Method, req.URL.String(), status_code) // TODO: time taken
|
||||||
@ -243,9 +240,10 @@ done:
|
|||||||
oops:
|
oops:
|
||||||
c.log.Write("", LOG_ERROR, "[%s] %s %s - %s", req.RemoteAddr, req.Method, req.URL.String(), err.Error())
|
c.log.Write("", LOG_ERROR, "[%s] %s %s - %s", req.RemoteAddr, req.Method, req.URL.String(), err.Error())
|
||||||
return
|
return
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ------------------------------------
|
||||||
|
|
||||||
func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||||
var c *Client
|
var c *Client
|
||||||
var status_code int
|
var status_code int
|
||||||
@ -313,7 +311,8 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r
|
|||||||
}
|
}
|
||||||
|
|
||||||
case http.MethodDelete:
|
case http.MethodDelete:
|
||||||
cts.RemoveClientRoutes()
|
//cts.RemoveAllClientRoutes()
|
||||||
|
cts.ReqStopAllClientRoutes()
|
||||||
status_code = http.StatusNoContent; w.WriteHeader(status_code)
|
status_code = http.StatusNoContent; w.WriteHeader(status_code)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
|
76
client.go
76
client.go
@ -18,8 +18,8 @@ import "google.golang.org/grpc/status"
|
|||||||
|
|
||||||
type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet]
|
type PacketStreamClient grpc.BidiStreamingClient[Packet, Packet]
|
||||||
|
|
||||||
type ClientConnMap = map[string]*ClientConn
|
type ClientConnMapByAddr = map[string]*ClientConn
|
||||||
type ClientConnMapById = map[uint32]*ClientConn
|
type ClientConnMap = map[uint32]*ClientConn
|
||||||
type ClientPeerConnMap = map[uint32]*ClientPeerConn
|
type ClientPeerConnMap = map[uint32]*ClientPeerConn
|
||||||
type ClientRouteMap = map[uint32]*ClientRoute
|
type ClientRouteMap = map[uint32]*ClientRoute
|
||||||
type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc
|
type ClientPeerCancelFuncMap = map[uint32]context.CancelFunc
|
||||||
@ -45,8 +45,8 @@ type Client struct {
|
|||||||
ctl *http.Server // control server
|
ctl *http.Server // control server
|
||||||
|
|
||||||
cts_mtx sync.Mutex
|
cts_mtx sync.Mutex
|
||||||
|
cts_map_by_addr ClientConnMapByAddr
|
||||||
cts_map ClientConnMap
|
cts_map ClientConnMap
|
||||||
cts_map_by_id ClientConnMapById
|
|
||||||
|
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
stop_req atomic.Bool
|
stop_req atomic.Bool
|
||||||
@ -408,15 +408,27 @@ fmt.Printf("added client route.... %d -> %d\n", id, len(cts.route_map))
|
|||||||
return r, nil
|
return r, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cts *ClientConn) RemoveClientRoutes() {
|
func (cts *ClientConn) ReqStopAllClientRoutes() {
|
||||||
var r *ClientRoute
|
var r *ClientRoute
|
||||||
|
|
||||||
cts.route_mtx.Lock()
|
cts.route_mtx.Lock()
|
||||||
|
defer cts.route_mtx.Unlock()
|
||||||
|
|
||||||
|
for _, r = range cts.route_map {
|
||||||
|
r.ReqStop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cts *ClientConn) RemoveAllClientRoutes() {
|
||||||
|
var r *ClientRoute
|
||||||
|
|
||||||
|
cts.route_mtx.Lock()
|
||||||
|
defer cts.route_mtx.Unlock()
|
||||||
|
|
||||||
for _, r = range cts.route_map {
|
for _, r = range cts.route_map {
|
||||||
delete(cts.route_map, r.id)
|
delete(cts.route_map, r.id)
|
||||||
r.ReqStop()
|
r.ReqStop()
|
||||||
}
|
}
|
||||||
cts.route_mtx.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error {
|
func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error {
|
||||||
@ -766,8 +778,8 @@ func NewClient(ctx context.Context, listen_on string, logger Logger, tlscfg *tls
|
|||||||
c.ctx, c.ctx_cancel = context.WithCancel(ctx)
|
c.ctx, c.ctx_cancel = context.WithCancel(ctx)
|
||||||
c.tlscfg = tlscfg
|
c.tlscfg = tlscfg
|
||||||
c.ext_svcs = make([]Service, 0, 1)
|
c.ext_svcs = make([]Service, 0, 1)
|
||||||
|
c.cts_map_by_addr = make(ClientConnMapByAddr)
|
||||||
c.cts_map = make(ClientConnMap)
|
c.cts_map = make(ClientConnMap)
|
||||||
c.cts_map_by_id = make(ClientConnMapById)
|
|
||||||
c.stop_req.Store(false)
|
c.stop_req.Store(false)
|
||||||
c.stop_chan = make(chan bool, 8)
|
c.stop_chan = make(chan bool, 8)
|
||||||
c.log = logger
|
c.log = logger
|
||||||
@ -800,14 +812,14 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
|||||||
c.cts_mtx.Lock()
|
c.cts_mtx.Lock()
|
||||||
defer c.cts_mtx.Unlock()
|
defer c.cts_mtx.Unlock()
|
||||||
|
|
||||||
_, ok = c.cts_map[cfg.ServerAddr]
|
_, ok = c.cts_map_by_addr[cfg.ServerAddr]
|
||||||
if ok {
|
if ok {
|
||||||
return nil, fmt.Errorf("existing server - %s", cfg.ServerAddr)
|
return nil, fmt.Errorf("existing server - %s", cfg.ServerAddr)
|
||||||
}
|
}
|
||||||
|
|
||||||
id = rand.Uint32()
|
id = rand.Uint32()
|
||||||
for {
|
for {
|
||||||
_, ok = c.cts_map_by_id[id]
|
_, ok = c.cts_map[id]
|
||||||
if !ok { break }
|
if !ok { break }
|
||||||
id++
|
id++
|
||||||
}
|
}
|
||||||
@ -815,19 +827,43 @@ func (c *Client) AddNewClientConn(cfg *ClientConfig) (*ClientConn, error) {
|
|||||||
cts.cfg.Id = id // store it again in the active configuration for easy access via control channel
|
cts.cfg.Id = id // store it again in the active configuration for easy access via control channel
|
||||||
cts.lid = fmt.Sprintf("%d", id) // id in string used for logging
|
cts.lid = fmt.Sprintf("%d", id) // id in string used for logging
|
||||||
|
|
||||||
c.cts_map[cfg.ServerAddr] = cts
|
c.cts_map_by_addr[cfg.ServerAddr] = cts
|
||||||
c.cts_map_by_id[id] = cts
|
c.cts_map[id] = cts
|
||||||
fmt.Printf("ADD total servers %d\n", len(c.cts_map))
|
fmt.Printf("ADD total servers %d\n", len(c.cts_map_by_addr))
|
||||||
return cts, nil
|
return cts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c* Client) ReqStopAllClientConns() {
|
||||||
|
var cts *ClientConn
|
||||||
|
|
||||||
|
c.cts_mtx.Lock()
|
||||||
|
defer c.cts_mtx.Unlock()
|
||||||
|
|
||||||
|
for _, cts = range c.cts_map {
|
||||||
|
cts.ReqStop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *Client) RemoveAllClientConns() {
|
||||||
|
var cts *ClientConn
|
||||||
|
|
||||||
|
c.cts_mtx.Lock()
|
||||||
|
defer c.cts_mtx.Unlock()
|
||||||
|
|
||||||
|
for _, cts = range c.cts_map {
|
||||||
|
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
|
||||||
|
delete(c.cts_map, cts.id)
|
||||||
|
cts.ReqStop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (c *Client) RemoveClientConn(cts *ClientConn) error {
|
func (c *Client) RemoveClientConn(cts *ClientConn) error {
|
||||||
var conn *ClientConn
|
var conn *ClientConn
|
||||||
var ok bool
|
var ok bool
|
||||||
|
|
||||||
c.cts_mtx.Lock()
|
c.cts_mtx.Lock()
|
||||||
|
|
||||||
conn, ok = c.cts_map[cts.cfg.ServerAddr]
|
conn, ok = c.cts_map[cts.id]
|
||||||
if !ok {
|
if !ok {
|
||||||
c.cts_mtx.Unlock()
|
c.cts_mtx.Unlock()
|
||||||
return fmt.Errorf("non-existent connection id - %d", cts.id)
|
return fmt.Errorf("non-existent connection id - %d", cts.id)
|
||||||
@ -837,9 +873,9 @@ func (c *Client) RemoveClientConn(cts *ClientConn) error {
|
|||||||
return fmt.Errorf("non-existent connection id - %d", cts.id)
|
return fmt.Errorf("non-existent connection id - %d", cts.id)
|
||||||
}
|
}
|
||||||
|
|
||||||
delete(c.cts_map, cts.cfg.ServerAddr)
|
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
|
||||||
delete(c.cts_map_by_id, cts.id)
|
delete(c.cts_map, cts.id)
|
||||||
fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.cfg.ServerAddr, len(c.cts_map))
|
fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.cfg.ServerAddr, len(c.cts_map_by_addr))
|
||||||
c.cts_mtx.Unlock()
|
c.cts_mtx.Unlock()
|
||||||
|
|
||||||
c.ReqStop()
|
c.ReqStop()
|
||||||
@ -852,7 +888,7 @@ func (c *Client) RemoveClientConnById(conn_id uint32) error {
|
|||||||
|
|
||||||
c.cts_mtx.Lock()
|
c.cts_mtx.Lock()
|
||||||
|
|
||||||
cts, ok = c.cts_map_by_id[conn_id]
|
cts, ok = c.cts_map[conn_id]
|
||||||
if !ok {
|
if !ok {
|
||||||
c.cts_mtx.Unlock()
|
c.cts_mtx.Unlock()
|
||||||
return fmt.Errorf("non-existent connection id - %d", conn_id)
|
return fmt.Errorf("non-existent connection id - %d", conn_id)
|
||||||
@ -860,9 +896,9 @@ func (c *Client) RemoveClientConnById(conn_id uint32) error {
|
|||||||
|
|
||||||
// NOTE: removal by id doesn't perform identity check
|
// NOTE: removal by id doesn't perform identity check
|
||||||
|
|
||||||
delete(c.cts_map, cts.cfg.ServerAddr)
|
delete(c.cts_map_by_addr, cts.cfg.ServerAddr)
|
||||||
delete(c.cts_map_by_id, cts.id)
|
delete(c.cts_map, cts.id)
|
||||||
fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.cfg.ServerAddr, len(c.cts_map))
|
fmt.Printf("REMOVEDDDDDD CONNECTION FROM %s total servers %d\n", cts.cfg.ServerAddr, len(c.cts_map_by_addr))
|
||||||
c.cts_mtx.Unlock()
|
c.cts_mtx.Unlock()
|
||||||
|
|
||||||
cts.ReqStop()
|
cts.ReqStop()
|
||||||
@ -876,7 +912,7 @@ func (c *Client) FindClientConnById(id uint32) *ClientConn {
|
|||||||
c.cts_mtx.Lock()
|
c.cts_mtx.Lock()
|
||||||
defer c.cts_mtx.Unlock()
|
defer c.cts_mtx.Unlock()
|
||||||
|
|
||||||
cts, ok = c.cts_map_by_id[id]
|
cts, ok = c.cts_map[id]
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user