added more client-side endpoints
This commit is contained in:
128
client.go
128
client.go
@ -1,5 +1,6 @@
|
||||
package hodu
|
||||
|
||||
import "container/list"
|
||||
import "context"
|
||||
import "crypto/tls"
|
||||
import "errors"
|
||||
@ -14,6 +15,7 @@ import "sync/atomic"
|
||||
import "time"
|
||||
import "unsafe"
|
||||
|
||||
import "golang.org/x/net/websocket"
|
||||
import "google.golang.org/grpc"
|
||||
import "google.golang.org/grpc/codes"
|
||||
import "google.golang.org/grpc/credentials"
|
||||
@ -56,6 +58,10 @@ type ClientConnConfigActive struct {
|
||||
ClientConnConfig
|
||||
}
|
||||
|
||||
type ClientConnNoticeHandler interface {
|
||||
Handle(cts* ClientConn, text string)
|
||||
}
|
||||
|
||||
type ClientConfig struct {
|
||||
CtlAddrs []string
|
||||
CtlTls *tls.Config
|
||||
@ -71,16 +77,32 @@ type ClientConfig struct {
|
||||
Token string // to send to the server for identification
|
||||
}
|
||||
|
||||
type ClientConnNoticeHandler interface {
|
||||
Handle(cts *ClientConn, text string)
|
||||
type ClientEventKind int
|
||||
const (
|
||||
CLIENT_EVENT_CONN_ADDED = iota
|
||||
CLIENT_EVENT_CONN_UPDATED
|
||||
CLIENT_EVENT_CONN_DELETED
|
||||
)
|
||||
|
||||
type ClientEvent struct {
|
||||
Kind ClientEventKind `json:"type"`
|
||||
Data interface{} `json:"data"`
|
||||
}
|
||||
|
||||
type ClientEventBulletin = Bulletin[*ClientEvent]
|
||||
type ClientEventSubscription = BulletinSubscription[*ClientEvent]
|
||||
|
||||
type Client struct {
|
||||
Named
|
||||
|
||||
Ctx context.Context
|
||||
CtxCancel context.CancelFunc
|
||||
|
||||
wg sync.WaitGroup
|
||||
stop_req atomic.Bool
|
||||
stop_chan chan bool
|
||||
token string
|
||||
|
||||
ext_mtx sync.Mutex
|
||||
ext_svcs []Service
|
||||
|
||||
@ -94,21 +116,24 @@ type Client struct {
|
||||
ctl []*http.Server // control server
|
||||
|
||||
ptc_tmout time.Duration // timeout seconds to connect to peer
|
||||
ptc_limit int // global maximum number of peers
|
||||
cts_limit int
|
||||
cts_mtx sync.Mutex
|
||||
cts_next_id ConnId
|
||||
cts_map ClientConnMap
|
||||
|
||||
wg sync.WaitGroup
|
||||
stop_req atomic.Bool
|
||||
stop_chan chan bool
|
||||
token string
|
||||
ptc_limit int // global maximum number of peers
|
||||
ptc_mtx sync.Mutex
|
||||
ptc_list *list.List
|
||||
|
||||
route_mtx sync.Mutex
|
||||
route_list *list.List
|
||||
|
||||
log Logger
|
||||
conn_notice_handlers []ClientConnNoticeHandler
|
||||
route_persister ClientRoutePersister
|
||||
|
||||
bulletin *ClientEventBulletin
|
||||
|
||||
promreg *prometheus.Registry
|
||||
stats struct {
|
||||
conns atomic.Int64
|
||||
@ -131,6 +156,7 @@ type ClientConn struct {
|
||||
cfg ClientConnConfigActive
|
||||
Id ConnId
|
||||
Sid string // id rendered in string
|
||||
Created time.Time
|
||||
State atomic.Int32 // ClientConnState
|
||||
Token Atom[string]
|
||||
|
||||
@ -151,6 +177,9 @@ type ClientConn struct {
|
||||
route_map ClientRouteMap
|
||||
route_wg sync.WaitGroup
|
||||
|
||||
ptc_mtx sync.Mutex
|
||||
ptc_list *list.List
|
||||
|
||||
stop_req atomic.Bool
|
||||
stop_chan chan bool
|
||||
|
||||
@ -160,8 +189,11 @@ type ClientConn struct {
|
||||
type ClientRoute struct {
|
||||
cts *ClientConn
|
||||
Id RouteId
|
||||
Created time.Time
|
||||
Static bool
|
||||
|
||||
node_in_client *list.Element
|
||||
|
||||
PeerAddr string
|
||||
PeerName string
|
||||
PeerOption RouteOption
|
||||
@ -191,6 +223,10 @@ type ClientPeerConn struct {
|
||||
route *ClientRoute
|
||||
conn_id PeerId
|
||||
conn *net.TCPConn
|
||||
Created time.Time
|
||||
|
||||
node_in_client *list.Element
|
||||
node_in_conn *list.Element
|
||||
|
||||
pts_laddr string // server-local addreess of the server-side peer
|
||||
pts_raddr string // address of the server-side peer
|
||||
@ -235,6 +271,7 @@ func NewClientRoute(cts *ClientConn, id RouteId, static bool, client_peer_addr s
|
||||
|
||||
r.cts = cts
|
||||
r.Id = id
|
||||
r.Created = time.Now()
|
||||
r.Static = static
|
||||
r.ptc_map = make(ClientPeerConnMap)
|
||||
r.ptc_cancel_map = make(ClientPeerCancelFuncMap)
|
||||
@ -263,6 +300,14 @@ func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id PeerId, pts_ra
|
||||
r.cts.C.stats.peers.Add(1)
|
||||
r.ptc_mtx.Unlock()
|
||||
|
||||
r.cts.C.ptc_mtx.Lock()
|
||||
ptc.node_in_client = r.cts.C.ptc_list.PushBack(ptc)
|
||||
r.cts.C.ptc_mtx.Unlock()
|
||||
|
||||
r.cts.ptc_mtx.Lock()
|
||||
ptc.node_in_conn = r.cts.ptc_list.PushBack(ptc)
|
||||
r.cts.ptc_mtx.Unlock()
|
||||
|
||||
r.cts.C.log.Write(r.cts.Sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
|
||||
return ptc, nil
|
||||
}
|
||||
@ -422,6 +467,13 @@ done:
|
||||
}
|
||||
|
||||
r.cts.RemoveClientRoute(r)
|
||||
|
||||
|
||||
r.cts.C.route_mtx.Lock()
|
||||
r.cts.C.route_list.Remove(r.node_in_client)
|
||||
r.node_in_client = nil
|
||||
r.cts.C.route_mtx.Unlock()
|
||||
// TODO: fire ROUTE_DELETED event
|
||||
}
|
||||
|
||||
func (r *ClientRoute) ReqStop() {
|
||||
@ -723,11 +775,13 @@ func NewClientConn(c *Client, cfg *ClientConnConfig) *ClientConn {
|
||||
var i int
|
||||
|
||||
cts.C = c
|
||||
cts.Created = time.Now()
|
||||
cts.route_map = make(ClientRouteMap)
|
||||
cts.route_next_id = 1
|
||||
cts.cfg.ClientConnConfig = *cfg
|
||||
cts.stop_req.Store(false)
|
||||
cts.stop_chan = make(chan bool, 8)
|
||||
cts.ptc_list = list.New()
|
||||
|
||||
for i, _ = range cts.cfg.Routes {
|
||||
// override it to static regardless of the value passed in
|
||||
@ -783,6 +837,11 @@ func (cts *ClientConn) AddNewClientRoute(rc *ClientRouteConfig) (*ClientRoute, e
|
||||
if cts.C.route_persister != nil { cts.C.route_persister.Save(cts, r) }
|
||||
cts.route_mtx.Unlock()
|
||||
|
||||
cts.C.route_mtx.Lock()
|
||||
r.node_in_client = cts.C.route_list.PushBack(r)
|
||||
cts.C.route_mtx.Unlock()
|
||||
// TODO: fire ROUTE_ADDED event
|
||||
|
||||
cts.C.log.Write(cts.Sid, LOG_INFO, "Added route(%d,%s)", r.Id, r.PeerAddr)
|
||||
|
||||
cts.route_wg.Add(1)
|
||||
@ -1385,6 +1444,12 @@ type ClientHttpHandler interface {
|
||||
ServeHTTP (w http.ResponseWriter, req *http.Request) (int, error)
|
||||
}
|
||||
|
||||
type ClientWebsocketHandler interface {
|
||||
Id() string
|
||||
ServeWebsocket(ws *websocket.Conn) (int, error)
|
||||
}
|
||||
|
||||
|
||||
func (c *Client) wrap_http_handler(handler ClientHttpHandler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
var status_code int
|
||||
@ -1439,14 +1504,43 @@ func (c *Client) wrap_http_handler(handler ClientHttpHandler) http.Handler {
|
||||
})
|
||||
}
|
||||
|
||||
func (s *Client) WrapWebsocketHandler(handler ClientWebsocketHandler) websocket.Handler {
|
||||
return websocket.Handler(func(ws *websocket.Conn) {
|
||||
var status_code int
|
||||
var err error
|
||||
var start_time time.Time
|
||||
var time_taken time.Duration
|
||||
var req *http.Request
|
||||
|
||||
req = ws.Request()
|
||||
s.log.Write(handler.Id(), LOG_INFO, "[%s] %s %s [ws]", req.RemoteAddr, req.Method, req.URL.String())
|
||||
|
||||
start_time = time.Now()
|
||||
status_code, err = handler.ServeWebsocket(ws)
|
||||
time_taken = time.Now().Sub(start_time)
|
||||
|
||||
if status_code > 0 {
|
||||
if err != nil {
|
||||
s.log.Write(handler.Id(), LOG_INFO, "[%s] %s %s [ws] %d %.9f - %s", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds(), err.Error())
|
||||
} else {
|
||||
s.log.Write(handler.Id(), LOG_INFO, "[%s] %s %s [ws] %d %.9f", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds())
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfig) *Client {
|
||||
var c Client
|
||||
var i int
|
||||
var hs_log *log.Logger
|
||||
|
||||
c.name = name
|
||||
c.log = logger
|
||||
c.token = cfg.Token
|
||||
c.Ctx, c.CtxCancel = context.WithCancel(ctx)
|
||||
c.ext_svcs = make([]Service, 0, 1)
|
||||
c.route_list = list.New()
|
||||
c.ptc_list = list.New()
|
||||
c.ptc_tmout = cfg.PeerConnTmout
|
||||
c.ptc_limit = cfg.PeerConnMax
|
||||
c.cts_limit = cfg.RpcConnMax
|
||||
@ -1454,8 +1548,7 @@ func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfi
|
||||
c.cts_map = make(ClientConnMap)
|
||||
c.stop_req.Store(false)
|
||||
c.stop_chan = make(chan bool, 8)
|
||||
c.log = logger
|
||||
c.token = cfg.Token
|
||||
c.bulletin = NewBulletin[*ClientEvent](&c, 1024)
|
||||
|
||||
c.rpc_tls = cfg.RpcTls
|
||||
c.ctl_auth = cfg.CtlAuth
|
||||
@ -1477,6 +1570,15 @@ func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfi
|
||||
c.wrap_http_handler(&client_ctl_client_conns_id_routes_id_peers{client_ctl{c: &c, id: HS_ID_CTL}}))
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/client-conns/{conn_id}/routes/{route_id}/peers/{peer_id}",
|
||||
c.wrap_http_handler(&client_ctl_client_conns_id_routes_id_peers_id{client_ctl{c: &c, id: HS_ID_CTL}}))
|
||||
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/client-conns/{conn_id}/peers",
|
||||
c.wrap_http_handler(&client_ctl_client_conns_id_peers{client_ctl{c: &c, id: HS_ID_CTL}}))
|
||||
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/client-routes",
|
||||
c.wrap_http_handler(&client_ctl_client_routes{client_ctl{c: &c, id: HS_ID_CTL}}))
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/client-peers",
|
||||
c.wrap_http_handler(&client_ctl_client_peers{client_ctl{c: &c, id: HS_ID_CTL}}))
|
||||
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/notices",
|
||||
c.wrap_http_handler(&client_ctl_notices{client_ctl{c: &c, id: HS_ID_CTL}}))
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/notices/{conn_id}",
|
||||
@ -1493,6 +1595,8 @@ func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfi
|
||||
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/metrics",
|
||||
promhttp.HandlerFor(c.promreg, promhttp.HandlerOpts{ EnableOpenMetrics: true }))
|
||||
|
||||
c.ctl_mux.Handle("/_ctl/events",
|
||||
c.WrapWebsocketHandler(&client_ctl_ws{client_ctl{c: &c, id: HS_ID_CTL}}))
|
||||
|
||||
c.ctl_addr = make([]string, len(cfg.CtlAddrs))
|
||||
c.ctl = make([]*http.Server, len(cfg.CtlAddrs))
|
||||
@ -1786,6 +1890,8 @@ func (c *Client) ReqStop() {
|
||||
var cts *ClientConn
|
||||
var ctl *http.Server
|
||||
|
||||
c.bulletin.Block()
|
||||
|
||||
c.CtxCancel()
|
||||
for _, ctl = range c.ctl {
|
||||
ctl.Shutdown(c.Ctx) // to break c.ctl.ListenAndServe()
|
||||
@ -1887,6 +1993,9 @@ func (c *Client) StartService(data interface{}) {
|
||||
var cfg *ClientConnConfig
|
||||
var ok bool
|
||||
|
||||
c.wg.Add(1)
|
||||
go c.bulletin.RunTask(&c.wg)
|
||||
|
||||
cfg, ok = data.(*ClientConnConfig)
|
||||
if !ok {
|
||||
c.log.Write("", LOG_ERROR, "Failed to start service - invalid configuration - %v", data)
|
||||
@ -1916,6 +2025,7 @@ func (c *Client) StartExtService(svc Service, data interface{}) {
|
||||
func (c *Client) StopServices() {
|
||||
var ext_svc Service
|
||||
c.ReqStop()
|
||||
c.bulletin.ReqStop()
|
||||
for _, ext_svc = range c.ext_svcs {
|
||||
ext_svc.StopServices()
|
||||
}
|
||||
|
Reference in New Issue
Block a user