created event firing functions under server and client to replace directy bulleting enqueing
This commit is contained in:
132
client.go
132
client.go
@ -79,9 +79,15 @@ type ClientConfig struct {
|
||||
|
||||
type ClientEventKind int
|
||||
const (
|
||||
CLIENT_EVENT_CONN_ADDED = iota
|
||||
CLIENT_EVENT_CONN_STARTED = iota
|
||||
CLIENT_EVENT_CONN_UPDATED
|
||||
CLIENT_EVENT_CONN_DELETED
|
||||
CLIENT_EVENT_CONN_STOPPED
|
||||
CLIENT_EVENT_ROUTE_STARTED
|
||||
CLIENT_EVENT_ROUTE_UPDATED
|
||||
CLIENT_EVENT_ROUTE_STOPPED
|
||||
CLIENT_EVENT_PEER_STARTED
|
||||
CLIENT_EVENT_PEER_UPDATED
|
||||
CLIENT_EVENT_PEER_STOPPED
|
||||
)
|
||||
|
||||
type ClientEvent struct {
|
||||
@ -201,8 +207,8 @@ type ClientRoute struct {
|
||||
ReqServerPeerSvcAddr string // requested server-side service address
|
||||
ReqServerPeerSvcNet string // requested server-side service address
|
||||
ServerPeerListenAddr *net.TCPAddr // actual service-side service address
|
||||
ServerPeerSvcAddr string // actual server-side service address
|
||||
ServerPeerSvcNet string // actual server-side network
|
||||
ServerPeerSvcAddr Atom[string] // actual server-side service address
|
||||
ServerPeerSvcNet Atom[string] // actual server-side network
|
||||
ServerPeerOption RouteOption
|
||||
|
||||
ptc_mtx sync.Mutex
|
||||
@ -405,6 +411,8 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) {
|
||||
// it merely implements some timeout if set.
|
||||
defer wg.Done()
|
||||
|
||||
r.cts.C.FireRouteEvent(CLIENT_EVENT_ROUTE_STARTED, r)
|
||||
|
||||
err = r.cts.psc.Send(MakeRouteStartPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet))
|
||||
if err != nil {
|
||||
r.cts.C.log.Write(r.cts.Sid, LOG_DEBUG,
|
||||
@ -468,12 +476,12 @@ done:
|
||||
|
||||
r.cts.RemoveClientRoute(r)
|
||||
|
||||
|
||||
r.cts.C.route_mtx.Lock()
|
||||
r.cts.C.route_list.Remove(r.node_in_client)
|
||||
r.node_in_client = nil
|
||||
r.cts.C.route_mtx.Unlock()
|
||||
// TODO: fire ROUTE_DELETED event
|
||||
|
||||
r.cts.C.FireRouteEvent(CLIENT_EVENT_ROUTE_STOPPED, r)
|
||||
}
|
||||
|
||||
func (r *ClientRoute) ReqStop() {
|
||||
@ -614,8 +622,10 @@ func (r *ClientRoute) ReportPacket(pts_id PeerId, packet_type PACKET_KIND, event
|
||||
} else {
|
||||
// received the server-side addresses
|
||||
r.ServerPeerListenAddr = addr
|
||||
r.ServerPeerSvcAddr = rd.TargetAddrStr
|
||||
r.ServerPeerSvcNet = rd.ServiceNetStr
|
||||
r.ServerPeerSvcAddr.Set(rd.TargetAddrStr)
|
||||
r.ServerPeerSvcNet.Set(rd.ServiceNetStr)
|
||||
|
||||
r.cts.C.FireRouteEvent(CLIENT_EVENT_ROUTE_UPDATED, r)
|
||||
|
||||
r.cts.C.log.Write(r.cts.Sid, LOG_INFO,
|
||||
"Ingested route_started(%d,%s,%s) for route(%d,%s,%v,%s,%s)",
|
||||
@ -840,7 +850,6 @@ func (cts *ClientConn) AddNewClientRoute(rc *ClientRouteConfig) (*ClientRoute, e
|
||||
cts.C.route_mtx.Lock()
|
||||
r.node_in_client = cts.C.route_list.PushBack(r)
|
||||
cts.C.route_mtx.Unlock()
|
||||
// TODO: fire ROUTE_ADDED event
|
||||
|
||||
cts.C.log.Write(cts.Sid, LOG_INFO, "Added route(%d,%s)", r.Id, r.PeerAddr)
|
||||
|
||||
@ -1052,6 +1061,8 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
|
||||
|
||||
defer wg.Done() // arrange to call at the end of this function
|
||||
|
||||
cts.C.FireConnEvent(CLIENT_EVENT_CONN_STARTED, cts)
|
||||
|
||||
start_over:
|
||||
cts.State.Store(CLIENT_CONN_CONNECTING)
|
||||
cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs)
|
||||
@ -1110,12 +1121,14 @@ start_over:
|
||||
|
||||
cts.C.log.Write(cts.Sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
|
||||
|
||||
cts.State.Store(CLIENT_CONN_CONNECTED)
|
||||
cts.Token.Set(cts.cfg.ClientToken)
|
||||
if cts.Token.Get() == "" { cts.Token.Set(cts.C.token) }
|
||||
cts.State.Store(CLIENT_CONN_CONNECTED)
|
||||
cts.C.FireConnEvent(CLIENT_EVENT_CONN_UPDATED, cts)
|
||||
|
||||
cts.psc = &GuardedPacketStreamClient{Hodu_PacketStreamClient: psc}
|
||||
|
||||
|
||||
if cts.Token.Get() != "" {
|
||||
err = cts.psc.Send(MakeConnDescPacket(cts.Token.Get()))
|
||||
if err != nil {
|
||||
@ -1333,6 +1346,8 @@ req_stop_and_wait_for_termination:
|
||||
wait_for_termination:
|
||||
cts.route_wg.Wait() // wait until all route tasks are finished
|
||||
cts.C.RemoveClientConn(cts)
|
||||
|
||||
cts.C.FireConnEvent(CLIENT_EVENT_CONN_STOPPED, cts)
|
||||
return
|
||||
|
||||
reconnect_to_server:
|
||||
@ -1341,6 +1356,7 @@ reconnect_to_server:
|
||||
cts.State.Store(CLIENT_CONN_DISCONNECTING)
|
||||
cts.disconnect_from_server(true)
|
||||
cts.State.Store(CLIENT_CONN_DISCONNECTED)
|
||||
cts.C.FireConnEvent(CLIENT_EVENT_CONN_UPDATED, cts)
|
||||
|
||||
// wait for 2 seconds
|
||||
slpctx, cancel_sleep = context.WithTimeout(cts.C.Ctx, 2 * time.Second)
|
||||
@ -2059,3 +2075,99 @@ func (c *Client) AddCtlMetricsCollector(col prometheus.Collector) error {
|
||||
func (c *Client) RemoveCtlMetricsCollector(col prometheus.Collector) bool {
|
||||
return c.promreg.Unregister(col)
|
||||
}
|
||||
|
||||
func (c *Client) FireConnEvent(event_kind ClientEventKind, cts *ClientConn) {
|
||||
if event_kind == CLIENT_EVENT_CONN_STOPPED {
|
||||
c.bulletin.Enqueue(
|
||||
&ClientEvent{
|
||||
Kind: event_kind,
|
||||
Data: &json_out_client_conn{ CId: cts.Id },
|
||||
},
|
||||
)
|
||||
} else {
|
||||
c.bulletin.Enqueue(
|
||||
&ClientEvent{
|
||||
Kind: event_kind,
|
||||
Data: &json_out_client_conn{
|
||||
CId: cts.Id,
|
||||
ReqServerAddrs: cts.cfg.ServerAddrs,
|
||||
CurrentServerIndex: cts.cfg.Index,
|
||||
ServerAddr: cts.remote_addr.Get(),
|
||||
ClientAddr: cts.local_addr.Get(),
|
||||
ClientToken: cts.Token.Get(),
|
||||
CreatedMilli: cts.Created.UnixMilli(),
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) FireRouteEvent(event_kind ClientEventKind, r *ClientRoute) {
|
||||
if event_kind == CLIENT_EVENT_ROUTE_STOPPED {
|
||||
c.bulletin.Enqueue(
|
||||
&ClientEvent{
|
||||
Kind: event_kind,
|
||||
Data: &json_out_client_route_id {
|
||||
CId: r.cts.Id, RId: r.Id,
|
||||
},
|
||||
},
|
||||
)
|
||||
} else {
|
||||
var lftsta time.Time
|
||||
var lftdur time.Duration
|
||||
|
||||
lftsta, lftdur = r.GetLifetimeInfo()
|
||||
c.bulletin.Enqueue(
|
||||
&ClientEvent{
|
||||
Kind: event_kind,
|
||||
Data: &json_out_client_route{
|
||||
CId: r.cts.Id,
|
||||
RId: r.Id,
|
||||
ClientPeerAddr: r.PeerAddr,
|
||||
ClientPeerName: r.PeerName,
|
||||
ServerPeerSvcAddr: r.ServerPeerSvcAddr.Get(),
|
||||
ServerPeerSvcNet: r.ServerPeerSvcNet.Get(),
|
||||
ServerPeerOption: r.ServerPeerOption.String(),
|
||||
Lifetime: DurationToSecString(lftdur),
|
||||
LifetimeStart: lftsta.Unix(),
|
||||
CreatedMilli: r.Created.UnixMilli(),
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) FirePeerEvent(event_kind ClientEventKind, ptc *ClientPeerConn) {
|
||||
if event_kind == CLIENT_EVENT_PEER_STOPPED {
|
||||
c.bulletin.Enqueue(
|
||||
&ClientEvent{
|
||||
Kind: event_kind,
|
||||
Data: &json_out_client_peer_id{
|
||||
CId: ptc.route.cts.Id,
|
||||
RId: ptc.route.Id,
|
||||
PId: ptc.conn_id,
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
} else {
|
||||
var r *ClientRoute
|
||||
|
||||
r = ptc.route
|
||||
c.bulletin.Enqueue(
|
||||
&ClientEvent{
|
||||
Kind: event_kind,
|
||||
Data: &json_out_client_peer{
|
||||
CId: r.cts.Id,
|
||||
RId: r.Id,
|
||||
PId: ptc.conn_id,
|
||||
ClientPeerAddr: ptc.conn.RemoteAddr().String(),
|
||||
ClientLocalAddr: ptc.conn.LocalAddr().String(),
|
||||
ServerPeerAddr: ptc.pts_raddr,
|
||||
ServerLocalAddr: ptc.pts_laddr,
|
||||
CreatedMilli: ptc.Created.UnixMilli(),
|
||||
},
|
||||
},
|
||||
)
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user