added default notice handling interface

This commit is contained in:
2025-02-18 14:44:45 +09:00
parent 81f7bb0c0d
commit 2b3a841299
6 changed files with 286 additions and 174 deletions

205
client.go
View File

@ -66,6 +66,10 @@ type ClientConfig struct {
PeerConnTmout time.Duration
}
type ClientConnNoticeHandler interface {
Handle(cts *ClientConn, text string)
}
type Client struct {
Named
@ -95,7 +99,8 @@ type Client struct {
stop_req atomic.Bool
stop_chan chan bool
log Logger
log Logger
conn_notice ClientConnNoticeHandler
route_persister ClientRoutePersister
promreg *prometheus.Registry
@ -117,7 +122,7 @@ const (
// client connection to server
type ClientConn struct {
cli *Client
C *Client
cfg ClientConnConfigActive
Id ConnId
Sid string // id rendered in string
@ -242,10 +247,10 @@ func (r *ClientRoute) AddNewClientPeerConn(c *net.TCPConn, pts_id PeerId, pts_ra
r.ptc_mtx.Lock()
ptc = NewClientPeerConn(r, c, pts_id, pts_raddr, pts_laddr)
r.ptc_map[ptc.conn_id] = ptc
r.cts.cli.stats.peers.Add(1)
r.cts.C.stats.peers.Add(1)
r.ptc_mtx.Unlock()
r.cts.cli.log.Write(r.cts.Sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
r.cts.C.log.Write(r.cts.Sid, LOG_INFO, "Added client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
return ptc, nil
}
@ -264,10 +269,10 @@ func (r *ClientRoute) RemoveClientPeerConn(ptc *ClientPeerConn) error {
return fmt.Errorf("conflicting peer id - %d", ptc.conn_id)
}
delete(r.ptc_map, ptc.conn_id)
r.cts.cli.stats.peers.Add(-1)
r.cts.C.stats.peers.Add(-1)
r.ptc_mtx.Unlock()
r.cts.cli.log.Write(r.cts.Sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
r.cts.C.log.Write(r.cts.Sid, LOG_INFO, "Removed client-side peer(%d,%d,%s,%s)", r.Id, ptc.conn_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String())
ptc.ReqStop()
return nil
}
@ -306,7 +311,7 @@ func (r *ClientRoute) ExtendLifetime(lifetime time.Duration) error {
r.Lifetime = r.Lifetime + lifetime
expiry = r.LifetimeStart.Add(r.Lifetime)
r.lifetime_timer.Reset(expiry.Sub(time.Now()))
if r.cts.cli.route_persister != nil { r.cts.cli.route_persister.Save(r.cts, r) }
if r.cts.C.route_persister != nil { r.cts.C.route_persister.Save(r.cts, r) }
return nil
}
}
@ -323,7 +328,7 @@ func (r *ClientRoute) ResetLifetime(lifetime time.Duration) error {
r.Lifetime = lifetime
r.LifetimeStart = time.Now()
r.lifetime_timer.Reset(lifetime)
if r.cts.cli.route_persister != nil { r.cts.cli.route_persister.Save(r.cts, r) }
if r.cts.C.route_persister != nil { r.cts.C.route_persister.Save(r.cts, r) }
return nil
}
}
@ -338,12 +343,12 @@ func (r *ClientRoute) RunTask(wg *sync.WaitGroup) {
err = r.cts.psc.Send(MakeRouteStartPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ServerPeerAddr, r.ServerPeerNet))
if err != nil {
r.cts.cli.log.Write(r.cts.Sid, LOG_DEBUG,
r.cts.C.log.Write(r.cts.Sid, LOG_DEBUG,
"Failed to send route_start for route(%d,%s,%v,%v) to %s",
r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr)
goto done
} else {
r.cts.cli.log.Write(r.cts.Sid, LOG_DEBUG,
r.cts.C.log.Write(r.cts.Sid, LOG_DEBUG,
"Sent route_start for route(%d,%s,%v,%v) to %s",
r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr)
}
@ -363,7 +368,7 @@ main_loop:
break main_loop
case <-r.lifetime_timer.C:
r.cts.cli.log.Write(r.cts.Sid, LOG_INFO, "route(%d,%s,%v,%v) reached end of lifetime(%v)",
r.cts.C.log.Write(r.cts.Sid, LOG_INFO, "route(%d,%s,%v,%v) reached end of lifetime(%v)",
r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.Lifetime)
break main_loop
}
@ -388,11 +393,11 @@ done:
err = r.cts.psc.Send(MakeRouteStopPacket(r.Id, r.ServerPeerOption, r.PeerAddr, r.PeerName, r.ServerPeerAddr, r.ServerPeerNet))
if err != nil {
r.cts.cli.log.Write(r.cts.Sid, LOG_DEBUG,
r.cts.C.log.Write(r.cts.Sid, LOG_DEBUG,
"Failed to route_stop for route(%d,%s,%v,%v) to %s - %s",
r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr, err.Error())
} else {
r.cts.cli.log.Write(r.cts.Sid, LOG_DEBUG,
r.cts.C.log.Write(r.cts.Sid, LOG_DEBUG,
"Sent route_stop for route(%d,%s,%v,%v) to %s",
r.Id, r.PeerAddr, r.ServerPeerOption, r.ServerPeerNet, r.cts.remote_addr)
}
@ -431,9 +436,9 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
defer wg.Done()
tmout = time.Duration(r.cts.cli.ptc_tmout)
tmout = time.Duration(r.cts.C.ptc_tmout)
if tmout <= 0 { tmout = 5 * time.Second} // TODO: make this configurable...
waitctx, cancel_wait = context.WithTimeout(r.cts.cli.ctx, tmout)
waitctx, cancel_wait = context.WithTimeout(r.cts.C.ctx, tmout)
r.ptc_mtx.Lock()
r.ptc_cancel_map[pts_id] = cancel_wait
r.ptc_mtx.Unlock()
@ -447,7 +452,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
r.ptc_mtx.Unlock()
if err != nil {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR,
"Failed to connect to %s for route(%d,%d,%s,%s) - %s",
r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
goto peer_aborted
@ -455,7 +460,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
real_conn, ok = conn.(*net.TCPConn)
if !ok {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR,
"Failed to get connection information to %s for route(%d,%d,%s,%s) - %s",
r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
goto peer_aborted
@ -465,7 +470,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
real_conn_laddr = real_conn.LocalAddr().String()
ptc, err = r.AddNewClientPeerConn(real_conn, pts_id, pts_raddr, pts_laddr)
if err != nil {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR,
"Failed to add client peer %s for route(%d,%d,%s,%s) - %s",
r.PeerAddr, r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
goto peer_aborted
@ -475,7 +480,7 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
err = r.cts.psc.Send(MakePeerStartedPacket(r.Id, ptc.conn_id, real_conn_raddr, real_conn_laddr))
if err != nil {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR,
"Failed to send peer_start(%d,%d,%s,%s) for route(%d,%d,%s,%s) - %s",
r.Id, ptc.conn_id, real_conn_raddr, real_conn_laddr,
r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
@ -490,7 +495,7 @@ peer_aborted:
// real_conn_radd and real_conn_laddr may be empty depending on when the jump to here is made.
err = r.cts.psc.Send(MakePeerAbortedPacket(r.Id, pts_id, real_conn_raddr, real_conn_laddr))
if err != nil {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR,
"Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s",
r.Id, pts_id, r.Id, pts_id, pts_raddr, pts_laddr, err.Error())
}
@ -525,13 +530,13 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
var rd *RouteDesc
rd, ok = event_data.(*RouteDesc)
if !ok {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id)
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id)
r.ReqStop()
} else {
var addr *net.TCPAddr
addr, err = net.ResolveTCPAddr(TcpAddrStrClass(rd.TargetAddrStr), rd.TargetAddrStr)
if err != nil {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid service address(%s) for server peer in route_started event(%d)", rd.TargetAddrStr, r.Id)
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid service address(%s) for server peer in route_started event(%d)", rd.TargetAddrStr, r.Id)
r.ReqStop()
} else {
r.server_peer_listen_addr = addr
@ -546,7 +551,7 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
// but the server must be able to handle this case as invalid route.
var ok bool
_, ok = event_data.(*RouteDesc)
if !ok { r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id) }
if !ok { r.cts.C.log.Write(r.cts.Sid, LOG_ERROR, "Protocol error - invalid data in route_started event(%d)", r.Id) }
r.ReqStop()
case PACKET_KIND_PEER_STARTED:
@ -555,18 +560,18 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
pd, ok = event_data.(*PeerDesc)
if !ok {
r.cts.cli.log.Write(r.cts.Sid, LOG_WARN,
r.cts.C.log.Write(r.cts.Sid, LOG_WARN,
"Protocol error - invalid data in peer_started event(%d,%d)", r.Id, pts_id)
// ignore it. don't want to delete the whole route
} else {
if r.cts.cli.ptc_limit > 0 && int(r.cts.cli.stats.peers.Load()) >= r.cts.cli.ptc_limit {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
if r.cts.C.ptc_limit > 0 && int(r.cts.C.stats.peers.Load()) >= r.cts.C.ptc_limit {
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR,
"Rejecting to connect to peer(%s)for route(%d,%d) - allowed max %d",
r.PeerAddr, r.Id, pts_id, r.cts.cli.ptc_limit)
r.PeerAddr, r.Id, pts_id, r.cts.C.ptc_limit)
err = r.cts.psc.Send(MakePeerAbortedPacket(r.Id, pts_id, "", ""))
if err != nil {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR,
"Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s",
r.Id, pts_id, r.Id, pts_id, "", "", err.Error())
}
@ -586,13 +591,13 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
pd, ok = event_data.(*PeerDesc)
if !ok {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR,
"Protocol error - invalid data in peer_aborted event(%d,%d)", r.Id, pts_id)
ptc.ReqStop()
} else {
err = r.DisconnectFromPeer(ptc)
if err != nil {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR,
"Failed to disconnect from peer(%d,%d,%s,%s) - %s",
r.Id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error())
ptc.ReqStop()
@ -610,14 +615,14 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
pd, ok = event_data.(*PeerDesc)
if !ok {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR,
"Protocol error - invalid data in peer_stopped event(%d,%d)",
r.Id, pts_id)
ptc.ReqStop()
} else {
err = r.DisconnectFromPeer(ptc)
if err != nil {
r.cts.cli.log.Write(r.cts.Sid, LOG_WARN,
r.cts.C.log.Write(r.cts.Sid, LOG_WARN,
"Failed to disconnect from peer(%d,%d,%s,%s) - %s",
r.Id, pts_id, pd.RemoteAddrStr, pd.LocalAddrStr, err.Error())
ptc.ReqStop()
@ -634,7 +639,7 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
_, ok = event_data.(*PeerDesc)
if !ok {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR,
"Protocol error - invalid data in peer_eof event(%d,%d)",
r.Id, pts_id)
ptc.ReqStop()
@ -653,14 +658,14 @@ func (r *ClientRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
data, ok = event_data.([]byte)
if !ok {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR,
"Protocol error - invalid data in peer_data event(%d,%d)",
r.Id, pts_id)
ptc.ReqStop()
} else {
_, err = ptc.conn.Write(data)
if err != nil {
r.cts.cli.log.Write(r.cts.Sid, LOG_ERROR,
r.cts.C.log.Write(r.cts.Sid, LOG_ERROR,
"Failed to write to peer(%d,%d,%s,%s) - %s",
r.Id, pts_id, ptc.conn.RemoteAddr().String(), ptc.conn.LocalAddr().String(), err.Error())
ptc.ReqStop()
@ -680,7 +685,7 @@ func NewClientConn(c *Client, cfg *ClientConnConfig) *ClientConn {
var cts ClientConn
var i int
cts.cli = c
cts.C = c
cts.route_map = make(ClientRouteMap)
cts.route_next_id = 1
cts.cfg.ClientConnConfig = *cfg
@ -737,11 +742,11 @@ func (cts *ClientConn) AddNewClientRoute(rc *ClientRouteConfig) (*ClientRoute, e
r = NewClientRoute(cts, assigned_id, rc.Static, rc.PeerAddr, rc.PeerName, rc.ServiceAddr, rc.ServiceNet, rc.Option, rc.Lifetime)
cts.route_map[r.Id] = r
cts.cli.stats.routes.Add(1)
if cts.cli.route_persister != nil { cts.cli.route_persister.Save(cts, r) }
cts.C.stats.routes.Add(1)
if cts.C.route_persister != nil { cts.C.route_persister.Save(cts, r) }
cts.route_mtx.Unlock()
cts.cli.log.Write(cts.Sid, LOG_INFO, "Added route(%d,%d) %s", cts.Id, r.Id, r.PeerAddr)
cts.C.log.Write(cts.Sid, LOG_INFO, "Added route(%d,%d) %s", cts.Id, r.Id, r.PeerAddr)
cts.route_wg.Add(1)
go r.RunTask(&cts.route_wg)
@ -764,7 +769,7 @@ func (cts *ClientConn) RemoveAllClientRoutes() {
for _, r = range cts.route_map {
delete(cts.route_map, r.Id)
cts.cli.stats.routes.Add(-1)
cts.C.stats.routes.Add(-1)
r.ReqStop()
}
}*/
@ -784,13 +789,13 @@ func (cts *ClientConn) RemoveClientRoute(route *ClientRoute) error {
return fmt.Errorf("conflicting route id - %d", route.Id)
}
delete(cts.route_map, route.Id)
cts.cli.stats.routes.Add(-1)
if cts.cli.route_persister != nil {
cts.cli.route_persister.Delete(cts, r)
cts.C.stats.routes.Add(-1)
if cts.C.route_persister != nil {
cts.C.route_persister.Delete(cts, r)
}
cts.route_mtx.Unlock()
cts.cli.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", route.Id, route.PeerAddr)
cts.C.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", route.Id, route.PeerAddr)
r.ReqStop()
return nil
@ -807,11 +812,11 @@ func (cts *ClientConn) RemoveClientRouteById(route_id RouteId) error {
return fmt.Errorf("non-existent route id - %d", route_id)
}
delete(cts.route_map, route_id)
cts.cli.stats.routes.Add(-1)
if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) }
cts.C.stats.routes.Add(-1)
if cts.C.route_persister != nil { cts.C.route_persister.Delete(cts, r) }
cts.route_mtx.Unlock()
cts.cli.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr)
cts.C.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr)
r.ReqStop()
return nil
@ -827,10 +832,10 @@ func (cts *ClientConn) RemoveClientRouteByServerPeerSvcPortId(port_id PortId) er
for _, r = range cts.route_map {
if r.server_peer_listen_addr.Port == int(port_id) {
delete(cts.route_map, r.Id)
cts.cli.stats.routes.Add(-1)
if cts.cli.route_persister != nil { cts.cli.route_persister.Delete(cts, r) }
cts.C.stats.routes.Add(-1)
if cts.C.route_persister != nil { cts.C.route_persister.Delete(cts, r) }
cts.route_mtx.Unlock()
cts.cli.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr)
cts.C.log.Write(cts.Sid, LOG_INFO, "Removed route(%d,%s)", r.Id, r.PeerAddr)
r.ReqStop()
return nil
}
@ -945,17 +950,17 @@ func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
start_over:
cts.State = CLIENT_CONN_CONNECTING
cts.cfg.Index = (cts.cfg.Index + 1) % len(cts.cfg.ServerAddrs)
cts.cli.log.Write(cts.Sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
if cts.cli.rpc_tls == nil {
cts.C.log.Write(cts.Sid, LOG_INFO, "Connecting to server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
if cts.C.rpc_tls == nil {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
if cts.cfg.ServerAuthority != "" { opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority)) }
} else {
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.cli.rpc_tls)))
opts = append(opts, grpc.WithTransportCredentials(credentials.NewTLS(cts.C.rpc_tls)))
// set the http2 :authority header with tls server name defined.
if cts.cfg.ServerAuthority != "" {
opts = append(opts, grpc.WithAuthority(cts.cfg.ServerAuthority))
} else if cts.cli.rpc_tls.ServerName != "" {
opts = append(opts, grpc.WithAuthority(cts.cli.rpc_tls.ServerName))
} else if cts.C.rpc_tls.ServerName != "" {
opts = append(opts, grpc.WithAuthority(cts.C.rpc_tls.ServerName))
}
}
if cts.cfg.ServerSeedTmout > 0 {
@ -964,7 +969,7 @@ start_over:
cts.conn, err = grpc.NewClient(cts.cfg.ServerAddrs[cts.cfg.Index], opts...)
if err != nil {
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Failed to make client to server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to make client to server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
goto reconnect_to_server
}
cts.hdc = NewHoduClient(cts.conn)
@ -973,20 +978,20 @@ start_over:
// there is nothing to do much about it for now.
c_seed.Version = HODU_RPC_VERSION
c_seed.Flags = 0
s_seed, err = cts.hdc.GetSeed(cts.cli.ctx, &c_seed)
s_seed, err = cts.hdc.GetSeed(cts.C.ctx, &c_seed)
if err != nil {
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to get seed from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
goto reconnect_to_server
}
cts.s_seed = *s_seed
cts.c_seed = c_seed
cts.route_next_id = 1 // reset this whenever a new connection is made. the number of routes must be zero.
cts.cli.log.Write(cts.Sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version)
cts.C.log.Write(cts.Sid, LOG_INFO, "Got seed from server[%d] %s - ver=%#x", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.s_seed.Version)
psc, err = cts.hdc.PacketStream(cts.cli.ctx)
psc, err = cts.hdc.PacketStream(cts.C.ctx)
if err != nil {
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to get packet stream from server[%d] %s - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], err.Error())
goto reconnect_to_server
}
@ -996,7 +1001,7 @@ start_over:
cts.local_addr = p.LocalAddr.String()
}
cts.cli.log.Write(cts.Sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
cts.C.log.Write(cts.Sid, LOG_INFO, "Got packet stream from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
cts.State = CLIENT_CONN_CONNECTED
@ -1007,22 +1012,22 @@ start_over:
// let's add statically configured routes to the client-side peers
err = cts.add_client_routes(cts.cfg.Routes)
if err != nil {
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Failed to add routes to server[%d] %s for %v - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.cfg.Routes, err.Error())
cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to add routes to server[%d] %s for %v - %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index], cts.cfg.Routes, err.Error())
goto done
}
}
if cts.cli.route_persister != nil {
if cts.C.route_persister != nil {
// restore the client routes added and saved via the control channel
cts.cli.route_persister.LoadAll(cts)
cts.C.route_persister.LoadAll(cts)
}
for {
var pkt *Packet
select {
case <-cts.cli.ctx.Done():
// need to log cts.cli.ctx.Err().Error()?
case <-cts.C.ctx.Done():
// need to log cts.C.ctx.Err().Error()?
goto done
case <-cts.stop_chan:
@ -1038,7 +1043,7 @@ start_over:
if status.Code(err) == codes.Canceled || errors.Is(err, net.ErrClosed) {
goto reconnect_to_server
} else {
cts.cli.log.Write(cts.Sid, LOG_INFO, "Failed to receive packet from %s - %s", cts.remote_addr, err.Error())
cts.C.log.Write(cts.Sid, LOG_INFO, "Failed to receive packet from %s - %s", cts.remote_addr, err.Error())
goto reconnect_to_server
}
}
@ -1052,16 +1057,16 @@ start_over:
if ok {
err = cts.ReportEvent(RouteId(x.Route.RouteId), 0, pkt.Kind, x.Route)
if err != nil {
cts.cli.log.Write(cts.Sid, LOG_ERROR,
cts.C.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle route_started event(%d,%s) from %s - %s",
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error())
} else {
cts.cli.log.Write(cts.Sid, LOG_DEBUG,
cts.C.log.Write(cts.Sid, LOG_DEBUG,
"Handled route_started event(%d,%s) from %s",
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr)
}
} else {
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid route_started event from %s", cts.remote_addr)
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid route_started event from %s", cts.remote_addr)
}
case PACKET_KIND_ROUTE_STOPPED:
@ -1071,16 +1076,16 @@ start_over:
if ok {
err = cts.ReportEvent(RouteId(x.Route.RouteId), 0, pkt.Kind, x.Route)
if err != nil {
cts.cli.log.Write(cts.Sid, LOG_ERROR,
cts.C.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle route_stopped event(%d,%s) from %s - %s",
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error())
} else {
cts.cli.log.Write(cts.Sid, LOG_DEBUG,
cts.C.log.Write(cts.Sid, LOG_DEBUG,
"Handled route_stopped event(%d,%s) from %s",
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr)
}
} else {
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid route_stopped event from %s", cts.remote_addr)
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid route_stopped event from %s", cts.remote_addr)
}
case PACKET_KIND_PEER_STARTED:
@ -1091,16 +1096,16 @@ start_over:
if ok {
err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STARTED, x.Peer)
if err != nil {
cts.cli.log.Write(cts.Sid, LOG_ERROR,
cts.C.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s",
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
cts.cli.log.Write(cts.Sid, LOG_DEBUG,
cts.C.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_started event from %s for peer(%d,%d,%s,%s)",
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr)
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr)
}
// PACKET_KIND_PEER_ABORTED is never sent by server to client.
@ -1114,16 +1119,16 @@ start_over:
if ok {
err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STOPPED, x.Peer)
if err != nil {
cts.cli.log.Write(cts.Sid, LOG_ERROR,
cts.C.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s",
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
cts.cli.log.Write(cts.Sid, LOG_DEBUG,
cts.C.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_stopped event from %s for peer(%d,%d,%s,%s)",
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr)
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr)
}
case PACKET_KIND_PEER_EOF:
@ -1133,16 +1138,16 @@ start_over:
if ok {
err = cts.ReportEvent(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_EOF, x.Peer)
if err != nil {
cts.cli.log.Write(cts.Sid, LOG_ERROR,
cts.C.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_eof event from %s for peer(%d,%d,%s,%s) - %s",
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
cts.cli.log.Write(cts.Sid, LOG_DEBUG,
cts.C.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_eof event from %s for peer(%d,%d,%s,%s)",
cts.remote_addr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_eof event from %s", cts.remote_addr)
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_eof event from %s", cts.remote_addr)
}
case PACKET_KIND_PEER_DATA:
@ -1153,16 +1158,16 @@ start_over:
if ok {
err = cts.ReportEvent(RouteId(x.Data.RouteId), PeerId(x.Data.PeerId), PACKET_KIND_PEER_DATA, x.Data.Data)
if err != nil {
cts.cli.log.Write(cts.Sid, LOG_ERROR,
cts.C.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_data event from %s for peer(%d,%d) - %s",
cts.remote_addr, x.Data.RouteId, x.Data.PeerId, err.Error())
} else {
cts.cli.log.Write(cts.Sid, LOG_DEBUG,
cts.C.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_data event from %s for peer(%d,%d)",
cts.remote_addr, x.Data.RouteId, x.Data.PeerId)
}
} else {
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr)
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr)
}
case PACKET_KIND_CONN_NOTICE:
@ -1171,9 +1176,11 @@ start_over:
var ok bool
x, ok = pkt.U.(*Packet_Notice)
if ok {
fmt.Printf ("CONN NOTICE [%s] from %s\n", x.Notice.Text, cts.remote_addr)
if cts.C.conn_notice != nil {
cts.C.conn_notice.Handle(cts, x.Notice.Text)
}
} else {
cts.cli.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_data event from %s", cts.remote_addr)
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_data event from %s", cts.remote_addr)
}
default:
@ -1182,7 +1189,7 @@ fmt.Printf ("CONN NOTICE [%s] from %s\n", x.Notice.Text, cts.remote_addr)
}
done:
cts.cli.log.Write(cts.Sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
cts.C.log.Write(cts.Sid, LOG_INFO, "Disconnected from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
cts.State = CLIENT_CONN_DISCONNECTED
req_stop_and_wait_for_termination:
@ -1191,23 +1198,23 @@ req_stop_and_wait_for_termination:
wait_for_termination:
cts.route_wg.Wait() // wait until all route tasks are finished
cts.cli.RemoveClientConn(cts)
cts.C.RemoveClientConn(cts)
return
reconnect_to_server:
cts.State = CLIENT_CONN_DISCONNECTING
if cts.conn != nil {
cts.cli.log.Write(cts.Sid, LOG_INFO, "Disconnecting from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
cts.C.log.Write(cts.Sid, LOG_INFO, "Disconnecting from server[%d] %s", cts.cfg.Index, cts.cfg.ServerAddrs[cts.cfg.Index])
}
cts.disconnect_from_server()
cts.State = CLIENT_CONN_DISCONNECTED
// wait for 2 seconds
slpctx, cancel_sleep = context.WithTimeout(cts.cli.ctx, 2 * time.Second)
slpctx, cancel_sleep = context.WithTimeout(cts.C.ctx, 2 * time.Second)
select {
case <-cts.cli.ctx.Done():
// need to log cts.cli.ctx.Err().Error()?
case <-cts.C.ctx.Done():
// need to log cts.C.ctx.Err().Error()?
cancel_sleep()
goto req_stop_and_wait_for_termination
case <-cts.stop_chan:
@ -1348,8 +1355,10 @@ func NewClient(ctx context.Context, name string, logger Logger, cfg *ClientConfi
c.wrap_http_handler(&client_ctl_client_conns_id_routes_id_peers{client_ctl{c: &c, id: HS_ID_CTL}}))
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/client-conns/{conn_id}/routes/{route_id}/peers/{peer_id}",
c.wrap_http_handler(&client_ctl_client_conns_id_routes_id_peers_id{client_ctl{c: &c, id: HS_ID_CTL}}))
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/client-conns/{conn_id}/notices",
c.wrap_http_handler(&client_ctl_client_conns_id_notices{client_ctl{c: &c, id: HS_ID_CTL}}))
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/notices",
c.wrap_http_handler(&client_ctl_notices{client_ctl{c: &c, id: HS_ID_CTL}}))
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/notices/{conn_id}",
c.wrap_http_handler(&client_ctl_notices_id{client_ctl{c: &c, id: HS_ID_CTL}}))
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/stats",
c.wrap_http_handler(&client_ctl_stats{client_ctl{c: &c, id: HS_ID_CTL}}))
c.ctl_mux.Handle(c.ctl_prefix + "/_ctl/token",
@ -1742,6 +1751,10 @@ func (c *Client) WriteLog(id string, level LogLevel, fmtstr string, args ...inte
c.log.Write(id, level, fmtstr, args...)
}
func (c *Client) SetConnNoticeHandler(handler ClientConnNoticeHandler) {
c.conn_notice = handler
}
func (c *Client) SetRoutePersister(persister ClientRoutePersister) {
c.route_persister = persister
}