diff --git a/Makefile b/Makefile index 2460349..49b7cc4 100644 --- a/Makefile +++ b/Makefile @@ -28,6 +28,7 @@ SRCS=\ server-peer.go \ server-pty.go \ server-pxy.go \ + server-rpx.go \ system.go \ transform.go \ diff --git a/client-pty.go b/client-pty.go index 87219eb..280604e 100644 --- a/client-pty.go +++ b/client-pty.go @@ -58,7 +58,7 @@ func (pty *client_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) { conn_ready = <-conn_ready_chan if conn_ready { // connected - var poll_fds []unix.PollFd; + var poll_fds []unix.PollFd var buf []byte var n int var err error @@ -83,7 +83,7 @@ func (pty *client_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) { if (poll_fds[0].Revents & (unix.POLLERR | unix.POLLHUP | unix.POLLNVAL)) != 0 { c.log.Write(pty.Id, LOG_DEBUG, "[%s] EOF detected on pty stdout", req.RemoteAddr) - break; + break } if (poll_fds[0].Revents & unix.POLLIN) != 0 { diff --git a/client.go b/client.go index 8e63c7d..dfdb6b9 100644 --- a/client.go +++ b/client.go @@ -661,7 +661,7 @@ func (r *ClientRoute) ReportPacket(pts_id PeerId, packet_type PACKET_KIND, event r.cts.C.log.Write(r.cts.Sid, LOG_INFO, "Ingested route_started(%d,%s,%s) for route(%d,%s,%v,%s,%s)", rd.RouteId, rd.TargetAddrStr, rd.ServiceNetStr, - r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet); + r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet) } } @@ -680,7 +680,7 @@ func (r *ClientRoute) ReportPacket(pts_id PeerId, packet_type PACKET_KIND, event r.cts.C.log.Write(r.cts.Sid, LOG_INFO, "Ingested route_stopped(%d,%s,%s) for route(%d,%s,%v,%s,%s)", rd.RouteId, rd.TargetAddrStr, rd.ServiceNetStr, - r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet); + r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet) } r.ReqStop() @@ -1423,7 +1423,7 @@ func (cts *ClientConn) ReportPacket(route_id RouteId, pts_id PeerId, packet_type func (cts *ClientConn) ReadRptyLoop(crp *ClientRpty, wg *sync.WaitGroup) { - var poll_fds []unix.PollFd; + var poll_fds []unix.PollFd var buf []byte var n int var err error diff --git a/cmd/config.go b/cmd/config.go index 8244366..90dab9b 100644 --- a/cmd/config.go +++ b/cmd/config.go @@ -69,6 +69,10 @@ type CTLServiceConfig struct { Auth HttpAuthConfig `yaml:"auth"` } +type RPXServiceConfig struct { + Addrs []string `yaml:"addresses"` +} + type PXYServiceConfig struct { Addrs []string `yaml:"addresses"` } @@ -122,6 +126,11 @@ type ServerConfig struct { TLS ServerTLSConfig `yaml:"tls"` } `yaml:"ctl"` + RPX struct { + Service RPXServiceConfig `yaml:"service"` + TLS ServerTLSConfig `yaml:"tls"` + } `yaml:"rpx"` + PXY struct { Service PXYServiceConfig `yaml:"service"` TLS ServerTLSConfig `yaml:"tls"` diff --git a/cmd/main.go b/cmd/main.go index eee4e5d..ba3f429 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -90,7 +90,7 @@ func (sh *signal_handler) WriteLog(id string, level hodu.LogLevel, fmt string, a // -------------------------------------------------------------------- -func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx_addrs []string, logfile string, cfg *ServerConfig) error { +func server_main(ctl_addrs []string, rpc_addrs []string, rpx_addrs[] string, pxy_addrs []string, wpx_addrs []string, logfile string, cfg *ServerConfig) error { var s *hodu.Server var config *hodu.ServerConfig var logger *AppLogger @@ -108,6 +108,7 @@ func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx config = &hodu.ServerConfig{ CtlAddrs: ctl_addrs, RpcAddrs: rpc_addrs, + RpxAddrs: rpx_addrs, PxyAddrs: pxy_addrs, WpxAddrs: wpx_addrs, } @@ -117,6 +118,8 @@ func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx if err != nil { return err } config.RpcTls, err = make_tls_server_config(&cfg.RPC.TLS) if err != nil { return err } + config.RpxTls, err = make_tls_server_config(&cfg.RPX.TLS) + if err != nil { return err } config.PxyTls, err = make_tls_server_config(&cfg.PXY.TLS) if err != nil { return err } config.WpxTls, err = make_tls_server_config(&cfg.WPX.TLS) @@ -124,6 +127,7 @@ func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx if len(config.CtlAddrs) <= 0 { config.CtlAddrs = cfg.CTL.Service.Addrs } if len(config.RpcAddrs) <= 0 { config.RpcAddrs = cfg.RPC.Service.Addrs } + if len(config.RpxAddrs) <= 0 { config.RpxAddrs = cfg.RPX.Service.Addrs } if len(config.PxyAddrs) <= 0 { config.PxyAddrs = cfg.PXY.Service.Addrs } if len(config.WpxAddrs) <= 0 { config.WpxAddrs = cfg.WPX.Service.Addrs } @@ -178,6 +182,7 @@ func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx s.StartService(nil) s.StartCtlService() + s.StartRpxService() s.StartPxyService() s.StartWpxService() s.StartExtService(&signal_handler{svc:s}, nil) @@ -364,6 +369,7 @@ func main() { if strings.EqualFold(os.Args[1], "server") { var rpc_addrs []string var ctl_addrs []string + var rpx_addrs []string var pxy_addrs []string var wpx_addrs []string var cfgfile string @@ -373,6 +379,7 @@ func main() { ctl_addrs = make([]string, 0) rpc_addrs = make([]string, 0) + rpx_addrs = make([]string, 0) pxy_addrs = make([]string, 0) wpx_addrs = make([]string, 0) @@ -385,6 +392,10 @@ func main() { rpc_addrs = append(rpc_addrs, v) return nil }) + flgs.Func("rpx-on", "specify a rpx listening address", func(v string) error { + rpx_addrs = append(rpx_addrs, v) + return nil + }) flgs.Func("pxy-on", "specify a proxy listening address", func(v string) error { pxy_addrs = append(pxy_addrs, v) return nil @@ -440,7 +451,7 @@ func main() { } } - err = server_main(ctl_addrs, rpc_addrs, pxy_addrs, wpx_addrs, logfile, &cfg) + err = server_main(ctl_addrs, rpc_addrs, rpx_addrs, pxy_addrs, wpx_addrs, logfile, &cfg) if err != nil { fmt.Fprintf(os.Stderr, "ERROR: server error - %s\n", err.Error()) goto oops diff --git a/hodu.pb.go b/hodu.pb.go index 7252802..9d6bae3 100644 --- a/hodu.pb.go +++ b/hodu.pb.go @@ -28,10 +28,9 @@ const ( ROUTE_OPTION_TCP ROUTE_OPTION = 1 ROUTE_OPTION_TCP4 ROUTE_OPTION = 2 ROUTE_OPTION_TCP6 ROUTE_OPTION = 4 - ROUTE_OPTION_UNUSED ROUTE_OPTION = 8 - ROUTE_OPTION_HTTP ROUTE_OPTION = 16 - ROUTE_OPTION_HTTPS ROUTE_OPTION = 32 - ROUTE_OPTION_SSH ROUTE_OPTION = 64 + ROUTE_OPTION_HTTP ROUTE_OPTION = 8 + ROUTE_OPTION_HTTPS ROUTE_OPTION = 16 + ROUTE_OPTION_SSH ROUTE_OPTION = 32 ) // Enum value maps for ROUTE_OPTION. @@ -41,20 +40,18 @@ var ( 1: "TCP", 2: "TCP4", 4: "TCP6", - 8: "UNUSED", - 16: "HTTP", - 32: "HTTPS", - 64: "SSH", + 8: "HTTP", + 16: "HTTPS", + 32: "SSH", } ROUTE_OPTION_value = map[string]int32{ "UNSPEC": 0, "TCP": 1, "TCP4": 2, "TCP6": 4, - "UNUSED": 8, - "HTTP": 16, - "HTTPS": 32, - "SSH": 64, + "HTTP": 8, + "HTTPS": 16, + "SSH": 32, } ) @@ -866,18 +863,16 @@ const file_hodu_proto_rawDesc = "" + "\bConnNoti\x18\a \x01(\v2\v.ConnNoticeH\x00R\bConnNoti\x12&\n" + "\aRptyEvt\x18\b \x01(\v2\n" + ".RptyEventH\x00R\aRptyEvtB\x03\n" + - "\x01U*a\n" + + "\x01U*U\n" + "\fROUTE_OPTION\x12\n" + "\n" + "\x06UNSPEC\x10\x00\x12\a\n" + "\x03TCP\x10\x01\x12\b\n" + "\x04TCP4\x10\x02\x12\b\n" + - "\x04TCP6\x10\x04\x12\n" + - "\n" + - "\x06UNUSED\x10\b\x12\b\n" + - "\x04HTTP\x10\x10\x12\t\n" + - "\x05HTTPS\x10 \x12\a\n" + - "\x03SSH\x10@*\xa2\x02\n" + + "\x04TCP6\x10\x04\x12\b\n" + + "\x04HTTP\x10\b\x12\t\n" + + "\x05HTTPS\x10\x10\x12\a\n" + + "\x03SSH\x10 *\xa2\x02\n" + "\vPACKET_KIND\x12\f\n" + "\bRESERVED\x10\x00\x12\x0f\n" + "\vROUTE_START\x10\x01\x12\x0e\n" + diff --git a/hodu.proto b/hodu.proto index 870a4cf..29f644c 100644 --- a/hodu.proto +++ b/hodu.proto @@ -23,10 +23,9 @@ enum ROUTE_OPTION { TCP = 1; TCP4 = 2; TCP6 = 4; - UNUSED = 8; - HTTP = 16; - HTTPS = 32; - SSH = 64; + HTTP = 8; + HTTPS = 16; + SSH = 32; }; message RouteDesc { diff --git a/pty.go b/pty.go index c713069..7b3344b 100644 --- a/pty.go +++ b/pty.go @@ -21,7 +21,7 @@ func connect_pty(pty_shell string, pty_user string) (*exec.Cmd, *os.File, error) return nil, nil, fmt.Errorf("blank pty shell") } - cmd = exec.Command(pty_shell); + cmd = exec.Command(pty_shell) if pty_user != "" { var uid int var gid int @@ -55,8 +55,8 @@ func connect_pty(pty_shell string, pty_user string) (*exec.Cmd, *os.File, error) return nil, nil, err } - //syscall.SetNonblock(int(tty.Fd()), true); - unix.SetNonblock(int(tty.Fd()), true); + //syscall.SetNonblock(int(tty.Fd()), true) + unix.SetNonblock(int(tty.Fd()), true) return cmd, tty, nil } diff --git a/server-pty.go b/server-pty.go index 5efac75..9542161 100644 --- a/server-pty.go +++ b/server-pty.go @@ -66,7 +66,7 @@ func (pty *server_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) { conn_ready = <-conn_ready_chan if conn_ready { // connected - var poll_fds []unix.PollFd; + var poll_fds []unix.PollFd var buf []byte var n int var err error diff --git a/server.go b/server.go index 0a2f5b2..427e507 100644 --- a/server.go +++ b/server.go @@ -32,8 +32,9 @@ const CTS_LIMIT int = 16384 type PortId uint16 const PORT_ID_MARKER string = "_" const HS_ID_CTL string = "ctl" +const HS_ID_RPX string = "pxy" +const HS_ID_PXY string = "rpx" const HS_ID_WPX string = "wpx" -const HS_ID_PXY string = "pxy" type ServerConnMapByAddr map[net.Addr]*ServerConn type ServerConnMapByClientToken map[string]*ServerConn @@ -64,6 +65,9 @@ type ServerConfig struct { CtlAuth *HttpAuthConfig CtlCors bool + RpxAddrs []string + RpxTls *tls.Config + PxyAddrs []string PxyTls *tls.Config @@ -108,6 +112,11 @@ type Server struct { ext_svcs []Service ext_closed bool + rpx_mux *http.ServeMux + rpx []*http.Server // proxy server + rpx_addrs_mtx sync.Mutex + rpx_addrs *list.List // of net.Addr + pxy_mux *http.ServeMux pxy []*http.Server // proxy server pxy_addrs_mtx sync.Mutex @@ -1514,6 +1523,7 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi s.bulletin = NewBulletin[*ServerEvent](&s, 1024) s.ctl_addrs = list.New() + s.rpx_addrs = list.New() s.pxy_addrs = list.New() s.wpx_addrs = list.New() @@ -1624,6 +1634,23 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi // --------------------------------------------------------- + s.rpx_mux = http.NewServeMux() // TODO: make /_init,_ssh,_ssh/ws,_http configurable... + s.rpx_mux.Handle("/", s.WrapHttpHandler(&server_rpx{ S: &s, Id: HS_ID_RPX })) + + s.rpx = make([]*http.Server, len(cfg.RpxAddrs)) + + for i = 0; i < len(cfg.RpxAddrs); i++ { + s.rpx[i] = &http.Server{ + Addr: cfg.RpxAddrs[i], + Handler: s.rpx_mux, + TLSConfig: cfg.RpxTls, + ErrorLog: hs_log, + // TODO: more settings + } + } + + // --------------------------------------------------------- + s.pxy_mux = http.NewServeMux() // TODO: make /_init,_ssh,_ssh/ws,_http configurable... s.pxy_mux.Handle("/_ssh/{conn_id}/", @@ -1892,6 +1919,59 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) { l_wg.Wait() } +func (s *Server) RunRpxTask(wg *sync.WaitGroup) { + var err error + var rpx *http.Server + var idx int + var l_wg sync.WaitGroup + + defer wg.Done() + + for idx, rpx = range s.rpx { + l_wg.Add(1) + go func(i int, cs *http.Server) { + var l net.Listener + + s.log.Write("", LOG_INFO, "rpx channel[%d] started on %s", i, s.Cfg.RpxAddrs[i]) + + if s.stop_req.Load() == false { + l, err = net.Listen(TcpAddrStrClass(cs.Addr), cs.Addr) + if err == nil { + if s.stop_req.Load() == false { + var node *list.Element + + s.rpx_addrs_mtx.Lock() + node = s.rpx_addrs.PushBack(l.Addr().(*net.TCPAddr)) + s.rpx_addrs_mtx.Unlock() + + if s.Cfg.RpxTls == nil { // TODO: change this + err = cs.Serve(l) + } else { + err = cs.ServeTLS(l, "", "") // s.Cfg.RpxTls must provide a certificate and a key + } + + s.rpx_addrs_mtx.Lock() + s.rpx_addrs.Remove(node) + s.rpx_addrs_mtx.Unlock() + } else { + err = fmt.Errorf("stop requested") + } + l.Close() + } + } else { + err = fmt.Errorf("stop requested") + } + if errors.Is(err, http.ErrServerClosed) { + s.log.Write("", LOG_INFO, "rpx channel[%d] ended", i) + } else { + s.log.Write("", LOG_ERROR, "rpx channel[%d] error - %s", i, err.Error()) + } + l_wg.Done() + }(idx, rpx) + } + l_wg.Wait() +} + func (s *Server) RunPxyTask(wg *sync.WaitGroup) { var err error var pxy *http.Server @@ -2015,6 +2095,10 @@ func (s *Server) ReqStop() { hs.Shutdown(s.Ctx) // to break s.ctl.Serve() } + for _, hs = range s.rpx { + hs.Shutdown(s.Ctx) // to break s.rpx.Serve() + } + for _, hs = range s.pxy { hs.Shutdown(s.Ctx) // to break s.pxy.Serve() } @@ -2384,6 +2468,11 @@ func (s *Server) StartCtlService() { go s.RunCtlTask(&s.wg) } +func (s *Server) StartRpxService() { + s.wg.Add(1) + go s.RunRpxTask(&s.wg) +} + func (s *Server) StartPxyService() { s.wg.Add(1) go s.RunPxyTask(&s.wg)