hodu/server.go

1050 lines
25 KiB
Go
Raw Normal View History

package hodu
import "context"
import "crypto/tls"
2024-11-17 14:57:56 +09:00
import "errors"
import "fmt"
import "io"
import "math/rand"
import "net"
2024-11-21 01:11:01 +09:00
import "net/http"
import "os"
import "sync"
import "sync/atomic"
import "google.golang.org/grpc"
2024-11-13 02:20:25 +09:00
//import "google.golang.org/grpc/metadata"
import "google.golang.org/grpc/peer"
import "google.golang.org/grpc/stats"
const PTS_LIMIT = 8192
2024-12-03 00:55:19 +09:00
type ServerConnMapByAddr = map[net.Addr]*ServerConn
type ServerConnMap = map[uint32]*ServerConn
type ServerPeerConnMap = map[uint32]*ServerPeerConn
type ServerRouteMap = map[uint32]*ServerRoute
type Server struct {
2024-12-03 00:55:19 +09:00
ctx context.Context
ctx_cancel context.CancelFunc
tlscfg *tls.Config
2024-11-24 20:39:51 +09:00
2024-12-03 00:55:19 +09:00
wg sync.WaitGroup
stop_req atomic.Bool
stop_chan chan bool
2024-11-18 22:25:59 +09:00
ext_mtx sync.Mutex
ext_svcs []Service
ctl_addr []string
2024-12-03 00:55:19 +09:00
ctl_prefix string
ctl_mux *http.ServeMux
ctl []*http.Server // control server
2024-11-21 01:11:01 +09:00
rpc []*net.TCPListener // main listener for grpc
rpc_wg sync.WaitGroup
gs *grpc.Server
2024-12-03 00:55:19 +09:00
cts_mtx sync.Mutex
cts_map ServerConnMap
cts_map_by_addr ServerConnMapByAddr
cts_wg sync.WaitGroup
2024-12-03 00:55:19 +09:00
log Logger
UnimplementedHoduServer
}
// connection from client.
// client connect to the server, the server accept it, and makes a tunnel request
type ServerConn struct {
svr *Server
2024-12-02 02:19:50 +09:00
id uint32
sid string // for logging
raddr net.Addr // client address that created this structure
laddr net.Addr // local address that the client is connected to
pss *GuardedPacketStreamServer
route_mtx sync.Mutex
2024-11-18 22:25:59 +09:00
route_map ServerRouteMap
route_wg sync.WaitGroup
wg sync.WaitGroup
stop_req atomic.Bool
stop_chan chan bool
}
type ServerRoute struct {
cts *ServerConn
l *net.TCPListener
svcaddr *net.TCPAddr
2024-12-02 02:19:50 +09:00
ptc_addr string
id uint32
pts_mtx sync.Mutex
pts_map ServerPeerConnMap
pts_limit int
pts_last_id uint32
pts_wg sync.WaitGroup
stop_req atomic.Bool
2024-11-18 22:25:59 +09:00
}
type GuardedPacketStreamServer struct {
mtx sync.Mutex
//pss Hodu_PacketStreamServer
Hodu_PacketStreamServer // let's embed it to avoid reimplement Recv() and Context()
}
// ------------------------------------
2024-11-18 22:25:59 +09:00
func (g *GuardedPacketStreamServer) Send(data *Packet) error {
// while Recv() on a stream is called from the same gorountine all the time,
// Send() is called from multiple places. let's guard it as grpc-go
// doesn't provide concurrency safety in this case.
// https://github.com/grpc/grpc-go/blob/master/Documentation/concurrency.md
g.mtx.Lock()
defer g.mtx.Unlock()
return g.Hodu_PacketStreamServer.Send(data)
}
/*
func (g *GuardedPacketStreamServer) Recv() (*Packet, error) {
return g.pss.Recv()
}
func (g *GuardedPacketStreamServer) Context() context.Context {
return g.pss.Context()
}*/
// ------------------------------------
2024-12-02 02:19:50 +09:00
func NewServerRoute(cts *ServerConn, id uint32, proto ROUTE_PROTO, ptc_addr string) (*ServerRoute, error) {
2024-11-18 22:25:59 +09:00
var r ServerRoute
var l *net.TCPListener
var svcaddr *net.TCPAddr
2024-11-18 22:25:59 +09:00
var err error
l, svcaddr, err = cts.make_route_listener(proto)
2024-11-18 22:25:59 +09:00
if err != nil {
return nil, err
}
r.cts = cts
r.id = id
r.l = l
r.svcaddr = svcaddr
2024-12-02 02:19:50 +09:00
r.ptc_addr = ptc_addr
2024-11-18 22:25:59 +09:00
r.pts_limit = PTS_LIMIT
r.pts_map = make(ServerPeerConnMap)
r.pts_last_id = 0
r.stop_req.Store(false)
2024-11-23 14:49:04 +09:00
return &r, nil
2024-11-18 22:25:59 +09:00
}
func (r *ServerRoute) AddNewServerPeerConn(c *net.TCPConn) (*ServerPeerConn, error) {
var pts *ServerPeerConn
var ok bool
var start_id uint32
r.pts_mtx.Lock()
defer r.pts_mtx.Unlock()
if len(r.pts_map) >= r.pts_limit {
return nil, fmt.Errorf("peer-to-server connection table full")
}
start_id = r.pts_last_id
for {
_, ok = r.pts_map[r.pts_last_id]
if !ok {
break
}
r.pts_last_id++
if r.pts_last_id == start_id {
// unlikely to happen but it cycled through the whole range.
return nil, fmt.Errorf("failed to assign peer-to-server connection id")
}
}
pts = NewServerPeerConn(r, c, r.pts_last_id)
r.pts_map[pts.conn_id] = pts
r.pts_last_id++
return pts, nil
}
func (r *ServerRoute) RemoveServerPeerConn(pts *ServerPeerConn) {
r.pts_mtx.Lock()
delete(r.pts_map, pts.conn_id)
r.pts_mtx.Unlock()
2024-11-23 14:49:04 +09:00
r.cts.svr.log.Write("", LOG_DEBUG, "Removed server-side peer connection %s", pts.conn.RemoteAddr().String())
}
2024-11-18 22:25:59 +09:00
func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
var err error
var conn *net.TCPConn
var pts *ServerPeerConn
var log_id string
2024-11-18 22:25:59 +09:00
defer wg.Done()
log_id = fmt.Sprintf("%s,%d", r.cts.raddr.String(), r.id)
2024-11-18 22:25:59 +09:00
for {
conn, err = r.l.AcceptTCP()
if err != nil {
2024-11-17 14:57:56 +09:00
if errors.Is(err, net.ErrClosed) {
2024-11-24 20:39:51 +09:00
r.cts.svr.log.Write(log_id, LOG_INFO, "Server-side peer listener closed")
2024-11-17 14:57:56 +09:00
} else {
r.cts.svr.log.Write(log_id, LOG_INFO, "Server-side peer listener error - %s", err.Error())
2024-11-17 14:57:56 +09:00
}
break
}
pts, err = r.AddNewServerPeerConn(conn)
if err != nil {
r.cts.svr.log.Write(log_id, LOG_ERROR, "Failed to add new server-side peer %s - %s", conn.RemoteAddr().String(), err.Error())
conn.Close()
} else {
r.cts.svr.log.Write(log_id, LOG_DEBUG, "Added new server-side peer %s", conn.RemoteAddr().String())
r.pts_wg.Add(1)
2024-11-18 22:25:59 +09:00
go pts.RunTask(&r.pts_wg)
}
}
2024-11-24 20:39:51 +09:00
r.ReqStop()
2024-11-18 22:25:59 +09:00
r.pts_wg.Wait()
r.cts.svr.log.Write(log_id, LOG_DEBUG, "All service-side peer handlers completed")
2024-11-24 20:39:51 +09:00
r.cts.RemoveServerRoute(r) // final phase...
}
2024-11-18 22:25:59 +09:00
func (r *ServerRoute) ReqStop() {
fmt.Printf("requesting to stop route taak..\n")
2024-11-18 22:25:59 +09:00
if r.stop_req.CompareAndSwap(false, true) {
var pts *ServerPeerConn
for _, pts = range r.pts_map {
pts.ReqStop()
}
2024-11-23 14:49:04 +09:00
r.l.Close()
2024-11-18 22:25:59 +09:00
}
fmt.Printf("requiested to stopp route taak..\n")
}
func (r *ServerRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
var spc *ServerPeerConn
var ok bool
r.pts_mtx.Lock()
spc, ok = r.pts_map[pts_id]
if !ok {
2024-11-23 14:49:04 +09:00
r.pts_mtx.Unlock()
return fmt.Errorf("non-existent peer id - %u", pts_id)
}
2024-11-23 14:49:04 +09:00
r.pts_mtx.Unlock()
return spc.ReportEvent(event_type, event_data)
}
// ------------------------------------
func (cts *ServerConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) {
var l *net.TCPListener
var err error
var svcaddr *net.TCPAddr
var port int
var tries int = 0
var nw string
switch proto {
case ROUTE_PROTO_TCP:
nw = "tcp"
case ROUTE_PROTO_TCP4:
nw = "tcp4"
case ROUTE_PROTO_TCP6:
nw = "tcp6"
}
for {
port = rand.Intn(65535-32000+1) + 32000
svcaddr, err = net.ResolveTCPAddr(nw, fmt.Sprintf(":%d", port))
if err == nil {
l, err = net.ListenTCP(nw, svcaddr) // make the binding address configurable. support multiple binding addresses???
if err == nil {
fmt.Printf("listening .... on ... %d\n", port)
return l, svcaddr, nil
}
}
// TODO: implement max retries..
tries++
if tries >= 1000 {
err = fmt.Errorf("unable to allocate port")
break
}
}
return nil, nil, err
}
2024-12-02 02:19:50 +09:00
func (cts *ServerConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO, ptc_addr string) (*ServerRoute, error) {
var r *ServerRoute
var err error
cts.route_mtx.Lock()
2024-11-18 22:25:59 +09:00
if cts.route_map[route_id] != nil {
cts.route_mtx.Unlock()
return nil, fmt.Errorf("existent route id - %d", route_id)
}
2024-12-02 02:19:50 +09:00
r, err = NewServerRoute(cts, route_id, proto, ptc_addr)
if err != nil {
cts.route_mtx.Unlock()
return nil, err
}
2024-11-23 14:49:04 +09:00
cts.route_map[route_id] = r
cts.route_mtx.Unlock()
2024-11-18 22:25:59 +09:00
cts.route_wg.Add(1)
go r.RunTask(&cts.route_wg)
return r, nil
}
func (cts *ServerConn) RemoveServerRoute(route *ServerRoute) error {
2024-11-24 20:39:51 +09:00
var r *ServerRoute
var ok bool
cts.route_mtx.Lock()
r, ok = cts.route_map[route.id]
if !ok {
2024-11-24 20:39:51 +09:00
cts.route_mtx.Unlock()
return fmt.Errorf("non-existent route id - %d", route.id)
2024-11-24 20:39:51 +09:00
}
if r != route {
2024-11-24 20:39:51 +09:00
cts.route_mtx.Unlock()
return fmt.Errorf("non-existent route - %d", route.id)
2024-11-24 20:39:51 +09:00
}
delete(cts.route_map, route.id)
cts.route_mtx.Unlock()
r.ReqStop()
return nil
}
func (cts *ServerConn) RemoveServerRouteById(route_id uint32) (*ServerRoute, error) {
var r *ServerRoute
var ok bool
cts.route_mtx.Lock()
2024-11-18 22:25:59 +09:00
r, ok = cts.route_map[route_id]
if !ok {
cts.route_mtx.Unlock()
return nil, fmt.Errorf("non-existent route id - %d", route_id)
}
2024-11-18 22:25:59 +09:00
delete(cts.route_map, route_id)
cts.route_mtx.Unlock()
2024-11-24 20:39:51 +09:00
r.ReqStop()
return r, nil
}
func (cts *ServerConn) ReportEvent(route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
var r *ServerRoute
var ok bool
cts.route_mtx.Lock()
2024-11-18 22:25:59 +09:00
r, ok = cts.route_map[route_id]
if (!ok) {
cts.route_mtx.Unlock()
return fmt.Errorf("non-existent route id - %d", route_id)
}
cts.route_mtx.Unlock()
return r.ReportEvent(pts_id, event_type, event_data)
}
func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
var pkt *Packet
var err error
defer wg.Done()
for {
2024-11-18 22:25:59 +09:00
pkt, err = cts.pss.Recv()
2024-11-17 14:57:56 +09:00
if errors.Is(err, io.EOF) {
cts.svr.log.Write("", LOG_INFO, "GRPC stream closed for client %s", cts.raddr)
2024-11-18 22:25:59 +09:00
goto done
}
if err != nil {
cts.svr.log.Write("", LOG_ERROR, "GRPC stream error for client %s - %s", cts.raddr, err.Error())
2024-11-18 22:25:59 +09:00
goto done
}
switch pkt.Kind {
case PACKET_KIND_ROUTE_START:
var x *Packet_Route
var ok bool
x, ok = pkt.U.(*Packet_Route)
if ok {
var r *ServerRoute
2024-11-24 20:39:51 +09:00
2024-12-02 02:19:50 +09:00
r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto, x.Route.AddrStr)
if err != nil {
cts.svr.log.Write("", LOG_ERROR, "Failed to add server route for client %s peer %s", cts.raddr, x.Route.AddrStr)
} else {
cts.svr.log.Write("", LOG_INFO, "Added server route(id=%d) for client %s peer %s to cts(id=%d)", r.id, cts.raddr, x.Route.AddrStr, cts.id)
err = cts.pss.Send(MakeRouteStartedPacket(r.id, x.Route.Proto, r.svcaddr.String()))
if err != nil {
2024-11-24 20:39:51 +09:00
r.ReqStop()
cts.svr.log.Write("", LOG_ERROR, "Failed to inform client %s of server route started for peer %s", cts.raddr, x.Route.AddrStr)
2024-11-24 20:39:51 +09:00
goto done
}
}
} else {
cts.svr.log.Write("", LOG_INFO, "Received invalid packet from %s", cts.raddr)
2024-11-24 20:39:51 +09:00
// TODO: need to abort this client?
}
case PACKET_KIND_ROUTE_STOP:
var x *Packet_Route
var ok bool
x, ok = pkt.U.(*Packet_Route)
if ok {
var r *ServerRoute
2024-11-24 20:39:51 +09:00
r, err = cts.RemoveServerRouteById(x.Route.RouteId)
if err != nil {
cts.svr.log.Write("", LOG_ERROR, "Failed to delete server route(id=%d) for client %s peer %s", x.Route.RouteId, cts.raddr, x.Route.AddrStr)
} else {
cts.svr.log.Write("", LOG_ERROR, "Deleted server route(id=%d) for client %s peer %s", x.Route.RouteId, cts.raddr, x.Route.AddrStr)
2024-11-18 22:25:59 +09:00
err = cts.pss.Send(MakeRouteStoppedPacket(x.Route.RouteId, x.Route.Proto))
if err != nil {
2024-11-24 20:39:51 +09:00
r.ReqStop()
cts.svr.log.Write("", LOG_ERROR, "Failed to inform client %s of server route(id=%d) stopped for peer %s", cts.raddr, x.Route.RouteId, x.Route.AddrStr)
2024-11-24 20:39:51 +09:00
goto done
}
}
} else {
cts.svr.log.Write("", LOG_INFO, "Received invalid packet from %s", cts.raddr)
2024-11-24 20:39:51 +09:00
// TODO: need to abort this client?
}
case PACKET_KIND_PEER_STARTED:
// the connection from the client to a peer has been established
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil)
if err != nil {
// TODO:
fmt.Printf("Failed to report PEER_STARTED Event")
} else {
// TODO:
}
} else {
// TODO
}
2024-11-20 00:48:02 +09:00
case PACKET_KIND_PEER_ABORTED:
fallthrough
case PACKET_KIND_PEER_STOPPED:
// the connection from the client to a peer has been established
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil)
if err != nil {
// TODO:
fmt.Printf("Failed to report PEER_STOPPED Event")
} else {
// TODO:
}
} else {
// TODO
}
case PACKET_KIND_PEER_DATA:
// the connection from the client to a peer has been established
var x *Packet_Data
var ok bool
x, ok = pkt.U.(*Packet_Data)
if ok {
err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data)
if err != nil {
// TODO:
fmt.Printf("Failed to report PEER_DATA Event")
} else {
// TODO:
}
} else {
// TODO
}
}
}
2024-11-18 22:25:59 +09:00
done:
fmt.Printf("************ stream receiver finished....\n")
2024-11-18 22:25:59 +09:00
}
func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
2024-11-18 22:25:59 +09:00
var strm *GuardedPacketStreamServer
var ctx context.Context
defer wg.Done()
strm = cts.pss
ctx = strm.Context()
// it looks like the only proper way to interrupt the blocking Recv
// call on the grpc streaming server is exit from the service handler
// which is this function invoked from PacketStream().
// there is no cancel function or whatever that can interrupt it.
// so start the Recv() loop in a separte goroutine and let this
// function be the channel waiter only.
// increment on the wait group is for the caller to wait for
// these detached goroutines to finish.
wg.Add(1)
go cts.receive_from_stream(wg)
2024-11-18 22:25:59 +09:00
for {
// exit if context is done
// or continue
select {
case <-ctx.Done(): // the stream context is done
2024-11-24 20:39:51 +09:00
fmt.Printf("grpc server done - %s\n", ctx.Err().Error())
2024-11-18 22:25:59 +09:00
goto done
case <- cts.stop_chan:
2024-11-24 20:39:51 +09:00
// get out of the loop to eventually to exit from
// this handler to let the main grpc server to
// close this specific client connection.
2024-11-18 22:25:59 +09:00
goto done
//default:
2024-11-18 22:25:59 +09:00
// no other case is ready.
// without the default case, the select construct would block
}
}
done:
fmt.Printf("^^^^^^^^^^^^^^^^^ waiting for reoute_wg...\n")
2024-11-24 20:39:51 +09:00
cts.ReqStop() // just in case
2024-11-18 22:25:59 +09:00
cts.route_wg.Wait()
fmt.Printf("^^^^^^^^^^^^^^^^^ waited for reoute_wg...\n")
2024-11-18 22:25:59 +09:00
}
func (cts *ServerConn) ReqStop() {
2024-11-18 22:25:59 +09:00
if cts.stop_req.CompareAndSwap(false, true) {
var r *ServerRoute
for _, r = range cts.route_map {
r.ReqStop()
}
2024-11-24 20:39:51 +09:00
// there is no good way to break a specific connection client to
// the grpc server. while the global grpc server is closed in
// ReqStop() for Server, the individuation connection is closed
// by returing from the grpc handler goroutine. See the comment
// RunTask() for ServerConn.
2024-11-18 22:25:59 +09:00
cts.stop_chan <- true
}
}
// --------------------------------------------------------------------
func (s *Server) GetSeed(ctx context.Context, c_seed *Seed) (*Seed, error) {
var s_seed Seed
// seed exchange is for furture expansion of the protocol
// there is nothing to do much about it for now.
s_seed.Version = HODU_VERSION
s_seed.Flags = 0
// we create no ServerConn structure associated with the connection
// at this phase for the server. it doesn't track the client version and
// features. we delegate protocol selection solely to the client.
return &s_seed, nil
}
2024-11-18 22:25:59 +09:00
func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error {
var ctx context.Context
var p *peer.Peer
var ok bool
var err error
var cts *ServerConn
2024-11-18 22:25:59 +09:00
ctx = strm.Context()
p, ok = peer.FromContext(ctx)
if !ok {
2024-11-18 22:25:59 +09:00
return fmt.Errorf("failed to get peer from packet stream context")
}
2024-12-02 02:19:50 +09:00
cts, err = s.AddNewServerConn(&p.Addr, &p.LocalAddr, strm)
2024-11-18 22:25:59 +09:00
if err != nil {
return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error())
}
2024-11-18 22:25:59 +09:00
// Don't detached the cts task as a go-routine as this function
// is invoked as a go-routine by the grpc server.
s.cts_wg.Add(1)
cts.RunTask(&s.cts_wg)
return nil
}
// ------------------------------------
type ConnCatcher struct {
server *Server
}
func (cc *ConnCatcher) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return ctx
}
func (cc *ConnCatcher) HandleRPC(ctx context.Context, s stats.RPCStats) {
}
func (cc *ConnCatcher) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
2024-11-23 14:49:04 +09:00
return ctx
//return context.TODO()
}
func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) {
// fmt.Println(ctx.Value("user_id")) // Returns nil, can't access the value
var p *peer.Peer
var ok bool
var addr string
p, ok = peer.FromContext(ctx)
if !ok {
addr = ""
} else {
addr = p.Addr.String()
}
2024-11-13 02:20:25 +09:00
/*
md,ok:=metadata.FromIncomingContext(ctx)
fmt.Printf("%+v%+v\n",md,ok)
if ok {
2024-11-13 02:20:25 +09:00
}*/
switch cs.(type) {
case *stats.ConnBegin:
fmt.Printf("**** client connected - [%s]\n", addr)
case *stats.ConnEnd:
fmt.Printf("**** client disconnected - [%s]\n", addr)
cc.server.RemoveServerConnByAddr(p.Addr)
}
}
// wrappedStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and
// SendMsg method call.
type wrappedStream struct {
grpc.ServerStream
}
func (w *wrappedStream) RecvMsg(m any) error {
//fmt.Printf("Receive a message (Type: %T) at %s\n", m, time.Now().Format(time.RFC3339))
return w.ServerStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m any) error {
//fmt.Printf("Send a message (Type: %T) at %v\n", m, time.Now().Format(time.RFC3339))
return w.ServerStream.SendMsg(m)
}
func newWrappedStream(s grpc.ServerStream) grpc.ServerStream {
return &wrappedStream{s}
}
func streamInterceptor(srv any, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// authentication (token verification)
/*
md, ok := metadata.FromIncomingContext(ss.Context())
if !ok {
return errMissingMetadata
}
if !valid(md["authorization"]) {
return errInvalidToken
}
*/
err := handler(srv, newWrappedStream(ss))
if err != nil {
fmt.Printf("RPC failed with error: %v\n", err)
}
return err
}
func unaryInterceptor(ctx context.Context, req any, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) {
// authentication (token verification)
/*
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, errMissingMetadata
}
if !valid(md["authorization"]) {
// return nil, errInvalidToken
}
*/
m, err := handler(ctx, req)
if err != nil {
fmt.Printf("RPC failed with error: %v\n", err)
}
2024-11-21 01:11:01 +09:00
return m, err
}
func NewServer(ctx context.Context, ctl_addrs []string, rpc_addrs []string, logger Logger, tlscfg *tls.Config) (*Server, error) {
var s Server
var l *net.TCPListener
var rpcaddr *net.TCPAddr
var err error
var addr string
var gl *net.TCPListener
var i int
var cwd string
if len(rpc_addrs) <= 0 {
2024-11-24 20:39:51 +09:00
return nil, fmt.Errorf("no server addresses provided")
}
2024-11-24 20:39:51 +09:00
s.ctx, s.ctx_cancel = context.WithCancel(ctx)
2024-11-21 01:11:01 +09:00
s.log = logger
/* create the specified number of listeners */
s.rpc = make([]*net.TCPListener, 0)
for _, addr = range rpc_addrs {
rpcaddr, err = net.ResolveTCPAddr(NET_TYPE_TCP, addr) // Make this interruptable???
if err != nil {
goto oops
}
l, err = net.ListenTCP(NET_TYPE_TCP, rpcaddr)
if err != nil {
goto oops
}
s.rpc = append(s.rpc, l)
}
s.tlscfg = tlscfg
s.ext_svcs = make([]Service, 0, 1)
2024-12-03 00:55:19 +09:00
s.cts_map = make(ServerConnMap)
s.cts_map_by_addr = make(ServerConnMapByAddr)
2024-11-24 20:39:51 +09:00
s.stop_chan = make(chan bool, 8)
s.stop_req.Store(false)
/*
creds, err := credentials.NewServerTLSFromFile(data.Path("x509/server_cert.pem"), data.Path("x509/server_key.pem"))
if err != nil {
log.Fatalf("failed to create credentials: %v", err)
}
gs = grpc.NewServer(grpc.Creds(creds))
*/
s.gs = grpc.NewServer(
grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor),
grpc.StatsHandler(&ConnCatcher{server: &s}),
) // TODO: have this outside the server struct?
RegisterHoduServer(s.gs, &s)
s.ctl_prefix = "" // TODO:
s.ctl_mux = http.NewServeMux()
cwd, _ = os.Getwd()
s.ctl_mux.Handle(s.ctl_prefix + "/ui/", http.StripPrefix(s.ctl_prefix, http.FileServer(http.Dir(cwd)))) // TODO: proper directory. it must not use the current working directory...
2024-12-02 02:19:50 +09:00
s.ctl_mux.Handle(s.ctl_prefix + "/ws/tty", new_server_ctl_ws_tty(&s))
s.ctl_mux.Handle(s.ctl_prefix + "/server-conns", &server_ctl_server_conns{s: &s})
2024-12-03 00:55:19 +09:00
s.ctl_mux.Handle(s.ctl_prefix + "/server-conns/{conn_id}", &server_ctl_server_conns_id{s: &s})
s.ctl_addr = make([]string, len(ctl_addrs))
s.ctl = make([]*http.Server, len(ctl_addrs))
copy(s.ctl_addr, ctl_addrs)
for i = 0; i < len(ctl_addrs); i++ {
s.ctl[i] = &http.Server{
Addr: ctl_addrs[i],
Handler: s.ctl_mux,
// TODO: more settings
}
}
return &s, nil
oops:
/* TODO: check if gs needs to be closed... */
if gl != nil {
gl.Close()
}
for _, l = range s.rpc {
l.Close()
}
s.rpc = make([]*net.TCPListener, 0)
return nil, err
}
2024-11-18 22:25:59 +09:00
func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error {
var l *net.TCPListener
var err error
2024-11-23 14:49:04 +09:00
defer wg.Done()
2024-11-18 22:25:59 +09:00
l = s.rpc[idx]
// it seems to be safe to call a single grpc server on differnt listening sockets multiple times
s.log.Write("", LOG_ERROR, "Starting GRPC server on %s", l.Addr().String())
2024-11-23 14:49:04 +09:00
err = s.gs.Serve(l)
if err != nil {
2024-11-21 01:11:01 +09:00
if errors.Is(err, net.ErrClosed) {
s.log.Write("", LOG_ERROR, "GRPC server on %s closed", l.Addr().String())
2024-11-21 01:11:01 +09:00
} else {
s.log.Write("", LOG_ERROR, "Error from GRPC server on %s - %s", l.Addr().String(), err.Error())
2024-11-21 01:11:01 +09:00
}
return err
}
return nil
}
2024-11-18 22:25:59 +09:00
func (s *Server) RunTask(wg *sync.WaitGroup) {
var idx int
2024-11-18 22:25:59 +09:00
defer wg.Done()
2024-11-13 02:20:25 +09:00
for idx, _ = range s.rpc {
s.rpc_wg.Add(1)
go s.run_grpc_server(idx, &s.rpc_wg)
}
2024-11-24 20:39:51 +09:00
// most the work is done by in separate goroutines (s.run_grp_server)
// this loop serves as a placeholder to prevent the logic flow from
// descening down to s.ReqStop()
task_loop:
for {
select {
case <-s.stop_chan:
break task_loop
}
}
s.ReqStop()
s.rpc_wg.Wait()
s.log.Write("", LOG_DEBUG, "All GRPC listeners completed")
2024-11-24 20:39:51 +09:00
2024-11-18 22:25:59 +09:00
s.cts_wg.Wait()
s.log.Write("", LOG_DEBUG, "All CTS handlers completed")
2024-11-21 01:11:01 +09:00
// stop the main grpc server after all the other tasks are finished.
s.gs.Stop()
}
2024-11-21 01:11:01 +09:00
func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
var err error
var ctl *http.Server
var idx int
var l_wg sync.WaitGroup
2024-11-21 01:11:01 +09:00
defer wg.Done()
for idx, ctl = range s.ctl {
l_wg.Add(1)
go func(i int, cs *http.Server) {
s.log.Write ("", LOG_INFO, "Control channel[%d] started on %s", i, s.ctl_addr[i])
err = cs.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
s.log.Write("", LOG_DEBUG, "Control channel[%d] ended", i)
} else {
s.log.Write("", LOG_ERROR, "Control channel[%d] error - %s", i, err.Error())
}
l_wg.Done()
}(idx, ctl)
2024-11-21 01:11:01 +09:00
}
l_wg.Wait()
2024-11-21 01:11:01 +09:00
}
func (s *Server) ReqStop() {
if s.stop_req.CompareAndSwap(false, true) {
var l *net.TCPListener
var cts *ServerConn
var ctl *http.Server
for _, ctl = range s.ctl {
ctl.Shutdown(s.ctx) // to break c.ctl.ListenAndServe()
2024-11-24 20:39:51 +09:00
}
//s.gs.GracefulStop()
//s.gs.Stop()
for _, l = range s.rpc {
l.Close()
}
s.cts_mtx.Lock() // TODO: this mya create dead-lock. check possibility of dead lock???
for _, cts = range s.cts_map {
cts.ReqStop() // request to stop connections from/to peer held in the cts structure
}
s.cts_mtx.Unlock()
2024-11-24 20:39:51 +09:00
s.stop_chan <- true
s.ctx_cancel()
}
}
2024-12-02 02:19:50 +09:00
func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, pss Hodu_PacketStreamServer) (*ServerConn, error) {
var cts ServerConn
2024-12-03 00:55:19 +09:00
var id uint32
var ok bool
cts.svr = s
2024-11-18 22:25:59 +09:00
cts.route_map = make(ServerRouteMap)
cts.raddr = *remote_addr
cts.laddr = *local_addr
2024-11-18 22:25:59 +09:00
cts.pss = &GuardedPacketStreamServer{Hodu_PacketStreamServer: pss}
cts.stop_req.Store(false)
2024-11-24 20:39:51 +09:00
cts.stop_chan = make(chan bool, 8)
s.cts_mtx.Lock()
defer s.cts_mtx.Unlock()
2024-12-03 00:55:19 +09:00
id = rand.Uint32()
for {
_, ok = s.cts_map[id]
if !ok { break }
id++
}
cts.id = id
cts.sid = fmt.Sprintf("%d", id) // id in string used for logging
2024-12-03 00:55:19 +09:00
_, ok = s.cts_map_by_addr[cts.raddr]
if ok {
return nil, fmt.Errorf("existing client - %s", cts.raddr.String())
}
s.cts_map_by_addr[cts.raddr] = &cts
2024-12-03 00:55:19 +09:00
s.cts_map[id] = &cts;
s.log.Write("", LOG_DEBUG, "Added client connection from %s", cts.raddr.String())
return &cts, nil
}
func (s *Server) ReqStopAllServerConns() {
var cts *ServerConn
s.cts_mtx.Lock()
defer s.cts_mtx.Unlock()
for _, cts = range s.cts_map {
cts.ReqStop()
}
}
2024-12-03 00:55:19 +09:00
func (s *Server) RemoveServerConn(cts *ServerConn) error {
var conn *ServerConn
var ok bool
s.cts_mtx.Lock()
conn, ok = s.cts_map[cts.id]
if !ok {
s.cts_mtx.Unlock()
return fmt.Errorf("non-existent connection id - %d", cts.id)
}
if conn != cts {
s.cts_mtx.Unlock()
return fmt.Errorf("non-existent connection id - %d", cts.id)
}
delete(s.cts_map, cts.id)
delete(s.cts_map_by_addr, cts.raddr)
2024-12-03 00:55:19 +09:00
s.cts_mtx.Unlock()
cts.ReqStop()
return nil
}
func (s *Server) RemoveServerConnByAddr(addr net.Addr) error {
var cts *ServerConn
var ok bool
s.cts_mtx.Lock()
2024-12-03 00:55:19 +09:00
cts, ok = s.cts_map_by_addr[addr]
if !ok {
s.cts_mtx.Unlock()
return fmt.Errorf("non-existent connection address - %s", addr.String())
}
delete(s.cts_map, cts.id)
delete(s.cts_map_by_addr, cts.raddr)
s.cts_mtx.Unlock()
2024-12-03 00:55:19 +09:00
cts.ReqStop()
return nil
}
2024-12-03 00:55:19 +09:00
func (s* Server) FindServerConnById(id uint32) *ServerConn {
var cts *ServerConn
var ok bool
s.cts_mtx.Lock()
defer s.cts_mtx.Unlock()
2024-12-03 00:55:19 +09:00
cts, ok = s.cts_map[id]
if !ok {
return nil
}
2024-12-03 00:55:19 +09:00
return cts
}
func (s *Server) FindServerConnByAddr(addr net.Addr) *ServerConn {
var cts *ServerConn
var ok bool
s.cts_mtx.Lock()
defer s.cts_mtx.Unlock()
2024-12-03 00:55:19 +09:00
cts, ok = s.cts_map_by_addr[addr]
if !ok {
return nil
}
return cts
}
func (s *Server) StartService(cfg interface{}) {
s.wg.Add(1)
go s.RunTask(&s.wg)
}
func (s *Server) StartExtService(svc Service, data interface{}) {
s.ext_mtx.Lock()
s.ext_svcs = append(s.ext_svcs, svc)
s.ext_mtx.Unlock()
s.wg.Add(1)
go svc.RunTask(&s.wg)
}
func (s *Server) StartCtlService() {
s.wg.Add(1)
go s.RunCtlTask(&s.wg)
}
func (s *Server) StopServices() {
var ext_svc Service
s.ReqStop()
for _, ext_svc = range s.ext_svcs {
ext_svc.StopServices()
}
}
func (s *Server) WaitForTermination() {
2024-11-13 02:20:25 +09:00
s.wg.Wait()
}
2024-11-23 14:49:04 +09:00
func (s *Server) WriteLog(id string, level LogLevel, fmtstr string, args ...interface{}) {
s.log.Write(id, level, fmtstr, args...)
2024-11-23 14:49:04 +09:00
}