2024-11-23 12:30:23 +09:00
package hodu
2024-11-12 22:59:37 +09:00
import "context"
import "crypto/tls"
2024-11-17 14:57:56 +09:00
import "errors"
2024-11-12 22:59:37 +09:00
import "fmt"
2024-12-07 22:18:07 +09:00
import "log"
2024-11-25 19:46:18 +09:00
import "math/rand"
2024-11-12 22:59:37 +09:00
import "net"
2024-11-16 00:03:42 +09:00
import "net/http"
2024-11-12 22:59:37 +09:00
import "sync"
import "sync/atomic"
2024-11-13 23:14:43 +09:00
import "time"
2024-11-12 22:59:37 +09:00
import "google.golang.org/grpc"
2024-11-24 22:33:19 +09:00
import "google.golang.org/grpc/codes"
2024-12-07 16:57:00 +09:00
import "google.golang.org/grpc/credentials"
2024-11-12 22:59:37 +09:00
import "google.golang.org/grpc/credentials/insecure"
2024-12-02 02:19:50 +09:00
import "google.golang.org/grpc/peer"
2024-11-24 22:33:19 +09:00
import "google.golang.org/grpc/status"
2024-11-12 22:59:37 +09:00
type PacketStreamClient grpc . BidiStreamingClient [ Packet , Packet ]
2024-11-30 18:59:36 +09:00
type ClientConnMap = map [ uint32 ] * ClientConn
2024-11-12 22:59:37 +09:00
type ClientPeerConnMap = map [ uint32 ] * ClientPeerConn
type ClientRouteMap = map [ uint32 ] * ClientRoute
2024-11-20 02:47:58 +09:00
type ClientPeerCancelFuncMap = map [ uint32 ] context . CancelFunc
2024-11-12 22:59:37 +09:00
// --------------------------------------------------------------------
type ClientConfig struct {
2024-12-08 16:06:18 +09:00
ServerAddrs [ ] string
2024-11-23 12:30:23 +09:00
PeerAddrs [ ] string
2024-12-08 16:48:23 +09:00
ServerSeedTimeout int
ServerAuthority string // http2 :authority header
2024-11-12 22:59:37 +09:00
}
2024-11-25 22:55:03 +09:00
type ClientConfigActive struct {
Id uint32
2024-12-08 16:06:18 +09:00
Index int
2024-11-25 22:55:03 +09:00
ClientConfig
}
2024-11-12 22:59:37 +09:00
type Client struct {
2024-11-16 00:03:42 +09:00
ctx context . Context
ctx_cancel context . CancelFunc
2024-12-07 16:57:00 +09:00
ctltlscfg * tls . Config
rpctlscfg * tls . Config
2024-12-03 11:52:46 +09:00
2024-11-30 20:24:30 +09:00
ext_mtx sync . Mutex
2024-11-23 12:30:23 +09:00
ext_svcs [ ] Service
2024-12-03 11:52:46 +09:00
ctl_addr [ ] string
ctl_prefix string
2024-12-01 21:47:11 +09:00
ctl_mux * http . ServeMux
2024-12-03 11:52:46 +09:00
ctl [ ] * http . Server // control server
2024-11-12 22:59:37 +09:00
2024-11-16 00:03:42 +09:00
cts_mtx sync . Mutex
2024-11-25 19:46:18 +09:00
cts_map ClientConnMap
2024-11-12 22:59:37 +09:00
2024-11-16 00:03:42 +09:00
wg sync . WaitGroup
stop_req atomic . Bool
stop_chan chan bool
2024-11-23 14:49:04 +09:00
log Logger
2024-12-08 12:13:36 +09:00
stats struct {
conns atomic . Int64
routes atomic . Int64
peers atomic . Int64
}
2024-11-16 00:03:42 +09:00
}
2024-11-12 22:59:37 +09:00
// client connection to server
2024-11-25 19:46:18 +09:00
type ClientConn struct {
2024-12-02 02:19:50 +09:00
cli * Client
cfg ClientConfigActive
id uint32
2024-12-03 11:52:46 +09:00
sid string // id rendered in string
2024-11-13 23:14:43 +09:00
2024-12-02 02:19:50 +09:00
local_addr string
remote_addr string
conn * grpc . ClientConn // grpc connection to the server
hdc HoduClient
psc * GuardedPacketStreamClient // guarded grpc stream
2024-11-12 22:59:37 +09:00
2024-12-02 02:19:50 +09:00
s_seed Seed
c_seed Seed
2024-11-20 00:31:14 +09:00
2024-12-02 02:19:50 +09:00
route_mtx sync . Mutex
route_map ClientRouteMap
route_wg sync . WaitGroup
2024-11-12 22:59:37 +09:00
2024-12-02 02:19:50 +09:00
stop_req atomic . Bool
stop_chan chan bool
2024-11-12 22:59:37 +09:00
}
type ClientRoute struct {
2024-11-25 19:46:18 +09:00
cts * ClientConn
2024-11-12 22:59:37 +09:00
id uint32
2024-11-30 02:53:47 +09:00
peer_addr string
2024-11-28 01:29:02 +09:00
server_peer_listen_addr * net . TCPAddr
2024-12-05 18:24:42 +09:00
server_peer_net string
server_peer_proto ROUTE_PROTO
2024-11-12 22:59:37 +09:00
2024-11-20 02:47:58 +09:00
ptc_mtx sync . Mutex
ptc_map ClientPeerConnMap
ptc_cancel_map ClientPeerCancelFuncMap
2024-11-12 22:59:37 +09:00
ptc_wg sync . WaitGroup
2024-11-13 23:14:43 +09:00
stop_req atomic . Bool
stop_chan chan bool
2024-11-12 22:59:37 +09:00
}
2024-11-30 20:06:59 +09:00
type ClientPeerConn struct {
route * ClientRoute
conn_id uint32
conn * net . TCPConn
2024-12-01 17:20:16 +09:00
pts_laddr string // server-local addreess of the server-side peer
pts_raddr string // address of the server-side peer
pts_eof atomic . Bool
2024-11-30 20:06:59 +09:00
stop_chan chan bool
stop_req atomic . Bool
}
2024-11-18 22:25:59 +09:00
type GuardedPacketStreamClient struct {
mtx sync . Mutex
//psc Hodu_PacketStreamClient
Hodu_PacketStreamClient
}
// ------------------------------------
func ( g * GuardedPacketStreamClient ) Send ( data * Packet ) error {
g . mtx . Lock ( )
defer g . mtx . Unlock ( )
//return g.psc.Send(data)
return g . Hodu_PacketStreamClient . Send ( data )
}
/ * func ( g * GuardedPacketStreamClient ) Recv ( ) ( * Packet , error ) {
return g . psc . Recv ( )
}
func ( g * GuardedPacketStreamClient ) Context ( ) context . Context {
return g . psc . Context ( )
} * /
2024-11-12 22:59:37 +09:00
// --------------------------------------------------------------------
2024-12-05 18:24:42 +09:00
func NewClientRoute ( cts * ClientConn , id uint32 , client_peer_addr string , server_peer_net string , server_peer_proto ROUTE_PROTO ) * ClientRoute {
2024-11-12 22:59:37 +09:00
var r ClientRoute
r . cts = cts
r . id = id
r . ptc_map = make ( ClientPeerConnMap )
2024-11-20 02:47:58 +09:00
r . ptc_cancel_map = make ( ClientPeerCancelFuncMap )
2024-12-05 18:24:42 +09:00
r . peer_addr = client_peer_addr // client-side peer
r . server_peer_net = server_peer_net // permitted network for server-side peer
r . server_peer_proto = server_peer_proto
2024-11-13 23:14:43 +09:00
r . stop_req . Store ( false )
2024-11-24 20:39:51 +09:00
r . stop_chan = make ( chan bool , 8 )
2024-11-12 22:59:37 +09:00
2024-11-23 14:49:04 +09:00
return & r
2024-11-12 22:59:37 +09:00
}
2024-12-01 17:20:16 +09:00
func ( r * ClientRoute ) AddNewClientPeerConn ( c * net . TCPConn , pts_id uint32 , pts_raddr string , pts_laddr string ) ( * ClientPeerConn , error ) {
var ptc * ClientPeerConn
r . ptc_mtx . Lock ( )
ptc = NewClientPeerConn ( r , c , pts_id , pts_raddr , pts_laddr )
r . ptc_map [ ptc . conn_id ] = ptc
2024-12-08 12:13:36 +09:00
r . cts . cli . stats . peers . Add ( 1 )
2024-12-05 01:26:44 +09:00
r . ptc_mtx . Unlock ( )
2024-12-01 17:20:16 +09:00
2024-12-05 01:26:44 +09:00
r . cts . cli . log . Write ( r . cts . sid , LOG_INFO , "Added client-side peer(%d,%d,%s,%s)" , r . id , ptc . conn_id , ptc . conn . RemoteAddr ( ) . String ( ) , ptc . conn . LocalAddr ( ) . String ( ) )
2024-12-01 17:20:16 +09:00
return ptc , nil
}
func ( r * ClientRoute ) RemoveClientPeerConn ( ptc * ClientPeerConn ) error {
var c * ClientPeerConn
var ok bool
r . ptc_mtx . Lock ( )
c , ok = r . ptc_map [ ptc . conn_id ]
if ! ok {
r . ptc_mtx . Unlock ( )
return fmt . Errorf ( "non-existent peer id - %d" , ptc . conn_id )
}
if c != ptc {
r . ptc_mtx . Unlock ( )
2024-12-05 01:26:44 +09:00
return fmt . Errorf ( "conflicting peer id - %d" , ptc . conn_id )
2024-12-01 17:20:16 +09:00
}
delete ( r . ptc_map , ptc . conn_id )
2024-12-08 12:13:36 +09:00
r . cts . cli . stats . peers . Add ( - 1 )
2024-12-01 17:20:16 +09:00
r . ptc_mtx . Unlock ( )
2024-12-05 01:26:44 +09:00
r . cts . cli . log . Write ( r . cts . sid , LOG_INFO , "Removed client-side peer(%d,%d,%s,%s)" , r . id , ptc . conn_id , ptc . conn . RemoteAddr ( ) . String ( ) , ptc . conn . LocalAddr ( ) . String ( ) )
2024-12-01 17:20:16 +09:00
ptc . ReqStop ( )
return nil
}
2024-12-08 12:13:36 +09:00
/ * func ( r * ClientRoute ) RemoveAllClientPeerConns ( ) {
2024-12-01 19:11:12 +09:00
var c * ClientPeerConn
r . ptc_mtx . Lock ( )
defer r . ptc_mtx . Unlock ( )
for _ , c = range r . ptc_map {
delete ( r . ptc_map , c . conn_id )
2024-12-08 12:13:36 +09:00
r . cts . cli . stats . peers . Add ( - 1 )
2024-12-01 19:11:12 +09:00
c . ReqStop ( )
}
2024-12-08 12:13:36 +09:00
} * /
2024-12-01 19:11:12 +09:00
func ( r * ClientRoute ) ReqStopAllClientPeerConns ( ) {
var c * ClientPeerConn
r . ptc_mtx . Lock ( )
defer r . ptc_mtx . Unlock ( )
for _ , c = range r . ptc_map {
c . ReqStop ( )
}
}
2024-12-01 17:20:16 +09:00
func ( r * ClientRoute ) FindClientPeerConnById ( conn_id uint32 ) * ClientPeerConn {
var c * ClientPeerConn
var ok bool
r . ptc_mtx . Lock ( )
defer r . ptc_mtx . Unlock ( )
c , ok = r . ptc_map [ conn_id ]
if ! ok {
return nil
}
return c
}
2024-11-17 00:47:15 +09:00
func ( r * ClientRoute ) RunTask ( wg * sync . WaitGroup ) {
2024-11-24 22:33:19 +09:00
var err error
2024-11-12 22:59:37 +09:00
// this task on the route object isn't actually necessary.
2024-11-13 23:14:43 +09:00
// most useful works are triggered by ReportEvent() and done by ConnectToPeer()
2024-11-17 00:47:15 +09:00
defer wg . Done ( )
2024-11-13 23:14:43 +09:00
2024-12-05 18:24:42 +09:00
err = r . cts . psc . Send ( MakeRouteStartPacket ( r . id , r . server_peer_proto , r . peer_addr , r . server_peer_net ) )
2024-11-24 22:33:19 +09:00
if err != nil {
2024-12-05 18:24:42 +09:00
r . cts . cli . log . Write ( r . cts . sid , LOG_DEBUG ,
"Failed to send route_start for route(%d,%s,%v,%v) to %s" ,
r . id , r . peer_addr , r . server_peer_proto , r . server_peer_net , r . cts . remote_addr )
2024-11-25 19:46:18 +09:00
goto done
2024-12-05 23:05:47 +09:00
} else {
r . cts . cli . log . Write ( r . cts . sid , LOG_DEBUG ,
"Sent route_start for route(%d,%s,%v,%v) to %s" ,
r . id , r . peer_addr , r . server_peer_proto , r . server_peer_net , r . cts . remote_addr )
2024-11-24 22:33:19 +09:00
}
2024-11-13 23:14:43 +09:00
main_loop :
for {
select {
2024-11-25 19:46:18 +09:00
case <- r . stop_chan :
2024-11-13 23:14:43 +09:00
break main_loop
}
}
2024-11-17 00:47:15 +09:00
2024-11-24 22:33:19 +09:00
done :
r . ReqStop ( )
2024-11-17 00:47:15 +09:00
r . ptc_wg . Wait ( ) // wait for all peer tasks are finished
2024-11-24 22:33:19 +09:00
2024-12-05 23:05:47 +09:00
err = r . cts . psc . Send ( MakeRouteStopPacket ( r . id , r . server_peer_proto , r . peer_addr , r . server_peer_net ) )
if err != nil {
r . cts . cli . log . Write ( r . cts . sid , LOG_DEBUG ,
"Failed to route_stop for route(%d,%s,%v,%v) to %s - %s" ,
r . id , r . peer_addr , r . server_peer_proto , r . server_peer_net , r . cts . remote_addr , err . Error ( ) )
} else {
r . cts . cli . log . Write ( r . cts . sid , LOG_DEBUG ,
"Sent route_stop for route(%d,%s,%v,%v) to %s" ,
r . id , r . peer_addr , r . server_peer_proto , r . server_peer_net , r . cts . remote_addr )
}
2024-12-05 01:26:44 +09:00
2024-11-24 22:33:19 +09:00
r . cts . RemoveClientRoute ( r )
2024-11-12 22:59:37 +09:00
}
2024-11-13 23:14:43 +09:00
func ( r * ClientRoute ) ReqStop ( ) {
if r . stop_req . CompareAndSwap ( false , true ) {
var ptc * ClientPeerConn
for _ , ptc = range r . ptc_map {
ptc . ReqStop ( )
}
r . stop_chan <- true
}
2024-11-12 22:59:37 +09:00
}
2024-12-01 17:20:16 +09:00
func ( r * ClientRoute ) ConnectToPeer ( pts_id uint32 , pts_raddr string , pts_laddr string , wg * sync . WaitGroup ) {
2024-11-12 22:59:37 +09:00
var err error
2024-11-13 23:14:43 +09:00
var conn net . Conn
2024-11-20 00:35:58 +09:00
var real_conn * net . TCPConn
2024-12-05 01:26:44 +09:00
var real_conn_raddr string
var real_conn_laddr string
2024-11-12 22:59:37 +09:00
var ptc * ClientPeerConn
2024-11-13 23:14:43 +09:00
var d net . Dialer
var ctx context . Context
2024-11-20 02:47:58 +09:00
var cancel context . CancelFunc
2024-11-20 00:35:58 +09:00
var ok bool
2024-11-13 23:14:43 +09:00
2024-11-20 02:47:58 +09:00
defer wg . Done ( )
2024-11-30 02:53:47 +09:00
// TODO: make timeout value configurable
2024-11-13 23:14:43 +09:00
// TODO: fire the cancellation function upon stop request???
2024-11-20 02:47:58 +09:00
ctx , cancel = context . WithTimeout ( r . cts . cli . ctx , 10 * time . Second )
r . ptc_mtx . Lock ( )
r . ptc_cancel_map [ pts_id ] = cancel
r . ptc_mtx . Unlock ( )
2024-11-13 23:14:43 +09:00
d . LocalAddr = nil // TOOD: use this if local address is specified
2024-11-30 02:53:47 +09:00
conn , err = d . DialContext ( ctx , "tcp" , r . peer_addr )
2024-11-20 02:47:58 +09:00
r . ptc_mtx . Lock ( )
delete ( r . ptc_cancel_map , pts_id )
r . ptc_mtx . Unlock ( )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-05 01:26:44 +09:00
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR ,
"Failed to connect to %s for route(%d,%d,%s,%s) - %s" ,
r . peer_addr , r . id , pts_id , pts_raddr , pts_laddr , err . Error ( ) )
2024-11-20 00:48:02 +09:00
goto peer_aborted
2024-11-12 22:59:37 +09:00
}
2024-11-20 00:35:58 +09:00
real_conn , ok = conn . ( * net . TCPConn )
if ! ok {
2024-12-05 01:26:44 +09:00
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR ,
"Failed to get connection information to %s for route(%d,%d,%s,%s) - %s" ,
r . peer_addr , r . id , pts_id , pts_raddr , pts_laddr , err . Error ( ) )
2024-11-20 00:48:02 +09:00
goto peer_aborted
2024-11-20 00:35:58 +09:00
}
2024-12-05 01:26:44 +09:00
real_conn_raddr = real_conn . RemoteAddr ( ) . String ( )
real_conn_laddr = real_conn . LocalAddr ( ) . String ( )
2024-12-01 17:20:16 +09:00
ptc , err = r . AddNewClientPeerConn ( real_conn , pts_id , pts_raddr , pts_laddr )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-05 01:26:44 +09:00
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR ,
"Failed to add client peer %s for route(%d,%d,%s,%s) - %s" ,
r . peer_addr , r . id , pts_id , pts_raddr , pts_laddr , err . Error ( ) )
2024-11-20 00:48:02 +09:00
goto peer_aborted
2024-11-12 22:59:37 +09:00
}
2024-12-03 20:28:04 +09:00
2024-12-05 01:26:44 +09:00
// ptc.conn is equal to pts_id as assigned in r.AddNewClientPeerConn()
err = r . cts . psc . Send ( MakePeerStartedPacket ( r . id , ptc . conn_id , real_conn_raddr , real_conn_laddr ) )
2024-11-17 14:57:56 +09:00
if err != nil {
2024-12-05 01:26:44 +09:00
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR ,
"Failed to send peer_start(%d,%d,%s,%s) for route(%d,%d,%s,%s) - %s" ,
r . id , ptc . conn_id , real_conn_raddr , real_conn_laddr ,
r . id , pts_id , pts_raddr , pts_laddr , err . Error ( ) )
2024-11-20 00:48:02 +09:00
goto peer_aborted
2024-11-17 14:57:56 +09:00
}
2024-11-12 22:59:37 +09:00
2024-11-20 02:47:58 +09:00
wg . Add ( 1 )
go ptc . RunTask ( wg )
2024-11-20 00:48:02 +09:00
return
peer_aborted :
2024-12-05 01:26:44 +09:00
// real_conn_radd and real_conn_laddr may be empty depending on when the jump to here is made.
err = r . cts . psc . Send ( MakePeerAbortedPacket ( r . id , pts_id , real_conn_raddr , real_conn_laddr ) )
if err != nil {
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR ,
"Failed to send peer_aborted(%d,%d) for route(%d,%d,%s,%s) - %s" ,
r . id , pts_id , r . id , pts_id , pts_raddr , pts_laddr , err . Error ( ) )
}
2024-11-20 00:48:02 +09:00
if conn != nil {
conn . Close ( )
}
2024-11-12 22:59:37 +09:00
}
2024-12-08 17:47:08 +09:00
func ( r * ClientRoute ) DisconnectFromPeer ( ptc * ClientPeerConn ) error {
2024-12-05 01:26:44 +09:00
var p * ClientPeerConn
2024-11-20 02:47:58 +09:00
var cancel context . CancelFunc
2024-11-20 00:31:14 +09:00
var ok bool
r . ptc_mtx . Lock ( )
2024-12-05 01:26:44 +09:00
p , ok = r . ptc_map [ ptc . conn_id ]
if ok && p == ptc {
cancel , ok = r . ptc_cancel_map [ ptc . conn_id ]
if ok {
cancel ( )
}
2024-11-20 00:31:14 +09:00
}
r . ptc_mtx . Unlock ( )
ptc . ReqStop ( )
return nil
}
2024-11-28 01:29:02 +09:00
func ( r * ClientRoute ) ReportEvent ( pts_id uint32 , event_type PACKET_KIND , event_data interface { } ) error {
2024-11-20 00:31:14 +09:00
var err error
2024-11-12 22:59:37 +09:00
switch event_type {
2024-11-28 01:29:02 +09:00
case PACKET_KIND_ROUTE_STARTED :
var ok bool
2024-12-05 01:26:44 +09:00
var rd * RouteDesc
rd , ok = event_data . ( * RouteDesc )
2024-11-28 01:29:02 +09:00
if ! ok {
2024-12-05 01:26:44 +09:00
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR , "Protocol error - invalid data in route_started event(%d)" , r . id )
r . ReqStop ( )
2024-11-28 01:29:02 +09:00
} else {
var addr * net . TCPAddr
2024-12-05 01:31:59 +09:00
addr , err = net . ResolveTCPAddr ( "tcp" , rd . TargetAddrStr )
2024-11-28 01:29:02 +09:00
if err != nil {
2024-12-05 01:31:59 +09:00
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR , "Protocol error - invalid service address(%s) for server peer in route_started event(%d)" , rd . TargetAddrStr , r . id )
2024-12-05 01:26:44 +09:00
r . ReqStop ( )
2024-11-28 01:29:02 +09:00
} else {
r . server_peer_listen_addr = addr
2024-12-05 18:24:42 +09:00
r . server_peer_net = rd . ServiceNetStr
2024-11-28 01:29:02 +09:00
}
}
case PACKET_KIND_ROUTE_STOPPED :
2024-12-05 23:05:47 +09:00
// NOTE:
// this event can be sent by the server in response to failed ROUTE_START or successful ROUTE_STOP.
// in case of the failed ROUTE_START, r.ReqStop() may trigger another ROUTE_STOP sent to the server.
// but the server must be able to handle this case as invalid route.
var ok bool
_ , ok = event_data . ( * RouteDesc )
if ! ok {
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR , "Protocol error - invalid data in route_started event(%d)" , r . id )
r . ReqStop ( )
} else {
r . ReqStop ( )
}
2024-11-28 01:29:02 +09:00
2024-11-12 22:59:37 +09:00
case PACKET_KIND_PEER_STARTED :
2024-12-01 17:20:16 +09:00
var ok bool
var pd * PeerDesc
pd , ok = event_data . ( * PeerDesc )
if ! ok {
2024-12-05 01:26:44 +09:00
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR ,
"Protocol error - invalid data in peer_started event(%d,%d)" , r . id , pts_id )
r . ReqStop ( )
2024-12-01 17:20:16 +09:00
} else {
r . ptc_wg . Add ( 1 )
go r . ConnectToPeer ( pts_id , pd . RemoteAddrStr , pd . LocalAddrStr , & r . ptc_wg )
}
2024-11-12 22:59:37 +09:00
2024-11-20 00:48:02 +09:00
case PACKET_KIND_PEER_ABORTED :
2024-12-05 01:26:44 +09:00
var ptc * ClientPeerConn
ptc = r . FindClientPeerConnById ( pts_id )
if ptc != nil {
var ok bool
var pd * PeerDesc
pd , ok = event_data . ( * PeerDesc )
if ! ok {
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR ,
"Protocol error - invalid data in peer_aborted event(%d,%d)" , r . id , pts_id )
r . ReqStop ( )
} else {
err = r . DisconnectFromPeer ( ptc )
if err != nil {
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR ,
"Failed to disconnect from peer(%d,%d,%s,%s) - %s" ,
r . id , pts_id , pd . RemoteAddrStr , pd . LocalAddrStr , err . Error ( ) )
ptc . ReqStop ( )
}
}
}
2024-11-20 00:31:14 +09:00
case PACKET_KIND_PEER_STOPPED :
2024-12-05 01:26:44 +09:00
var ptc * ClientPeerConn
ptc = r . FindClientPeerConnById ( pts_id )
if ptc != nil {
var ok bool
var pd * PeerDesc
pd , ok = event_data . ( * PeerDesc )
if ! ok {
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR ,
"Protocol error - invalid data in peer_stopped event(%d,%d)" ,
r . id , pts_id )
ptc . ReqStop ( )
} else {
err = r . DisconnectFromPeer ( ptc )
if err != nil {
r . cts . cli . log . Write ( r . cts . sid , LOG_WARN ,
"Failed to disconnect from peer(%d,%d,%s,%s) - %s" ,
r . id , pts_id , pd . RemoteAddrStr , pd . LocalAddrStr , err . Error ( ) )
ptc . ReqStop ( )
}
}
2024-11-20 00:31:14 +09:00
}
case PACKET_KIND_PEER_EOF :
2024-12-05 01:26:44 +09:00
var ptc * ClientPeerConn
ptc = r . FindClientPeerConnById ( pts_id )
if ptc != nil {
var ok bool
_ , ok = event_data . ( * PeerDesc )
if ! ok {
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR ,
"Protocol error - invalid data in peer_eof event(%d,%d)" ,
r . id , pts_id )
ptc . ReqStop ( )
} else {
ptc . CloseWrite ( )
}
2024-11-20 00:31:14 +09:00
}
2024-11-17 14:57:56 +09:00
case PACKET_KIND_PEER_DATA :
var ptc * ClientPeerConn
2024-11-28 01:29:02 +09:00
2024-12-05 01:26:44 +09:00
ptc = r . FindClientPeerConnById ( pts_id )
if ptc != nil {
var ok bool
var data [ ] byte
2024-11-28 01:29:02 +09:00
data , ok = event_data . ( [ ] byte )
2024-12-05 01:26:44 +09:00
if ! ok {
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR ,
"Protocol error - invalid data in peer_data event(%d,%d)" ,
r . id , pts_id )
ptc . ReqStop ( )
2024-11-28 01:29:02 +09:00
} else {
2024-12-05 01:26:44 +09:00
_ , err = ptc . conn . Write ( data )
if err != nil {
2024-12-07 21:31:06 +09:00
r . cts . cli . log . Write ( r . cts . sid , LOG_ERROR ,
"Failed to write to peer(%d,%d,%s,%s) - %s" ,
r . id , pts_id , ptc . conn . RemoteAddr ( ) . String ( ) , ptc . conn . LocalAddr ( ) . String ( ) , err . Error ( ) )
2024-12-05 01:26:44 +09:00
ptc . ReqStop ( )
}
2024-11-28 01:29:02 +09:00
}
2024-11-17 14:57:56 +09:00
}
2024-12-05 01:26:44 +09:00
default :
// ignore all others
2024-11-12 22:59:37 +09:00
}
return nil
}
// --------------------------------------------------------------------
2024-11-30 02:53:47 +09:00
func NewClientConn ( c * Client , cfg * ClientConfig ) * ClientConn {
2024-11-25 19:46:18 +09:00
var cts ClientConn
2024-11-17 00:47:15 +09:00
cts . cli = c
cts . route_map = make ( ClientRouteMap )
2024-11-25 22:55:03 +09:00
cts . cfg . ClientConfig = * cfg
2024-11-17 00:47:15 +09:00
cts . stop_req . Store ( false )
2024-11-24 20:39:51 +09:00
cts . stop_chan = make ( chan bool , 8 )
2024-11-17 00:47:15 +09:00
// the actual connection to the server is established in the main task function
// The cts.conn, cts.hdc, cts.psc fields are left unassigned here.
return & cts
}
2024-11-12 22:59:37 +09:00
2024-12-05 18:24:42 +09:00
func ( cts * ClientConn ) AddNewClientRoute ( addr string , server_peer_net string , proto ROUTE_PROTO ) ( * ClientRoute , error ) {
2024-11-12 22:59:37 +09:00
var r * ClientRoute
2024-11-28 01:29:02 +09:00
var id uint32
var ok bool
2024-11-12 22:59:37 +09:00
cts . route_mtx . Lock ( )
2024-11-28 01:29:02 +09:00
id = rand . Uint32 ( )
for {
_ , ok = cts . route_map [ id ]
if ! ok { break }
id ++
2024-11-12 22:59:37 +09:00
}
2024-11-28 01:29:02 +09:00
//if cts.route_map[route_id] != nil {
// cts.route_mtx.Unlock()
// return nil, fmt.Errorf("existent route id - %d", route_id)
//}
2024-12-05 18:24:42 +09:00
r = NewClientRoute ( cts , id , addr , server_peer_net , proto )
2024-11-28 01:29:02 +09:00
cts . route_map [ id ] = r
2024-12-08 12:13:36 +09:00
cts . cli . stats . routes . Add ( 1 )
2024-11-12 22:59:37 +09:00
cts . route_mtx . Unlock ( )
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_INFO , "Added route(%d,%s)" , id , addr )
2024-11-17 00:47:15 +09:00
cts . route_wg . Add ( 1 )
go r . RunTask ( & cts . route_wg )
2024-11-12 22:59:37 +09:00
return r , nil
}
2024-11-30 18:59:36 +09:00
func ( cts * ClientConn ) ReqStopAllClientRoutes ( ) {
2024-11-30 02:53:47 +09:00
var r * ClientRoute
cts . route_mtx . Lock ( )
2024-11-30 18:59:36 +09:00
defer cts . route_mtx . Unlock ( )
for _ , r = range cts . route_map {
r . ReqStop ( )
}
}
2024-12-08 12:13:36 +09:00
/ *
2024-11-30 18:59:36 +09:00
func ( cts * ClientConn ) RemoveAllClientRoutes ( ) {
var r * ClientRoute
cts . route_mtx . Lock ( )
defer cts . route_mtx . Unlock ( )
2024-11-30 02:53:47 +09:00
for _ , r = range cts . route_map {
delete ( cts . route_map , r . id )
2024-12-08 12:13:36 +09:00
cts . cli . stats . routes . Add ( - 1 )
2024-11-30 02:53:47 +09:00
r . ReqStop ( )
}
2024-12-08 12:13:36 +09:00
} * /
2024-11-30 02:53:47 +09:00
2024-11-25 19:46:18 +09:00
func ( cts * ClientConn ) RemoveClientRoute ( route * ClientRoute ) error {
2024-11-12 22:59:37 +09:00
var r * ClientRoute
var ok bool
cts . route_mtx . Lock ( )
2024-11-24 20:39:51 +09:00
r , ok = cts . route_map [ route . id ]
2024-11-25 19:46:18 +09:00
if ! ok {
2024-11-12 22:59:37 +09:00
cts . route_mtx . Unlock ( )
2024-11-25 19:46:18 +09:00
return fmt . Errorf ( "non-existent route id - %d" , route . id )
2024-11-12 22:59:37 +09:00
}
2024-11-24 20:39:51 +09:00
if r != route {
cts . route_mtx . Unlock ( )
2024-12-05 01:26:44 +09:00
return fmt . Errorf ( "conflicting route id - %d" , route . id )
2024-11-24 20:39:51 +09:00
}
delete ( cts . route_map , route . id )
2024-12-08 12:13:36 +09:00
cts . cli . stats . routes . Add ( - 1 )
2024-11-12 22:59:37 +09:00
cts . route_mtx . Unlock ( )
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_INFO , "Removed route(%d,%s)" , route . id , route . peer_addr )
2024-11-24 20:39:51 +09:00
r . ReqStop ( )
2024-11-23 14:49:04 +09:00
return nil
2024-11-12 22:59:37 +09:00
}
2024-11-25 19:46:18 +09:00
func ( cts * ClientConn ) RemoveClientRouteById ( route_id uint32 ) error {
2024-11-17 00:47:15 +09:00
var r * ClientRoute
2024-11-24 20:39:51 +09:00
var ok bool
2024-11-17 00:47:15 +09:00
cts . route_mtx . Lock ( )
2024-11-24 20:39:51 +09:00
r , ok = cts . route_map [ route_id ]
2024-11-25 19:46:18 +09:00
if ! ok {
2024-11-24 20:39:51 +09:00
cts . route_mtx . Unlock ( )
2024-11-25 19:46:18 +09:00
return fmt . Errorf ( "non-existent route id - %d" , route_id )
2024-11-17 00:47:15 +09:00
}
2024-11-24 20:39:51 +09:00
delete ( cts . route_map , route_id )
2024-12-08 12:13:36 +09:00
cts . cli . stats . routes . Add ( - 1 )
2024-11-17 00:47:15 +09:00
cts . route_mtx . Unlock ( )
2024-11-24 20:39:51 +09:00
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_INFO , "Removed route(%d,%s)" , r . id , r . peer_addr )
2024-11-24 20:39:51 +09:00
r . ReqStop ( )
return nil
2024-11-17 00:47:15 +09:00
}
2024-11-30 00:19:39 +09:00
func ( cts * ClientConn ) FindClientRouteById ( route_id uint32 ) * ClientRoute {
var r * ClientRoute
var ok bool
cts . route_mtx . Lock ( )
r , ok = cts . route_map [ route_id ]
if ! ok {
cts . route_mtx . Unlock ( )
return nil
}
cts . route_mtx . Unlock ( )
return r
}
2024-11-25 19:46:18 +09:00
func ( cts * ClientConn ) AddClientRoutes ( peer_addrs [ ] string ) error {
2024-11-12 22:59:37 +09:00
var v string
var err error
2024-11-28 01:29:02 +09:00
for _ , v = range peer_addrs {
2024-12-05 18:24:42 +09:00
_ , err = cts . AddNewClientRoute ( v , "" , ROUTE_PROTO_TCP )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-11-30 02:53:47 +09:00
return fmt . Errorf ( "unable to add client route for %s - %s" , v , err . Error ( ) )
2024-11-12 22:59:37 +09:00
}
}
2024-11-23 14:49:04 +09:00
return nil
2024-11-12 22:59:37 +09:00
}
2024-11-25 19:46:18 +09:00
func ( cts * ClientConn ) disconnect_from_server ( ) {
2024-11-24 22:33:19 +09:00
if cts . conn != nil {
2024-11-25 19:46:18 +09:00
var r * ClientRoute
2024-11-24 20:39:51 +09:00
2024-11-24 22:33:19 +09:00
cts . route_mtx . Lock ( )
for _ , r = range cts . route_map {
r . ReqStop ( )
}
cts . route_mtx . Unlock ( )
2024-11-24 20:39:51 +09:00
2024-11-24 22:33:19 +09:00
cts . conn . Close ( )
// don't reset cts.conn to nil here
// if this function is called from RunTask()
// for reconnection, it will be set to a new value
// immediately after the start_over lable in it.
// if it's called from ReqStop(), we don't really
// need to care about it.
2024-12-08 16:06:18 +09:00
cts . local_addr = ""
cts . remote_addr = ""
2024-11-24 20:39:51 +09:00
}
}
2024-11-25 19:46:18 +09:00
func ( cts * ClientConn ) ReqStop ( ) {
2024-11-13 23:14:43 +09:00
if cts . stop_req . CompareAndSwap ( false , true ) {
2024-11-24 22:33:19 +09:00
cts . disconnect_from_server ( )
2024-11-13 23:14:43 +09:00
cts . stop_chan <- true
2024-11-12 22:59:37 +09:00
}
}
2024-11-13 02:20:25 +09:00
2024-12-08 16:48:23 +09:00
func timed_interceptor ( tmout_sec int ) grpc . UnaryClientInterceptor {
// The client calls GetSeed() as the first call to the server.
// To simulate a kind of connect timeout to the server and find out an unresponsive server,
// Place a unary intercepter that places a new context with a timeout on the GetSeed() call.
2024-12-08 17:47:08 +09:00
return func ( ctx context . Context , method string , req , reply interface { } , cc * grpc . ClientConn , invoker grpc . UnaryInvoker , opts ... grpc . CallOption ) error {
2024-12-08 16:48:23 +09:00
var cancel context . CancelFunc
if tmout_sec > 0 && method == Hodu_GetSeed_FullMethodName {
ctx , cancel = context . WithTimeout ( ctx , time . Duration ( tmout_sec ) * time . Second )
defer cancel ( )
}
return invoker ( ctx , method , req , reply , cc , opts ... )
2024-12-08 17:47:08 +09:00
}
2024-12-08 16:48:23 +09:00
}
2024-11-25 19:46:18 +09:00
func ( cts * ClientConn ) RunTask ( wg * sync . WaitGroup ) {
2024-11-13 23:14:43 +09:00
var psc PacketStreamClient
2024-11-17 00:47:15 +09:00
var slpctx context . Context
2024-11-20 00:31:14 +09:00
var c_seed Seed
var s_seed * Seed
2024-12-02 02:19:50 +09:00
var p * peer . Peer
var ok bool
2024-11-12 22:59:37 +09:00
var err error
2024-12-08 13:34:47 +09:00
var opts [ ] grpc . DialOption
2024-11-12 22:59:37 +09:00
2024-11-16 00:03:42 +09:00
defer wg . Done ( ) // arrange to call at the end of this function
2024-11-13 02:20:25 +09:00
2024-11-17 00:47:15 +09:00
start_over :
2024-12-08 16:06:18 +09:00
cts . cfg . Index = ( cts . cfg . Index + 1 ) % len ( cts . cfg . ServerAddrs )
2024-12-08 16:48:23 +09:00
cts . cli . log . Write ( cts . sid , LOG_INFO , "Connecting to server[%d] %s" , cts . cfg . Index , cts . cfg . ServerAddrs [ cts . cfg . Index ] )
2024-12-07 16:57:00 +09:00
if cts . cli . rpctlscfg == nil {
2024-12-08 13:34:47 +09:00
opts = append ( opts , grpc . WithTransportCredentials ( insecure . NewCredentials ( ) ) )
2024-12-08 16:48:23 +09:00
if cts . cfg . ServerAuthority != "" { opts = append ( opts , grpc . WithAuthority ( cts . cfg . ServerAuthority ) ) }
2024-12-07 16:57:00 +09:00
} else {
2024-12-08 13:34:47 +09:00
opts = append ( opts , grpc . WithTransportCredentials ( credentials . NewTLS ( cts . cli . rpctlscfg ) ) )
// set the http2 :authority header with tls server name defined.
2024-12-08 16:48:23 +09:00
if cts . cfg . ServerAuthority != "" {
opts = append ( opts , grpc . WithAuthority ( cts . cfg . ServerAuthority ) )
} else if cts . cli . rpctlscfg . ServerName != "" {
opts = append ( opts , grpc . WithAuthority ( cts . cli . rpctlscfg . ServerName ) )
}
}
if cts . cfg . ServerSeedTimeout > 0 {
opts = append ( opts , grpc . WithUnaryInterceptor ( timed_interceptor ( cts . cfg . ServerSeedTimeout ) ) )
2024-12-07 16:57:00 +09:00
}
2024-12-08 16:48:23 +09:00
2024-12-08 16:06:18 +09:00
cts . conn , err = grpc . NewClient ( cts . cfg . ServerAddrs [ cts . cfg . Index ] , opts ... )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-08 16:48:23 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR , "Failed to make client to server[%d] %s - %s" , cts . cfg . Index , cts . cfg . ServerAddrs [ cts . cfg . Index ] , err . Error ( ) )
2024-11-17 00:47:15 +09:00
goto reconnect_to_server
2024-11-12 22:59:37 +09:00
}
2024-11-24 22:33:19 +09:00
cts . hdc = NewHoduClient ( cts . conn )
2024-11-12 22:59:37 +09:00
2024-11-24 22:33:19 +09:00
// TODO: HANDLE connection timeout.. may have to run GetSeed or PacketStream in anther goroutnine
// ctx, _/*cancel*/ := context.WithTimeout(context.Background(), time.Second)
2024-11-20 00:31:14 +09:00
// seed exchange is for furture expansion of the protocol
// there is nothing to do much about it for now.
2024-12-07 21:24:06 +09:00
c_seed . Version = HODU_RPC_VERSION
2024-11-20 00:31:14 +09:00
c_seed . Flags = 0
2024-11-24 22:33:19 +09:00
s_seed , err = cts . hdc . GetSeed ( cts . cli . ctx , & c_seed )
2024-11-20 00:31:14 +09:00
if err != nil {
2024-12-08 16:06:18 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR , "Failed to get seed from server[%d] %s - %s" , cts . cfg . Index , cts . cfg . ServerAddrs [ cts . cfg . Index ] , err . Error ( ) )
2024-11-20 00:31:14 +09:00
goto reconnect_to_server
}
cts . s_seed = * s_seed
cts . c_seed = c_seed
2024-12-08 16:06:18 +09:00
cts . cli . log . Write ( cts . sid , LOG_INFO , "Got seed from server[%d] %s - ver=%#x" , cts . cfg . Index , cts . cfg . ServerAddrs [ cts . cfg . Index ] , cts . s_seed . Version )
2024-11-24 22:33:19 +09:00
psc , err = cts . hdc . PacketStream ( cts . cli . ctx )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-08 16:06:18 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR , "Failed to get packet stream from server[%d] %s - %s" , cts . cfg . Index , cts . cfg . ServerAddrs [ cts . cfg . Index ] , err . Error ( ) )
2024-11-17 00:47:15 +09:00
goto reconnect_to_server
2024-11-12 22:59:37 +09:00
}
2024-12-02 02:19:50 +09:00
p , ok = peer . FromContext ( psc . Context ( ) )
if ok {
cts . remote_addr = p . Addr . String ( )
cts . local_addr = p . LocalAddr . String ( )
}
2024-12-08 16:06:18 +09:00
cts . cli . log . Write ( cts . sid , LOG_INFO , "Got packet stream from server[%d] %s" , cts . cfg . Index , cts . cfg . ServerAddrs [ cts . cfg . Index ] )
2024-11-24 20:39:51 +09:00
2024-11-18 22:25:59 +09:00
cts . psc = & GuardedPacketStreamClient { Hodu_PacketStreamClient : psc }
2024-11-12 22:59:37 +09:00
2024-12-05 01:26:44 +09:00
if len ( cts . cfg . PeerAddrs ) > 0 {
// the connection structure to a server is ready.
// let's add routes to the client-side peers if given
err = cts . AddClientRoutes ( cts . cfg . PeerAddrs )
if err != nil {
2024-12-08 16:06:18 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR , "Failed to add routes to server[%s] %s for %v - %s" , cts . cfg . Index , cts . cfg . ServerAddrs [ cts . cfg . Index ] , cts . cfg . PeerAddrs , err . Error ( ) )
2024-12-05 01:26:44 +09:00
goto done
}
2024-11-12 22:59:37 +09:00
}
2024-11-24 20:39:51 +09:00
2024-11-12 22:59:37 +09:00
for {
var pkt * Packet
select {
2024-11-13 23:14:43 +09:00
case <- cts . cli . ctx . Done ( ) :
2024-12-05 01:26:44 +09:00
// need to log cts.cli.ctx.Err().Error()?
2024-11-17 00:47:15 +09:00
goto done
2024-11-13 23:14:43 +09:00
case <- cts . stop_chan :
2024-11-17 00:47:15 +09:00
goto done
2024-11-13 23:14:43 +09:00
2024-11-12 22:59:37 +09:00
default :
2024-11-17 14:57:56 +09:00
// no other case is ready. run the code below select.
2024-11-12 22:59:37 +09:00
// without the default case, the select construct would block
}
2024-11-13 23:14:43 +09:00
pkt , err = psc . Recv ( )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-11-24 22:33:19 +09:00
if status . Code ( err ) == codes . Canceled || errors . Is ( err , net . ErrClosed ) {
2024-11-24 20:39:51 +09:00
goto reconnect_to_server
} else {
2024-12-08 16:06:18 +09:00
cts . cli . log . Write ( cts . sid , LOG_INFO , "Failed to receive packet from %s - %s" , cts . remote_addr , err . Error ( ) )
2024-11-24 20:39:51 +09:00
goto reconnect_to_server
}
2024-11-12 22:59:37 +09:00
}
switch pkt . Kind {
case PACKET_KIND_ROUTE_STARTED :
2024-11-13 23:14:43 +09:00
// the server side managed to set up the route the client requested
2024-11-12 22:59:37 +09:00
var x * Packet_Route
var ok bool
x , ok = pkt . U . ( * Packet_Route )
if ok {
2024-12-05 01:26:44 +09:00
err = cts . ReportEvent ( x . Route . RouteId , 0 , pkt . Kind , x . Route )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR ,
"Failed to handle route_started event(%d,%s) from %s - %s" ,
2024-12-05 01:31:59 +09:00
x . Route . RouteId , x . Route . TargetAddrStr , cts . remote_addr , err . Error ( ) )
2024-11-12 22:59:37 +09:00
} else {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_DEBUG ,
"Handled route_started event(%d,%s) from %s" ,
2024-12-05 01:31:59 +09:00
x . Route . RouteId , x . Route . TargetAddrStr , cts . remote_addr )
2024-11-12 22:59:37 +09:00
}
} else {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR , "Invalid route_started event from %s" , cts . remote_addr )
2024-11-12 22:59:37 +09:00
}
case PACKET_KIND_ROUTE_STOPPED :
var x * Packet_Route
var ok bool
x , ok = pkt . U . ( * Packet_Route )
if ok {
2024-12-05 01:26:44 +09:00
err = cts . ReportEvent ( x . Route . RouteId , 0 , pkt . Kind , x . Route )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR ,
"Failed to handle route_stopped event(%d,%s) from %s - %s" ,
2024-12-05 01:31:59 +09:00
x . Route . RouteId , x . Route . TargetAddrStr , cts . remote_addr , err . Error ( ) )
2024-11-12 22:59:37 +09:00
} else {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_DEBUG ,
"Handled route_stopped event(%d,%s) from %s" ,
2024-12-05 01:31:59 +09:00
x . Route . RouteId , x . Route . TargetAddrStr , cts . remote_addr )
2024-11-12 22:59:37 +09:00
}
} else {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR , "Invalid route_stopped event from %s" , cts . remote_addr )
2024-11-12 22:59:37 +09:00
}
case PACKET_KIND_PEER_STARTED :
// the connection from the client to a peer has been established
var x * Packet_Peer
var ok bool
x , ok = pkt . U . ( * Packet_Peer )
if ok {
2024-12-01 17:20:16 +09:00
err = cts . ReportEvent ( x . Peer . RouteId , x . Peer . PeerId , PACKET_KIND_PEER_STARTED , x . Peer )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR ,
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s" ,
cts . remote_addr , x . Peer . RouteId , x . Peer . PeerId , x . Peer . LocalAddrStr , x . Peer . RemoteAddrStr , err . Error ( ) )
2024-11-12 22:59:37 +09:00
} else {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_DEBUG ,
"Handled peer_started event from %s for peer(%d,%d,%s,%s)" ,
cts . remote_addr , x . Peer . RouteId , x . Peer . PeerId , x . Peer . LocalAddrStr , x . Peer . RemoteAddrStr )
2024-11-12 22:59:37 +09:00
}
} else {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR , "Invalid peer_started event from %s" , cts . remote_addr )
2024-11-12 22:59:37 +09:00
}
2024-12-05 01:26:44 +09:00
// PACKET_KIND_PEER_ABORTED is never sent by server to client.
// the code here doesn't handle the event.
2024-11-12 22:59:37 +09:00
case PACKET_KIND_PEER_STOPPED :
// the connection from the client to a peer has been established
var x * Packet_Peer
var ok bool
x , ok = pkt . U . ( * Packet_Peer )
if ok {
2024-12-05 01:26:44 +09:00
err = cts . ReportEvent ( x . Peer . RouteId , x . Peer . PeerId , PACKET_KIND_PEER_STOPPED , x . Peer )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR ,
"Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s" ,
cts . remote_addr , x . Peer . RouteId , x . Peer . PeerId , x . Peer . LocalAddrStr , x . Peer . RemoteAddrStr , err . Error ( ) )
2024-11-12 22:59:37 +09:00
} else {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_DEBUG ,
"Handled peer_stopped event from %s for peer(%d,%d,%s,%s)" ,
cts . remote_addr , x . Peer . RouteId , x . Peer . PeerId , x . Peer . LocalAddrStr , x . Peer . RemoteAddrStr )
2024-11-12 22:59:37 +09:00
}
} else {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR , "Invalid peer_stopped event from %s" , cts . remote_addr )
2024-11-12 22:59:37 +09:00
}
2024-11-20 00:31:14 +09:00
case PACKET_KIND_PEER_EOF :
var x * Packet_Peer
var ok bool
x , ok = pkt . U . ( * Packet_Peer )
if ok {
2024-12-05 01:26:44 +09:00
err = cts . ReportEvent ( x . Peer . RouteId , x . Peer . PeerId , PACKET_KIND_PEER_EOF , x . Peer )
2024-11-20 00:31:14 +09:00
if err != nil {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR ,
"Failed to handle peer_eof event from %s for peer(%d,%d,%s,%s) - %s" ,
cts . remote_addr , x . Peer . RouteId , x . Peer . PeerId , x . Peer . LocalAddrStr , x . Peer . RemoteAddrStr , err . Error ( ) )
2024-11-20 00:31:14 +09:00
} else {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_DEBUG ,
"Handled peer_eof event from %s for peer(%d,%d,%s,%s)" ,
cts . remote_addr , x . Peer . RouteId , x . Peer . PeerId , x . Peer . LocalAddrStr , x . Peer . RemoteAddrStr )
2024-11-20 00:31:14 +09:00
}
} else {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR , "Invalid peer_eof event from %s" , cts . remote_addr )
2024-11-20 00:31:14 +09:00
}
2024-11-12 22:59:37 +09:00
case PACKET_KIND_PEER_DATA :
// the connection from the client to a peer has been established
var x * Packet_Data
var ok bool
x , ok = pkt . U . ( * Packet_Data )
if ok {
err = cts . ReportEvent ( x . Data . RouteId , x . Data . PeerId , PACKET_KIND_PEER_DATA , x . Data . Data )
if err != nil {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR ,
"Failed to handle peer_data event from %s for peer(%d,%d) - %s" ,
cts . remote_addr , x . Data . RouteId , x . Data . PeerId , err . Error ( ) )
2024-11-12 22:59:37 +09:00
} else {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_DEBUG ,
"Handled peer_data event from %s for peer(%d,%d)" ,
cts . remote_addr , x . Data . RouteId , x . Data . PeerId )
2024-11-12 22:59:37 +09:00
}
} else {
2024-12-05 01:26:44 +09:00
cts . cli . log . Write ( cts . sid , LOG_ERROR , "Invalid peer_data event from %s" , cts . remote_addr )
2024-11-12 22:59:37 +09:00
}
2024-12-05 01:26:44 +09:00
default :
// do nothing. ignore the rest
2024-11-12 22:59:37 +09:00
}
}
2024-11-13 23:14:43 +09:00
done :
2024-12-08 16:06:18 +09:00
cts . cli . log . Write ( cts . sid , LOG_INFO , "Disconnected from server[%d] %s" , cts . cfg . Index , cts . cfg . ServerAddrs [ cts . cfg . Index ] )
2024-11-30 03:08:20 +09:00
//cts.RemoveClientRoutes() // this isn't needed as each task removes itself from cts upon its termination
2024-11-24 22:33:19 +09:00
cts . ReqStop ( )
2024-12-05 01:26:44 +09:00
2024-11-24 22:33:19 +09:00
wait_for_termination :
2024-11-17 00:47:15 +09:00
cts . route_wg . Wait ( ) // wait until all route tasks are finished
2024-11-25 19:46:18 +09:00
cts . cli . RemoveClientConn ( cts )
2024-11-17 00:47:15 +09:00
return
reconnect_to_server :
2024-11-24 22:33:19 +09:00
cts . disconnect_from_server ( )
2024-11-24 20:39:51 +09:00
// wait for 2 seconds
slpctx , _ = context . WithTimeout ( cts . cli . ctx , 2 * time . Second )
2024-11-17 00:47:15 +09:00
select {
case <- cts . cli . ctx . Done ( ) :
2024-12-05 01:26:44 +09:00
// need to log cts.cli.ctx.Err().Error()?
2024-11-17 00:47:15 +09:00
goto done
case <- cts . stop_chan :
2024-11-24 22:33:19 +09:00
// this signal indicates that ReqStop() has been called
// so jumt to the waiting label
goto wait_for_termination
2024-11-25 19:46:18 +09:00
case <- slpctx . Done ( ) :
2024-11-17 00:47:15 +09:00
// do nothing
}
2024-11-24 20:39:51 +09:00
goto start_over // and reconnect
2024-11-13 23:14:43 +09:00
}
2024-11-13 02:20:25 +09:00
2024-12-08 17:47:08 +09:00
func ( cts * ClientConn ) ReportEvent ( route_id uint32 , pts_id uint32 , event_type PACKET_KIND , event_data interface { } ) error {
2024-11-13 23:14:43 +09:00
var r * ClientRoute
var ok bool
cts . route_mtx . Lock ( )
r , ok = cts . route_map [ route_id ]
2024-11-25 19:46:18 +09:00
if ! ok {
2024-11-13 23:14:43 +09:00
cts . route_mtx . Unlock ( )
2024-12-08 17:47:08 +09:00
return fmt . Errorf ( "non-existent route id - %d" , route_id )
2024-11-13 23:14:43 +09:00
}
cts . route_mtx . Unlock ( )
return r . ReportEvent ( pts_id , event_type , event_data )
}
2024-11-17 00:47:15 +09:00
2024-11-13 23:14:43 +09:00
// --------------------------------------------------------------------
2024-12-07 22:18:07 +09:00
type client_ctl_log_writer struct {
cli * Client
}
func ( hlw * client_ctl_log_writer ) Write ( p [ ] byte ) ( n int , err error ) {
// the standard http.Server always requires *log.Logger
// use this iowriter to create a logger to pass it to the http server.
hlw . cli . log . Write ( "" , LOG_INFO , string ( p ) )
return len ( p ) , nil
}
2024-12-08 12:13:36 +09:00
func NewClient ( ctx context . Context , ctl_addrs [ ] string , logger Logger , ctl_prefix string , ctltlscfg * tls . Config , rpctlscfg * tls . Config ) * Client {
2024-11-13 23:14:43 +09:00
var c Client
2024-12-03 11:52:46 +09:00
var i int
2024-12-07 22:18:07 +09:00
var hs_log * log . Logger
2024-11-13 23:14:43 +09:00
c . ctx , c . ctx_cancel = context . WithCancel ( ctx )
2024-12-07 16:57:00 +09:00
c . ctltlscfg = ctltlscfg
c . rpctlscfg = rpctlscfg
2024-11-23 12:30:23 +09:00
c . ext_svcs = make ( [ ] Service , 0 , 1 )
2024-11-25 19:46:18 +09:00
c . cts_map = make ( ClientConnMap )
2024-11-13 23:14:43 +09:00
c . stop_req . Store ( false )
2024-11-24 20:39:51 +09:00
c . stop_chan = make ( chan bool , 8 )
2024-11-23 14:49:04 +09:00
c . log = logger
2024-12-08 12:13:36 +09:00
c . ctl_prefix = ctl_prefix
2024-12-01 21:47:11 +09:00
c . ctl_mux = http . NewServeMux ( )
c . ctl_mux . Handle ( c . ctl_prefix + "/client-conns" , & client_ctl_client_conns { c : & c } )
c . ctl_mux . Handle ( c . ctl_prefix + "/client-conns/{conn_id}" , & client_ctl_client_conns_id { c : & c } )
c . ctl_mux . Handle ( c . ctl_prefix + "/client-conns/{conn_id}/routes" , & client_ctl_client_conns_id_routes { c : & c } )
c . ctl_mux . Handle ( c . ctl_prefix + "/client-conns/{conn_id}/routes/{route_id}" , & client_ctl_client_conns_id_routes_id { c : & c } )
c . ctl_mux . Handle ( c . ctl_prefix + "/client-conns/{conn_id}/routes/{route_id}/peers" , & client_ctl_client_conns_id_routes_id_peers { c : & c } )
c . ctl_mux . Handle ( c . ctl_prefix + "/client-conns/{conn_id}/routes/{route_id}/peers/{peer_id}" , & client_ctl_client_conns_id_routes_id_peers_id { c : & c } )
2024-12-08 01:10:49 +09:00
c . ctl_mux . Handle ( c . ctl_prefix + "/stats" , & client_ctl_stats { c : & c } )
2024-11-25 19:46:18 +09:00
2024-12-03 11:52:46 +09:00
c . ctl_addr = make ( [ ] string , len ( ctl_addrs ) )
c . ctl = make ( [ ] * http . Server , len ( ctl_addrs ) )
copy ( c . ctl_addr , ctl_addrs )
2024-12-07 22:18:07 +09:00
hs_log = log . New ( & client_ctl_log_writer { cli : & c } , "" , 0 ) ;
2024-12-03 11:52:46 +09:00
for i = 0 ; i < len ( ctl_addrs ) ; i ++ {
c . ctl [ i ] = & http . Server {
Addr : ctl_addrs [ i ] ,
Handler : c . ctl_mux ,
2024-12-07 16:57:00 +09:00
TLSConfig : c . ctltlscfg ,
2024-12-07 22:18:07 +09:00
ErrorLog : hs_log ,
2024-12-03 11:52:46 +09:00
// TODO: more settings
}
2024-11-16 00:03:42 +09:00
}
2024-12-08 12:13:36 +09:00
c . stats . conns . Store ( 0 )
c . stats . routes . Store ( 0 )
c . stats . peers . Store ( 0 )
2024-11-13 23:14:43 +09:00
return & c
}
2024-11-30 02:53:47 +09:00
func ( c * Client ) AddNewClientConn ( cfg * ClientConfig ) ( * ClientConn , error ) {
2024-11-25 19:46:18 +09:00
var cts * ClientConn
2024-11-13 23:14:43 +09:00
var ok bool
2024-11-25 19:46:18 +09:00
var id uint32
2024-11-13 23:14:43 +09:00
2024-12-09 15:43:29 +09:00
if len ( cfg . ServerAddrs ) <= 0 {
return nil , fmt . Errorf ( "no server rpc address specified" )
}
2024-11-30 02:53:47 +09:00
cts = NewClientConn ( c , cfg )
2024-11-13 23:14:43 +09:00
c . cts_mtx . Lock ( )
2024-11-25 19:46:18 +09:00
id = rand . Uint32 ( )
for {
2024-11-30 18:59:36 +09:00
_ , ok = c . cts_map [ id ]
2024-11-25 19:46:18 +09:00
if ! ok { break }
id ++
}
cts . id = id
2024-11-25 22:55:03 +09:00
cts . cfg . Id = id // store it again in the active configuration for easy access via control channel
2024-12-03 11:52:46 +09:00
cts . sid = fmt . Sprintf ( "%d" , id ) // id in string used for logging
2024-11-25 19:46:18 +09:00
2024-11-30 18:59:36 +09:00
c . cts_map [ id ] = cts
2024-12-08 12:13:36 +09:00
c . stats . conns . Add ( 1 )
2024-12-05 01:26:44 +09:00
c . cts_mtx . Unlock ( )
2024-12-08 17:47:08 +09:00
c . log . Write ( "" , LOG_INFO , "Added client connection(%d) to %v" , cts . id , cfg . ServerAddrs )
2024-11-17 00:47:15 +09:00
return cts , nil
2024-11-13 23:14:43 +09:00
}
2024-12-08 17:47:08 +09:00
func ( c * Client ) ReqStopAllClientConns ( ) {
2024-11-30 18:59:36 +09:00
var cts * ClientConn
c . cts_mtx . Lock ( )
defer c . cts_mtx . Unlock ( )
for _ , cts = range c . cts_map {
cts . ReqStop ( )
}
}
2024-12-08 12:13:36 +09:00
/ *
2024-11-30 18:59:36 +09:00
func ( c * Client ) RemoveAllClientConns ( ) {
var cts * ClientConn
c . cts_mtx . Lock ( )
defer c . cts_mtx . Unlock ( )
for _ , cts = range c . cts_map {
delete ( c . cts_map_by_addr , cts . cfg . ServerAddr )
delete ( c . cts_map , cts . id )
2024-12-08 12:13:36 +09:00
c . stats . conns . Store ( int64 ( len ( c . cts_map ) ) )
2024-11-30 18:59:36 +09:00
cts . ReqStop ( )
}
}
2024-12-08 12:13:36 +09:00
* /
2024-11-30 18:59:36 +09:00
2024-11-30 03:08:20 +09:00
func ( c * Client ) RemoveClientConn ( cts * ClientConn ) error {
var conn * ClientConn
var ok bool
2024-11-13 23:14:43 +09:00
c . cts_mtx . Lock ( )
2024-11-30 03:08:20 +09:00
2024-11-30 18:59:36 +09:00
conn , ok = c . cts_map [ cts . id ]
2024-11-30 03:08:20 +09:00
if ! ok {
c . cts_mtx . Unlock ( )
return fmt . Errorf ( "non-existent connection id - %d" , cts . id )
}
if conn != cts {
c . cts_mtx . Unlock ( )
2024-12-05 01:26:44 +09:00
return fmt . Errorf ( "conflicting connection id - %d" , cts . id )
2024-11-30 03:08:20 +09:00
}
2024-11-30 18:59:36 +09:00
delete ( c . cts_map , cts . id )
2024-12-08 12:13:36 +09:00
c . stats . conns . Store ( int64 ( len ( c . cts_map ) ) )
2024-11-13 23:14:43 +09:00
c . cts_mtx . Unlock ( )
2024-11-30 03:08:20 +09:00
2024-12-08 17:47:08 +09:00
c . log . Write ( "" , LOG_INFO , "Removed client connection(%d) to %v" , cts . id , cts . cfg . ServerAddrs )
2024-12-05 01:26:44 +09:00
2024-12-03 00:55:19 +09:00
cts . ReqStop ( )
2024-11-30 03:08:20 +09:00
return nil
}
func ( c * Client ) RemoveClientConnById ( conn_id uint32 ) error {
var cts * ClientConn
var ok bool
c . cts_mtx . Lock ( )
2024-11-30 18:59:36 +09:00
cts , ok = c . cts_map [ conn_id ]
2024-11-30 03:08:20 +09:00
if ! ok {
c . cts_mtx . Unlock ( )
return fmt . Errorf ( "non-existent connection id - %d" , conn_id )
}
2024-11-30 13:24:29 +09:00
// NOTE: removal by id doesn't perform identity check
2024-11-30 03:08:20 +09:00
2024-11-30 18:59:36 +09:00
delete ( c . cts_map , cts . id )
2024-12-08 12:13:36 +09:00
c . stats . conns . Store ( int64 ( len ( c . cts_map ) ) )
2024-11-30 03:08:20 +09:00
c . cts_mtx . Unlock ( )
2024-12-08 17:47:08 +09:00
c . log . Write ( "" , LOG_INFO , "Removed client connection(%d) to %v" , cts . id , cts . cfg . ServerAddrs )
2024-11-30 03:08:20 +09:00
cts . ReqStop ( )
return nil
2024-11-12 22:59:37 +09:00
}
2024-11-26 09:41:15 +09:00
func ( c * Client ) FindClientConnById ( id uint32 ) * ClientConn {
var cts * ClientConn
var ok bool
c . cts_mtx . Lock ( )
defer c . cts_mtx . Unlock ( )
2024-11-30 18:59:36 +09:00
cts , ok = c . cts_map [ id ]
2024-11-26 09:41:15 +09:00
if ! ok {
return nil
}
return cts
}
2024-11-30 20:06:59 +09:00
func ( c * Client ) FindClientRouteById ( conn_id uint32 , route_id uint32 ) * ClientRoute {
var cts * ClientConn
var ok bool
c . cts_mtx . Lock ( )
defer c . cts_mtx . Unlock ( )
cts , ok = c . cts_map [ conn_id ]
if ! ok {
return nil
}
return cts . FindClientRouteById ( route_id )
}
2024-12-01 17:20:16 +09:00
func ( c * Client ) FindClientPeerConnById ( conn_id uint32 , route_id uint32 , peer_id uint32 ) * ClientPeerConn {
var cts * ClientConn
2024-12-08 17:47:08 +09:00
var r * ClientRoute
2024-12-01 17:20:16 +09:00
var ok bool
c . cts_mtx . Lock ( )
defer c . cts_mtx . Unlock ( )
cts , ok = c . cts_map [ conn_id ]
if ! ok {
return nil
}
cts . route_mtx . Lock ( )
defer cts . route_mtx . Unlock ( )
r , ok = cts . route_map [ route_id ]
if ! ok {
return nil
}
return r . FindClientPeerConnById ( peer_id )
}
2024-11-12 22:59:37 +09:00
func ( c * Client ) ReqStop ( ) {
if c . stop_req . CompareAndSwap ( false , true ) {
2024-11-25 19:46:18 +09:00
var cts * ClientConn
2024-12-03 11:52:46 +09:00
var ctl * http . Server
2024-11-16 00:03:42 +09:00
2024-12-03 11:52:46 +09:00
for _ , ctl = range c . ctl {
ctl . Shutdown ( c . ctx ) // to break c.ctl.ListenAndServe()
2024-11-24 20:39:51 +09:00
}
2024-11-16 00:03:42 +09:00
2024-11-13 23:14:43 +09:00
for _ , cts = range c . cts_map {
cts . ReqStop ( )
}
c . stop_chan <- true
c . ctx_cancel ( )
2024-11-12 22:59:37 +09:00
}
2024-11-16 00:03:42 +09:00
}
2024-11-21 01:11:01 +09:00
func ( c * Client ) RunCtlTask ( wg * sync . WaitGroup ) {
2024-11-16 00:03:42 +09:00
var err error
2024-12-03 11:52:46 +09:00
var ctl * http . Server
var idx int
var l_wg sync . WaitGroup
2024-11-16 00:03:42 +09:00
2024-11-21 01:11:01 +09:00
defer wg . Done ( )
2024-11-16 00:03:42 +09:00
2024-12-03 11:52:46 +09:00
for idx , ctl = range c . ctl {
l_wg . Add ( 1 )
go func ( i int , cs * http . Server ) {
2024-12-07 23:03:23 +09:00
var l net . Listener
2024-12-08 17:47:08 +09:00
c . log . Write ( "" , LOG_INFO , "Control channel[%d] started on %s" , i , c . ctl_addr [ i ] )
2024-12-07 23:03:23 +09:00
// defeat hard-coded "tcp" in ListenAndServe() and ListenAndServeTLS()
// by creating the listener explicitly.
// err = cs.ListenAndServe()
// err = cs.ListenAndServeTLS("", "") // c.tlscfg must provide a certificate and a key
l , err = net . Listen ( tcp_addr_str_class ( cs . Addr ) , cs . Addr )
if err == nil {
if c . ctltlscfg == nil {
err = cs . Serve ( l )
} else {
err = cs . ServeTLS ( l , "" , "" ) // c.ctltlscfg must provide a certificate and a key
}
l . Close ( )
2024-12-06 00:52:33 +09:00
}
2024-12-03 11:52:46 +09:00
if errors . Is ( err , http . ErrServerClosed ) {
2024-12-09 01:51:04 +09:00
c . log . Write ( "" , LOG_INFO , "Control channel[%d] ended" , i )
2024-12-03 11:52:46 +09:00
} else {
c . log . Write ( "" , LOG_ERROR , "Control channel[%d] error - %s" , i , err . Error ( ) )
}
l_wg . Done ( )
} ( idx , ctl )
2024-11-16 00:03:42 +09:00
}
2024-12-03 11:52:46 +09:00
l_wg . Wait ( )
2024-11-16 00:03:42 +09:00
}
2024-11-23 12:30:23 +09:00
func ( c * Client ) StartCtlService ( ) {
c . wg . Add ( 1 )
go c . RunCtlTask ( & c . wg )
}
func ( c * Client ) RunTask ( wg * sync . WaitGroup ) {
// just a place holder to pacify the Service interface
2024-12-03 11:52:46 +09:00
// StartService() calls cts.RunTask() instead. it is not called.
// so no call to wg.Done()
2024-11-23 12:30:23 +09:00
}
2024-12-09 15:43:29 +09:00
func ( c * Client ) start_service ( cfg * ClientConfig ) ( * ClientConn , error ) {
2024-11-25 19:46:18 +09:00
var cts * ClientConn
2024-11-13 23:14:43 +09:00
var err error
2024-11-30 02:53:47 +09:00
cts , err = c . AddNewClientConn ( cfg )
2024-11-13 23:14:43 +09:00
if err != nil {
2024-12-08 16:06:18 +09:00
err = fmt . Errorf ( "unable to add server connection structure to %v - %s" , cfg . ServerAddrs , err . Error ( ) )
2024-11-25 22:55:03 +09:00
return nil , err
2024-11-13 23:14:43 +09:00
}
c . wg . Add ( 1 )
go cts . RunTask ( & c . wg )
2024-11-25 22:55:03 +09:00
return cts , nil
}
func ( c * Client ) StartService ( data interface { } ) {
2024-12-09 15:43:29 +09:00
var cfg * ClientConfig
var ok bool
2024-11-25 22:55:03 +09:00
2024-12-09 15:43:29 +09:00
cfg , ok = data . ( * ClientConfig )
if ! ok {
c . log . Write ( "" , LOG_ERROR , "Failed to start service - invalid configuration - %v" , data )
2024-11-25 22:55:03 +09:00
} else {
2024-12-09 15:43:29 +09:00
var cts * ClientConn
var err error
if len ( cfg . ServerAddrs ) > 0 {
cts , err = c . start_service ( cfg )
if err != nil {
c . log . Write ( "" , LOG_ERROR , "Failed to start service - %s" , err . Error ( ) )
} else {
c . log . Write ( "" , LOG_INFO , "Started service for %v [%d]" , cts . cfg . ServerAddrs , cts . cfg . Id )
}
}
2024-11-25 22:55:03 +09:00
}
2024-11-13 23:14:43 +09:00
}
2024-11-23 12:30:23 +09:00
func ( c * Client ) StartExtService ( svc Service , data interface { } ) {
2024-11-30 20:24:30 +09:00
c . ext_mtx . Lock ( )
2024-11-23 12:30:23 +09:00
c . ext_svcs = append ( c . ext_svcs , svc )
2024-11-30 20:24:30 +09:00
c . ext_mtx . Unlock ( )
2024-11-23 12:30:23 +09:00
c . wg . Add ( 1 )
go svc . RunTask ( & c . wg )
2024-11-13 23:14:43 +09:00
}
2024-11-13 02:20:25 +09:00
2024-11-23 12:30:23 +09:00
func ( c * Client ) StopServices ( ) {
var ext_svc Service
c . ReqStop ( )
for _ , ext_svc = range c . ext_svcs {
ext_svc . StopServices ( )
2024-11-13 02:20:25 +09:00
}
}
2024-11-23 12:30:23 +09:00
func ( c * Client ) WaitForTermination ( ) {
c . wg . Wait ( )
2024-11-12 22:59:37 +09:00
}
2024-11-23 14:49:04 +09:00
2024-11-25 19:46:18 +09:00
func ( c * Client ) WriteLog ( id string , level LogLevel , fmtstr string , args ... interface { } ) {
2024-11-23 20:13:07 +09:00
c . log . Write ( id , level , fmtstr , args ... )
2024-11-23 14:49:04 +09:00
}