diff --git a/client-ctl.go b/client-ctl.go index 9115f22..ad99fd0 100644 --- a/client-ctl.go +++ b/client-ctl.go @@ -1,11 +1,15 @@ package hodu +import "container/list" import "encoding/json" import "fmt" import "net/http" import "strings" +import "sync" import "time" +import "golang.org/x/net/websocket" + /* * POST GET PUT DELETE * /servers - create new server list all servers bulk update delete all servers @@ -56,7 +60,8 @@ type json_out_client_conn struct { ServerAddr string `json:"server-addr"` // actual server address ClientAddr string `json:"client-addr"` ClientToken string `json:"client-token"` - Routes []json_out_client_route `json:"routes"` + CreatedMilli int64 `json:"created-milli"` + Routes []json_out_client_route `json:"routes,omitempty"` } type json_out_client_route_id struct { @@ -78,6 +83,7 @@ type json_out_client_route struct { Lifetime string `json:"lifetime"` LifetimeStart int64 `json:"lifetime-start"` + CreatedMilli int64 `json:"created-milli"` } type json_out_client_peer struct { @@ -88,6 +94,7 @@ type json_out_client_peer struct { ClientLocalAddr string `json:"client-local-addr"` ServerPeerAddr string `json:"server-peer-addr"` ServerLocalAddr string `json:"server-local-addr"` + CreatedMilli int64 `json:"created-milli"` } type json_out_client_stats struct { @@ -101,6 +108,7 @@ type json_out_client_stats struct { type client_ctl struct { c *Client id string + noauth bool // override the auth configuration if true } type client_ctl_token struct { @@ -135,6 +143,18 @@ type client_ctl_client_conns_id_routes_id_peers_id struct { client_ctl } +type client_ctl_client_conns_id_peers struct { + client_ctl +} + +type client_ctl_client_routes struct { + client_ctl +} + +type client_ctl_client_peers struct { + client_ctl +} + type client_ctl_notices struct { client_ctl } @@ -147,6 +167,10 @@ type client_ctl_stats struct { client_ctl } +type client_ctl_ws struct { + client_ctl +} + // ------------------------------------ func (ctl *client_ctl) Id() string { @@ -228,13 +252,6 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R case http.MethodGet: var js []json_out_client_conn var ci ConnId -// var q url.Values - -// q = req.URL.Query() - -// TODO: brief listing vs full listing -// if q.Get("brief") == "true" { -// } js = make([]json_out_client_conn, 0) c.cts_mtx.Lock() @@ -264,6 +281,7 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R ServerPeerOption: r.ServerPeerOption.String(), Lifetime: DurationToSecString(lftdur), LifetimeStart: lftsta.Unix(), + CreatedMilli: r.Created.UnixMilli(), }) } cts.route_mtx.Unlock() @@ -275,6 +293,7 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R ServerAddr: cts.remote_addr.Get(), ClientAddr: cts.local_addr.Get(), ClientToken: cts.Token.Get(), + CreatedMilli: cts.Created.UnixMilli(), Routes: jsp, }) } @@ -380,6 +399,7 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt ServerPeerOption: r.ServerPeerOption.String(), Lifetime: DurationToSecString(lftdur), LifetimeStart: lftsta.Unix(), + CreatedMilli: r.Created.UnixMilli(), }) } cts.route_mtx.Unlock() @@ -391,6 +411,7 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt ServerAddr: cts.remote_addr.Get(), ClientAddr: cts.local_addr.Get(), ClientToken: cts.Token.Get(), + CreatedMilli: cts.Created.UnixMilli(), Routes: jsp, } @@ -459,6 +480,7 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r ServerPeerOption: r.ServerPeerOption.String(), Lifetime: DurationToSecString(lftdur), LifetimeStart: lftsta.Unix(), + CreatedMilli: r.Created.UnixMilli(), }) } cts.route_mtx.Unlock() @@ -577,6 +599,7 @@ func (ctl *client_ctl_client_conns_id_routes_id) ServeHTTP(w http.ResponseWriter ServerPeerOption: r.ServerPeerOption.String(), Lifetime: DurationToSecString(lftdur), LifetimeStart: lftsta.Unix(), + CreatedMilli: r.Created.UnixMilli(), }) if err != nil { goto oops } @@ -753,6 +776,7 @@ func (ctl *client_ctl_client_conns_id_routes_id_peers) ServeHTTP(w http.Response ClientLocalAddr: p.conn.LocalAddr().String(), ServerPeerAddr: p.pts_raddr, ServerLocalAddr: p.pts_laddr, + CreatedMilli: p.Created.UnixMilli(), }) } r.ptc_mtx.Unlock() @@ -812,6 +836,7 @@ func (ctl *client_ctl_client_conns_id_routes_id_peers_id) ServeHTTP(w http.Respo ClientLocalAddr: p.conn.LocalAddr().String(), ServerPeerAddr: p.pts_raddr, ServerLocalAddr: p.pts_laddr, + CreatedMilli: p.Created.UnixMilli(), } status_code = WriteJsonRespHeader(w, http.StatusOK) @@ -834,6 +859,167 @@ oops: // ------------------------------------ +func (ctl *client_ctl_client_conns_id_peers) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) { + var c *Client + var status_code int + var err error + var conn_id string + var je *json.Encoder + var cts *ClientConn + + c = ctl.c + je = json.NewEncoder(w) + + conn_id = req.PathValue("conn_id") + cts, err = c.FindClientConnByIdStr(conn_id) + if err != nil { + status_code = WriteJsonRespHeader(w, http.StatusNotFound) + je.Encode(JsonErrmsg{Text: err.Error()}) + goto oops + } + + switch req.Method { + case http.MethodGet: + var jsp []json_out_client_peer + var e *list.Element + + jsp = make([]json_out_client_peer, 0) + cts.ptc_mtx.Lock() + for e = cts.ptc_list.Front(); e != nil; e = e.Next() { + var ptc *ClientPeerConn + ptc = e.Value.(*ClientPeerConn) + jsp = append(jsp, json_out_client_peer{ + CId: ptc.route.cts.Id, + RId: ptc.route.Id, + PId: ptc.conn_id, + ClientPeerAddr: ptc.conn.RemoteAddr().String(), + ClientLocalAddr: ptc.conn.LocalAddr().String(), + ServerPeerAddr: ptc.pts_raddr, + ServerLocalAddr: ptc.pts_laddr, + CreatedMilli: ptc.Created.UnixMilli(), + }) + } + cts.ptc_mtx.Unlock() + + status_code = WriteJsonRespHeader(w, http.StatusOK) + if err = je.Encode(jsp); err != nil { goto oops } + + default: + status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed) + } + +//done: + return status_code, nil + +oops: + return status_code, err +} + +// ------------------------------------ + +func (ctl *client_ctl_client_routes) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) { + var c *Client + var status_code int + var je *json.Encoder + var err error + + c = ctl.c + je = json.NewEncoder(w) + + switch req.Method { + case http.MethodGet: + var js []json_out_client_route + var e *list.Element + + js = make([]json_out_client_route, 0) + c.route_mtx.Lock() + for e = c.route_list.Front(); e != nil; e = e.Next() { + var r *ClientRoute + var lftsta time.Time + var lftdur time.Duration + + r = e.Value.(*ClientRoute) + + lftsta, lftdur = r.GetLifetimeInfo() + js = append(js, json_out_client_route{ + CId: r.cts.Id, + RId: r.Id, + ClientPeerAddr: r.PeerAddr, + ClientPeerName: r.PeerName, + ServerPeerSvcAddr: r.ServerPeerSvcAddr, + ServerPeerSvcNet: r.ServerPeerSvcNet, + ServerPeerOption: r.ServerPeerOption.String(), + Lifetime: DurationToSecString(lftdur), + LifetimeStart: lftsta.Unix(), + CreatedMilli: r.Created.UnixMilli(), + }) + } + c.route_mtx.Unlock() + + status_code = WriteJsonRespHeader(w, http.StatusOK) + if err = je.Encode(js); err != nil { goto oops } + + default: + status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed) + } + +//done: + return status_code, nil + +oops: + return status_code, err +} + +// ------------------------------------ + +func (ctl *client_ctl_client_peers) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) { + var c *Client + var status_code int + var je *json.Encoder + var err error + + c = ctl.c + je = json.NewEncoder(w) + + switch req.Method { + case http.MethodGet: + var js []json_out_client_peer + var e *list.Element + + js = make([]json_out_client_peer, 0) + c.ptc_mtx.Lock() + for e = c.ptc_list.Front(); e != nil; e = e.Next() { + var ptc *ClientPeerConn + ptc = e.Value.(*ClientPeerConn) + js = append(js, json_out_client_peer{ + CId: ptc.route.cts.Id, + RId: ptc.route.Id, + PId: ptc.conn_id, + ClientPeerAddr: ptc.conn.RemoteAddr().String(), + ClientLocalAddr: ptc.conn.LocalAddr().String(), + ServerPeerAddr: ptc.pts_raddr, + ServerLocalAddr: ptc.pts_laddr, + CreatedMilli: ptc.Created.UnixMilli(), + }) + } + c.ptc_mtx.Unlock() + + status_code = WriteJsonRespHeader(w, http.StatusOK) + if err = je.Encode(js); err != nil { goto oops } + + default: + status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed) + } + +//done: + return status_code, nil + +oops: + return status_code, err +} + +// ------------------------------------ + func (ctl *client_ctl_notices) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) { var c *Client var status_code int @@ -955,3 +1141,98 @@ func (ctl *client_ctl_stats) ServeHTTP(w http.ResponseWriter, req *http.Request) oops: return status_code, err } + +// ------------------------------------ + +func (ctl *client_ctl_ws) ServeWebsocket(ws *websocket.Conn) (int, error) { + var c *Client + var wg sync.WaitGroup + var sbsc *ClientEventSubscription + var status_code int + var err error + var xerr error + + c = ctl.c + + // handle authentication using the first message. + // end this task if authentication fails. + if !ctl.noauth && c.ctl_auth != nil { + var req *http.Request + + req = ws.Request() + if req.Header.Get("Authorization") == "" { + var token string + token = req.FormValue("token") + if token != "" { + // websocket doesn't actual have extra headers except a few fixed + // ones. add "Authorization" header from the query paramerer and + // compose a fake header to reuse the same Authentication() function + req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", token)) + } + } + + status_code, _ = c.ctl_auth.Authenticate(req) + if status_code != http.StatusOK { + goto done + } + } + + sbsc, err = c.bulletin.Subscribe("") + if err != nil { goto done } + + wg.Add(1) + go func() { + var c chan *ClientEvent + var err error + + defer wg.Done() + c = sbsc.C + + for c != nil { + var e *ClientEvent + var ok bool + var msg[] byte + + e, ok = <- c + if ok { + msg, err = json.Marshal(e) + if err != nil { + xerr = fmt.Errorf("failed to marshal event - %+v - %s", e, err.Error()) + c = nil + } else { + err = websocket.Message.Send(ws, msg) + if err != nil { + xerr = fmt.Errorf("failed to send message - %s", err.Error()) + c = nil + } + } + } else { + // most likely sbcs.C is closed. if not readable, break the loop + c = nil + } + } + + ws.Close() // hack to break the recv loop. don't care about double closes + }() + +ws_recv_loop: + for { + var msg []byte + err = websocket.Message.Receive(ws, &msg) + if err != nil { break ws_recv_loop } + + if len(msg) > 0 { + // do nothing. discard received messages + } + } + + // Ubsubscribe() to break the internal event reception + // goroutine as well as for cleanup + c.bulletin.Unsubscribe(sbsc) + +done: + ws.Close() + wg.Wait() + if err == nil && xerr != nil { err = xerr } + return http.StatusOK, err +} diff --git a/client-peer.go b/client-peer.go index 28f5ab3..40f337c 100644 --- a/client-peer.go +++ b/client-peer.go @@ -54,6 +54,17 @@ func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error { cpc.route.cts.psc.Send(MakePeerStoppedPacket(cpc.route.Id, cpc.conn_id, cpc.conn.RemoteAddr().String(), cpc.conn.LocalAddr().String())) // nothing much to do upon failure. no error check here cpc.ReqStop() cpc.route.RemoveClientPeerConn(cpc) + + cpc.route.cts.C.ptc_mtx.Lock() + cpc.route.cts.C.ptc_list.Remove(cpc.node_in_client) + cpc.node_in_client = nil + cpc.route.cts.C.ptc_mtx.Unlock() + + cpc.route.cts.ptc_mtx.Lock() + cpc.route.cts.ptc_list.Remove(cpc.node_in_conn) + cpc.node_in_conn = nil + cpc.route.cts.ptc_mtx.Unlock() + return nil } diff --git a/client.go b/client.go index 5cc7653..ca12af2 100644 --- a/client.go +++ b/client.go @@ -1,5 +1,6 @@ package hodu +import "container/list" import "context" import "crypto/tls" import "errors" @@ -14,6 +15,7 @@ import "sync/atomic" import "time" import "unsafe" +import "golang.org/x/net/websocket" import "google.golang.org/grpc" import "google.golang.org/grpc/codes" import "google.golang.org/grpc/credentials" @@ -56,6 +58,10 @@ type ClientConnConfigActive struct { ClientConnConfig } +type ClientConnNoticeHandler interface { + Handle(cts* ClientConn, text string) +} + type ClientConfig struct { CtlAddrs []string CtlTls *tls.Config @@ -71,16 +77,32 @@ type ClientConfig struct { Token string // to send to the server for identification } -type ClientConnNoticeHandler interface { - Handle(cts *ClientConn, text string) +type ClientEventKind int +const ( + CLIENT_EVENT_CONN_ADDED = iota + CLIENT_EVENT_CONN_UPDATED + CLIENT_EVENT_CONN_DELETED +) + +type ClientEvent struct { + Kind ClientEventKind `json:"type"` + Data interface{} `json:"data"` } +type ClientEventBulletin = Bulletin[*ClientEvent] +type ClientEventSubscription = BulletinSubscription[*ClientEvent] + type Client struct { Named Ctx context.Context CtxCancel context.CancelFunc + wg sync.WaitGroup + stop_req atomic.Bool + stop_chan chan bool + token string + ext_mtx sync.Mutex ext_svcs []Service @@ -94,21 +116,24 @@ type Client struct { ctl []*http.Server // control server ptc_tmout time.Duration // timeout seconds to connect to peer - ptc_limit int // global maximum number of peers cts_limit int cts_mtx sync.Mutex cts_next_id ConnId cts_map ClientConnMap - wg sync.WaitGroup - stop_req atomic.Bool - stop_chan chan bool - token string + ptc_limit int // global maximum number of peers + ptc_mtx sync.Mutex + ptc_list *list.List + + route_mtx sync.Mutex + route_list *list.List log Logger conn_notice_handlers []ClientConnNoticeHandler route_persister ClientRoutePersister + bulletin *ClientEventBulletin + promreg *prometheus.Registry stats struct { conns atomic.Int64 @@ -131,6 +156,7 @@ type ClientConn struct { cfg ClientConnConfigActive Id ConnId Sid string // id rendered in string + Created time.Time State atomic.Int32 // ClientConnState Token Atom[string] @@ -151,6 +177,9 @@ type ClientConn struct { route_map ClientRouteMap route_wg sync.WaitGroup + ptc_mtx sync.Mutex + ptc_list *list.List + stop_req atomic.Bool stop_chan chan bool @@ -160,8 +189,11 @@ type ClientConn struct { type ClientRoute struct { cts *ClientConn Id RouteId + Created time.Time Static bool + node_in_client *list.Element + PeerAddr string PeerName string PeerOption RouteOption @@ -191,6 +223,10 @@ type ClientPeerConn struct { route *ClientRoute conn_id PeerId conn *net.TCPConn + Created time.Time + + node_in_client *list.Element + node_in_conn *list.Element pts_laddr string // server-local addreess of the server-side peer pts_raddr string // address of the server-side peer @@ -235,6 +271,7 @@ func NewClientRoute(cts *ClientConn, id RouteId, static bool, client_peer_addr s r.cts = cts r.Id = id + r.Created = time.Now() r.Static = static r.ptc_map = make(ClientPeerConnMap) r.ptc_cancel_map = make(ClientPeerCancelFuncMap) @@ -263,6 +300,14 @@ func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id PeerId, pts_ra r.cts.C.stats.peers.Add(1) r.ptc_mtx.Unlock() + r.cts.C.ptc_mtx.Lock() + ptc.node_in_client = r.cts.C.ptc_list.PushBack(ptc) + r.cts.C.ptc_mtx.Unlock() + + r.cts.ptc_mtx.Lock() + ptc.node_in_conn = r.cts.ptc_list.PushBack(ptc) + r.cts.ptc_mtx.Unlock() + r.cts.C.log.Write(r.cts.Sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String()) return ptc, nil } @@ -422,6 +467,13 @@ done: } r.cts.RemoveClientRoute(r) + + + r.cts.C.route_mtx.Lock() + r.cts.C.route_list.Remove(r.node_in_client) + r.node_in_client = nil + r.cts.C.route_mtx.Unlock() +// TODO: fire ROUTE_DELETED event } func (r *ClientRoute) ReqStop() { @@ -723,11 +775,13 @@ func NewClientConn(c *Client, cfg *ClientConnConfig) *ClientConn { var i int cts.C = c + cts.Created = time.Now() cts.route_map = make(ClientRouteMap) cts.route_next_id = 1 cts.cfg.ClientConnConfig = *cfg cts.stop_req.Store(false) cts.stop_chan = make(chan bool, 8) + cts.ptc_list = list.New() for i, _ = range cts.cfg.Routes { // override it to static regardless of the value passed in @@ -783,6 +837,11 @@ func (cts *ClientConn) AddNewClientRoute(rc *ClientRouteConfig) (*ClientRoute, e if cts.C.route_persister != nil { cts.C.route_persister.Save(cts, r) } cts.route_mtx.Unlock() + cts.C.route_mtx.Lock() + r.node_in_client = cts.C.route_list.PushBack(r) + cts.C.route_mtx.Unlock() +// TODO: fire ROUTE_ADDED event + cts.C.log.Write(cts.Sid, LOG_INFO, "Added route(%d,%s)", r.Id, r.PeerAddr) cts.route_wg.Add(1) @@ -1385,6 +1444,12 @@ type ClientHttpHandler interface { ServeHTTP (w http.ResponseWriter, req *http.Request) (int, error) } +type ClientWebsocketHandler interface { + Id() string + ServeWebsocket(ws *websocket.Conn) (int, error) +} + + func (c *Client) wrap_http_handler(handler ClientHttpHandler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { var status_code int @@ -1439,14 +1504,43 @@ func (c *Client) wrap_http_handler(handler ClientHttpHandler) http.Handler { }) } +func (s *Client) WrapWebsocketHandler(handler ClientWebsocketHandler) websocket.Handler { + return websocket.Handler(func(ws *websocket.Conn) { + var status_code int + var err error + var start_time time.Time + var time_taken time.Duration + var req *http.Request + + req = ws.Request() + s.log.Write(handler.Id(), LOG_INFO, "[%s] %s %s [ws]", req.RemoteAddr, req.Method, req.URL.String()) + + start_time = time.Now() + status_code, err = handler.ServeWebsocket(ws) + time_taken = time.Now().Sub(start_time) + + if status_code > 0 { + if err != nil { + s.log.Write(handler.Id(), LOG_INFO, "[%s] %s %s [ws] %d %.9f - %s", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds(), err.Error()) + } else { + s.log.Write(handler.Id(), LOG_INFO, "[%s] %s %s [ws] %d %.9f", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds()) + } + } + }) +} + func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfig) *Client { var c Client var i int var hs_log *log.Logger c.name = name + c.log = logger + c.token = cfg.Token c.Ctx, c.CtxCancel = context.WithCancel(ctx) c.ext_svcs = make([]Service, 0, 1) + c.route_list = list.New() + c.ptc_list = list.New() c.ptc_tmout = cfg.PeerConnTmout c.ptc_limit = cfg.PeerConnMax c.cts_limit = cfg.RpcConnMax @@ -1454,8 +1548,7 @@ func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfi c.cts_map = make(ClientConnMap) c.stop_req.Store(false) c.stop_chan = make(chan bool, 8) - c.log = logger - c.token = cfg.Token + c.bulletin = NewBulletin[*ClientEvent](&c, 1024) c.rpc_tls = cfg.RpcTls c.ctl_auth = cfg.CtlAuth @@ -1477,6 +1570,15 @@ func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfi c.wrap_http_handler(&client_ctl_client_conns_id_routes_id_peers{client_ctl{c: &c, id: HS_ID_CTL}})) c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/client-conns/{conn_id}/routes/{route_id}/peers/{peer_id}", c.wrap_http_handler(&client_ctl_client_conns_id_routes_id_peers_id{client_ctl{c: &c, id: HS_ID_CTL}})) + + c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/client-conns/{conn_id}/peers", + c.wrap_http_handler(&client_ctl_client_conns_id_peers{client_ctl{c: &c, id: HS_ID_CTL}})) + + c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/client-routes", + c.wrap_http_handler(&client_ctl_client_routes{client_ctl{c: &c, id: HS_ID_CTL}})) + c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/client-peers", + c.wrap_http_handler(&client_ctl_client_peers{client_ctl{c: &c, id: HS_ID_CTL}})) + c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/notices", c.wrap_http_handler(&client_ctl_notices{client_ctl{c: &c, id: HS_ID_CTL}})) c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/notices/{conn_id}", @@ -1493,6 +1595,8 @@ func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfi c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/metrics", promhttp.HandlerFor(c.promreg, promhttp.HandlerOpts{ EnableOpenMetrics: true })) + c.ctl_mux.Handle("/_ctl/events", + c.WrapWebsocketHandler(&client_ctl_ws{client_ctl{c: &c, id: HS_ID_CTL}})) c.ctl_addr = make([]string, len(cfg.CtlAddrs)) c.ctl = make([]*http.Server, len(cfg.CtlAddrs)) @@ -1786,6 +1890,8 @@ func (c *Client) ReqStop() { var cts *ClientConn var ctl *http.Server + c.bulletin.Block() + c.CtxCancel() for _, ctl = range c.ctl { ctl.Shutdown(c.Ctx) // to break c.ctl.ListenAndServe() @@ -1887,6 +1993,9 @@ func (c *Client) StartService(data interface{}) { var cfg *ClientConnConfig var ok bool + c.wg.Add(1) + go c.bulletin.RunTask(&c.wg) + cfg, ok = data.(*ClientConnConfig) if !ok { c.log.Write("", LOG_ERROR, "Failed to start service - invalid configuration - %v", data) @@ -1916,6 +2025,7 @@ func (c *Client) StartExtService(svc Service, data interface{}) { func (c *Client) StopServices() { var ext_svc Service c.ReqStop() + c.bulletin.ReqStop() for _, ext_svc = range c.ext_svcs { ext_svc.StopServices() } diff --git a/server-ctl.go b/server-ctl.go index fd465ab..fd7aa3e 100644 --- a/server-ctl.go +++ b/server-ctl.go @@ -920,7 +920,7 @@ func (ctl *server_ctl_ws) ServeWebsocket(ws *websocket.Conn) (int, error) { // handle authentication using the first message. // end this task if authentication fails. - if !ctl.noauth && ctl.s.Cfg.CtlAuth != nil { + if !ctl.noauth && s.Cfg.CtlAuth != nil { var req *http.Request req = ws.Request() @@ -935,7 +935,7 @@ func (ctl *server_ctl_ws) ServeWebsocket(ws *websocket.Conn) (int, error) { } } - status_code, _ = ctl.s.Cfg.CtlAuth.Authenticate(req) + status_code, _ = s.Cfg.CtlAuth.Authenticate(req) if status_code != http.StatusOK { goto done } diff --git a/server.go b/server.go index 52f41b6..1e56938 100644 --- a/server.go +++ b/server.go @@ -1070,6 +1070,9 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { } if s.stop_req.Load() { + // this check should still suffer race condition + // becuase this function itself runs as a goroutine and is fired + // from the grpc server code without synchronizing with hodu. return fmt.Errorf("new conneciton prohibited after stop - %s", p.Addr.String()) } @@ -1382,9 +1385,9 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi s.Cfg = cfg s.ext_svcs = make([]Service, 0, 1) - s.pts_limit = cfg.MaxPeers - s.pts_list = list.New() s.route_list = list.New() + s.pts_list = list.New() + s.pts_limit = cfg.MaxPeers s.cts_limit = cfg.RpcMaxConns s.cts_next_id = 1 s.cts_map = make(ServerConnMap)