separating http handler to separate structs

using the pattern supported by http.ServeMux since go 1.22
This commit is contained in:
2024-11-25 19:46:18 +09:00
parent 903e4cf6d3
commit dcdadbeb20
8 changed files with 261 additions and 209 deletions

124
server.go
View File

@ -19,7 +19,7 @@ import "google.golang.org/grpc/stats"
const PTS_LIMIT = 8192
type ClientConnMap = map[net.Addr]*ClientConn
type ServerConnMap = map[net.Addr]*ServerConn
type ServerPeerConnMap = map[uint32]*ServerPeerConn
type ServerRouteMap = map[uint32]*ServerRoute
@ -39,7 +39,7 @@ type Server struct {
l_wg sync.WaitGroup
cts_mtx sync.Mutex
cts_map ClientConnMap
cts_map ServerConnMap
cts_wg sync.WaitGroup
gs *grpc.Server
@ -48,9 +48,9 @@ type Server struct {
UnimplementedHoduServer
}
// client connection to server.
// connection from client.
// client connect to the server, the server accept it, and makes a tunnel request
type ClientConn struct {
type ServerConn struct {
svr *Server
caddr net.Addr // client address that created this structure
pss *GuardedPacketStreamServer
@ -65,7 +65,7 @@ type ClientConn struct {
}
type ServerRoute struct {
cts *ClientConn
cts *ServerConn
l *net.TCPListener
laddr *net.TCPAddr
id uint32
@ -107,7 +107,7 @@ func (g *GuardedPacketStreamServer) Context() context.Context {
// ------------------------------------
func NewServerRoute(cts *ClientConn, id uint32, proto ROUTE_PROTO) (*ServerRoute, error) {
func NewServerRoute(cts *ServerConn, id uint32, proto ROUTE_PROTO) (*ServerRoute, error) {
var r ServerRoute
var l *net.TCPListener
var laddr *net.TCPAddr
@ -209,7 +209,7 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
}
func (r *ServerRoute) ReqStop() {
fmt.Printf ("requesting to stop route taak..\n")
fmt.Printf("requesting to stop route taak..\n")
if r.stop_req.CompareAndSwap(false, true) {
var pts *ServerPeerConn
@ -220,10 +220,10 @@ func (r *ServerRoute) ReqStop() {
r.l.Close()
}
fmt.Printf ("requiested to stopp route taak..\n")
fmt.Printf("requiested to stopp route taak..\n")
}
func (r *ServerRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
func (r *ServerRoute) ReportEvent(pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
var spc *ServerPeerConn
var ok bool
@ -239,7 +239,7 @@ func (r *ServerRoute) ReportEvent (pts_id uint32, event_type PACKET_KIND, event_
}
// ------------------------------------
func (cts *ClientConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) {
func (cts *ServerConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) {
var l *net.TCPListener
var err error
var laddr *net.TCPAddr
@ -279,14 +279,14 @@ func (cts *ClientConn) make_route_listener(proto ROUTE_PROTO) (*net.TCPListener,
return nil, nil, err
}
func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*ServerRoute, error) {
func (cts *ServerConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*ServerRoute, error) {
var r *ServerRoute
var err error
cts.route_mtx.Lock()
if cts.route_map[route_id] != nil {
cts.route_mtx.Unlock()
return nil, fmt.Errorf ("existent route id - %d", route_id)
return nil, fmt.Errorf("existent route id - %d", route_id)
}
r, err = NewServerRoute(cts, route_id, proto)
if err != nil {
@ -301,19 +301,19 @@ func (cts *ClientConn) AddNewServerRoute(route_id uint32, proto ROUTE_PROTO) (*S
return r, nil
}
func (cts *ClientConn) RemoveServerRoute (route* ServerRoute) error {
func (cts *ServerConn) RemoveServerRoute(route *ServerRoute) error {
var r *ServerRoute
var ok bool
cts.route_mtx.Lock()
r, ok = cts.route_map[route.id]
if (!ok) {
if !ok {
cts.route_mtx.Unlock()
return fmt.Errorf ("non-existent route id - %d", route.id)
return fmt.Errorf("non-existent route id - %d", route.id)
}
if (r != route) {
if r != route {
cts.route_mtx.Unlock()
return fmt.Errorf ("non-existent route - %d", route.id)
return fmt.Errorf("non-existent route - %d", route.id)
}
delete(cts.route_map, route.id)
cts.route_mtx.Unlock()
@ -322,15 +322,15 @@ func (cts *ClientConn) RemoveServerRoute (route* ServerRoute) error {
return nil
}
func (cts *ClientConn) RemoveServerRouteById (route_id uint32) (*ServerRoute, error) {
func (cts *ServerConn) RemoveServerRouteById(route_id uint32) (*ServerRoute, error) {
var r *ServerRoute
var ok bool
cts.route_mtx.Lock()
r, ok = cts.route_map[route_id]
if (!ok) {
if !ok {
cts.route_mtx.Unlock()
return nil, fmt.Errorf ("non-existent route id - %d", route_id)
return nil, fmt.Errorf("non-existent route id - %d", route_id)
}
delete(cts.route_map, route_id)
cts.route_mtx.Unlock()
@ -339,7 +339,7 @@ func (cts *ClientConn) RemoveServerRouteById (route_id uint32) (*ServerRoute, er
return r, nil
}
func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
func (cts *ServerConn) ReportEvent(route_id uint32, pts_id uint32, event_type PACKET_KIND, event_data []byte) error {
var r *ServerRoute
var ok bool
@ -347,14 +347,14 @@ func (cts *ClientConn) ReportEvent (route_id uint32, pts_id uint32, event_type P
r, ok = cts.route_map[route_id]
if (!ok) {
cts.route_mtx.Unlock()
return fmt.Errorf ("non-existent route id - %d", route_id)
return fmt.Errorf("non-existent route id - %d", route_id)
}
cts.route_mtx.Unlock()
return r.ReportEvent(pts_id, event_type, event_data)
}
func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) {
func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
var pkt *Packet
var err error
@ -377,7 +377,7 @@ func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) {
var ok bool
x, ok = pkt.U.(*Packet_Route)
if ok {
var r* ServerRoute
var r *ServerRoute
r, err = cts.AddNewServerRoute(x.Route.RouteId, x.Route.Proto)
if err != nil {
@ -401,7 +401,7 @@ func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) {
var ok bool
x, ok = pkt.U.(*Packet_Route)
if ok {
var r* ServerRoute
var r *ServerRoute
r, err = cts.RemoveServerRouteById(x.Route.RouteId)
if err != nil {
@ -429,7 +429,7 @@ func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STARTED, nil)
if err != nil {
// TODO:
fmt.Printf ("Failed to report PEER_STARTED Event")
fmt.Printf("Failed to report PEER_STARTED Event")
} else {
// TODO:
}
@ -448,7 +448,7 @@ func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) {
err = cts.ReportEvent(x.Peer.RouteId, x.Peer.PeerId, PACKET_KIND_PEER_STOPPED, nil)
if err != nil {
// TODO:
fmt.Printf ("Failed to report PEER_STOPPED Event")
fmt.Printf("Failed to report PEER_STOPPED Event")
} else {
// TODO:
}
@ -465,7 +465,7 @@ func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) {
err = cts.ReportEvent(x.Data.RouteId, x.Data.PeerId, PACKET_KIND_PEER_DATA, x.Data.Data)
if err != nil {
// TODO:
fmt.Printf ("Failed to report PEER_DATA Event")
fmt.Printf("Failed to report PEER_DATA Event")
} else {
// TODO:
}
@ -476,10 +476,10 @@ func (cts *ClientConn) receive_from_stream(wg *sync.WaitGroup) {
}
done:
fmt.Printf ("************ stream receiver finished....\n")
fmt.Printf("************ stream receiver finished....\n")
}
func (cts *ClientConn) RunTask(wg *sync.WaitGroup) {
func (cts *ServerConn) RunTask(wg *sync.WaitGroup) {
var strm *GuardedPacketStreamServer
var ctx context.Context
@ -520,13 +520,13 @@ fmt.Printf("grpc server done - %s\n", ctx.Err().Error())
}
done:
fmt.Printf ("^^^^^^^^^^^^^^^^^ waiting for reoute_wg...\n")
fmt.Printf("^^^^^^^^^^^^^^^^^ waiting for reoute_wg...\n")
cts.ReqStop() // just in case
cts.route_wg.Wait()
fmt.Printf ("^^^^^^^^^^^^^^^^^ waited for reoute_wg...\n")
fmt.Printf("^^^^^^^^^^^^^^^^^ waited for reoute_wg...\n")
}
func (cts *ClientConn) ReqStop() {
func (cts *ServerConn) ReqStop() {
if cts.stop_req.CompareAndSwap(false, true) {
var r *ServerRoute
@ -538,14 +538,14 @@ func (cts *ClientConn) ReqStop() {
// the grpc server. while the global grpc server is closed in
// ReqStop() for Server, the individuation connection is closed
// by returing from the grpc handler goroutine. See the comment
// RunTask() for ClientConn.
// RunTask() for ServerConn.
cts.stop_chan <- true
}
}
// --------------------------------------------------------------------
func (s *Server) GetSeed (ctx context.Context, c_seed *Seed) (*Seed, error) {
func (s *Server) GetSeed(ctx context.Context, c_seed *Seed) (*Seed, error) {
var s_seed Seed
// seed exchange is for furture expansion of the protocol
@ -554,7 +554,7 @@ func (s *Server) GetSeed (ctx context.Context, c_seed *Seed) (*Seed, error) {
s_seed.Version = HODU_VERSION
s_seed.Flags = 0
// we create no ClientConn structure associated with the connection
// we create no ServerConn structure associated with the connection
// at this phase for the server. it doesn't track the client version and
// features. we delegate protocol selection solely to the client.
@ -566,15 +566,15 @@ func (s *Server) PacketStream(strm Hodu_PacketStreamServer) error {
var p *peer.Peer
var ok bool
var err error
var cts *ClientConn
var cts *ServerConn
ctx = strm.Context()
p, ok = peer.FromContext(ctx)
if (!ok) {
if !ok {
return fmt.Errorf("failed to get peer from packet stream context")
}
cts, err = s.AddNewClientConn(p.Addr, strm)
cts, err = s.AddNewServerConn(p.Addr, strm)
if err != nil {
return fmt.Errorf("unable to add client %s - %s", p.Addr.String(), err.Error())
}
@ -593,7 +593,7 @@ type ConnCatcher struct {
}
func (cc *ConnCatcher) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return ctx
return ctx
}
func (cc *ConnCatcher) HandleRPC(ctx context.Context, s stats.RPCStats) {
@ -601,7 +601,7 @@ func (cc *ConnCatcher) HandleRPC(ctx context.Context, s stats.RPCStats) {
func (cc *ConnCatcher) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
return ctx
//return context.TODO()
//return context.TODO()
}
func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) {
@ -611,7 +611,7 @@ func (cc *ConnCatcher) HandleConn(ctx context.Context, cs stats.ConnStats) {
var addr string
p, ok = peer.FromContext(ctx)
if (!ok) {
if !ok {
addr = ""
} else {
addr = p.Addr.String()
@ -626,8 +626,8 @@ if ok {
fmt.Printf("**** client connected - [%s]\n", addr)
case *stats.ConnEnd:
fmt.Printf("**** client disconnected - [%s]\n", addr)
cc.server.RemoveClientConnByAddr(p.Addr)
}
cc.server.RemoveServerConnByAddr(p.Addr)
}
}
// wrappedStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and
@ -720,7 +720,7 @@ func NewServer(ctx context.Context, laddrs []string, logger Logger, tlscfg *tls.
s.tlscfg = tlscfg
s.ext_svcs = make([]Service, 0, 1)
s.cts_map = make(ClientConnMap) // TODO: make it configurable...
s.cts_map = make(ServerConnMap) // TODO: make it configurable...
s.stop_chan = make(chan bool, 8)
s.stop_req.Store(false)
/*
@ -733,9 +733,9 @@ func NewServer(ctx context.Context, laddrs []string, logger Logger, tlscfg *tls.
s.gs = grpc.NewServer(
grpc.UnaryInterceptor(unaryInterceptor),
grpc.StreamInterceptor(streamInterceptor),
grpc.StatsHandler(&ConnCatcher{ server: &s }),
grpc.StatsHandler(&ConnCatcher{server: &s}),
) // TODO: have this outside the server struct?
RegisterHoduServer (s.gs, &s)
RegisterHoduServer(s.gs, &s)
return &s, nil
@ -760,13 +760,13 @@ func (s *Server) run_grpc_server(idx int, wg *sync.WaitGroup) error {
l = s.l[idx]
// it seems to be safe to call a single grpc server on differnt listening sockets multiple times
s.log.Write ("", LOG_ERROR, "Starting GRPC server listening on %s", l.Addr().String())
s.log.Write("", LOG_ERROR, "Starting GRPC server listening on %s", l.Addr().String())
err = s.gs.Serve(l)
if err != nil {
if errors.Is(err, net.ErrClosed) {
s.log.Write ("", LOG_ERROR, "GRPC server listening on %s closed", l.Addr().String())
s.log.Write("", LOG_ERROR, "GRPC server listening on %s closed", l.Addr().String())
} else {
s.log.Write ("", LOG_ERROR, "Error from GRPC server listening on %s - %s", l.Addr().String(), err.Error())
s.log.Write("", LOG_ERROR, "Error from GRPC server listening on %s - %s", l.Addr().String(), err.Error())
}
return err
}
@ -814,18 +814,18 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
err = s.ctl.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) {
fmt.Printf ("------------http server error - %s\n", err.Error())
fmt.Printf("------------http server error - %s\n", err.Error())
} else {
fmt.Printf ("********* http server ended\n")
fmt.Printf("********* http server ended\n")
}
}
func (s *Server) ReqStop() {
if s.stop_req.CompareAndSwap(false, true) {
var l *net.TCPListener
var cts *ClientConn
var cts *ServerConn
if (s.ctl != nil) {
if s.ctl != nil {
// shutdown the control server if ever started.
s.ctl.Shutdown(s.ctx)
}
@ -847,8 +847,8 @@ func (s *Server) ReqStop() {
}
}
func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (*ClientConn, error) {
var cts ClientConn
func (s *Server) AddNewServerConn(addr net.Addr, pss Hodu_PacketStreamServer) (*ServerConn, error) {
var cts ServerConn
var ok bool
cts.svr = s
@ -871,15 +871,15 @@ func (s *Server) AddNewClientConn(addr net.Addr, pss Hodu_PacketStreamServer) (*
return &cts, nil
}
func (s *Server) RemoveClientConn(cts *ClientConn) {
func (s *Server) RemoveServerConn(cts *ServerConn) {
s.cts_mtx.Lock()
delete(s.cts_map, cts.caddr)
s.log.Write("", LOG_DEBUG, "Removed client connection from %s", cts.caddr.String())
s.cts_mtx.Unlock()
}
func (s *Server) RemoveClientConnByAddr(addr net.Addr) {
var cts *ClientConn
func (s *Server) RemoveServerConnByAddr(addr net.Addr) {
var cts *ServerConn
var ok bool
s.cts_mtx.Lock()
@ -892,8 +892,8 @@ func (s *Server) RemoveClientConnByAddr(addr net.Addr) {
}
}
func (s *Server) FindClientConnByAddr (addr net.Addr) *ClientConn {
var cts *ClientConn
func (s *Server) FindServerConnByAddr(addr net.Addr) *ServerConn {
var cts *ServerConn
var ok bool
s.cts_mtx.Lock()
@ -930,6 +930,6 @@ func (s *Server) WaitForTermination() {
s.wg.Wait()
}
func (s *Server) WriteLog (id string, level LogLevel, fmtstr string, args ...interface{}) {
func (s *Server) WriteLog(id string, level LogLevel, fmtstr string, args ...interface{}) {
s.log.Write(id, level, fmtstr, args...)
}