Compare commits

...

4 Commits

Author SHA1 Message Date
31a4223aab minotr message fix 2025-08-12 16:40:18 +09:00
41cd725c1c added server-rpx.go 2025-08-12 16:30:56 +09:00
6200bc5460 removed UNUSED from the proto file 2025-08-12 16:29:44 +09:00
7fb4fbaae2 updated http logging to include the query string part 2025-08-12 12:17:33 +09:00
13 changed files with 205 additions and 52 deletions

View File

@ -28,6 +28,7 @@ SRCS=\
server-peer.go \
server-pty.go \
server-pxy.go \
server-rpx.go \
system.go \
transform.go \

View File

@ -58,7 +58,7 @@ func (pty *client_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
conn_ready = <-conn_ready_chan
if conn_ready { // connected
var poll_fds []unix.PollFd;
var poll_fds []unix.PollFd
var buf []byte
var n int
var err error
@ -83,7 +83,7 @@ func (pty *client_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
if (poll_fds[0].Revents & (unix.POLLERR | unix.POLLHUP | unix.POLLNVAL)) != 0 {
c.log.Write(pty.Id, LOG_DEBUG, "[%s] EOF detected on pty stdout", req.RemoteAddr)
break;
break
}
if (poll_fds[0].Revents & unix.POLLIN) != 0 {

View File

@ -661,7 +661,7 @@ func (r *ClientRoute) ReportPacket(pts_id PeerId, packet_type PACKET_KIND, event
r.cts.C.log.Write(r.cts.Sid, LOG_INFO,
"Ingested route_started(%d,%s,%s) for route(%d,%s,%v,%s,%s)",
rd.RouteId, rd.TargetAddrStr, rd.ServiceNetStr,
r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet);
r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet)
}
}
@ -680,7 +680,7 @@ func (r *ClientRoute) ReportPacket(pts_id PeerId, packet_type PACKET_KIND, event
r.cts.C.log.Write(r.cts.Sid, LOG_INFO,
"Ingested route_stopped(%d,%s,%s) for route(%d,%s,%v,%s,%s)",
rd.RouteId, rd.TargetAddrStr, rd.ServiceNetStr,
r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet);
r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet)
}
r.ReqStop()
@ -1423,7 +1423,7 @@ func (cts *ClientConn) ReportPacket(route_id RouteId, pts_id PeerId, packet_type
func (cts *ClientConn) ReadRptyLoop(crp *ClientRpty, wg *sync.WaitGroup) {
var poll_fds []unix.PollFd;
var poll_fds []unix.PollFd
var buf []byte
var n int
var err error
@ -1700,9 +1700,9 @@ func (c *Client) WrapHttpHandler(handler ClientHttpHandler) http.Handler {
if status_code > 0 {
if err != nil {
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f - %s", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds(), err.Error())
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f - %s", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code, time_taken.Seconds(), err.Error())
} else {
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds())
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code, time_taken.Seconds())
}
}
})
@ -1714,7 +1714,7 @@ func (c *Client) SafeWrapWebsocketHandler(handler websocket.Handler) http.Handle
!strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") {
var status_code int
status_code = WriteEmptyRespHeader(w, http.StatusBadRequest)
c.log.Write("", LOG_INFO, "[%s] %s %s %d[non-websocket]", req.RemoteAddr, req.Method, req.URL.String(), status_code)
c.log.Write("", LOG_INFO, "[%s] %s %s %d[non-websocket]", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code)
return
}
handler.ServeHTTP(w, req)
@ -1728,9 +1728,11 @@ func (c *Client) WrapWebsocketHandler(handler ClientWebsocketHandler) websocket.
var start_time time.Time
var time_taken time.Duration
var req *http.Request
var raw_url_path string
req = ws.Request()
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws]", req.RemoteAddr, req.Method, req.URL.String())
raw_url_path = get_raw_url_path(req)
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws]", req.RemoteAddr, req.Method, raw_url_path)
start_time = time.Now()
status_code, err = handler.ServeWebsocket(ws)
@ -1738,9 +1740,9 @@ func (c *Client) WrapWebsocketHandler(handler ClientWebsocketHandler) websocket.
if status_code > 0 {
if err != nil {
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f - %s", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds(), err.Error())
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f - %s", req.RemoteAddr, req.Method, raw_url_path, status_code, time_taken.Seconds(), err.Error())
} else {
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds())
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f", req.RemoteAddr, req.Method, raw_url_path, status_code, time_taken.Seconds())
}
}
})

View File

@ -69,6 +69,10 @@ type CTLServiceConfig struct {
Auth HttpAuthConfig `yaml:"auth"`
}
type RPXServiceConfig struct {
Addrs []string `yaml:"addresses"`
}
type PXYServiceConfig struct {
Addrs []string `yaml:"addresses"`
}
@ -122,6 +126,11 @@ type ServerConfig struct {
TLS ServerTLSConfig `yaml:"tls"`
} `yaml:"ctl"`
RPX struct {
Service RPXServiceConfig `yaml:"service"`
TLS ServerTLSConfig `yaml:"tls"`
} `yaml:"rpx"`
PXY struct {
Service PXYServiceConfig `yaml:"service"`
TLS ServerTLSConfig `yaml:"tls"`

View File

@ -90,7 +90,7 @@ func (sh *signal_handler) WriteLog(id string, level hodu.LogLevel, fmt string, a
// --------------------------------------------------------------------
func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx_addrs []string, logfile string, cfg *ServerConfig) error {
func server_main(ctl_addrs []string, rpc_addrs []string, rpx_addrs[] string, pxy_addrs []string, wpx_addrs []string, logfile string, cfg *ServerConfig) error {
var s *hodu.Server
var config *hodu.ServerConfig
var logger *AppLogger
@ -108,6 +108,7 @@ func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx
config = &hodu.ServerConfig{
CtlAddrs: ctl_addrs,
RpcAddrs: rpc_addrs,
RpxAddrs: rpx_addrs,
PxyAddrs: pxy_addrs,
WpxAddrs: wpx_addrs,
}
@ -117,6 +118,8 @@ func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx
if err != nil { return err }
config.RpcTls, err = make_tls_server_config(&cfg.RPC.TLS)
if err != nil { return err }
config.RpxTls, err = make_tls_server_config(&cfg.RPX.TLS)
if err != nil { return err }
config.PxyTls, err = make_tls_server_config(&cfg.PXY.TLS)
if err != nil { return err }
config.WpxTls, err = make_tls_server_config(&cfg.WPX.TLS)
@ -124,6 +127,7 @@ func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx
if len(config.CtlAddrs) <= 0 { config.CtlAddrs = cfg.CTL.Service.Addrs }
if len(config.RpcAddrs) <= 0 { config.RpcAddrs = cfg.RPC.Service.Addrs }
if len(config.RpxAddrs) <= 0 { config.RpxAddrs = cfg.RPX.Service.Addrs }
if len(config.PxyAddrs) <= 0 { config.PxyAddrs = cfg.PXY.Service.Addrs }
if len(config.WpxAddrs) <= 0 { config.WpxAddrs = cfg.WPX.Service.Addrs }
@ -178,6 +182,7 @@ func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx
s.StartService(nil)
s.StartCtlService()
s.StartRpxService()
s.StartPxyService()
s.StartWpxService()
s.StartExtService(&signal_handler{svc:s}, nil)
@ -364,6 +369,7 @@ func main() {
if strings.EqualFold(os.Args[1], "server") {
var rpc_addrs []string
var ctl_addrs []string
var rpx_addrs []string
var pxy_addrs []string
var wpx_addrs []string
var cfgfile string
@ -373,6 +379,7 @@ func main() {
ctl_addrs = make([]string, 0)
rpc_addrs = make([]string, 0)
rpx_addrs = make([]string, 0)
pxy_addrs = make([]string, 0)
wpx_addrs = make([]string, 0)
@ -385,6 +392,10 @@ func main() {
rpc_addrs = append(rpc_addrs, v)
return nil
})
flgs.Func("rpx-on", "specify a rpx listening address", func(v string) error {
rpx_addrs = append(rpx_addrs, v)
return nil
})
flgs.Func("pxy-on", "specify a proxy listening address", func(v string) error {
pxy_addrs = append(pxy_addrs, v)
return nil
@ -440,7 +451,7 @@ func main() {
}
}
err = server_main(ctl_addrs, rpc_addrs, pxy_addrs, wpx_addrs, logfile, &cfg)
err = server_main(ctl_addrs, rpc_addrs, rpx_addrs, pxy_addrs, wpx_addrs, logfile, &cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: server error - %s\n", err.Error())
goto oops
@ -525,7 +536,7 @@ func main() {
os.Exit(0)
wrong_usage:
fmt.Fprintf(os.Stderr, "USAGE: %s server --rpc-on=addr:port --ctl-on=addr:port --pxy-on=addr:port --wpx-on=addr:port [--config-file=file] [--config-file-pattern=pattern]\n", os.Args[0])
fmt.Fprintf(os.Stderr, "USAGE: %s server --rpc-on=addr:port --ctl-on=addr:port --rpx-on=addr:port --pxy-on=addr:port --wpx-on=addr:port [--config-file=file] [--config-file-pattern=pattern]\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s client --rpc-to=addr:port --ctl-on=addr:port [--config-file=file] [--config-file-pattern=pattern] [peer-addr:peer-port ...]\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s version\n", os.Args[0])
os.Exit(1)

12
hodu.go
View File

@ -225,7 +225,7 @@ func (option RouteOption) String() string {
func dump_call_frame_and_exit(log Logger, req *http.Request, err interface{}) {
var buf []byte
buf = make([]byte, 65536); buf = buf[:min(65536, runtime.Stack(buf, false))]
log.Write("", LOG_ERROR, "[%s] %s %s - %v\n%s", req.RemoteAddr, req.Method, req.URL.String(), err, string(buf))
log.Write("", LOG_ERROR, "[%s] %s %s - %v\n%s", req.RemoteAddr, req.Method, get_raw_url_path(req), err, string(buf))
log.Close()
os.Exit(99) // fatal error. treat panic() as a fatal runtime error
}
@ -466,3 +466,13 @@ func (auth *HttpAuthConfig) Authenticate(req *http.Request) (int, string) {
return http.StatusOK, ""
}
// ------------------------------------
func get_raw_url_path(req *http.Request) string {
var path string
path = req.URL.Path
if req.URL.RawQuery != "" { path += "?" + req.URL.RawQuery }
return path
}

View File

@ -28,10 +28,9 @@ const (
ROUTE_OPTION_TCP ROUTE_OPTION = 1
ROUTE_OPTION_TCP4 ROUTE_OPTION = 2
ROUTE_OPTION_TCP6 ROUTE_OPTION = 4
ROUTE_OPTION_UNUSED ROUTE_OPTION = 8
ROUTE_OPTION_HTTP ROUTE_OPTION = 16
ROUTE_OPTION_HTTPS ROUTE_OPTION = 32
ROUTE_OPTION_SSH ROUTE_OPTION = 64
ROUTE_OPTION_HTTP ROUTE_OPTION = 8
ROUTE_OPTION_HTTPS ROUTE_OPTION = 16
ROUTE_OPTION_SSH ROUTE_OPTION = 32
)
// Enum value maps for ROUTE_OPTION.
@ -41,20 +40,18 @@ var (
1: "TCP",
2: "TCP4",
4: "TCP6",
8: "UNUSED",
16: "HTTP",
32: "HTTPS",
64: "SSH",
8: "HTTP",
16: "HTTPS",
32: "SSH",
}
ROUTE_OPTION_value = map[string]int32{
"UNSPEC": 0,
"TCP": 1,
"TCP4": 2,
"TCP6": 4,
"UNUSED": 8,
"HTTP": 16,
"HTTPS": 32,
"SSH": 64,
"HTTP": 8,
"HTTPS": 16,
"SSH": 32,
}
)
@ -866,18 +863,16 @@ const file_hodu_proto_rawDesc = "" +
"\bConnNoti\x18\a \x01(\v2\v.ConnNoticeH\x00R\bConnNoti\x12&\n" +
"\aRptyEvt\x18\b \x01(\v2\n" +
".RptyEventH\x00R\aRptyEvtB\x03\n" +
"\x01U*a\n" +
"\x01U*U\n" +
"\fROUTE_OPTION\x12\n" +
"\n" +
"\x06UNSPEC\x10\x00\x12\a\n" +
"\x03TCP\x10\x01\x12\b\n" +
"\x04TCP4\x10\x02\x12\b\n" +
"\x04TCP6\x10\x04\x12\n" +
"\n" +
"\x06UNUSED\x10\b\x12\b\n" +
"\x04HTTP\x10\x10\x12\t\n" +
"\x05HTTPS\x10 \x12\a\n" +
"\x03SSH\x10@*\xa2\x02\n" +
"\x04TCP6\x10\x04\x12\b\n" +
"\x04HTTP\x10\b\x12\t\n" +
"\x05HTTPS\x10\x10\x12\a\n" +
"\x03SSH\x10 *\xa2\x02\n" +
"\vPACKET_KIND\x12\f\n" +
"\bRESERVED\x10\x00\x12\x0f\n" +
"\vROUTE_START\x10\x01\x12\x0e\n" +

View File

@ -23,10 +23,9 @@ enum ROUTE_OPTION {
TCP = 1;
TCP4 = 2;
TCP6 = 4;
UNUSED = 8;
HTTP = 16;
HTTPS = 32;
SSH = 64;
HTTP = 8;
HTTPS = 16;
SSH = 32;
};
message RouteDesc {

6
pty.go
View File

@ -21,7 +21,7 @@ func connect_pty(pty_shell string, pty_user string) (*exec.Cmd, *os.File, error)
return nil, nil, fmt.Errorf("blank pty shell")
}
cmd = exec.Command(pty_shell);
cmd = exec.Command(pty_shell)
if pty_user != "" {
var uid int
var gid int
@ -55,8 +55,8 @@ func connect_pty(pty_shell string, pty_user string) (*exec.Cmd, *os.File, error)
return nil, nil, err
}
//syscall.SetNonblock(int(tty.Fd()), true);
unix.SetNonblock(int(tty.Fd()), true);
//syscall.SetNonblock(int(tty.Fd()), true)
unix.SetNonblock(int(tty.Fd()), true)
return cmd, tty, nil
}

View File

@ -66,7 +66,7 @@ func (pty *server_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
conn_ready = <-conn_ready_chan
if conn_ready { // connected
var poll_fds []unix.PollFd;
var poll_fds []unix.PollFd
var buf []byte
var n int
var err error

View File

@ -375,7 +375,7 @@ func (pxy *server_pxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Requ
}
proxy_url = pxy.req_to_proxy_url(req, pi)
s.log.Write(pxy.Id, LOG_INFO, "[%s] %s %s -> %+v", req.RemoteAddr, req.Method, req.URL.String(), proxy_url)
s.log.Write(pxy.Id, LOG_INFO, "[%s] %s %s -> %+v", req.RemoteAddr, req.Method, get_raw_url_path(req), proxy_url)
proxy_req, err = http.NewRequestWithContext(s.Ctx, req.Method, proxy_url.String(), req.Body)
if err != nil {
@ -401,7 +401,7 @@ func (pxy *server_pxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Requ
} else {
status_code = resp.StatusCode
if upgrade_required && resp.StatusCode == http.StatusSwitchingProtocols {
s.log.Write(pxy.Id, LOG_INFO, "[%s] %s %s %d", req.RemoteAddr, req.Method, req.URL.String(), status_code)
s.log.Write(pxy.Id, LOG_INFO, "[%s] %s %s %d", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code)
err = pxy.serve_upgraded(w, req, resp)
if err != nil { goto oops }
return 0, nil// print the log mesage before calling serve_upgraded() and exit here
@ -426,7 +426,7 @@ func (pxy *server_pxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Requ
_, err = io.Copy(w, resp_body)
if err != nil {
s.log.Write(pxy.Id, LOG_WARN, "[%s] %s %s %s", req.RemoteAddr, req.Method, req.URL.String(), err.Error())
s.log.Write(pxy.Id, LOG_WARN, "[%s] %s %s %s", req.RemoteAddr, req.Method, get_raw_url_path(req), err.Error())
}
// TODO: handle trailers

35
server-rpx.go Normal file
View File

@ -0,0 +1,35 @@
package hodu
import "net/http"
type server_rpx struct {
S *Server
Id string
}
// ------------------------------------
func (pxy *server_rpx) Identity() string {
return pxy.Id
}
func (pxy *server_rpx) Cors(req *http.Request) bool {
return false
}
func (pxy *server_rpx) Authenticate(req *http.Request) (int, string) {
return http.StatusOK, ""
}
func (pxy *server_rpx) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
var status_code int
// var err error
status_code = http.StatusOK
//done:
return status_code, nil
//oops:
// return status_code, err
}

105
server.go
View File

@ -32,8 +32,9 @@ const CTS_LIMIT int = 16384
type PortId uint16
const PORT_ID_MARKER string = "_"
const HS_ID_CTL string = "ctl"
const HS_ID_RPX string = "pxy"
const HS_ID_PXY string = "rpx"
const HS_ID_WPX string = "wpx"
const HS_ID_PXY string = "pxy"
type ServerConnMapByAddr map[net.Addr]*ServerConn
type ServerConnMapByClientToken map[string]*ServerConn
@ -64,6 +65,9 @@ type ServerConfig struct {
CtlAuth *HttpAuthConfig
CtlCors bool
RpxAddrs []string
RpxTls *tls.Config
PxyAddrs []string
PxyTls *tls.Config
@ -108,6 +112,11 @@ type Server struct {
ext_svcs []Service
ext_closed bool
rpx_mux *http.ServeMux
rpx []*http.Server // proxy server
rpx_addrs_mtx sync.Mutex
rpx_addrs *list.List // of net.Addr
pxy_mux *http.ServeMux
pxy []*http.Server // proxy server
pxy_addrs_mtx sync.Mutex
@ -1417,9 +1426,9 @@ func (s *Server) WrapHttpHandler(handler ServerHttpHandler) http.Handler {
if status_code > 0 {
if err != nil {
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f - %s", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds(), err.Error())
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f - %s", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code, time_taken.Seconds(), err.Error())
} else {
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds())
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code, time_taken.Seconds())
}
}
})
@ -1431,7 +1440,7 @@ func (s *Server) SafeWrapWebsocketHandler(handler websocket.Handler) http.Handle
!strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") {
var status_code int
status_code = WriteEmptyRespHeader(w, http.StatusBadRequest)
s.log.Write("", LOG_INFO, "[%s] %s %s %d[non-websocket]", req.RemoteAddr, req.Method, req.URL.String(), status_code)
s.log.Write("", LOG_INFO, "[%s] %s %s %d[non-websocket]", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code)
return
}
handler.ServeHTTP(w, req)
@ -1445,9 +1454,11 @@ func (s *Server) WrapWebsocketHandler(handler ServerWebsocketHandler) websocket.
var start_time time.Time
var time_taken time.Duration
var req *http.Request
var raw_url_path string
req = ws.Request()
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws]", req.RemoteAddr, req.Method, req.URL.String())
raw_url_path = get_raw_url_path(req)
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws]", req.RemoteAddr, req.Method, raw_url_path)
start_time = time.Now()
status_code, err = handler.ServeWebsocket(ws)
@ -1455,9 +1466,9 @@ func (s *Server) WrapWebsocketHandler(handler ServerWebsocketHandler) websocket.
if status_code > 0 {
if err != nil {
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f - %s", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds(), err.Error())
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f - %s", req.RemoteAddr, req.Method, raw_url_path, status_code, time_taken.Seconds(), err.Error())
} else {
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds())
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f", req.RemoteAddr, req.Method, raw_url_path, status_code, time_taken.Seconds())
}
}
})
@ -1512,6 +1523,7 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi
s.bulletin = NewBulletin[*ServerEvent](&s, 1024)
s.ctl_addrs = list.New()
s.rpx_addrs = list.New()
s.pxy_addrs = list.New()
s.wpx_addrs = list.New()
@ -1622,6 +1634,23 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi
// ---------------------------------------------------------
s.rpx_mux = http.NewServeMux() // TODO: make /_init,_ssh,_ssh/ws,_http configurable...
s.rpx_mux.Handle("/", s.WrapHttpHandler(&server_rpx{ S: &s, Id: HS_ID_RPX }))
s.rpx = make([]*http.Server, len(cfg.RpxAddrs))
for i = 0; i < len(cfg.RpxAddrs); i++ {
s.rpx[i] = &http.Server{
Addr: cfg.RpxAddrs[i],
Handler: s.rpx_mux,
TLSConfig: cfg.RpxTls,
ErrorLog: hs_log,
// TODO: more settings
}
}
// ---------------------------------------------------------
s.pxy_mux = http.NewServeMux() // TODO: make /_init,_ssh,_ssh/ws,_http configurable...
s.pxy_mux.Handle("/_ssh/{conn_id}/",
@ -1890,6 +1919,59 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
l_wg.Wait()
}
func (s *Server) RunRpxTask(wg *sync.WaitGroup) {
var err error
var rpx *http.Server
var idx int
var l_wg sync.WaitGroup
defer wg.Done()
for idx, rpx = range s.rpx {
l_wg.Add(1)
go func(i int, cs *http.Server) {
var l net.Listener
s.log.Write("", LOG_INFO, "RPX channel[%d] started on %s", i, s.Cfg.RpxAddrs[i])
if s.stop_req.Load() == false {
l, err = net.Listen(TcpAddrStrClass(cs.Addr), cs.Addr)
if err == nil {
if s.stop_req.Load() == false {
var node *list.Element
s.rpx_addrs_mtx.Lock()
node = s.rpx_addrs.PushBack(l.Addr().(*net.TCPAddr))
s.rpx_addrs_mtx.Unlock()
if s.Cfg.RpxTls == nil { // TODO: change this
err = cs.Serve(l)
} else {
err = cs.ServeTLS(l, "", "") // s.Cfg.RpxTls must provide a certificate and a key
}
s.rpx_addrs_mtx.Lock()
s.rpx_addrs.Remove(node)
s.rpx_addrs_mtx.Unlock()
} else {
err = fmt.Errorf("stop requested")
}
l.Close()
}
} else {
err = fmt.Errorf("stop requested")
}
if errors.Is(err, http.ErrServerClosed) {
s.log.Write("", LOG_INFO, "RPX channel[%d] ended", i)
} else {
s.log.Write("", LOG_ERROR, "RPX channel[%d] error - %s", i, err.Error())
}
l_wg.Done()
}(idx, rpx)
}
l_wg.Wait()
}
func (s *Server) RunPxyTask(wg *sync.WaitGroup) {
var err error
var pxy *http.Server
@ -2013,6 +2095,10 @@ func (s *Server) ReqStop() {
hs.Shutdown(s.Ctx) // to break s.ctl.Serve()
}
for _, hs = range s.rpx {
hs.Shutdown(s.Ctx) // to break s.rpx.Serve()
}
for _, hs = range s.pxy {
hs.Shutdown(s.Ctx) // to break s.pxy.Serve()
}
@ -2382,6 +2468,11 @@ func (s *Server) StartCtlService() {
go s.RunCtlTask(&s.wg)
}
func (s *Server) StartRpxService() {
s.wg.Add(1)
go s.RunRpxTask(&s.wg)
}
func (s *Server) StartPxyService() {
s.wg.Add(1)
go s.RunPxyTask(&s.wg)