From 76cba687edf70f7e9d2ca5943830f2676d75985d Mon Sep 17 00:00:00 2001 From: hyung-hwan Date: Fri, 28 Mar 2025 17:03:17 +0900 Subject: [PATCH] created event firing functions under server and client to replace directy bulleting enqueing --- client-ctl.go | 49 ++++++----- client-peer.go | 2 + client.go | 132 ++++++++++++++++++++++++++--- server-ctl.go | 15 ++++ server-peer.go | 27 +----- server.go | 220 ++++++++++++++++++++++--------------------------- 6 files changed, 270 insertions(+), 175 deletions(-) diff --git a/client-ctl.go b/client-ctl.go index ad99fd0..1fbd60d 100644 --- a/client-ctl.go +++ b/client-ctl.go @@ -49,10 +49,6 @@ type json_in_client_route_update struct { Lifetime string `json:"lifetime"` } -type json_out_client_conn_id struct { - CId ConnId `json:"conn-id"` -} - type json_out_client_conn struct { CId ConnId `json:"conn-id"` ReqServerAddrs []string `json:"req-server-addrs"` // server addresses requested. may include a domain name @@ -64,11 +60,6 @@ type json_out_client_conn struct { Routes []json_out_client_route `json:"routes,omitempty"` } -type json_out_client_route_id struct { - CId ConnId `json:"conn-id"` - RId RouteId `json:"route-id"` -} - type json_out_client_route struct { CId ConnId `json:"conn-id"` RId RouteId `json:"route-id"` @@ -97,6 +88,22 @@ type json_out_client_peer struct { CreatedMilli int64 `json:"created-milli"` } +type json_out_client_conn_id struct { + CId ConnId `json:"conn-id"` +} + +type json_out_client_route_id struct { + CId ConnId `json:"conn-id"` + RId RouteId `json:"route-id"` +} + +type json_out_client_peer_id struct { + CId ConnId `json:"conn-id"` + RId RouteId `json:"route-id"` + PId PeerId `json:"peer-id"` +} + + type json_out_client_stats struct { json_out_go_stats ClientConns int64 `json:"client-conns"` @@ -276,8 +283,8 @@ func (ctl *client_ctl_client_conns) ServeHTTP(w http.ResponseWriter, req *http.R RId: r.Id, ClientPeerAddr: r.PeerAddr, ClientPeerName: r.PeerName, - ServerPeerSvcAddr: r.ServerPeerSvcAddr, - ServerPeerSvcNet: r.ServerPeerSvcNet, + ServerPeerSvcAddr: r.ServerPeerSvcAddr.Get(), + ServerPeerSvcNet: r.ServerPeerSvcNet.Get(), ServerPeerOption: r.ServerPeerOption.String(), Lifetime: DurationToSecString(lftdur), LifetimeStart: lftsta.Unix(), @@ -394,8 +401,8 @@ func (ctl *client_ctl_client_conns_id) ServeHTTP(w http.ResponseWriter, req *htt RId: r.Id, ClientPeerAddr: r.PeerAddr, ClientPeerName: r.PeerName, - ServerPeerSvcAddr: r.ServerPeerSvcAddr, - ServerPeerSvcNet: r.ServerPeerSvcNet, + ServerPeerSvcAddr: r.ServerPeerSvcAddr.Get(), + ServerPeerSvcNet: r.ServerPeerSvcNet.Get(), ServerPeerOption: r.ServerPeerOption.String(), Lifetime: DurationToSecString(lftdur), LifetimeStart: lftsta.Unix(), @@ -475,8 +482,8 @@ func (ctl *client_ctl_client_conns_id_routes) ServeHTTP(w http.ResponseWriter, r RId: r.Id, ClientPeerAddr: r.PeerAddr, ClientPeerName: r.PeerName, - ServerPeerSvcAddr: r.ServerPeerSvcAddr, - ServerPeerSvcNet: r.ServerPeerSvcNet, + ServerPeerSvcAddr: r.ServerPeerSvcAddr.Get(), + ServerPeerSvcNet: r.ServerPeerSvcNet.Get(), ServerPeerOption: r.ServerPeerOption.String(), Lifetime: DurationToSecString(lftdur), LifetimeStart: lftsta.Unix(), @@ -594,8 +601,8 @@ func (ctl *client_ctl_client_conns_id_routes_id) ServeHTTP(w http.ResponseWriter RId: r.Id, ClientPeerAddr: r.PeerAddr, ClientPeerName: r.PeerName, - ServerPeerSvcAddr: r.ServerPeerSvcAddr, - ServerPeerSvcNet: r.ServerPeerSvcNet, + ServerPeerSvcAddr: r.ServerPeerSvcAddr.Get(), + ServerPeerSvcNet: r.ServerPeerSvcNet.Get(), ServerPeerOption: r.ServerPeerOption.String(), Lifetime: DurationToSecString(lftdur), LifetimeStart: lftsta.Unix(), @@ -683,8 +690,8 @@ func (ctl *client_ctl_client_conns_id_routes_spsp) ServeHTTP(w http.ResponseWrit RId: r.Id, ClientPeerAddr: r.PeerAddr, ClientPeerName: r.PeerName, - ServerPeerSvcAddr: r.ServerPeerSvcAddr, - ServerPeerSvcNet: r.ServerPeerSvcNet, + ServerPeerSvcAddr: r.ServerPeerSvcAddr.Get(), + ServerPeerSvcNet: r.ServerPeerSvcNet.Get(), ServerPeerOption: r.ServerPeerOption.String(), Lifetime: DurationToSecString(lftdur), LifetimeStart: lftsta.Unix(), @@ -946,8 +953,8 @@ func (ctl *client_ctl_client_routes) ServeHTTP(w http.ResponseWriter, req *http. RId: r.Id, ClientPeerAddr: r.PeerAddr, ClientPeerName: r.PeerName, - ServerPeerSvcAddr: r.ServerPeerSvcAddr, - ServerPeerSvcNet: r.ServerPeerSvcNet, + ServerPeerSvcAddr: r.ServerPeerSvcAddr.Get(), + ServerPeerSvcNet: r.ServerPeerSvcNet.Get(), ServerPeerOption: r.ServerPeerOption.String(), Lifetime: DurationToSecString(lftdur), LifetimeStart: lftsta.Unix(), diff --git a/client-peer.go b/client-peer.go index 40f337c..f436ed7 100644 --- a/client-peer.go +++ b/client-peer.go @@ -26,6 +26,7 @@ func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error { var n int defer wg.Done() + cpc.route.cts.C.FirePeerEvent(CLIENT_EVENT_PEER_STARTED, cpc) for { n, err = cpc.conn.Read(buf[:]) @@ -65,6 +66,7 @@ func (cpc *ClientPeerConn) RunTask(wg *sync.WaitGroup) error { cpc.node_in_conn = nil cpc.route.cts.ptc_mtx.Unlock() + cpc.route.cts.C.FirePeerEvent(CLIENT_EVENT_PEER_STOPPED, cpc) return nil } diff --git a/client.go b/client.go index ca12af2..ca434cb 100644 --- a/client.go +++ b/client.go @@ -79,9 +79,15 @@ type ClientConfig struct { type ClientEventKind int const ( - CLIENT_EVENT_CONN_ADDED = iota + CLIENT_EVENT_CONN_STARTED = iota CLIENT_EVENT_CONN_UPDATED - CLIENT_EVENT_CONN_DELETED + CLIENT_EVENT_CONN_STOPPED + CLIENT_EVENT_ROUTE_STARTED + CLIENT_EVENT_ROUTE_UPDATED + CLIENT_EVENT_ROUTE_STOPPED + CLIENT_EVENT_PEER_STARTED + CLIENT_EVENT_PEER_UPDATED + CLIENT_EVENT_PEER_STOPPED ) type ClientEvent struct { @@ -201,8 +207,8 @@ type ClientRoute struct { ReqServerPeerSvcAddr string // requested server-side service address ReqServerPeerSvcNet string // requested server-side service address ServerPeerListenAddr *net.TCPAddr // actual service-side service address - ServerPeerSvcAddr string // actual server-side service address - ServerPeerSvcNet string // actual server-side network + ServerPeerSvcAddr Atom[string] // actual server-side service address + ServerPeerSvcNet Atom[string] // actual server-side network ServerPeerOption RouteOption ptc_mtx sync.Mutex @@ -405,6 +411,8 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) { // it merely implements some timeout if set. defer wg.Done() + r.cts.C.FireRouteEvent(CLIENT_EVENT_ROUTE_STARTED, r) + err = r.cts.psc.Send(MakeRouteStartPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet)) if err != nil { r.cts.C.log.Write(r.cts.Sid, LOG_DEBUG, @@ -468,12 +476,12 @@ done: r.cts.RemoveClientRoute(r) - r.cts.C.route_mtx.Lock() r.cts.C.route_list.Remove(r.node_in_client) r.node_in_client = nil r.cts.C.route_mtx.Unlock() -// TODO: fire ROUTE_DELETED event + + r.cts.C.FireRouteEvent(CLIENT_EVENT_ROUTE_STOPPED, r) } func (r *ClientRoute) ReqStop() { @@ -614,8 +622,10 @@ func (r *ClientRoute) ReportPacket(pts_id PeerId, packet_type PACKET_KIND, event } else { // received the server-side addresses r.ServerPeerListenAddr = addr - r.ServerPeerSvcAddr = rd.TargetAddrStr - r.ServerPeerSvcNet = rd.ServiceNetStr + r.ServerPeerSvcAddr.Set(rd.TargetAddrStr) + r.ServerPeerSvcNet.Set(rd.ServiceNetStr) + + r.cts.C.FireRouteEvent(CLIENT_EVENT_ROUTE_UPDATED, r) r.cts.C.log.Write(r.cts.Sid, LOG_INFO, "Ingested route_started(%d,%s,%s) for route(%d,%s,%v,%s,%s)", @@ -840,7 +850,6 @@ func (cts *ClientConn) AddNewClientRoute(rc *ClientRouteConfig) (*ClientRoute, e cts.C.route_mtx.Lock() r.node_in_client = cts.C.route_list.PushBack(r) cts.C.route_mtx.Unlock() -// TODO: fire ROUTE_ADDED event cts.C.log.Write(cts.Sid, LOG_INFO, "Added route(%d,%s)", r.Id, r.PeerAddr) @@ -1052,6 +1061,8 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) { defer wg.Done() // arrange to call at the end of this function + cts.C.FireConnEvent(CLIENT_EVENT_CONN_STARTED, cts) + start_over: cts.State.Store(CLIENT_CONN_CONNECTING) cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs) @@ -1110,12 +1121,14 @@ start_over: cts.C.log.Write(cts.Sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index]) - cts.State.Store(CLIENT_CONN_CONNECTED) cts.Token.Set(cts.cfg.ClientToken) if cts.Token.Get() == "" { cts.Token.Set(cts.C.token) } + cts.State.Store(CLIENT_CONN_CONNECTED) + cts.C.FireConnEvent(CLIENT_EVENT_CONN_UPDATED, cts) cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc} + if cts.Token.Get() != "" { err = cts.psc.Send(MakeConnDescPacket(cts.Token.Get())) if err != nil { @@ -1333,6 +1346,8 @@ req_stop_and_wait_for_termination: wait_for_termination: cts.route_wg.Wait() // wait until all route tasks are finished cts.C.RemoveClientConn(cts) + + cts.C.FireConnEvent(CLIENT_EVENT_CONN_STOPPED, cts) return reconnect_to_server: @@ -1341,6 +1356,7 @@ reconnect_to_server: cts.State.Store(CLIENT_CONN_DISCONNECTING) cts.disconnect_from_server(true) cts.State.Store(CLIENT_CONN_DISCONNECTED) + cts.C.FireConnEvent(CLIENT_EVENT_CONN_UPDATED, cts) // wait for 2 seconds slpctx, cancel_sleep = context.WithTimeout(cts.C.Ctx, 2 * time.Second) @@ -2059,3 +2075,99 @@ func (c *Client) AddCtlMetricsCollector(col prometheus.Collector) error { func (c *Client) RemoveCtlMetricsCollector(col prometheus.Collector) bool { return c.promreg.Unregister(col) } + +func (c *Client) FireConnEvent(event_kind ClientEventKind, cts *ClientConn) { + if event_kind == CLIENT_EVENT_CONN_STOPPED { + c.bulletin.Enqueue( + &ClientEvent{ + Kind: event_kind, + Data: &json_out_client_conn{ CId: cts.Id }, + }, + ) + } else { + c.bulletin.Enqueue( + &ClientEvent{ + Kind: event_kind, + Data: &json_out_client_conn{ + CId: cts.Id, + ReqServerAddrs: cts.cfg.ServerAddrs, + CurrentServerIndex: cts.cfg.Index, + ServerAddr: cts.remote_addr.Get(), + ClientAddr: cts.local_addr.Get(), + ClientToken: cts.Token.Get(), + CreatedMilli: cts.Created.UnixMilli(), + }, + }, + ) + } +} + +func (c *Client) FireRouteEvent(event_kind ClientEventKind, r *ClientRoute) { + if event_kind == CLIENT_EVENT_ROUTE_STOPPED { + c.bulletin.Enqueue( + &ClientEvent{ + Kind: event_kind, + Data: &json_out_client_route_id { + CId: r.cts.Id, RId: r.Id, + }, + }, + ) + } else { + var lftsta time.Time + var lftdur time.Duration + + lftsta, lftdur = r.GetLifetimeInfo() + c.bulletin.Enqueue( + &ClientEvent{ + Kind: event_kind, + Data: &json_out_client_route{ + CId: r.cts.Id, + RId: r.Id, + ClientPeerAddr: r.PeerAddr, + ClientPeerName: r.PeerName, + ServerPeerSvcAddr: r.ServerPeerSvcAddr.Get(), + ServerPeerSvcNet: r.ServerPeerSvcNet.Get(), + ServerPeerOption: r.ServerPeerOption.String(), + Lifetime: DurationToSecString(lftdur), + LifetimeStart: lftsta.Unix(), + CreatedMilli: r.Created.UnixMilli(), + }, + }, + ) + } +} + +func (c *Client) FirePeerEvent(event_kind ClientEventKind, ptc *ClientPeerConn) { + if event_kind == CLIENT_EVENT_PEER_STOPPED { + c.bulletin.Enqueue( + &ClientEvent{ + Kind: event_kind, + Data: &json_out_client_peer_id{ + CId: ptc.route.cts.Id, + RId: ptc.route.Id, + PId: ptc.conn_id, + }, + }, + ) + + } else { + var r *ClientRoute + + r = ptc.route + c.bulletin.Enqueue( + &ClientEvent{ + Kind: event_kind, + Data: &json_out_client_peer{ + CId: r.cts.Id, + RId: r.Id, + PId: ptc.conn_id, + ClientPeerAddr: ptc.conn.RemoteAddr().String(), + ClientLocalAddr: ptc.conn.LocalAddr().String(), + ServerPeerAddr: ptc.pts_raddr, + ServerLocalAddr: ptc.pts_laddr, + CreatedMilli: ptc.Created.UnixMilli(), + }, + }, + ) + } +} diff --git a/server-ctl.go b/server-ctl.go index fd7aa3e..a43e31f 100644 --- a/server-ctl.go +++ b/server-ctl.go @@ -53,6 +53,21 @@ type json_out_server_peer struct { CreatedMilli int64 `json:"created-milli"` } +type json_out_server_conn_id struct { + CId ConnId `json:"conn-id"` +} + +type json_out_server_route_id struct { + CId ConnId `json:"conn-id"` + RId RouteId `json:"route-id"` +} + +type json_out_server_peer_id struct { + CId ConnId `json:"conn-id"` + RId RouteId `json:"route-id"` + PId PeerId `json:"peer-id"` +} + type json_out_server_stats struct { json_out_go_stats diff --git a/server-peer.go b/server-peer.go index 518fdb7..cf38ffc 100644 --- a/server-peer.go +++ b/server-peer.go @@ -61,6 +61,8 @@ func (spc *ServerPeerConn) RunTask(wg *sync.WaitGroup) { defer wg.Done() + spc.route.Cts.S.FirePeerEvent(SERVER_EVENT_PEER_STARTED, spc) + conn_raddr = spc.conn.RemoteAddr().String() conn_laddr = spc.conn.LocalAddr().String() @@ -160,14 +162,7 @@ done_without_stop: spc.node_in_conn = nil spc.route.Cts.pts_mtx.Unlock() - spc.route.Cts.S.bulletin.Enqueue( - &ServerEvent{ - Kind: SERVER_EVENT_PEER_DELETED, - Data: &ServerEventPeerDeleted { - Conn: spc.route.Cts.Id, Route: spc.route.Id, Peer: spc.conn_id, - }, - }, - ) + spc.route.Cts.S.FirePeerEvent(SERVER_EVENT_PEER_STOPPED, spc) } func (spc *ServerPeerConn) ReqStop() { @@ -206,21 +201,7 @@ func (spc *ServerPeerConn) ReportPacket(packet_type PACKET_KIND, event_data inte spc.client_peer_status_chan <- true } - spc.route.Cts.S.bulletin.Enqueue( - &ServerEvent{ - Kind: SERVER_EVENT_PEER_UPDATED, - Data: &ServerEventPeerAdded{ - Conn: spc.route.Cts.Id, - Route: spc.route.Id, - Peer: spc.conn_id, - ServerPeerAddr: spc.conn.RemoteAddr().String(), - ServerLocalAddr: spc.conn.LocalAddr().String(), - ClientPeerAddr: spc.client_peer_raddr.Get(), - ClientLocalAddr: spc.client_peer_laddr.Get(), - CreatedMilli: spc.Created.UnixMilli(), - }, - }, - ) + spc.route.Cts.S.FirePeerEvent(SERVER_EVENT_PEER_UPDATED, spc) case PACKET_KIND_PEER_ABORTED: spc.ReqStop() diff --git a/server.go b/server.go index 1e56938..121a16a 100644 --- a/server.go +++ b/server.go @@ -70,15 +70,15 @@ type ServerConfig struct { type ServerEventKind int const ( - SERVER_EVENT_CONN_ADDED = iota + SERVER_EVENT_CONN_STARTED = iota SERVER_EVENT_CONN_UPDATED - SERVER_EVENT_CONN_DELETED - SERVER_EVENT_ROUTE_ADDED + SERVER_EVENT_CONN_STOPPED + SERVER_EVENT_ROUTE_STARTED SERVER_EVENT_ROUTE_UPDATED - SERVER_EVENT_ROUTE_DELETED - SERVER_EVENT_PEER_ADDED + SERVER_EVENT_ROUTE_STOPPED + SERVER_EVENT_PEER_STARTED SERVER_EVENT_PEER_UPDATED - SERVER_EVENT_PEER_DELETED + SERVER_EVENT_PEER_STOPPED ) type ServerEvent struct { @@ -86,51 +86,6 @@ type ServerEvent struct { Data interface{} `json:"data"` } -type ServerEventConnAdded struct { - Conn ConnId `json:"conn-id"` - ServerAddr string `json:"server-addr"` - ClientAddr string `json:"client-addr"` - ClientToken string `json:"client-token"` - CreatedMilli int64 `json:"created-milli"` -} - -type ServerEventConnDeleted struct { - Conn ConnId `json:"conn-id"` -} - -type ServerEventRouteAdded struct { - Conn ConnId `json:"conn-id"` - Route RouteId `json:"route-id"` - ClientPeerAddr string `json:"client-peer-addr"` - ClientPeerName string `json:"client-peer-name"` - ServerPeerOption string `json:"server-peer-option"` - ServerPeerSvcAddr string `json:"server-peer-svc-addr"` - ServerPeerSvcNet string `json:"server-peer-svc-net"` - CreatedMilli int64 `json:"created-milli"` -} - -type ServerEventRouteDeleted struct { - Conn ConnId `json:"conn-id"` - Route RouteId `json:"route-id"` -} - -type ServerEventPeerAdded struct { - Conn ConnId `json:"conn-id"` - Route RouteId `json:"route-id"` - Peer PeerId `json:"peer-id"` - ServerPeerAddr string `json:"server-peer-addr"` - ServerLocalAddr string `json:"server-local-addr"` - ClientPeerAddr string `json:"client-peer-addr"` - ClientLocalAddr string `json:"client-local-addr"` - CreatedMilli int64 `json:"created-milli"` -} - -type ServerEventPeerDeleted struct { - Conn ConnId `json:"conn-id"` - Route RouteId `json:"route-id"` - Peer PeerId `json:"peer-id"` -} - type ServerEventBulletin = Bulletin[*ServerEvent] type ServerEventSubscription = BulletinSubscription[*ServerEvent] @@ -365,21 +320,6 @@ func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, err pts.node_in_conn = r.Cts.pts_list.PushBack(pts) r.Cts.pts_mtx.Unlock() - r.Cts.S.bulletin.Enqueue( - &ServerEvent{ - Kind: SERVER_EVENT_PEER_ADDED, - Data: &ServerEventPeerAdded{ - Conn: r.Cts.Id, - Route: r.Id, - Peer: pts.conn_id, - ServerPeerAddr: pts.conn.RemoteAddr().String(), - ServerLocalAddr: pts.conn.LocalAddr().String(), - ClientPeerAddr: pts.client_peer_raddr.Get(), - ClientLocalAddr: pts.client_peer_laddr.Get(), - CreatedMilli: pts.Created.UnixMilli(), - }, - }, - ) return pts, nil } @@ -399,6 +339,7 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { var iaddr netip.Addr defer wg.Done() + r.Cts.S.FireRouteEvent(SERVER_EVENT_ROUTE_STARTED, r) for { conn, err = r.svc_l.AcceptTCP() // this call is blocking... @@ -447,14 +388,7 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) { r.node_in_server = nil r.Cts.S.route_mtx.Unlock() - r.Cts.S.bulletin.Enqueue( - &ServerEvent{ - Kind: SERVER_EVENT_ROUTE_DELETED, - Data: &ServerEventRouteDeleted { - Conn: r.Cts.Id, Route: r.Id, - }, - }, - ) + r.Cts.S.FireRouteEvent(SERVER_EVENT_ROUTE_STOPPED, r) } func (r *ServerRoute) ReqStop() { @@ -595,22 +529,6 @@ func (cts *ServerConn) AddNewServerRoute(route_id RouteId, proto RouteOption, pt r.node_in_server = cts.S.route_list.PushBack(r) cts.S.route_mtx.Unlock() - cts.S.bulletin.Enqueue( - &ServerEvent{ - Kind: SERVER_EVENT_ROUTE_ADDED, - Data: &ServerEventRouteAdded{ - Conn: cts.Id, - Route: r.Id, - ClientPeerAddr: r.PtcAddr, - ClientPeerName: r.PtcName, - ServerPeerSvcAddr: r.SvcAddr.String(), - ServerPeerSvcNet: r.SvcPermNet.String(), - ServerPeerOption: r.SvcOption.String(), - CreatedMilli: r.Created.UnixMilli(), - }, - }, - ) - // Don't detached the cts task as a go-routine as this function cts.route_wg.Add(1) go r.RunTask(&cts.route_wg) @@ -919,18 +837,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) { cts.S.cts_mtx.Unlock() cts.S.log.Write(cts.Sid, LOG_INFO, "client(%d) %s - token set to '%s'", cts.Id, cts.RemoteAddr, x.Conn.Token) - cts.S.bulletin.Enqueue( - &ServerEvent{ - Kind: SERVER_EVENT_CONN_UPDATED, - Data: &ServerEventConnAdded{ - Conn: cts.Id, - ServerAddr: cts.LocalAddr.String(), - ClientAddr: cts.RemoteAddr.String(), - ClientToken: cts.ClientToken.Get(), - CreatedMilli: cts.Created.UnixMilli(), - }, - }, - ) + cts.S.FireConnEvent(SERVER_EVENT_CONN_UPDATED, cts) } } } @@ -965,6 +872,7 @@ func (cts *ServerConn) RunTask(wg *sync.WaitGroup) { var ctx context.Context defer wg.Done() + cts.S.FireConnEvent(SERVER_EVENT_CONN_STARTED, cts) strm = cts.pss ctx = strm.Context() @@ -1011,12 +919,8 @@ done: cts.ReqStop() // just in case cts.route_wg.Done() cts.route_wg.Wait() - cts.S.bulletin.Enqueue( - &ServerEvent{ - Kind: SERVER_EVENT_CONN_DELETED, - Data: &ServerEventConnDeleted{ Conn: cts.Id }, - }, - ) + cts.S.FireConnEvent(SERVER_EVENT_CONN_STOPPED, cts) + // Don't detached the cts task as a go-routine as this function cts.S.log.Write(cts.Sid, LOG_INFO, "End of connection task") } @@ -1081,19 +985,6 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error { return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error()) } - s.bulletin.Enqueue( - &ServerEvent{ - Kind: SERVER_EVENT_CONN_ADDED, - Data: &ServerEventConnAdded{ - Conn: cts.Id, - ServerAddr: cts.LocalAddr.String(), - ClientAddr: cts.RemoteAddr.String(), - ClientToken: cts.ClientToken.Get(), - CreatedMilli: cts.Created.UnixMilli(), - }, - }, - ) - // Don't detach the cts task as a go-routine as this function // is invoked as a go-routine by the grpc server. s.cts_wg.Add(1) @@ -2225,3 +2116,90 @@ func (s *Server) SendNotice(id_str string, text string) error { return nil } + +func (s *Server) FireConnEvent(event_kind ServerEventKind, cts *ServerConn) { + if event_kind == SERVER_EVENT_CONN_STOPPED { + s.bulletin.Enqueue( + &ServerEvent{ + Kind: event_kind, + Data: &json_out_server_conn_id{ + CId: cts.Id, + }, + }, + ) + } else { + s.bulletin.Enqueue( + &ServerEvent{ + Kind: event_kind, + Data: &json_out_server_conn{ + CId: cts.Id, + ServerAddr: cts.LocalAddr.String(), + ClientAddr: cts.RemoteAddr.String(), + ClientToken: cts.ClientToken.Get(), + CreatedMilli: cts.Created.UnixMilli(), + }, + }, + ) + } +} + +func (s *Server) FireRouteEvent(event_kind ServerEventKind, r *ServerRoute) { + if event_kind == SERVER_EVENT_ROUTE_STOPPED { + s.bulletin.Enqueue( + &ServerEvent{ + Kind: event_kind, + Data: &json_out_server_route_id { + CId: r.Cts.Id, + RId: r.Id, + }, + }, + ) + } else { + s.bulletin.Enqueue( + &ServerEvent{ + Kind: event_kind, + Data: &json_out_server_route{ + CId: r.Cts.Id, + RId: r.Id, + ClientPeerAddr: r.PtcAddr, + ClientPeerName: r.PtcName, + ServerPeerSvcAddr: r.SvcAddr.String(), + ServerPeerSvcNet: r.SvcPermNet.String(), + ServerPeerOption: r.SvcOption.String(), + CreatedMilli: r.Created.UnixMilli(), + }, + }, + ) + } +} + +func (s *Server) FirePeerEvent(event_kind ServerEventKind, pts *ServerPeerConn) { + if event_kind == SERVER_EVENT_PEER_STOPPED { + pts.route.Cts.S.bulletin.Enqueue( + &ServerEvent{ + Kind: event_kind, + Data: &json_out_server_peer_id { + CId: pts.route.Cts.Id, + RId: pts.route.Id, + PId: pts.conn_id, + }, + }, + ) + } else { + s.bulletin.Enqueue( + &ServerEvent{ + Kind: event_kind, + Data: &json_out_server_peer{ + CId: pts.route.Cts.Id, + RId: pts.route.Id, + PId: pts.conn_id, + ServerPeerAddr: pts.conn.RemoteAddr().String(), + ServerLocalAddr: pts.conn.LocalAddr().String(), + ClientPeerAddr: pts.client_peer_raddr.Get(), + ClientLocalAddr: pts.client_peer_laddr.Get(), + CreatedMilli: pts.Created.UnixMilli(), + }, + }, + ) + } +}