added the proxy service for http/https client peer on the server side. work in progress

This commit is contained in:
2024-12-12 21:09:16 +09:00
parent 41e7222b98
commit f6ea852e61
13 changed files with 538 additions and 252 deletions

243
server.go
View File

@ -6,7 +6,7 @@ import "errors"
import "fmt"
import "io"
import "log"
import "math/rand"
//import "math/rand"
import "net"
import "net/http"
import "net/netip"
@ -28,10 +28,10 @@ type ServerConnMap = map[ConnId]*ServerConn
type ServerRouteMap = map[RouteId]*ServerRoute
type ServerPeerConnMap = map[PeerId]*ServerPeerConn
type Server struct {
ctx context.Context
ctx_cancel context.CancelFunc
pxytlscfg *tls.Config
ctltlscfg *tls.Config
rpctlscfg *tls.Config
@ -42,6 +42,10 @@ type Server struct {
ext_mtx sync.Mutex
ext_svcs []Service
pxy_addr []string
pxy_mux *http.ServeMux
pxy []*http.Server // proxy server
ctl_addr []string
ctl_prefix string
ctl_mux *http.ServeMux
@ -91,10 +95,11 @@ type ServerConn struct {
type ServerRoute struct {
cts *ServerConn
l *net.TCPListener
svc_addr *net.TCPAddr // listening address
svc_l *net.TCPListener
svc_addr *net.TCPAddr // actual listening address
svc_requested_addr string
svc_permitted_net netip.Prefix
svc_proto ROUTE_PROTO
svc_option RouteOption
ptc_addr string
id RouteId
@ -136,7 +141,7 @@ func (g *GuardedPacketStreamServer) Context() context.Context {
// ------------------------------------
func NewServerRoute(cts *ServerConn, id RouteId, proto ROUTE_PROTO, ptc_addr string, svc_permitted_net string) (*ServerRoute, error) {
func NewServerRoute(cts *ServerConn, id RouteId, option RouteOption, ptc_addr string, svc_requested_addr string, svc_permitted_net string) (*ServerRoute, error) {
var r ServerRoute
var l *net.TCPListener
var svcaddr *net.TCPAddr
@ -144,31 +149,34 @@ func NewServerRoute(cts *ServerConn, id RouteId, proto ROUTE_PROTO, ptc_addr str
var err error
if svc_permitted_net != "" {
// parse the permitted network before creating a listener.
// the listener opened doesn't have to be closed when parsing fails.
svcnet, err = netip.ParsePrefix(svc_permitted_net)
if err != nil {
return nil , err
}
}
l, svcaddr, err = cts.make_route_listener(id, proto)
l, svcaddr, err = cts.make_route_listener(id, option, svc_requested_addr)
if err != nil {
return nil, err
}
if svc_permitted_net == "" {
if svcaddr.IP.To4() != nil {
svcnet, _ = netip.ParsePrefix("0.0.0.0/0")
svcnet = IPV4_PREFIX_ZERO
} else {
svcnet, _ = netip.ParsePrefix("::/0")
svcnet = IPV6_PREFIX_ZERO
}
}
r.cts = cts
r.id = id
r.l = l
r.svc_l = l
r.svc_addr = svcaddr
r.svc_requested_addr = svc_requested_addr
r.svc_permitted_net = svcnet
r.svc_proto = proto
r.svc_option = option
r.ptc_addr = ptc_addr
r.pts_limit = PTS_LIMIT
@ -230,7 +238,7 @@ func (r *ServerRoute) RunTask(wg *sync.WaitGroup) {
defer wg.Done()
for {
conn, err = r.l.AcceptTCP()
conn, err = r.svc_l.AcceptTCP() // this call is blocking...
if err != nil {
if errors.Is(err, net.ErrClosed) {
r.cts.svr.log.Write(r.cts.sid, LOG_INFO, "Server-side peer listener closed on route(%d)", r.id)
@ -280,7 +288,7 @@ func (r *ServerRoute) ReqStop() {
pts.ReqStop()
}
r.l.Close()
r.svc_l.Close()
}
}
@ -300,52 +308,53 @@ func (r *ServerRoute) ReportEvent(pts_id PeerId, event_type PACKET_KIND, event_d
}
// ------------------------------------
func (cts *ServerConn) make_route_listener(id RouteId, proto ROUTE_PROTO) (*net.TCPListener, *net.TCPAddr, error) {
func (cts *ServerConn) make_route_listener(id RouteId, option RouteOption, svc_requested_addr string) (*net.TCPListener, *net.TCPAddr, error) {
var l *net.TCPListener
var err error
var svcaddr *net.TCPAddr
var port int
var tries int = 0
var nw string
var ip string
var err error
switch proto {
case ROUTE_PROTO_TCP:
if svc_requested_addr != "" {
var ap netip.AddrPort
ap, err = netip.ParseAddrPort(svc_requested_addr)
if err != nil {
return nil, nil, fmt.Errorf("invalid service address %s - %s", svc_requested_addr, err.Error())
}
svcaddr = &net.TCPAddr{IP: ap.Addr().AsSlice(), Port: int(ap.Port())}
}
if option & RouteOption(ROUTE_OPTION_TCP) != 0 {
nw = "tcp"
ip = ""
case ROUTE_PROTO_TCP4:
nw = "tcp4"
ip = "0.0.0.0"
case ROUTE_PROTO_TCP6:
nw = "tcp6"
ip = "[::]"
default:
return nil, nil, fmt.Errorf("invalid protocol number %d", proto)
}
for {
port = rand.Intn(65535-32000+1) + 32000 // TODO: configurable port range
svcaddr, err = net.ResolveTCPAddr(nw, fmt.Sprintf("%s:%d", ip, port))
if err == nil {
l, err = net.ListenTCP(nw, svcaddr) // make the binding address configurable. support multiple binding addresses???
if err == nil {
cts.svr.log.Write(cts.sid, LOG_DEBUG, "Route(%d) listening on %d", id, port)
return l, svcaddr, nil
if svcaddr == nil {
svcaddr = &net.TCPAddr{Port: 0} // port 0 for automatic assignment.
}
}
tries++
if tries >= 1000 {
err = fmt.Errorf("unable to allocate port")
break
}
} else if option & RouteOption(ROUTE_OPTION_TCP4) != 0 {
nw = "tcp4"
if svcaddr == nil {
svcaddr = &net.TCPAddr{IP: net.IPv4zero, Port: 0} // port 0 for automatic assignment.
}
} else if option & RouteOption(ROUTE_OPTION_TCP6) != 0 {
nw = "tcp6"
if svcaddr == nil {
svcaddr = &net.TCPAddr{IP: net.IPv6zero, Port: 0} // port 0 for automatic assignment.
}
} else {
return nil, nil, fmt.Errorf("invalid route option value %d(%s)", option, option.string())
}
return nil, nil, err
l, err = net.ListenTCP(nw, svcaddr) // make the binding address configurable. support multiple binding addresses???
if err != nil {
return nil, nil, err
}
svcaddr = l.Addr().(*net.TCPAddr)
cts.svr.log.Write(cts.sid, LOG_DEBUG, "Route(%d) listening on %s", id, svcaddr.String())
return l, svcaddr, nil
}
func (cts *ServerConn) AddNewServerRoute(route_id RouteId, proto ROUTE_PROTO, ptc_addr string, svc_permitted_net string) (*ServerRoute, error) {
func (cts *ServerConn) AddNewServerRoute(route_id RouteId, proto RouteOption, ptc_addr string, svc_requested_addr string, svc_permitted_net string) (*ServerRoute, error) {
var r *ServerRoute
var err error
@ -358,7 +367,7 @@ func (cts *ServerConn) AddNewServerRoute(route_id RouteId, proto ROUTE_PROTO, pt
cts.route_mtx.Unlock()
return nil, fmt.Errorf("existent route id - %d", route_id)
}
r, err = NewServerRoute(cts, route_id, proto, ptc_addr, svc_permitted_net)
r, err = NewServerRoute(cts, route_id, proto, ptc_addr, svc_requested_addr, svc_permitted_net)
if err != nil {
cts.route_mtx.Unlock()
return nil, err
@ -478,34 +487,34 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
if ok {
var r *ServerRoute
r, err = cts.AddNewServerRoute(RouteId(x.Route.RouteId), x.Route.ServiceProto, x.Route.TargetAddrStr, x.Route.ServiceNetStr)
r, err = cts.AddNewServerRoute(RouteId(x.Route.RouteId), RouteOption(x.Route.ServiceOption), x.Route.TargetAddrStr, x.Route.ServiceAddrStr, x.Route.ServiceNetStr)
if err != nil {
cts.svr.log.Write(cts.sid, LOG_ERROR,
"Failed to add route(%d,%s) for %s - %s",
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr, err.Error())
err = cts.pss.Send(MakeRouteStoppedPacket(RouteId(x.Route.RouteId), x.Route.ServiceProto, x.Route.TargetAddrStr, x.Route.ServiceNetStr))
err = cts.pss.Send(MakeRouteStoppedPacket(RouteId(x.Route.RouteId), RouteOption(x.Route.ServiceOption), x.Route.TargetAddrStr, x.Route.ServiceAddrStr, x.Route.ServiceNetStr))
if err != nil {
cts.svr.log.Write(cts.sid, LOG_ERROR,
"Failed to send route_stopped event(%d,%s,%v,%s) to client %s - %s",
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceProto, x.Route.ServiceNetStr, cts.remote_addr, err.Error())
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.remote_addr, err.Error())
goto done
} else {
cts.svr.log.Write(cts.sid, LOG_DEBUG,
"Sent route_stopped event(%d,%s,%v,%s) to client %s",
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceProto, x.Route.ServiceNetStr, cts.remote_addr)
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.remote_addr)
}
} else {
cts.svr.log.Write(cts.sid, LOG_INFO,
"Added route(%d,%s,%s,%v,%v) for client %s to cts(%d)",
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_proto, r.svc_permitted_net, cts.remote_addr, cts.id)
err = cts.pss.Send(MakeRouteStartedPacket(r.id, r.svc_proto, r.svc_addr.String(), r.svc_permitted_net.String()))
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_option, r.svc_permitted_net, cts.remote_addr, cts.id)
err = cts.pss.Send(MakeRouteStartedPacket(r.id, r.svc_option, r.svc_addr.String(), r.svc_requested_addr, r.svc_permitted_net.String()))
if err != nil {
r.ReqStop()
cts.svr.log.Write(cts.sid, LOG_ERROR,
"Failed to send route_started event(%d,%s,%s,%s%v,%v) to client %s - %s",
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_proto, r.svc_permitted_net, cts.remote_addr, err.Error())
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_option, r.svc_permitted_net, cts.remote_addr, err.Error())
goto done
}
}
@ -529,13 +538,13 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
} else {
cts.svr.log.Write(cts.sid, LOG_ERROR,
"Deleted route(%d,%s,%s,%v,%v) for client %s",
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_proto, r.svc_permitted_net.String(), cts.remote_addr)
err = cts.pss.Send(MakeRouteStoppedPacket(r.id, r.svc_proto, r.ptc_addr, r.svc_permitted_net.String()))
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_option, r.svc_permitted_net.String(), cts.remote_addr)
err = cts.pss.Send(MakeRouteStoppedPacket(r.id, r.svc_option, r.ptc_addr, r.svc_requested_addr, r.svc_permitted_net.String()))
if err != nil {
r.ReqStop()
cts.svr.log.Write(cts.sid, LOG_ERROR,
"Failed to send route_stopped event(%d,%s,%s,%v.%v) to client %s - %s",
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_proto, r.svc_permitted_net.String(), cts.remote_addr, err.Error())
r.id, r.ptc_addr, r.svc_addr.String(), r.svc_option, r.svc_permitted_net.String(), cts.remote_addr, err.Error())
goto done
}
}
@ -846,18 +855,18 @@ func unaryInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServ
}
type server_ctl_log_writer struct {
type server_http_log_writer struct {
svr *Server
}
func (hlw *server_ctl_log_writer) Write(p []byte) (n int, err error) {
func (hlw *server_http_log_writer) Write(p []byte) (n int, err error) {
// the standard http.Server always requires *log.Logger
// use this iowriter to create a logger to pass it to the http server.
hlw.svr.log.Write("", LOG_INFO, string(p))
return len(p), nil
}
func NewServer(ctx context.Context, logger Logger, ctl_addrs []string, rpc_addrs []string, ctl_prefix string, ctltlscfg *tls.Config, rpctlscfg *tls.Config, rpc_max int, peer_max int) (*Server, error) {
func NewServer(ctx context.Context, logger Logger, ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, ctl_prefix string, ctltlscfg *tls.Config, rpctlscfg *tls.Config, pxytlscfg *tls.Config, rpc_max int, peer_max int) (*Server, error) {
var s Server
var l *net.TCPListener
var rpcaddr *net.TCPAddr
@ -893,6 +902,7 @@ func NewServer(ctx context.Context, logger Logger, ctl_addrs []string, rpc_addrs
s.ctltlscfg = ctltlscfg
s.rpctlscfg = rpctlscfg
s.pxytlscfg = pxytlscfg
s.ext_svcs = make([]Service, 0, 1)
s.pts_limit = peer_max
s.cts_limit = rpc_max
@ -916,10 +926,16 @@ func NewServer(ctx context.Context, logger Logger, ctl_addrs []string, rpc_addrs
s.rpc_svr = grpc.NewServer(opts...)
RegisterHoduServer(s.rpc_svr, &s)
s.ctl_prefix = ctl_prefix
// ---------------------------------------------------------
hs_log = log.New(&server_http_log_writer{svr: &s}, "", 0);
// ---------------------------------------------------------
s.ctl_prefix = ctl_prefix
s.ctl_mux = http.NewServeMux()
cwd, _ = os.Getwd()
cwd, _ = os.Getwd() // TODO:
s.ctl_mux.Handle(s.ctl_prefix + "/ui/", http.StripPrefix(s.ctl_prefix, http.FileServer(http.Dir(cwd)))) // TODO: proper directory. it must not use the current working directory...
s.ctl_mux.Handle(s.ctl_prefix + "/ws/tty", new_server_ctl_ws_tty(&s))
s.ctl_mux.Handle(s.ctl_prefix + "/server-conns", &server_ctl_server_conns{s: &s})
@ -931,7 +947,6 @@ func NewServer(ctx context.Context, logger Logger, ctl_addrs []string, rpc_addrs
s.ctl_addr = make([]string, len(ctl_addrs))
s.ctl = make([]*http.Server, len(ctl_addrs))
copy(s.ctl_addr, ctl_addrs)
hs_log = log.New(&server_ctl_log_writer{svr: &s}, "", 0);
for i = 0; i < len(ctl_addrs); i++ {
s.ctl[i] = &http.Server{
@ -943,6 +958,26 @@ func NewServer(ctx context.Context, logger Logger, ctl_addrs []string, rpc_addrs
}
}
// ---------------------------------------------------------
s.pxy_mux = http.NewServeMux() // TODO: make /_init configurable...
s.pxy_mux.Handle("/_init/{conn_id}/{route_id}/{trailer...}", &server_proxy_http_init{s: &s})
s.pxy_mux.Handle("/", &server_proxy_http_main{s: &s})
s.pxy_addr = make([]string, len(pxy_addrs))
s.pxy = make([]*http.Server, len(pxy_addrs))
copy(s.pxy_addr, pxy_addrs)
for i = 0; i < len(pxy_addrs); i++ {
s.pxy[i] = &http.Server{
Addr: pxy_addrs[i],
Handler: s.pxy_mux,
TLSConfig: s.pxytlscfg,
ErrorLog: hs_log,
// TODO: more settings
}
}
// ---------------------------------------------------------
s.stats.conns.Store(0)
s.stats.routes.Store(0)
s.stats.peers.Store(0)
@ -1032,7 +1067,6 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
s.log.Write("", LOG_INFO, "Control channel[%d] started on %s", i, s.ctl_addr[i])
if s.stop_req.Load() == false {
// defeat hard-coded "tcp" in ListenAndServe() and ListenAndServeTLS()
// err = cs.ListenAndServe()
@ -1064,14 +1098,61 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
l_wg.Wait()
}
func (s *Server) RunPxyTask(wg *sync.WaitGroup) {
var err error
var pxy *http.Server
var idx int
var l_wg sync.WaitGroup
defer wg.Done()
for idx, pxy = range s.pxy {
l_wg.Add(1)
go func(i int, cs *http.Server) {
var l net.Listener
s.log.Write("", LOG_INFO, "Proxy channel[%d] started on %s", i, s.pxy_addr[i])
if s.stop_req.Load() == false {
l, err = net.Listen(tcp_addr_str_class(cs.Addr), cs.Addr)
if err == nil {
if s.stop_req.Load() == false {
if s.pxytlscfg == nil { // TODO: change this
err = cs.Serve(l)
} else {
err = cs.ServeTLS(l, "", "") // s.pxytlscfg must provide a certificate and a key
}
} else {
err = fmt.Errorf("stop requested")
}
l.Close()
}
} else {
err = fmt.Errorf("stop requested")
}
if errors.Is(err, http.ErrServerClosed) {
s.log.Write("", LOG_INFO, "Proxy channel[%d] ended", i)
} else {
s.log.Write("", LOG_ERROR, "Proxy channel[%d] error - %s", i, err.Error())
}
l_wg.Done()
}(idx, pxy)
}
l_wg.Wait()
}
func (s *Server) ReqStop() {
if s.stop_req.CompareAndSwap(false, true) {
var l *net.TCPListener
var cts *ServerConn
var ctl *http.Server
var hs *http.Server
for _, ctl = range s.ctl {
ctl.Shutdown(s.ctx) // to break c.ctl.ListenAndServe()
for _, hs = range s.ctl {
hs.Shutdown(s.ctx) // to break s.ctl.Serve()
}
for _, hs = range s.pxy {
hs.Shutdown(s.ctx) // to break s.pxy.Serve()
}
//s.rpc_svr.GracefulStop()
@ -1222,6 +1303,23 @@ func (s *Server) FindServerConnByAddr(addr net.Addr) *ServerConn {
return cts
}
func (s *Server) FindServerRouteById(id ConnId, route_id RouteId) *ServerRoute {
var cts *ServerConn
var ok bool
s.cts_mtx.Lock()
defer s.cts_mtx.Unlock()
cts, ok = s.cts_map[id]
if !ok {
return nil
}
return cts.FindServerRouteById(route_id)
}
func (s *Server) StartService(cfg interface{}) {
s.wg.Add(1)
go s.RunTask(&s.wg)
@ -1240,6 +1338,11 @@ func (s *Server) StartCtlService() {
go s.RunCtlTask(&s.wg)
}
func (s *Server) StartPxyService() {
s.wg.Add(1)
go s.RunPxyTask(&s.wg)
}
func (s *Server) StopServices() {
var ext_svc Service
s.ReqStop()