2024-11-23 12:30:23 +09:00
package hodu
2024-11-12 22:59:37 +09:00
import "context"
import "crypto/tls"
2024-11-17 14:57:56 +09:00
import "errors"
2024-11-12 22:59:37 +09:00
import "fmt"
import "io"
2024-12-07 22:18:07 +09:00
import "log"
2024-11-12 22:59:37 +09:00
import "math/rand"
import "net"
2024-11-21 01:11:01 +09:00
import "net/http"
2024-12-05 18:24:42 +09:00
import "net/netip"
2024-12-01 21:47:11 +09:00
import "os"
2024-11-12 22:59:37 +09:00
import "sync"
import "sync/atomic"
import "google.golang.org/grpc"
2024-12-07 16:57:00 +09:00
import "google.golang.org/grpc/credentials"
2024-11-13 02:20:25 +09:00
//import "google.golang.org/grpc/metadata"
2024-11-12 22:59:37 +09:00
import "google.golang.org/grpc/peer"
import "google.golang.org/grpc/stats"
const PTS_LIMIT = 8192
2024-12-03 00:55:19 +09:00
type ServerConnMapByAddr = map [ net . Addr ] * ServerConn
type ServerConnMap = map [ uint32 ] * ServerConn
2024-11-12 22:59:37 +09:00
type ServerPeerConnMap = map [ uint32 ] * ServerPeerConn
type ServerRouteMap = map [ uint32 ] * ServerRoute
type Server struct {
2024-12-03 00:55:19 +09:00
ctx context . Context
ctx_cancel context . CancelFunc
2024-12-07 16:57:00 +09:00
ctltlscfg * tls . Config
rpctlscfg * tls . Config
2024-11-24 20:39:51 +09:00
2024-12-03 00:55:19 +09:00
wg sync . WaitGroup
stop_req atomic . Bool
stop_chan chan bool
2024-11-18 22:25:59 +09:00
2024-12-03 11:52:46 +09:00
ext_mtx sync . Mutex
ext_svcs [ ] Service
ctl_addr [ ] string
2024-12-03 00:55:19 +09:00
ctl_prefix string
ctl_mux * http . ServeMux
2024-12-03 11:52:46 +09:00
ctl [ ] * http . Server // control server
2024-11-21 01:11:01 +09:00
2024-12-03 11:52:46 +09:00
rpc [ ] * net . TCPListener // main listener for grpc
rpc_wg sync . WaitGroup
2024-12-03 20:28:04 +09:00
rpc_svr * grpc . Server
2024-11-12 22:59:37 +09:00
2024-12-03 00:55:19 +09:00
cts_mtx sync . Mutex
cts_map ServerConnMap
cts_map_by_addr ServerConnMapByAddr
cts_wg sync . WaitGroup
2024-11-12 22:59:37 +09:00
2024-12-03 00:55:19 +09:00
log Logger
2024-11-20 00:31:14 +09:00
2024-11-12 22:59:37 +09:00
UnimplementedHoduServer
}
2024-11-25 19:46:18 +09:00
// connection from client.
2024-11-12 22:59:37 +09:00
// client connect to the server, the server accept it, and makes a tunnel request
2024-11-25 19:46:18 +09:00
type ServerConn struct {
2024-12-03 20:28:04 +09:00
svr * Server
id uint32
sid string // for logging
2024-12-03 11:52:46 +09:00
2024-12-03 20:28:04 +09:00
remote_addr net . Addr // client address that created this structure
local_addr net . Addr // local address that the client is connected to
pss * GuardedPacketStreamServer
2024-11-12 22:59:37 +09:00
2024-12-03 20:28:04 +09:00
route_mtx sync . Mutex
route_map ServerRouteMap
route_wg sync . WaitGroup
2024-11-12 22:59:37 +09:00
2024-12-03 20:28:04 +09:00
wg sync . WaitGroup
stop_req atomic . Bool
stop_chan chan bool
2024-11-12 22:59:37 +09:00
}
type ServerRoute struct {
2024-11-25 19:46:18 +09:00
cts * ServerConn
2024-11-20 00:31:14 +09:00
l * net . TCPListener
2024-12-03 20:28:04 +09:00
svc_addr * net . TCPAddr // listening address
2024-12-05 18:24:42 +09:00
svc_permitted_net netip . Prefix
svc_proto ROUTE_PROTO
2024-12-02 02:19:50 +09:00
ptc_addr string
2024-11-20 00:31:14 +09:00
id uint32
2024-11-12 22:59:37 +09:00
pts_mtx sync . Mutex
pts_map ServerPeerConnMap
pts_limit int
pts_last_id uint32
2024-11-20 00:31:14 +09:00
pts_wg sync . WaitGroup
stop_req atomic . Bool
2024-11-18 22:25:59 +09:00
}
type GuardedPacketStreamServer struct {
mtx sync . Mutex
//pss Hodu_PacketStreamServer
Hodu_PacketStreamServer // let's embed it to avoid reimplement Recv() and Context()
2024-11-12 22:59:37 +09:00
}
// ------------------------------------
2024-11-18 22:25:59 +09:00
func ( g * GuardedPacketStreamServer ) Send ( data * Packet ) error {
// while Recv() on a stream is called from the same gorountine all the time,
// Send() is called from multiple places. let's guard it as grpc-go
// doesn't provide concurrency safety in this case.
// https://github.com/grpc/grpc-go/blob/master/Documentation/concurrency.md
g . mtx . Lock ( )
defer g . mtx . Unlock ( )
return g . Hodu_PacketStreamServer . Send ( data )
}
/ *
func ( g * GuardedPacketStreamServer ) Recv ( ) ( * Packet , error ) {
return g . pss . Recv ( )
}
func ( g * GuardedPacketStreamServer ) Context ( ) context . Context {
return g . pss . Context ( )
} * /
// ------------------------------------
2024-12-05 18:24:42 +09:00
func NewServerRoute ( cts * ServerConn , id uint32 , proto ROUTE_PROTO , ptc_addr string , svc_permitted_net string ) ( * ServerRoute , error ) {
2024-11-18 22:25:59 +09:00
var r ServerRoute
var l * net . TCPListener
2024-12-03 11:52:46 +09:00
var svcaddr * net . TCPAddr
2024-12-05 18:24:42 +09:00
var svcnet netip . Prefix
2024-11-18 22:25:59 +09:00
var err error
2024-12-05 18:24:42 +09:00
if svc_permitted_net != "" {
svcnet , err = netip . ParsePrefix ( svc_permitted_net )
if err != nil {
return nil , err
}
}
2024-12-03 20:28:04 +09:00
l , svcaddr , err = cts . make_route_listener ( id , proto )
2024-11-18 22:25:59 +09:00
if err != nil {
return nil , err
}
2024-12-05 18:24:42 +09:00
if svc_permitted_net == "" {
if svcaddr . IP . To4 ( ) != nil {
svcnet , _ = netip . ParsePrefix ( "0.0.0.0/0" )
} else {
svcnet , _ = netip . ParsePrefix ( "::/0" )
}
}
2024-11-18 22:25:59 +09:00
r . cts = cts
r . id = id
r . l = l
2024-12-03 20:28:04 +09:00
r . svc_addr = svcaddr
2024-12-05 18:24:42 +09:00
r . svc_permitted_net = svcnet
r . svc_proto = proto
2024-12-02 02:19:50 +09:00
r . ptc_addr = ptc_addr
2024-11-18 22:25:59 +09:00
r . pts_limit = PTS_LIMIT
r . pts_map = make ( ServerPeerConnMap )
r . pts_last_id = 0
r . stop_req . Store ( false )
2024-11-23 14:49:04 +09:00
return & r , nil
2024-11-18 22:25:59 +09:00
}
2024-11-12 22:59:37 +09:00
func ( r * ServerRoute ) AddNewServerPeerConn ( c * net . TCPConn ) ( * ServerPeerConn , error ) {
var pts * ServerPeerConn
var ok bool
var start_id uint32
r . pts_mtx . Lock ( )
defer r . pts_mtx . Unlock ( )
if len ( r . pts_map ) >= r . pts_limit {
return nil , fmt . Errorf ( "peer-to-server connection table full" )
}
start_id = r . pts_last_id
for {
_ , ok = r . pts_map [ r . pts_last_id ]
if ! ok {
break
}
r . pts_last_id ++
if r . pts_last_id == start_id {
// unlikely to happen but it cycled through the whole range.
return nil , fmt . Errorf ( "failed to assign peer-to-server connection id" )
}
}
pts = NewServerPeerConn ( r , c , r . pts_last_id )
r . pts_map [ pts . conn_id ] = pts
r . pts_last_id ++
return pts , nil
}
func ( r * ServerRoute ) RemoveServerPeerConn ( pts * ServerPeerConn ) {
r . pts_mtx . Lock ( )
delete ( r . pts_map , pts . conn_id )
r . pts_mtx . Unlock ( )
2024-12-03 20:28:04 +09:00
r . cts . svr . log . Write ( r . cts . sid , LOG_DEBUG , "Removed server-side peer connection %s from route(%d)" , pts . conn . RemoteAddr ( ) . String ( ) , r . id )
2024-11-12 22:59:37 +09:00
}
2024-11-18 22:25:59 +09:00
func ( r * ServerRoute ) RunTask ( wg * sync . WaitGroup ) {
2024-11-12 22:59:37 +09:00
var err error
var conn * net . TCPConn
var pts * ServerPeerConn
2024-12-05 18:24:42 +09:00
var raddr * net . TCPAddr
var iaddr netip . Addr
2024-11-12 22:59:37 +09:00
2024-11-18 22:25:59 +09:00
defer wg . Done ( )
2024-11-12 22:59:37 +09:00
for {
conn , err = r . l . AcceptTCP ( )
if err != nil {
2024-11-17 14:57:56 +09:00
if errors . Is ( err , net . ErrClosed ) {
2024-12-03 20:28:04 +09:00
r . cts . svr . log . Write ( r . cts . sid , LOG_INFO , "Server-side peer listener closed on route(%d)" , r . id )
2024-11-17 14:57:56 +09:00
} else {
2024-12-03 20:28:04 +09:00
r . cts . svr . log . Write ( r . cts . sid , LOG_INFO , "Server-side peer listener error on route(%d) - %s" , r . id , err . Error ( ) )
2024-11-17 14:57:56 +09:00
}
2024-11-12 22:59:37 +09:00
break
}
2024-12-05 18:24:42 +09:00
raddr = conn . RemoteAddr ( ) . ( * net . TCPAddr )
iaddr , _ = netip . AddrFromSlice ( raddr . IP )
if ! r . svc_permitted_net . Contains ( iaddr ) {
r . cts . svr . log . Write ( r . cts . sid , LOG_DEBUG , "Rejected server-side peer %s to route(%d) - allowed range %v" , raddr . String ( ) , r . id , r . svc_permitted_net )
conn . Close ( )
}
2024-11-12 22:59:37 +09:00
pts , err = r . AddNewServerPeerConn ( conn )
if err != nil {
2024-12-05 18:24:42 +09:00
r . cts . svr . log . Write ( r . cts . sid , LOG_ERROR , "Failed to add server-side peer %s to route(%d) - %s" , r . id , raddr . String ( ) , r . id , err . Error ( ) )
2024-11-12 22:59:37 +09:00
conn . Close ( )
} else {
2024-12-05 18:24:42 +09:00
r . cts . svr . log . Write ( r . cts . sid , LOG_DEBUG , "Added server-side peer %s to route(%d)" , raddr . String ( ) , r . id )
2024-11-12 22:59:37 +09:00
r . pts_wg . Add ( 1 )
2024-11-18 22:25:59 +09:00
go pts . RunTask ( & r . pts_wg )
2024-11-12 22:59:37 +09:00
}
}
2024-11-13 23:14:43 +09:00
2024-11-24 20:39:51 +09:00
r . ReqStop ( )
2024-11-18 22:25:59 +09:00
r . pts_wg . Wait ( )
2024-12-03 20:28:04 +09:00
r . cts . svr . log . Write ( r . cts . sid , LOG_DEBUG , "All service-side peer handlers completed on route(%d)" , r . id )
2024-11-24 20:39:51 +09:00
r . cts . RemoveServerRoute ( r ) // final phase...
2024-11-12 22:59:37 +09:00
}
2024-11-18 22:25:59 +09:00
func ( r * ServerRoute ) ReqStop ( ) {
if r . stop_req . CompareAndSwap ( false , true ) {
var pts * ServerPeerConn
for _ , pts = range r . pts_map {
pts . ReqStop ( )
}
2024-11-23 14:49:04 +09:00
r . l . Close ( )
2024-11-18 22:25:59 +09:00
}
2024-11-12 22:59:37 +09:00
}
2024-12-03 20:28:04 +09:00
func ( r * ServerRoute ) ReportEvent ( pts_id uint32 , event_type PACKET_KIND , event_data interface { } ) error {
2024-11-12 22:59:37 +09:00
var spc * ServerPeerConn
var ok bool
r . pts_mtx . Lock ( )
spc , ok = r . pts_map [ pts_id ]
if ! ok {
2024-11-23 14:49:04 +09:00
r . pts_mtx . Unlock ( )
2024-12-05 18:24:42 +09:00
return fmt . Errorf ( "non-existent peer id - %d" , pts_id )
2024-11-12 22:59:37 +09:00
}
2024-11-23 14:49:04 +09:00
r . pts_mtx . Unlock ( )
2024-11-12 22:59:37 +09:00
return spc . ReportEvent ( event_type , event_data )
}
// ------------------------------------
2024-12-03 20:28:04 +09:00
func ( cts * ServerConn ) make_route_listener ( id uint32 , proto ROUTE_PROTO ) ( * net . TCPListener , * net . TCPAddr , error ) {
2024-11-12 22:59:37 +09:00
var l * net . TCPListener
var err error
2024-12-03 11:52:46 +09:00
var svcaddr * net . TCPAddr
2024-11-12 22:59:37 +09:00
var port int
var tries int = 0
var nw string
2024-12-05 23:05:47 +09:00
var ip string
2024-11-12 22:59:37 +09:00
switch proto {
case ROUTE_PROTO_TCP :
nw = "tcp"
2024-12-05 23:05:47 +09:00
ip = ""
2024-11-12 22:59:37 +09:00
case ROUTE_PROTO_TCP4 :
nw = "tcp4"
2024-12-05 23:05:47 +09:00
ip = "0.0.0.0"
2024-11-12 22:59:37 +09:00
case ROUTE_PROTO_TCP6 :
nw = "tcp6"
2024-12-05 23:05:47 +09:00
ip = "[::]"
default :
return nil , nil , fmt . Errorf ( "invalid protocol number %d" , proto )
2024-11-12 22:59:37 +09:00
}
for {
2024-12-05 23:05:47 +09:00
port = rand . Intn ( 65535 - 32000 + 1 ) + 32000 // TODO: configurable port range
2024-11-12 22:59:37 +09:00
2024-12-05 23:05:47 +09:00
svcaddr , err = net . ResolveTCPAddr ( nw , fmt . Sprintf ( "%s:%d" , ip , port ) )
2024-11-12 22:59:37 +09:00
if err == nil {
2024-12-03 11:52:46 +09:00
l , err = net . ListenTCP ( nw , svcaddr ) // make the binding address configurable. support multiple binding addresses???
2024-11-12 22:59:37 +09:00
if err == nil {
2024-12-03 20:28:04 +09:00
cts . svr . log . Write ( cts . sid , LOG_DEBUG , "Route(%d) listening on %d" , id , port )
2024-12-03 11:52:46 +09:00
return l , svcaddr , nil
2024-11-12 22:59:37 +09:00
}
}
tries ++
if tries >= 1000 {
err = fmt . Errorf ( "unable to allocate port" )
break
}
}
return nil , nil , err
}
2024-12-05 18:24:42 +09:00
func ( cts * ServerConn ) AddNewServerRoute ( route_id uint32 , proto ROUTE_PROTO , ptc_addr string , svc_permitted_net string ) ( * ServerRoute , error ) {
2024-11-12 22:59:37 +09:00
var r * ServerRoute
var err error
cts . route_mtx . Lock ( )
2024-11-18 22:25:59 +09:00
if cts . route_map [ route_id ] != nil {
2024-11-12 22:59:37 +09:00
cts . route_mtx . Unlock ( )
2024-11-25 19:46:18 +09:00
return nil , fmt . Errorf ( "existent route id - %d" , route_id )
2024-11-12 22:59:37 +09:00
}
2024-12-05 18:24:42 +09:00
r , err = NewServerRoute ( cts , route_id , proto , ptc_addr , svc_permitted_net )
2024-11-12 22:59:37 +09:00
if err != nil {
cts . route_mtx . Unlock ( )
return nil , err
}
2024-11-23 14:49:04 +09:00
cts . route_map [ route_id ] = r
2024-11-12 22:59:37 +09:00
cts . route_mtx . Unlock ( )
2024-11-18 22:25:59 +09:00
cts . route_wg . Add ( 1 )
go r . RunTask ( & cts . route_wg )
2024-11-12 22:59:37 +09:00
return r , nil
}
2024-11-25 19:46:18 +09:00
func ( cts * ServerConn ) RemoveServerRoute ( route * ServerRoute ) error {
2024-11-24 20:39:51 +09:00
var r * ServerRoute
var ok bool
cts . route_mtx . Lock ( )
r , ok = cts . route_map [ route . id ]
2024-11-25 19:46:18 +09:00
if ! ok {
2024-11-24 20:39:51 +09:00
cts . route_mtx . Unlock ( )
2024-11-25 19:46:18 +09:00
return fmt . Errorf ( "non-existent route id - %d" , route . id )
2024-11-24 20:39:51 +09:00
}
2024-11-25 19:46:18 +09:00
if r != route {
2024-11-24 20:39:51 +09:00
cts . route_mtx . Unlock ( )
2024-11-25 19:46:18 +09:00
return fmt . Errorf ( "non-existent route - %d" , route . id )
2024-11-24 20:39:51 +09:00
}
delete ( cts . route_map , route . id )
cts . route_mtx . Unlock ( )
r . ReqStop ( )
return nil
}
2024-11-25 19:46:18 +09:00
func ( cts * ServerConn ) RemoveServerRouteById ( route_id uint32 ) ( * ServerRoute , error ) {
2024-11-12 22:59:37 +09:00
var r * ServerRoute
var ok bool
cts . route_mtx . Lock ( )
2024-11-18 22:25:59 +09:00
r , ok = cts . route_map [ route_id ]
2024-11-25 19:46:18 +09:00
if ! ok {
2024-11-12 22:59:37 +09:00
cts . route_mtx . Unlock ( )
2024-11-25 19:46:18 +09:00
return nil , fmt . Errorf ( "non-existent route id - %d" , route_id )
2024-11-12 22:59:37 +09:00
}
2024-11-18 22:25:59 +09:00
delete ( cts . route_map , route_id )
2024-11-12 22:59:37 +09:00
cts . route_mtx . Unlock ( )
2024-11-24 20:39:51 +09:00
r . ReqStop ( )
return r , nil
2024-11-12 22:59:37 +09:00
}
2024-12-03 20:28:04 +09:00
func ( cts * ServerConn ) ReportEvent ( route_id uint32 , pts_id uint32 , event_type PACKET_KIND , event_data interface { } ) error {
2024-11-12 22:59:37 +09:00
var r * ServerRoute
var ok bool
2024-11-13 23:14:43 +09:00
2024-11-12 22:59:37 +09:00
cts . route_mtx . Lock ( )
2024-11-18 22:25:59 +09:00
r , ok = cts . route_map [ route_id ]
2024-11-12 22:59:37 +09:00
if ( ! ok ) {
cts . route_mtx . Unlock ( )
2024-11-25 19:46:18 +09:00
return fmt . Errorf ( "non-existent route id - %d" , route_id )
2024-11-12 22:59:37 +09:00
}
cts . route_mtx . Unlock ( )
2024-11-13 23:14:43 +09:00
2024-11-12 22:59:37 +09:00
return r . ReportEvent ( pts_id , event_type , event_data )
}
2024-11-25 19:46:18 +09:00
func ( cts * ServerConn ) receive_from_stream ( wg * sync . WaitGroup ) {
2024-11-12 22:59:37 +09:00
var pkt * Packet
var err error
2024-11-13 23:14:43 +09:00
2024-11-20 00:31:14 +09:00
defer wg . Done ( )
for {
2024-11-18 22:25:59 +09:00
pkt , err = cts . pss . Recv ( )
2024-11-17 14:57:56 +09:00
if errors . Is ( err , io . EOF ) {
2024-12-03 20:28:04 +09:00
cts . svr . log . Write ( cts . sid , LOG_INFO , "RPC stream closed for client %s" , cts . remote_addr )
2024-11-18 22:25:59 +09:00
goto done
2024-11-12 22:59:37 +09:00
}
if err != nil {
2024-12-03 20:28:04 +09:00
cts . svr . log . Write ( cts . sid , LOG_ERROR , "RPC stream error for client %s - %s" , cts . remote_addr , err . Error ( ) )
2024-11-18 22:25:59 +09:00
goto done
2024-11-12 22:59:37 +09:00
}
switch pkt . Kind {
case PACKET_KIND_ROUTE_START :
var x * Packet_Route
var ok bool
x , ok = pkt . U . ( * Packet_Route )
if ok {
2024-11-25 19:46:18 +09:00
var r * ServerRoute
2024-11-24 20:39:51 +09:00
2024-12-05 18:24:42 +09:00
r , err = cts . AddNewServerRoute ( x . Route . RouteId , x . Route . ServiceProto , x . Route . TargetAddrStr , x . Route . ServiceNetStr )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-05 18:24:42 +09:00
cts . svr . log . Write ( cts . sid , LOG_ERROR ,
2024-12-05 23:05:47 +09:00
"Failed to add route(%d,%s) for %s - %s" ,
x . Route . RouteId , x . Route . TargetAddrStr , cts . remote_addr , err . Error ( ) )
err = cts . pss . Send ( MakeRouteStoppedPacket ( x . Route . RouteId , x . Route . ServiceProto , x . Route . TargetAddrStr , x . Route . ServiceNetStr ) )
if err != nil {
cts . svr . log . Write ( cts . sid , LOG_ERROR ,
"Failed to send route_stopped event(%d,%s,%v,%s) to client %s - %s" ,
x . Route . RouteId , x . Route . TargetAddrStr , x . Route . ServiceProto , x . Route . ServiceNetStr , cts . remote_addr , err . Error ( ) )
goto done
} else {
cts . svr . log . Write ( cts . sid , LOG_DEBUG ,
"Sent route_stopped event(%d,%s,%v,%s) to client %s" ,
x . Route . RouteId , x . Route . TargetAddrStr , x . Route . ServiceProto , x . Route . ServiceNetStr , cts . remote_addr )
}
2024-11-12 22:59:37 +09:00
} else {
2024-12-05 18:24:42 +09:00
cts . svr . log . Write ( cts . sid , LOG_INFO ,
"Added route(%d,%s,%s,%v,%v) for client %s to cts(%d)" ,
r . id , r . ptc_addr , r . svc_addr . String ( ) , r . svc_proto , r . svc_permitted_net , cts . remote_addr , cts . id )
err = cts . pss . Send ( MakeRouteStartedPacket ( r . id , r . svc_proto , r . svc_addr . String ( ) , r . svc_permitted_net . String ( ) ) )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-11-24 20:39:51 +09:00
r . ReqStop ( )
2024-12-05 18:24:42 +09:00
cts . svr . log . Write ( cts . sid , LOG_ERROR ,
2024-12-05 23:05:47 +09:00
"Failed to send route_started event(%d,%s,%s,%s%v,%v) to client %s - %s" ,
r . id , r . ptc_addr , r . svc_addr . String ( ) , r . svc_proto , r . svc_permitted_net , cts . remote_addr , err . Error ( ) )
2024-11-24 20:39:51 +09:00
goto done
2024-11-12 22:59:37 +09:00
}
}
} else {
2024-12-03 20:28:04 +09:00
cts . svr . log . Write ( cts . sid , LOG_INFO , "Received invalid packet from %s" , cts . remote_addr )
2024-11-24 20:39:51 +09:00
// TODO: need to abort this client?
2024-11-12 22:59:37 +09:00
}
case PACKET_KIND_ROUTE_STOP :
var x * Packet_Route
var ok bool
x , ok = pkt . U . ( * Packet_Route )
if ok {
2024-11-25 19:46:18 +09:00
var r * ServerRoute
2024-11-24 20:39:51 +09:00
r , err = cts . RemoveServerRouteById ( x . Route . RouteId )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-05 18:24:42 +09:00
cts . svr . log . Write ( cts . sid , LOG_ERROR ,
2024-12-05 23:05:47 +09:00
"Failed to delete route(%d,%s) for client %s - %s" ,
x . Route . RouteId , x . Route . TargetAddrStr , cts . remote_addr , err . Error ( ) )
2024-11-12 22:59:37 +09:00
} else {
2024-12-05 18:24:42 +09:00
cts . svr . log . Write ( cts . sid , LOG_ERROR ,
"Deleted route(%d,%s,%s,%v,%v) for client %s" ,
r . id , r . ptc_addr , r . svc_addr . String ( ) , r . svc_proto , r . svc_permitted_net . String ( ) , cts . remote_addr )
err = cts . pss . Send ( MakeRouteStoppedPacket ( r . id , r . svc_proto , r . ptc_addr , r . svc_permitted_net . String ( ) ) )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-11-24 20:39:51 +09:00
r . ReqStop ( )
2024-12-05 18:24:42 +09:00
cts . svr . log . Write ( cts . sid , LOG_ERROR ,
2024-12-05 23:05:47 +09:00
"Failed to send route_stopped event(%d,%s,%s,%v.%v) to client %s - %s" ,
r . id , r . ptc_addr , r . svc_addr . String ( ) , r . svc_proto , r . svc_permitted_net . String ( ) , cts . remote_addr , err . Error ( ) )
2024-11-24 20:39:51 +09:00
goto done
2024-11-12 22:59:37 +09:00
}
}
} else {
2024-12-03 20:28:04 +09:00
cts . svr . log . Write ( cts . sid , LOG_ERROR , "Invalid route_stop event from %s" , cts . remote_addr )
2024-11-12 22:59:37 +09:00
}
2024-11-13 23:14:43 +09:00
2024-11-12 22:59:37 +09:00
case PACKET_KIND_PEER_STARTED :
// the connection from the client to a peer has been established
var x * Packet_Peer
var ok bool
x , ok = pkt . U . ( * Packet_Peer )
if ok {
2024-12-03 20:28:04 +09:00
err = cts . ReportEvent ( x . Peer . RouteId , x . Peer . PeerId , PACKET_KIND_PEER_STARTED , x . Peer )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-03 20:28:04 +09:00
cts . svr . log . Write ( cts . sid , LOG_ERROR ,
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s" ,
cts . remote_addr , x . Peer . RouteId , x . Peer . PeerId , x . Peer . LocalAddrStr , x . Peer . RemoteAddrStr , err . Error ( ) )
2024-11-12 22:59:37 +09:00
} else {
2024-12-03 20:28:04 +09:00
cts . svr . log . Write ( cts . sid , LOG_DEBUG ,
"Handled peer_started event from %s for peer(%d,%d,%s,%s)" ,
cts . remote_addr , x . Peer . RouteId , x . Peer . PeerId , x . Peer . LocalAddrStr , x . Peer . RemoteAddrStr )
2024-11-12 22:59:37 +09:00
}
} else {
2024-12-03 20:28:04 +09:00
// invalid event data
cts . svr . log . Write ( cts . sid , LOG_ERROR , "Invalid peer_started event from %s" , cts . remote_addr )
2024-11-12 22:59:37 +09:00
}
2024-11-13 23:14:43 +09:00
2024-11-20 00:48:02 +09:00
case PACKET_KIND_PEER_ABORTED :
2024-12-05 01:26:44 +09:00
var x * Packet_Peer
var ok bool
x , ok = pkt . U . ( * Packet_Peer )
if ok {
err = cts . ReportEvent ( x . Peer . RouteId , x . Peer . PeerId , PACKET_KIND_PEER_ABORTED , x . Peer )
if err != nil {
cts . svr . log . Write ( cts . sid , LOG_ERROR ,
"Failed to handle peer_aborted event from %s for peer(%d,%d,%s,%s) - %s" ,
cts . remote_addr , x . Peer . RouteId , x . Peer . PeerId , x . Peer . LocalAddrStr , x . Peer . RemoteAddrStr , err . Error ( ) )
} else {
cts . svr . log . Write ( cts . sid , LOG_DEBUG ,
"Handled peer_aborted event from %s for peer(%d,%d,%s,%s)" ,
cts . remote_addr , x . Peer . RouteId , x . Peer . PeerId , x . Peer . LocalAddrStr , x . Peer . RemoteAddrStr )
}
} else {
// invalid event data
cts . svr . log . Write ( cts . sid , LOG_ERROR , "Invalid peer_aborted event from %s" , cts . remote_addr )
}
2024-11-12 22:59:37 +09:00
case PACKET_KIND_PEER_STOPPED :
// the connection from the client to a peer has been established
var x * Packet_Peer
var ok bool
x , ok = pkt . U . ( * Packet_Peer )
if ok {
2024-12-05 01:26:44 +09:00
err = cts . ReportEvent ( x . Peer . RouteId , x . Peer . PeerId , PACKET_KIND_PEER_STOPPED , x . Peer )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-03 20:28:04 +09:00
cts . svr . log . Write ( cts . sid , LOG_ERROR ,
2024-12-05 01:26:44 +09:00
"Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s" ,
2024-12-03 20:28:04 +09:00
cts . remote_addr , x . Peer . RouteId , x . Peer . PeerId , x . Peer . LocalAddrStr , x . Peer . RemoteAddrStr , err . Error ( ) )
2024-11-12 22:59:37 +09:00
} else {
2024-12-03 20:28:04 +09:00
cts . svr . log . Write ( cts . sid , LOG_DEBUG ,
"Handled peer_stopped event from %s for peer(%d,%d,%s,%s)" ,
cts . remote_addr , x . Peer . RouteId , x . Peer . PeerId , x . Peer . LocalAddrStr , x . Peer . RemoteAddrStr )
2024-11-12 22:59:37 +09:00
}
} else {
2024-12-03 20:28:04 +09:00
// invalid event data
cts . svr . log . Write ( cts . sid , LOG_ERROR , "Invalid peer_stopped event from %s" , cts . remote_addr )
2024-11-12 22:59:37 +09:00
}
case PACKET_KIND_PEER_DATA :
// the connection from the client to a peer has been established
var x * Packet_Data
var ok bool
x , ok = pkt . U . ( * Packet_Data )
if ok {
err = cts . ReportEvent ( x . Data . RouteId , x . Data . PeerId , PACKET_KIND_PEER_DATA , x . Data . Data )
if err != nil {
2024-12-03 20:28:04 +09:00
cts . svr . log . Write ( cts . sid , LOG_ERROR ,
"Failed to handle peer_data event from %s for peer(%d,%d) - %s" ,
cts . remote_addr , x . Data . RouteId , x . Data . PeerId , err . Error ( ) )
2024-11-12 22:59:37 +09:00
} else {
2024-12-03 20:28:04 +09:00
cts . svr . log . Write ( cts . sid , LOG_DEBUG ,
"Handled peer_data event from %s for peer(%d,%d)" ,
cts . remote_addr , x . Data . RouteId , x . Data . PeerId )
2024-11-12 22:59:37 +09:00
}
} else {
2024-12-03 20:28:04 +09:00
// invalid event data
cts . svr . log . Write ( cts . sid , LOG_ERROR , "Invalid peer_data event from %s" , cts . remote_addr )
2024-11-12 22:59:37 +09:00
}
}
2024-11-20 00:31:14 +09:00
}
2024-11-18 22:25:59 +09:00
done :
2024-12-03 20:28:04 +09:00
cts . svr . log . Write ( cts . sid , LOG_INFO , "RPC stream receiver ended" )
2024-11-18 22:25:59 +09:00
}
2024-11-25 19:46:18 +09:00
func ( cts * ServerConn ) RunTask ( wg * sync . WaitGroup ) {
2024-11-18 22:25:59 +09:00
var strm * GuardedPacketStreamServer
var ctx context . Context
defer wg . Done ( )
strm = cts . pss
ctx = strm . Context ( )
2024-11-20 00:31:14 +09:00
// it looks like the only proper way to interrupt the blocking Recv
// call on the grpc streaming server is exit from the service handler
// which is this function invoked from PacketStream().
// there is no cancel function or whatever that can interrupt it.
// so start the Recv() loop in a separte goroutine and let this
// function be the channel waiter only.
// increment on the wait group is for the caller to wait for
// these detached goroutines to finish.
wg . Add ( 1 )
go cts . receive_from_stream ( wg )
2024-11-18 22:25:59 +09:00
for {
// exit if context is done
// or continue
select {
case <- ctx . Done ( ) : // the stream context is done
2024-12-03 20:28:04 +09:00
cts . svr . log . Write ( cts . sid , LOG_INFO , "RPC stream done - %s" , ctx . Err ( ) . Error ( ) )
2024-11-18 22:25:59 +09:00
goto done
case <- cts . stop_chan :
2024-11-24 20:39:51 +09:00
// get out of the loop to eventually to exit from
// this handler to let the main grpc server to
// close this specific client connection.
2024-11-18 22:25:59 +09:00
goto done
2024-11-20 00:31:14 +09:00
//default:
2024-11-18 22:25:59 +09:00
// no other case is ready.
// without the default case, the select construct would block
}
}
done :
2024-11-24 20:39:51 +09:00
cts . ReqStop ( ) // just in case
2024-11-18 22:25:59 +09:00
cts . route_wg . Wait ( )
}
2024-11-25 19:46:18 +09:00
func ( cts * ServerConn ) ReqStop ( ) {
2024-11-18 22:25:59 +09:00
if cts . stop_req . CompareAndSwap ( false , true ) {
var r * ServerRoute
for _ , r = range cts . route_map {
r . ReqStop ( )
}
2024-11-24 20:39:51 +09:00
// there is no good way to break a specific connection client to
// the grpc server. while the global grpc server is closed in
// ReqStop() for Server, the individuation connection is closed
// by returing from the grpc handler goroutine. See the comment
2024-11-25 19:46:18 +09:00
// RunTask() for ServerConn.
2024-11-18 22:25:59 +09:00
cts . stop_chan <- true
}
}
// --------------------------------------------------------------------
2024-11-25 19:46:18 +09:00
func ( s * Server ) GetSeed ( ctx context . Context , c_seed * Seed ) ( * Seed , error ) {
2024-11-20 00:31:14 +09:00
var s_seed Seed
// seed exchange is for furture expansion of the protocol
// there is nothing to do much about it for now.
2024-12-07 21:24:06 +09:00
s_seed . Version = HODU_RPC_VERSION
2024-11-20 00:31:14 +09:00
s_seed . Flags = 0
2024-11-25 19:46:18 +09:00
// we create no ServerConn structure associated with the connection
2024-11-20 00:31:14 +09:00
// at this phase for the server. it doesn't track the client version and
// features. we delegate protocol selection solely to the client.
return & s_seed , nil
}
2024-11-18 22:25:59 +09:00
func ( s * Server ) PacketStream ( strm Hodu_PacketStreamServer ) error {
var ctx context . Context
var p * peer . Peer
var ok bool
var err error
2024-11-25 19:46:18 +09:00
var cts * ServerConn
2024-11-18 22:25:59 +09:00
ctx = strm . Context ( )
p , ok = peer . FromContext ( ctx )
2024-11-25 19:46:18 +09:00
if ! ok {
2024-11-18 22:25:59 +09:00
return fmt . Errorf ( "failed to get peer from packet stream context" )
}
2024-12-02 02:19:50 +09:00
cts , err = s . AddNewServerConn ( & p . Addr , & p . LocalAddr , strm )
2024-11-18 22:25:59 +09:00
if err != nil {
return fmt . Errorf ( "unable to add client %s - %s" , p . Addr . String ( ) , err . Error ( ) )
2024-11-12 22:59:37 +09:00
}
2024-11-18 22:25:59 +09:00
// Don't detached the cts task as a go-routine as this function
// is invoked as a go-routine by the grpc server.
s . cts_wg . Add ( 1 )
cts . RunTask ( & s . cts_wg )
return nil
2024-11-12 22:59:37 +09:00
}
// ------------------------------------
type ConnCatcher struct {
server * Server
}
func ( cc * ConnCatcher ) TagRPC ( ctx context . Context , info * stats . RPCTagInfo ) context . Context {
2024-11-25 19:46:18 +09:00
return ctx
2024-11-12 22:59:37 +09:00
}
func ( cc * ConnCatcher ) HandleRPC ( ctx context . Context , s stats . RPCStats ) {
}
func ( cc * ConnCatcher ) TagConn ( ctx context . Context , info * stats . ConnTagInfo ) context . Context {
2024-11-23 14:49:04 +09:00
return ctx
2024-11-25 19:46:18 +09:00
//return context.TODO()
2024-11-12 22:59:37 +09:00
}
func ( cc * ConnCatcher ) HandleConn ( ctx context . Context , cs stats . ConnStats ) {
var p * peer . Peer
var ok bool
var addr string
p , ok = peer . FromContext ( ctx )
2024-11-25 19:46:18 +09:00
if ! ok {
2024-11-12 22:59:37 +09:00
addr = ""
} else {
addr = p . Addr . String ( )
}
2024-12-03 20:28:04 +09:00
/ *
md , ok := metadata . FromIncomingContext ( ctx )
if ok {
} * /
2024-11-12 22:59:37 +09:00
switch cs . ( type ) {
case * stats . ConnBegin :
2024-12-03 20:28:04 +09:00
cc . server . log . Write ( "" , LOG_INFO , "Client connected - %s" , addr )
2024-11-12 22:59:37 +09:00
case * stats . ConnEnd :
2024-12-03 20:28:04 +09:00
cc . server . log . Write ( "" , LOG_INFO , "Client disconnected - %s" , addr )
2024-11-25 19:46:18 +09:00
cc . server . RemoveServerConnByAddr ( p . Addr )
}
2024-11-12 22:59:37 +09:00
}
2024-12-03 20:28:04 +09:00
// ------------------------------------
2024-11-12 22:59:37 +09:00
type wrappedStream struct {
grpc . ServerStream
}
2024-12-03 20:28:04 +09:00
func ( w * wrappedStream ) RecvMsg ( msg interface { } ) error {
return w . ServerStream . RecvMsg ( msg )
2024-11-12 22:59:37 +09:00
}
2024-12-03 20:28:04 +09:00
func ( w * wrappedStream ) SendMsg ( msg interface { } ) error {
return w . ServerStream . SendMsg ( msg )
2024-11-12 22:59:37 +09:00
}
func newWrappedStream ( s grpc . ServerStream ) grpc . ServerStream {
return & wrappedStream { s }
}
2024-12-03 20:28:04 +09:00
func streamInterceptor ( srv interface { } , ss grpc . ServerStream , info * grpc . StreamServerInfo , handler grpc . StreamHandler ) error {
var err error
2024-11-12 22:59:37 +09:00
// authentication (token verification)
/ *
2024-12-03 20:28:04 +09:00
md , ok = metadata . FromIncomingContext ( ss . Context ( ) )
2024-11-12 22:59:37 +09:00
if ! ok {
return errMissingMetadata
}
if ! valid ( md [ "authorization" ] ) {
return errInvalidToken
}
* /
2024-12-03 20:28:04 +09:00
err = handler ( srv , newWrappedStream ( ss ) )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-03 20:28:04 +09:00
// TODO: LOGGING
2024-11-12 22:59:37 +09:00
}
return err
}
2024-12-03 20:28:04 +09:00
func unaryInterceptor ( ctx context . Context , req interface { } , info * grpc . UnaryServerInfo , handler grpc . UnaryHandler ) ( interface { } , error ) {
var v interface { }
var err error
2024-11-12 22:59:37 +09:00
// authentication (token verification)
/ *
2024-12-03 20:28:04 +09:00
md , ok = metadata . FromIncomingContext ( ctx )
2024-11-12 22:59:37 +09:00
if ! ok {
return nil , errMissingMetadata
}
if ! valid ( md [ "authorization" ] ) {
// return nil, errInvalidToken
}
* /
2024-12-03 20:28:04 +09:00
v , err = handler ( ctx , req )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-12-03 20:28:04 +09:00
//fmt.Printf("RPC failed with error: %v\n", err)
// TODO: Logging?
2024-11-12 22:59:37 +09:00
}
2024-11-21 01:11:01 +09:00
2024-12-03 20:28:04 +09:00
return v , err
2024-11-12 22:59:37 +09:00
}
2024-12-07 22:18:07 +09:00
type server_ctl_log_writer struct {
svr * Server
}
func ( hlw * server_ctl_log_writer ) Write ( p [ ] byte ) ( n int , err error ) {
// the standard http.Server always requires *log.Logger
// use this iowriter to create a logger to pass it to the http server.
hlw . svr . log . Write ( "" , LOG_INFO , string ( p ) )
return len ( p ) , nil
}
2024-12-07 16:57:00 +09:00
func NewServer ( ctx context . Context , ctl_addrs [ ] string , rpc_addrs [ ] string , logger Logger , ctltlscfg * tls . Config , rpctlscfg * tls . Config ) ( * Server , error ) {
2024-11-12 22:59:37 +09:00
var s Server
var l * net . TCPListener
2024-12-03 11:52:46 +09:00
var rpcaddr * net . TCPAddr
2024-11-12 22:59:37 +09:00
var err error
var addr string
var gl * net . TCPListener
2024-12-03 11:52:46 +09:00
var i int
2024-12-01 21:47:11 +09:00
var cwd string
2024-12-07 22:18:07 +09:00
var hs_log * log . Logger
2024-11-12 22:59:37 +09:00
2024-12-03 11:52:46 +09:00
if len ( rpc_addrs ) <= 0 {
2024-11-24 20:39:51 +09:00
return nil , fmt . Errorf ( "no server addresses provided" )
2024-11-12 22:59:37 +09:00
}
2024-11-24 20:39:51 +09:00
s . ctx , s . ctx_cancel = context . WithCancel ( ctx )
2024-11-21 01:11:01 +09:00
s . log = logger
2024-11-12 22:59:37 +09:00
/* create the specified number of listeners */
2024-12-03 11:52:46 +09:00
s . rpc = make ( [ ] * net . TCPListener , 0 )
for _ , addr = range rpc_addrs {
rpcaddr , err = net . ResolveTCPAddr ( NET_TYPE_TCP , addr ) // Make this interruptable???
2024-11-12 22:59:37 +09:00
if err != nil {
goto oops
}
2024-12-03 11:52:46 +09:00
l , err = net . ListenTCP ( NET_TYPE_TCP , rpcaddr )
2024-11-12 22:59:37 +09:00
if err != nil {
goto oops
}
2024-12-03 11:52:46 +09:00
s . rpc = append ( s . rpc , l )
2024-11-12 22:59:37 +09:00
}
2024-12-07 16:57:00 +09:00
s . ctltlscfg = ctltlscfg
s . rpctlscfg = rpctlscfg
2024-11-23 12:30:23 +09:00
s . ext_svcs = make ( [ ] Service , 0 , 1 )
2024-12-03 00:55:19 +09:00
s . cts_map = make ( ServerConnMap )
s . cts_map_by_addr = make ( ServerConnMapByAddr )
2024-11-24 20:39:51 +09:00
s . stop_chan = make ( chan bool , 8 )
2024-11-12 22:59:37 +09:00
s . stop_req . Store ( false )
2024-12-07 16:57:00 +09:00
2024-11-12 22:59:37 +09:00
/ *
creds , err := credentials . NewServerTLSFromFile ( data . Path ( "x509/server_cert.pem" ) , data . Path ( "x509/server_key.pem" ) )
if err != nil {
log . Fatalf ( "failed to create credentials: %v" , err )
}
gs = grpc . NewServer ( grpc . Creds ( creds ) )
* /
2024-12-07 16:57:00 +09:00
if s . rpctlscfg == nil {
s . rpc_svr = grpc . NewServer (
//grpc.UnaryInterceptor(unaryInterceptor),
//grpc.StreamInterceptor(streamInterceptor),
grpc . StatsHandler ( & ConnCatcher { server : & s } ) ,
)
} else {
s . rpc_svr = grpc . NewServer (
grpc . Creds ( credentials . NewTLS ( s . rpctlscfg ) ) ,
//grpc.UnaryInterceptor(unaryInterceptor),
//grpc.StreamInterceptor(streamInterceptor),
grpc . StatsHandler ( & ConnCatcher { server : & s } ) ,
)
}
2024-12-03 20:28:04 +09:00
RegisterHoduServer ( s . rpc_svr , & s )
2024-11-12 22:59:37 +09:00
2024-12-01 21:47:11 +09:00
s . ctl_prefix = "" // TODO:
s . ctl_mux = http . NewServeMux ( )
cwd , _ = os . Getwd ( )
s . ctl_mux . Handle ( s . ctl_prefix + "/ui/" , http . StripPrefix ( s . ctl_prefix , http . FileServer ( http . Dir ( cwd ) ) ) ) // TODO: proper directory. it must not use the current working directory...
2024-12-02 02:19:50 +09:00
s . ctl_mux . Handle ( s . ctl_prefix + "/ws/tty" , new_server_ctl_ws_tty ( & s ) )
s . ctl_mux . Handle ( s . ctl_prefix + "/server-conns" , & server_ctl_server_conns { s : & s } )
2024-12-03 00:55:19 +09:00
s . ctl_mux . Handle ( s . ctl_prefix + "/server-conns/{conn_id}" , & server_ctl_server_conns_id { s : & s } )
2024-12-06 00:52:33 +09:00
s . ctl_mux . Handle ( s . ctl_prefix + "/server-conns/{conn_id}/routes" , & server_ctl_server_conns_id_routes { s : & s } )
s . ctl_mux . Handle ( s . ctl_prefix + "/server-conns/{conn_id}/routes/{route_id}" , & server_ctl_server_conns_id_routes_id { s : & s } )
2024-12-01 21:47:11 +09:00
2024-12-03 11:52:46 +09:00
s . ctl_addr = make ( [ ] string , len ( ctl_addrs ) )
s . ctl = make ( [ ] * http . Server , len ( ctl_addrs ) )
copy ( s . ctl_addr , ctl_addrs )
2024-12-07 22:18:07 +09:00
hs_log = log . New ( & server_ctl_log_writer { svr : & s } , "" , 0 ) ;
2024-12-03 11:52:46 +09:00
for i = 0 ; i < len ( ctl_addrs ) ; i ++ {
s . ctl [ i ] = & http . Server {
Addr : ctl_addrs [ i ] ,
Handler : s . ctl_mux ,
2024-12-07 16:57:00 +09:00
TLSConfig : s . ctltlscfg ,
2024-12-07 22:18:07 +09:00
ErrorLog : hs_log ,
2024-12-03 11:52:46 +09:00
// TODO: more settings
}
2024-12-01 21:47:11 +09:00
}
2024-11-12 22:59:37 +09:00
return & s , nil
oops :
2024-12-03 20:28:04 +09:00
// TODO: check if rpc_svr needs to be closed. probably not. closing the listen may be good enough
2024-11-12 22:59:37 +09:00
if gl != nil {
gl . Close ( )
}
2024-12-03 11:52:46 +09:00
for _ , l = range s . rpc {
2024-11-12 22:59:37 +09:00
l . Close ( )
}
2024-12-03 11:52:46 +09:00
s . rpc = make ( [ ] * net . TCPListener , 0 )
2024-11-12 22:59:37 +09:00
return nil , err
}
2024-11-18 22:25:59 +09:00
func ( s * Server ) run_grpc_server ( idx int , wg * sync . WaitGroup ) error {
2024-11-12 22:59:37 +09:00
var l * net . TCPListener
var err error
2024-11-23 14:49:04 +09:00
defer wg . Done ( )
2024-11-18 22:25:59 +09:00
2024-12-03 11:52:46 +09:00
l = s . rpc [ idx ]
2024-11-12 22:59:37 +09:00
// it seems to be safe to call a single grpc server on differnt listening sockets multiple times
2024-12-03 20:28:04 +09:00
s . log . Write ( "" , LOG_ERROR , "Starting RPC server on %s" , l . Addr ( ) . String ( ) )
err = s . rpc_svr . Serve ( l )
2024-11-12 22:59:37 +09:00
if err != nil {
2024-11-21 01:11:01 +09:00
if errors . Is ( err , net . ErrClosed ) {
2024-12-03 20:28:04 +09:00
s . log . Write ( "" , LOG_ERROR , "RPC server on %s closed" , l . Addr ( ) . String ( ) )
2024-11-21 01:11:01 +09:00
} else {
2024-12-03 20:28:04 +09:00
s . log . Write ( "" , LOG_ERROR , "Error from RPC server on %s - %s" , l . Addr ( ) . String ( ) , err . Error ( ) )
2024-11-21 01:11:01 +09:00
}
return err
2024-11-12 22:59:37 +09:00
}
return nil
}
2024-11-18 22:25:59 +09:00
func ( s * Server ) RunTask ( wg * sync . WaitGroup ) {
2024-11-12 22:59:37 +09:00
var idx int
2024-11-18 22:25:59 +09:00
defer wg . Done ( )
2024-11-13 02:20:25 +09:00
2024-12-03 11:52:46 +09:00
for idx , _ = range s . rpc {
s . rpc_wg . Add ( 1 )
go s . run_grpc_server ( idx , & s . rpc_wg )
2024-11-12 22:59:37 +09:00
}
2024-11-24 20:39:51 +09:00
// most the work is done by in separate goroutines (s.run_grp_server)
// this loop serves as a placeholder to prevent the logic flow from
// descening down to s.ReqStop()
task_loop :
for {
select {
case <- s . stop_chan :
break task_loop
}
}
s . ReqStop ( )
2024-12-03 11:52:46 +09:00
s . rpc_wg . Wait ( )
2024-12-03 20:28:04 +09:00
s . log . Write ( "" , LOG_DEBUG , "All RPC listeners completed" )
2024-11-24 20:39:51 +09:00
2024-11-18 22:25:59 +09:00
s . cts_wg . Wait ( )
2024-11-23 17:20:53 +09:00
s . log . Write ( "" , LOG_DEBUG , "All CTS handlers completed" )
2024-11-21 01:11:01 +09:00
2024-11-20 00:31:14 +09:00
// stop the main grpc server after all the other tasks are finished.
2024-12-03 20:28:04 +09:00
s . rpc_svr . Stop ( )
2024-11-12 22:59:37 +09:00
}
2024-11-21 01:11:01 +09:00
func ( s * Server ) RunCtlTask ( wg * sync . WaitGroup ) {
var err error
2024-12-03 11:52:46 +09:00
var ctl * http . Server
var idx int
var l_wg sync . WaitGroup
2024-11-21 01:11:01 +09:00
defer wg . Done ( )
2024-12-03 11:52:46 +09:00
for idx , ctl = range s . ctl {
l_wg . Add ( 1 )
go func ( i int , cs * http . Server ) {
s . log . Write ( "" , LOG_INFO , "Control channel[%d] started on %s" , i , s . ctl_addr [ i ] )
2024-12-07 16:57:00 +09:00
if s . ctltlscfg == nil {
2024-12-06 00:52:33 +09:00
err = cs . ListenAndServe ( )
} else {
2024-12-07 16:57:00 +09:00
err = cs . ListenAndServeTLS ( "" , "" ) // c.ctltlscfg must provide a certificate and a key
2024-12-06 00:52:33 +09:00
}
2024-12-03 11:52:46 +09:00
if errors . Is ( err , http . ErrServerClosed ) {
s . log . Write ( "" , LOG_DEBUG , "Control channel[%d] ended" , i )
} else {
s . log . Write ( "" , LOG_ERROR , "Control channel[%d] error - %s" , i , err . Error ( ) )
}
l_wg . Done ( )
} ( idx , ctl )
2024-11-21 01:11:01 +09:00
}
2024-12-03 11:52:46 +09:00
l_wg . Wait ( )
2024-11-21 01:11:01 +09:00
}
2024-11-12 22:59:37 +09:00
func ( s * Server ) ReqStop ( ) {
if s . stop_req . CompareAndSwap ( false , true ) {
var l * net . TCPListener
2024-11-25 19:46:18 +09:00
var cts * ServerConn
2024-12-03 11:52:46 +09:00
var ctl * http . Server
2024-11-12 22:59:37 +09:00
2024-12-03 11:52:46 +09:00
for _ , ctl = range s . ctl {
ctl . Shutdown ( s . ctx ) // to break c.ctl.ListenAndServe()
2024-11-24 20:39:51 +09:00
}
2024-12-05 01:26:44 +09:00
//s.rpc_svr.GracefulStop()
//s.rpc_svr.Stop()
2024-12-03 11:52:46 +09:00
for _ , l = range s . rpc {
2024-11-12 22:59:37 +09:00
l . Close ( )
}
2024-12-03 20:28:04 +09:00
s . cts_mtx . Lock ( )
2024-11-12 22:59:37 +09:00
for _ , cts = range s . cts_map {
cts . ReqStop ( ) // request to stop connections from/to peer held in the cts structure
}
s . cts_mtx . Unlock ( )
2024-11-24 20:39:51 +09:00
s . stop_chan <- true
s . ctx_cancel ( )
2024-11-12 22:59:37 +09:00
}
}
2024-12-02 02:19:50 +09:00
func ( s * Server ) AddNewServerConn ( remote_addr * net . Addr , local_addr * net . Addr , pss Hodu_PacketStreamServer ) ( * ServerConn , error ) {
2024-11-25 19:46:18 +09:00
var cts ServerConn
2024-12-03 00:55:19 +09:00
var id uint32
2024-11-12 22:59:37 +09:00
var ok bool
cts . svr = s
2024-11-18 22:25:59 +09:00
cts . route_map = make ( ServerRouteMap )
2024-12-03 20:28:04 +09:00
cts . remote_addr = * remote_addr
cts . local_addr = * local_addr
2024-11-18 22:25:59 +09:00
cts . pss = & GuardedPacketStreamServer { Hodu_PacketStreamServer : pss }
2024-11-12 22:59:37 +09:00
cts . stop_req . Store ( false )
2024-11-24 20:39:51 +09:00
cts . stop_chan = make ( chan bool , 8 )
2024-11-12 22:59:37 +09:00
s . cts_mtx . Lock ( )
defer s . cts_mtx . Unlock ( )
2024-12-03 00:55:19 +09:00
id = rand . Uint32 ( )
for {
_ , ok = s . cts_map [ id ]
if ! ok { break }
id ++
}
cts . id = id
2024-12-03 11:52:46 +09:00
cts . sid = fmt . Sprintf ( "%d" , id ) // id in string used for logging
2024-12-03 00:55:19 +09:00
2024-12-03 20:28:04 +09:00
_ , ok = s . cts_map_by_addr [ cts . remote_addr ]
2024-11-12 22:59:37 +09:00
if ok {
2024-12-03 20:28:04 +09:00
return nil , fmt . Errorf ( "existing client - %s" , cts . remote_addr . String ( ) )
2024-11-12 22:59:37 +09:00
}
2024-12-03 20:28:04 +09:00
s . cts_map_by_addr [ cts . remote_addr ] = & cts
2024-12-03 00:55:19 +09:00
s . cts_map [ id ] = & cts ;
2024-12-03 20:28:04 +09:00
s . log . Write ( "" , LOG_DEBUG , "Added client connection from %s" , cts . remote_addr . String ( ) )
2024-11-12 22:59:37 +09:00
return & cts , nil
}
2024-12-02 09:46:10 +09:00
func ( s * Server ) ReqStopAllServerConns ( ) {
var cts * ServerConn
s . cts_mtx . Lock ( )
defer s . cts_mtx . Unlock ( )
for _ , cts = range s . cts_map {
cts . ReqStop ( )
}
}
2024-12-03 00:55:19 +09:00
func ( s * Server ) RemoveServerConn ( cts * ServerConn ) error {
var conn * ServerConn
var ok bool
s . cts_mtx . Lock ( )
conn , ok = s . cts_map [ cts . id ]
if ! ok {
s . cts_mtx . Unlock ( )
return fmt . Errorf ( "non-existent connection id - %d" , cts . id )
}
if conn != cts {
s . cts_mtx . Unlock ( )
return fmt . Errorf ( "non-existent connection id - %d" , cts . id )
}
delete ( s . cts_map , cts . id )
2024-12-03 20:28:04 +09:00
delete ( s . cts_map_by_addr , cts . remote_addr )
2024-12-03 00:55:19 +09:00
s . cts_mtx . Unlock ( )
cts . ReqStop ( )
return nil
}
func ( s * Server ) RemoveServerConnByAddr ( addr net . Addr ) error {
var cts * ServerConn
var ok bool
2024-11-12 22:59:37 +09:00
s . cts_mtx . Lock ( )
2024-12-03 00:55:19 +09:00
cts , ok = s . cts_map_by_addr [ addr ]
if ! ok {
s . cts_mtx . Unlock ( )
return fmt . Errorf ( "non-existent connection address - %s" , addr . String ( ) )
}
delete ( s . cts_map , cts . id )
2024-12-03 20:28:04 +09:00
delete ( s . cts_map_by_addr , cts . remote_addr )
2024-11-12 22:59:37 +09:00
s . cts_mtx . Unlock ( )
2024-12-03 00:55:19 +09:00
cts . ReqStop ( )
return nil
2024-11-12 22:59:37 +09:00
}
2024-12-03 00:55:19 +09:00
func ( s * Server ) FindServerConnById ( id uint32 ) * ServerConn {
2024-11-25 19:46:18 +09:00
var cts * ServerConn
2024-11-12 22:59:37 +09:00
var ok bool
s . cts_mtx . Lock ( )
defer s . cts_mtx . Unlock ( )
2024-12-03 00:55:19 +09:00
cts , ok = s . cts_map [ id ]
if ! ok {
return nil
2024-11-12 22:59:37 +09:00
}
2024-12-03 00:55:19 +09:00
return cts
2024-11-12 22:59:37 +09:00
}
2024-11-25 19:46:18 +09:00
func ( s * Server ) FindServerConnByAddr ( addr net . Addr ) * ServerConn {
var cts * ServerConn
2024-11-12 22:59:37 +09:00
var ok bool
s . cts_mtx . Lock ( )
defer s . cts_mtx . Unlock ( )
2024-12-03 00:55:19 +09:00
cts , ok = s . cts_map_by_addr [ addr ]
2024-11-12 22:59:37 +09:00
if ! ok {
2024-11-13 23:14:43 +09:00
return nil
2024-11-12 22:59:37 +09:00
}
return cts
}
2024-11-23 12:30:23 +09:00
func ( s * Server ) StartService ( cfg interface { } ) {
s . wg . Add ( 1 )
go s . RunTask ( & s . wg )
}
2024-11-12 22:59:37 +09:00
2024-11-23 12:30:23 +09:00
func ( s * Server ) StartExtService ( svc Service , data interface { } ) {
2024-11-30 20:24:30 +09:00
s . ext_mtx . Lock ( )
2024-11-23 12:30:23 +09:00
s . ext_svcs = append ( s . ext_svcs , svc )
2024-11-30 20:24:30 +09:00
s . ext_mtx . Unlock ( )
2024-11-23 12:30:23 +09:00
s . wg . Add ( 1 )
go svc . RunTask ( & s . wg )
}
2024-11-12 22:59:37 +09:00
2024-12-01 21:47:11 +09:00
func ( s * Server ) StartCtlService ( ) {
s . wg . Add ( 1 )
go s . RunCtlTask ( & s . wg )
}
2024-11-23 12:30:23 +09:00
func ( s * Server ) StopServices ( ) {
var ext_svc Service
s . ReqStop ( )
for _ , ext_svc = range s . ext_svcs {
ext_svc . StopServices ( )
2024-11-12 22:59:37 +09:00
}
2024-11-23 12:30:23 +09:00
}
2024-11-12 22:59:37 +09:00
2024-11-23 12:30:23 +09:00
func ( s * Server ) WaitForTermination ( ) {
2024-11-13 02:20:25 +09:00
s . wg . Wait ( )
2024-11-12 22:59:37 +09:00
}
2024-11-23 14:49:04 +09:00
2024-11-25 19:46:18 +09:00
func ( s * Server ) WriteLog ( id string , level LogLevel , fmtstr string , args ... interface { } ) {
2024-11-23 20:13:07 +09:00
s . log . Write ( id , level , fmtstr , args ... )
2024-11-23 14:49:04 +09:00
}