Compare commits

...

6 Commits

Author SHA1 Message Date
31a4223aab minotr message fix 2025-08-12 16:40:18 +09:00
41cd725c1c added server-rpx.go 2025-08-12 16:30:56 +09:00
6200bc5460 removed UNUSED from the proto file 2025-08-12 16:29:44 +09:00
7fb4fbaae2 updated http logging to include the query string part 2025-08-12 12:17:33 +09:00
d818acc53d rpty at least working 2025-08-12 02:50:10 +09:00
05cb0823b4 some code clean-up in handling grpc packets 2025-08-10 17:23:01 +09:00
19 changed files with 974 additions and 447 deletions

View File

@ -21,12 +21,14 @@ SRCS=\
hodu_grpc.pb.go \
jwt.go \
packet.go \
pty.go \
server.go \
server-ctl.go \
server-metrics.go \
server-peer.go \
server-pty.go \
server-pxy.go \
server-rpx.go \
system.go \
transform.go \

View File

@ -13,7 +13,7 @@ func (av* Atom[T]) Set(v T) {
func (av* Atom[T]) Get() T {
var v interface{}
v = av.val.Load()
if v == nil {
if v == nil {
var t T
return t // return the zero-value
}

View File

@ -2,16 +2,13 @@ package hodu
import "encoding/json"
import "errors"
import "fmt"
import "io"
import "net/http"
import "os"
import "os/exec"
import "os/user"
import "strconv"
import "strings"
import "sync"
import "syscall"
import "text/template"
import pts "github.com/creack/pty"
@ -35,74 +32,11 @@ func (pty *client_pty_ws) Identity() string {
return pty.Id
}
func (pty *client_pty_ws) send_ws_data(ws *websocket.Conn, type_val string, data string) error {
var msg []byte
var err error
msg, err = json.Marshal(json_xterm_ws_event{Type: type_val, Data: []string{ data } })
if err == nil { err = websocket.Message.Send(ws, msg) }
return err
}
func (pty *client_pty_ws) connect_pty(username string, password string) (*exec.Cmd, *os.File, error) {
var c *Client
var cmd *exec.Cmd
var tty *os.File
var err error
// username and password are not used yet.
c = pty.C
if c.pty_shell == "" {
return nil, nil, fmt.Errorf("blank pty shell")
}
cmd = exec.Command(c.pty_shell);
if c.pty_user != "" {
var uid int
var gid int
var u *user.User
u, err = user.Lookup(c.pty_user)
if err != nil { return nil, nil, err }
uid, _ = strconv.Atoi(u.Uid)
gid, _ = strconv.Atoi(u.Gid)
cmd.SysProcAttr = &syscall.SysProcAttr{
Credential: &syscall.Credential{
Uid: uint32(uid),
Gid: uint32(gid),
},
Setsid: true,
}
cmd.Dir = u.HomeDir
cmd.Env = append(cmd.Env,
"HOME=" + u.HomeDir,
"LOGNAME=" + u.Username,
"PATH=" + os.Getenv("PATH"),
"SHELL=" + c.pty_shell,
"TERM=xterm",
"USER=" + u.Username,
)
}
tty, err = pts.Start(cmd)
if err != nil {
return nil, nil, err
}
//syscall.SetNonblock(int(tty.Fd()), true);
unix.SetNonblock(int(tty.Fd()), true);
return cmd, tty, nil
}
func (pty *client_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
var c *Client
var req *http.Request
var username string
var password string
//var username string
//var password string
var in *os.File
var out *os.File
var tty *os.File
@ -124,7 +58,7 @@ func (pty *client_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
conn_ready = <-conn_ready_chan
if conn_ready { // connected
var poll_fds []unix.PollFd;
var poll_fds []unix.PollFd
var buf []byte
var n int
var err error
@ -149,7 +83,7 @@ func (pty *client_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
if (poll_fds[0].Revents & (unix.POLLERR | unix.POLLHUP | unix.POLLNVAL)) != 0 {
c.log.Write(pty.Id, LOG_DEBUG, "[%s] EOF detected on pty stdout", req.RemoteAddr)
break;
break
}
if (poll_fds[0].Revents & unix.POLLIN) != 0 {
@ -161,7 +95,7 @@ func (pty *client_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
break
}
if n > 0 {
err = pty.send_ws_data(ws, "iov", string(buf[:n]))
err = send_ws_data_for_xterm(ws, "iov", string(buf[:n]))
if err != nil {
c.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to send to websocket - %s", req.RemoteAddr, err.Error())
break
@ -186,21 +120,21 @@ ws_recv_loop:
switch ev.Type {
case "open":
if tty == nil && len(ev.Data) == 2 {
username = string(ev.Data[0])
password = string(ev.Data[1])
//username = string(ev.Data[0])
//password = string(ev.Data[1])
wg.Add(1)
go func() {
var err error
defer wg.Done()
cmd, tty, err = pty.connect_pty(username, password)
cmd, tty, err = connect_pty(c.pty_shell, c.pty_user)
if err != nil {
c.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to connect pty - %s", req.RemoteAddr, err.Error())
pty.send_ws_data(ws, "error", err.Error())
send_ws_data_for_xterm(ws, "error", err.Error())
ws.Close() // dirty way to flag out the error
} else {
err = pty.send_ws_data(ws, "status", "opened")
err = send_ws_data_for_xterm(ws, "status", "opened")
if err != nil {
c.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to write opened event to websocket - %s", req.RemoteAddr, err.Error())
ws.Close() // dirty way to flag out the error
@ -245,7 +179,7 @@ ws_recv_loop:
}
if tty != nil {
err = pty.send_ws_data(ws, "status", "closed")
err = send_ws_data_for_xterm(ws, "status", "closed")
if err != nil { goto done }
}

355
client.go
View File

@ -5,9 +5,12 @@ import "context"
import "crypto/tls"
import "errors"
import "fmt"
import "io"
import "log"
import "net"
import "net/http"
import "os"
import "os/exec"
import "slices"
import "strconv"
import "strings"
@ -17,12 +20,16 @@ import "time"
import "unsafe"
import "golang.org/x/net/websocket"
import "golang.org/x/sys/unix"
import "google.golang.org/grpc"
import "google.golang.org/grpc/codes"
import "google.golang.org/grpc/credentials"
import "google.golang.org/grpc/credentials/insecure"
import "google.golang.org/grpc/peer"
import "google.golang.org/grpc/status"
import pts "github.com/creack/pty"
import "github.com/prometheus/client_golang/prometheus"
import "github.com/prometheus/client_golang/prometheus/promhttp"
@ -32,6 +39,7 @@ type ClientConnMap map[ConnId]*ClientConn
type ClientRouteMap map[RouteId]*ClientRoute
type ClientPeerConnMap map[PeerId]*ClientPeerConn
type ClientPeerCancelFuncMap map[PeerId]context.CancelFunc
type ClientRptyMap map[uint64]*ClientRpty
// --------------------------------------------------------------------
type ClientRouteConfig struct {
@ -162,6 +170,12 @@ const (
CLIENT_CONN_DISCONNECTED
)
type ClientRpty struct {
id uint64
cmd *exec.Cmd
tty *os.File
}
// client connection to server
type ClientConn struct {
C *Client
@ -192,6 +206,9 @@ type ClientConn struct {
ptc_mtx sync.Mutex
ptc_list *list.List
rpty_mtx sync.Mutex
rpty_map ClientRptyMap
stop_req atomic.Bool
stop_chan chan bool
@ -527,10 +544,6 @@ func (r *ClientRoute) ConnectToPeer(pts_id PeerId, route_option RouteOption, pts
var tmout time.Duration
var ok bool
// TODO: handle TTY
// if route_option & RouteOption(ROUTE_OPTION_TTY) it must create a pseudo-tty insteaad of connecting to tcp address
//
defer wg.Done()
tmout = time.Duration(r.cts.C.ptc_tmout)
@ -648,7 +661,7 @@ func (r *ClientRoute) ReportPacket(pts_id PeerId, packet_type PACKET_KIND, event
r.cts.C.log.Write(r.cts.Sid, LOG_INFO,
"Ingested route_started(%d,%s,%s) for route(%d,%s,%v,%s,%s)",
rd.RouteId, rd.TargetAddrStr, rd.ServiceNetStr,
r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet);
r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet)
}
}
@ -667,7 +680,7 @@ func (r *ClientRoute) ReportPacket(pts_id PeerId, packet_type PACKET_KIND, event
r.cts.C.log.Write(r.cts.Sid, LOG_INFO,
"Ingested route_stopped(%d,%s,%s) for route(%d,%s,%v,%s,%s)",
rd.RouteId, rd.TargetAddrStr, rd.ServiceNetStr,
r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet);
r.Id, r.PeerAddr, r.ServerPeerOption, r.ReqServerPeerSvcAddr, r.ReqServerPeerSvcNet)
}
r.ReqStop()
@ -790,6 +803,7 @@ func (r *ClientRoute) ReportPacket(pts_id PeerId, packet_type PACKET_KIND, event
}
}
default:
// ignore all others
}
@ -810,6 +824,7 @@ func NewClientConn(c *Client, cfg *ClientConnConfig) *ClientConn {
cts.stop_req.Store(false)
cts.stop_chan = make(chan bool, 8)
cts.ptc_list = list.New()
cts.rpty_map = make(ClientRptyMap)
for i, _ = range cts.cfg.Routes {
// override it to static regardless of the value passed in
@ -1016,6 +1031,7 @@ func (cts *ClientConn) add_client_routes(routes []ClientRouteConfig) error {
func (cts *ClientConn) disconnect_from_server(logmsg bool) {
if cts.conn != nil {
var r *ClientRoute
var crp *ClientRpty
cts.discon_mtx.Lock()
@ -1027,6 +1043,17 @@ func (cts *ClientConn) disconnect_from_server(logmsg bool) {
for _, r = range cts.route_map { r.ReqStop() }
cts.route_mtx.Unlock()
// arrange to clean up all rpty objects
cts.rpty_mtx.Lock()
for _, crp = range cts.rpty_map {
crp.tty.Close()
crp.cmd.Process.Kill()
// the loop in ReadRptyLoop() is supposed to be broken.
// let's not inform the server of this connection.
// the server should clean up itself upon connection error
}
cts.rpty_mtx.Unlock()
// don't care about double closes when this function is called from both RunTask() and ReqStop()
cts.conn.Close()
@ -1200,6 +1227,8 @@ start_over:
switch pkt.Kind {
case PACKET_KIND_ROUTE_STARTED:
fallthrough
case PACKET_KIND_ROUTE_STOPPED:
// the server side managed to set up the route the client requested
var x *Packet_Route
var ok bool
@ -1208,96 +1237,41 @@ start_over:
err = cts.ReportPacket(RouteId(x.Route.RouteId), 0, pkt.Kind, x.Route)
if err != nil {
cts.C.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle route_started event(%d,%s) from %s - %s",
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr_p, err.Error())
"Failed to handle %s event(%d,%s) from %s - %s",
pkt.Kind.String(), x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr_p, err.Error())
} else {
cts.C.log.Write(cts.Sid, LOG_DEBUG,
"Handled route_started event(%d,%s) from %s",
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr_p)
"Handled %s event(%d,%s) from %s",
pkt.Kind.String(), x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr_p)
}
} else {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid route_started event from %s", cts.remote_addr_p)
}
case PACKET_KIND_ROUTE_STOPPED:
var x *Packet_Route
var ok bool
x, ok = pkt.U.(*Packet_Route)
if ok {
err = cts.ReportPacket(RouteId(x.Route.RouteId), 0, pkt.Kind, x.Route)
if err != nil {
cts.C.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle route_stopped event(%d,%s) from %s - %s",
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr_p, err.Error())
} else {
cts.C.log.Write(cts.Sid, LOG_DEBUG,
"Handled route_stopped event(%d,%s) from %s",
x.Route.RouteId, x.Route.TargetAddrStr, cts.remote_addr_p)
}
} else {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid route_stopped event from %s", cts.remote_addr_p)
}
case PACKET_KIND_PEER_STARTED:
// the connection from the client to a peer has been established
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportPacket(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STARTED, x.Peer)
if err != nil {
cts.C.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s",
cts.remote_addr_p, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
cts.C.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_started event from %s for peer(%d,%d,%s,%s)",
cts.remote_addr_p, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_started event from %s", cts.remote_addr_p)
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid %s event from %s", pkt.Kind.String(), cts.remote_addr_p)
}
// PACKET_KIND_PEER_ABORTED is never sent by server to client.
// the code here doesn't handle the event.
case PACKET_KIND_PEER_STARTED:
fallthrough
case PACKET_KIND_PEER_STOPPED:
fallthrough
case PACKET_KIND_PEER_EOF:
// the connection from the client to a peer has been established
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportPacket(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STOPPED, x.Peer)
err = cts.ReportPacket(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), pkt.Kind, x.Peer)
if err != nil {
cts.C.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s",
cts.remote_addr_p, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
"Failed to handle %s event from %s for peer(%d,%d,%s,%s) - %s",
pkt.Kind.String(), cts.remote_addr_p, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
cts.C.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_stopped event from %s for peer(%d,%d,%s,%s)",
cts.remote_addr_p, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
"Handled %s event from %s for peer(%d,%d,%s,%s)",
pkt.Kind.String(), cts.remote_addr_p, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.remote_addr_p)
}
case PACKET_KIND_PEER_EOF:
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportPacket(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_EOF, x.Peer)
if err != nil {
cts.C.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_eof event from %s for peer(%d,%d,%s,%s) - %s",
cts.remote_addr_p, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
cts.C.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_eof event from %s for peer(%d,%d,%s,%s)",
cts.remote_addr_p, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_eof event from %s", cts.remote_addr_p)
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid %s event from %s", pkt.Kind.String(), cts.remote_addr_p)
}
case PACKET_KIND_PEER_DATA:
@ -1306,18 +1280,18 @@ start_over:
var ok bool
x, ok = pkt.U.(*Packet_Data)
if ok {
err = cts.ReportPacket(RouteId(x.Data.RouteId), PeerId(x.Data.PeerId), PACKET_KIND_PEER_DATA, x.Data.Data)
err = cts.ReportPacket(RouteId(x.Data.RouteId), PeerId(x.Data.PeerId), pkt.Kind, x.Data.Data)
if err != nil {
cts.C.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_data event from %s for peer(%d,%d) - %s",
cts.remote_addr_p, x.Data.RouteId, x.Data.PeerId, err.Error())
"Failed to handle %s event from %s for peer(%d,%d) - %s",
pkt.Kind.String(), cts.remote_addr_p, x.Data.RouteId, x.Data.PeerId, err.Error())
} else {
cts.C.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_data event from %s for peer(%d,%d)",
cts.remote_addr_p, x.Data.RouteId, x.Data.PeerId)
"Handled %s event from %s for peer(%d,%d)",
pkt.Kind.String(), cts.remote_addr_p, x.Data.RouteId, x.Data.PeerId)
}
} else {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_data event from %s", cts.remote_addr_p)
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid %s event from %s", pkt.Kind.String(), cts.remote_addr_p)
}
case PACKET_KIND_CONN_ERROR:
@ -1325,10 +1299,10 @@ start_over:
var ok bool
x, ok = pkt.U.(*Packet_ConnErr)
if ok {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Received conn_error(%d, %s) event from %s", x.ConnErr.ErrorId, x.ConnErr.Text, cts.remote_addr_p)
cts.C.log.Write(cts.Sid, LOG_ERROR, "Received %s(%d, %s) event from %s", pkt.Kind.String(), x.ConnErr.ErrorId, x.ConnErr.Text, cts.remote_addr_p)
if cts.cfg.CloseOnConnErrorEvent { goto done }
} else {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_error event from %s", cts.remote_addr_p)
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid %sevent from %s", pkt.Kind.String(), cts.remote_addr_p)
}
case PACKET_KIND_CONN_NOTICE:
@ -1337,7 +1311,7 @@ start_over:
var ok bool
x, ok = pkt.U.(*Packet_ConnNoti)
if ok {
cts.C.log.Write(cts.Sid, LOG_DEBUG, "conn_notice message '%s' received from %s", x.ConnNoti.Text, cts.remote_addr_p)
cts.C.log.Write(cts.Sid, LOG_DEBUG, "%s message '%s' received from %s", pkt.Kind.String(), x.ConnNoti.Text, cts.remote_addr_p)
if cts.C.conn_notice_handlers != nil {
var handler ClientConnNoticeHandler
for _, handler = range cts.C.conn_notice_handlers {
@ -1345,18 +1319,34 @@ start_over:
}
}
} else {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_notice packet from %s", cts.remote_addr_p)
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid %s packet from %s", pkt.Kind.String(), cts.remote_addr_p)
}
case PACKET_KIND_RPTY_START:
// TODO:
fallthrough
case PACKET_KIND_RPTY_STOP:
// TODO:
fallthrough
case PACKET_KIND_RPTY_DATA:
// TODO:
case PACKET_KIND_RPTY_EOF:
// TODO:
fallthrough
case PACKET_KIND_RPTY_SIZE:
var x *Packet_RptyEvt
var ok bool
x, ok = pkt.U.(*Packet_RptyEvt)
if ok {
err = cts.HandleRptyEvent(pkt.Kind, x.RptyEvt)
if err != nil {
cts.C.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle %s event for rpty(%d) from %s - %s",
pkt.Kind.String(), x.RptyEvt.Id, cts.remote_addr_p, err.Error())
} else {
cts.C.log.Write(cts.Sid, LOG_DEBUG,
"Handled %s event for rpty(%d) from %s",
pkt.Kind.String(), x.RptyEvt.Id, cts.remote_addr_p)
}
} else {
cts.C.log.Write(cts.Sid, LOG_ERROR, "Invalid %s event from %s", pkt.Kind.String(), cts.remote_addr_p)
}
default:
// do nothing. ignore the rest
@ -1430,6 +1420,175 @@ func (cts *ClientConn) ReportPacket(route_id RouteId, pts_id PeerId, packet_type
return r.ReportPacket(pts_id, packet_type, event_data)
}
func (cts *ClientConn) ReadRptyLoop(crp *ClientRpty, wg *sync.WaitGroup) {
var poll_fds []unix.PollFd
var buf []byte
var n int
var err error
defer wg.Done()
poll_fds = []unix.PollFd{
unix.PollFd{Fd: int32(crp.tty.Fd()), Events: unix.POLLIN},
}
buf = make([]byte, 2048)
for {
n, err = unix.Poll(poll_fds, -1) // -1 means wait indefinitely
if err != nil {
if errors.Is(err, unix.EINTR) { continue }
cts.C.log.Write(cts.Sid, LOG_ERROR, "Failed to poll rpty(%d) stdout - %s", crp.id, err.Error())
break
}
if n == 0 { // timed out
continue
}
if (poll_fds[0].Revents & (unix.POLLERR | unix.POLLHUP | unix.POLLNVAL)) != 0 {
cts.C.log.Write(cts.Sid, LOG_DEBUG, "EOF detected on rpty(%d) stdout", crp.id)
break
}
if (poll_fds[0].Revents & unix.POLLIN) != 0 {
n, err = crp.tty.Read(buf)
if err != nil {
if !errors.Is(err, io.EOF) {
cts.C.log.Write(cts.Sid, LOG_DEBUG, "Failed to read rpty(%d) stdout - %s", crp.id, err.Error())
}
break
}
if n > 0 {
err = cts.psc.Send(MakeRptyDataPacket(crp.id, buf[:n]))
if err != nil {
cts.C.log.Write(cts.Sid, LOG_DEBUG, "Failed to send rpty(%d) stdout to server - %s", crp.id, err.Error())
break
}
}
}
}
cts.psc.Send(MakeRptyStopPacket(crp.id, ""))
cts.C.log.Write(cts.Sid, LOG_INFO, "Ending rpty(%d) read loop", crp.id)
crp.tty.Close() // don't care about multiple closes
crp.cmd.Process.Kill()
crp.cmd.Wait()
cts.rpty_mtx.Lock()
delete(cts.rpty_map, crp.id)
cts.rpty_mtx.Unlock()
}
func (cts *ClientConn) StartRpty(id uint64, wg *sync.WaitGroup) error {
var crp *ClientRpty
var ok bool
var err error
cts.rpty_mtx.Lock()
_, ok = cts.rpty_map[id]
if ok {
cts.rpty_mtx.Unlock()
return fmt.Errorf("multiple start on rpty id %d", id)
}
crp = &ClientRpty{ id: id }
crp.cmd, crp.tty, err = connect_pty(cts.C.pty_shell, cts.C.pty_user)
if err != nil {
cts.rpty_mtx.Unlock()
cts.psc.Send(MakeRptyStopPacket(id, err.Error()))
return fmt.Errorf("unable to start rpty(%d) for %s(%s) - %s", id, cts.C.pty_shell, cts.C.pty_user, err.Error())
}
cts.rpty_map[id] = crp
wg.Add(1)
go cts.ReadRptyLoop(crp, wg)
cts.rpty_mtx.Unlock()
cts.C.log.Write(cts.Sid, LOG_INFO, "Started rpty(%d) for %s(%s)", id, cts.C.pty_shell, cts.C.pty_user)
return nil
}
func (cts *ClientConn) StopRpty(id uint64) error {
var crp *ClientRpty
var ok bool
cts.rpty_mtx.Lock()
crp, ok = cts.rpty_map[id]
if !ok {
cts.rpty_mtx.Unlock()
return fmt.Errorf("unknown rpty id %d", id)
}
crp.tty.Close() // to break ReadRptyLoop()
crp.cmd.Process.Kill() // to process wait to be done by ReadRptyLoop()
cts.rpty_mtx.Unlock()
return nil
}
func (cts *ClientConn) WriteRpty(id uint64, data []byte) error {
var crp *ClientRpty
var ok bool
cts.rpty_mtx.Lock()
crp, ok = cts.rpty_map[id]
if !ok {
cts.rpty_mtx.Unlock()
return fmt.Errorf("unknown rpty id %d", id)
}
crp.tty.Write(data)
cts.rpty_mtx.Unlock()
return nil
}
func (cts *ClientConn) WriteRptySize(id uint64, data []byte) error {
var crp *ClientRpty
var ok bool
var flds []string
cts.rpty_mtx.Lock()
crp, ok = cts.rpty_map[id]
if !ok {
cts.rpty_mtx.Unlock()
return fmt.Errorf("unknown rpty id %d", id)
}
flds = strings.Split(string(data), " ")
if len(flds) == 2 {
var rows int
var cols int
rows, _ = strconv.Atoi(flds[0])
cols, _ = strconv.Atoi(flds[1])
pts.Setsize(crp.tty, &pts.Winsize{Rows: uint16(rows), Cols: uint16(cols)})
}
cts.rpty_mtx.Unlock()
return nil
}
func (cts *ClientConn) HandleRptyEvent(packet_type PACKET_KIND, evt *RptyEvent) error {
switch packet_type {
case PACKET_KIND_RPTY_START:
return cts.StartRpty(evt.Id, &cts.C.wg)
case PACKET_KIND_RPTY_STOP:
return cts.StopRpty(evt.Id)
case PACKET_KIND_RPTY_DATA:
return cts.WriteRpty(evt.Id, evt.Data)
case PACKET_KIND_RPTY_SIZE:
return cts.WriteRptySize(evt.Id, evt.Data)
}
// ignore other packet types
return nil
}
// --------------------------------------------------------------------
func (m ClientPeerConnMap) get_sorted_keys() []PeerId {
@ -1541,9 +1700,9 @@ func (c *Client) WrapHttpHandler(handler ClientHttpHandler) http.Handler {
if status_code > 0 {
if err != nil {
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f - %s", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds(), err.Error())
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f - %s", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code, time_taken.Seconds(), err.Error())
} else {
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds())
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code, time_taken.Seconds())
}
}
})
@ -1555,7 +1714,7 @@ func (c *Client) SafeWrapWebsocketHandler(handler websocket.Handler) http.Handle
!strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") {
var status_code int
status_code = WriteEmptyRespHeader(w, http.StatusBadRequest)
c.log.Write("", LOG_INFO, "[%s] %s %s %d[non-websocket]", req.RemoteAddr, req.Method, req.URL.String(), status_code)
c.log.Write("", LOG_INFO, "[%s] %s %s %d[non-websocket]", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code)
return
}
handler.ServeHTTP(w, req)
@ -1569,9 +1728,11 @@ func (c *Client) WrapWebsocketHandler(handler ClientWebsocketHandler) websocket.
var start_time time.Time
var time_taken time.Duration
var req *http.Request
var raw_url_path string
req = ws.Request()
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws]", req.RemoteAddr, req.Method, req.URL.String())
raw_url_path = get_raw_url_path(req)
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws]", req.RemoteAddr, req.Method, raw_url_path)
start_time = time.Now()
status_code, err = handler.ServeWebsocket(ws)
@ -1579,9 +1740,9 @@ func (c *Client) WrapWebsocketHandler(handler ClientWebsocketHandler) websocket.
if status_code > 0 {
if err != nil {
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f - %s", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds(), err.Error())
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f - %s", req.RemoteAddr, req.Method, raw_url_path, status_code, time_taken.Seconds(), err.Error())
} else {
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds())
c.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f", req.RemoteAddr, req.Method, raw_url_path, status_code, time_taken.Seconds())
}
}
})

View File

@ -69,6 +69,10 @@ type CTLServiceConfig struct {
Auth HttpAuthConfig `yaml:"auth"`
}
type RPXServiceConfig struct {
Addrs []string `yaml:"addresses"`
}
type PXYServiceConfig struct {
Addrs []string `yaml:"addresses"`
}
@ -122,6 +126,11 @@ type ServerConfig struct {
TLS ServerTLSConfig `yaml:"tls"`
} `yaml:"ctl"`
RPX struct {
Service RPXServiceConfig `yaml:"service"`
TLS ServerTLSConfig `yaml:"tls"`
} `yaml:"rpx"`
PXY struct {
Service PXYServiceConfig `yaml:"service"`
TLS ServerTLSConfig `yaml:"tls"`

View File

@ -90,7 +90,7 @@ func (sh *signal_handler) WriteLog(id string, level hodu.LogLevel, fmt string, a
// --------------------------------------------------------------------
func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx_addrs []string, logfile string, cfg *ServerConfig) error {
func server_main(ctl_addrs []string, rpc_addrs []string, rpx_addrs[] string, pxy_addrs []string, wpx_addrs []string, logfile string, cfg *ServerConfig) error {
var s *hodu.Server
var config *hodu.ServerConfig
var logger *AppLogger
@ -108,6 +108,7 @@ func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx
config = &hodu.ServerConfig{
CtlAddrs: ctl_addrs,
RpcAddrs: rpc_addrs,
RpxAddrs: rpx_addrs,
PxyAddrs: pxy_addrs,
WpxAddrs: wpx_addrs,
}
@ -117,6 +118,8 @@ func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx
if err != nil { return err }
config.RpcTls, err = make_tls_server_config(&cfg.RPC.TLS)
if err != nil { return err }
config.RpxTls, err = make_tls_server_config(&cfg.RPX.TLS)
if err != nil { return err }
config.PxyTls, err = make_tls_server_config(&cfg.PXY.TLS)
if err != nil { return err }
config.WpxTls, err = make_tls_server_config(&cfg.WPX.TLS)
@ -124,6 +127,7 @@ func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx
if len(config.CtlAddrs) <= 0 { config.CtlAddrs = cfg.CTL.Service.Addrs }
if len(config.RpcAddrs) <= 0 { config.RpcAddrs = cfg.RPC.Service.Addrs }
if len(config.RpxAddrs) <= 0 { config.RpxAddrs = cfg.RPX.Service.Addrs }
if len(config.PxyAddrs) <= 0 { config.PxyAddrs = cfg.PXY.Service.Addrs }
if len(config.WpxAddrs) <= 0 { config.WpxAddrs = cfg.WPX.Service.Addrs }
@ -178,6 +182,7 @@ func server_main(ctl_addrs []string, rpc_addrs []string, pxy_addrs []string, wpx
s.StartService(nil)
s.StartCtlService()
s.StartRpxService()
s.StartPxyService()
s.StartWpxService()
s.StartExtService(&signal_handler{svc:s}, nil)
@ -364,6 +369,7 @@ func main() {
if strings.EqualFold(os.Args[1], "server") {
var rpc_addrs []string
var ctl_addrs []string
var rpx_addrs []string
var pxy_addrs []string
var wpx_addrs []string
var cfgfile string
@ -373,6 +379,7 @@ func main() {
ctl_addrs = make([]string, 0)
rpc_addrs = make([]string, 0)
rpx_addrs = make([]string, 0)
pxy_addrs = make([]string, 0)
wpx_addrs = make([]string, 0)
@ -385,6 +392,10 @@ func main() {
rpc_addrs = append(rpc_addrs, v)
return nil
})
flgs.Func("rpx-on", "specify a rpx listening address", func(v string) error {
rpx_addrs = append(rpx_addrs, v)
return nil
})
flgs.Func("pxy-on", "specify a proxy listening address", func(v string) error {
pxy_addrs = append(pxy_addrs, v)
return nil
@ -440,7 +451,7 @@ func main() {
}
}
err = server_main(ctl_addrs, rpc_addrs, pxy_addrs, wpx_addrs, logfile, &cfg)
err = server_main(ctl_addrs, rpc_addrs, rpx_addrs, pxy_addrs, wpx_addrs, logfile, &cfg)
if err != nil {
fmt.Fprintf(os.Stderr, "ERROR: server error - %s\n", err.Error())
goto oops
@ -525,7 +536,7 @@ func main() {
os.Exit(0)
wrong_usage:
fmt.Fprintf(os.Stderr, "USAGE: %s server --rpc-on=addr:port --ctl-on=addr:port --pxy-on=addr:port --wpx-on=addr:port [--config-file=file] [--config-file-pattern=pattern]\n", os.Args[0])
fmt.Fprintf(os.Stderr, "USAGE: %s server --rpc-on=addr:port --ctl-on=addr:port --rpx-on=addr:port --pxy-on=addr:port --wpx-on=addr:port [--config-file=file] [--config-file-pattern=pattern]\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s client --rpc-to=addr:port --ctl-on=addr:port [--config-file=file] [--config-file-pattern=pattern] [peer-addr:peer-port ...]\n", os.Args[0])
fmt.Fprintf(os.Stderr, " %s version\n", os.Args[0])
os.Exit(1)

15
hodu.go
View File

@ -184,8 +184,6 @@ func word_to_route_option(word string) RouteOption {
return RouteOption(ROUTE_OPTION_TCP6)
case "tcp":
return RouteOption(ROUTE_OPTION_TCP)
case "tty":
return RouteOption(ROUTE_OPTION_TTY)
case "http":
return RouteOption(ROUTE_OPTION_HTTP)
case "https":
@ -217,7 +215,6 @@ func (option RouteOption) String() string {
if option & RouteOption(ROUTE_OPTION_TCP6) != 0 { str += " tcp6" }
if option & RouteOption(ROUTE_OPTION_TCP4) != 0 { str += " tcp4" }
if option & RouteOption(ROUTE_OPTION_TCP) != 0 { str += " tcp" }
if option & RouteOption(ROUTE_OPTION_TTY) != 0 { str += " tty" }
if option & RouteOption(ROUTE_OPTION_HTTP) != 0 { str += " http" }
if option & RouteOption(ROUTE_OPTION_HTTPS) != 0 { str += " https" }
if option & RouteOption(ROUTE_OPTION_SSH) != 0 { str += " ssh" }
@ -228,7 +225,7 @@ func (option RouteOption) String() string {
func dump_call_frame_and_exit(log Logger, req *http.Request, err interface{}) {
var buf []byte
buf = make([]byte, 65536); buf = buf[:min(65536, runtime.Stack(buf, false))]
log.Write("", LOG_ERROR, "[%s] %s %s - %v\n%s", req.RemoteAddr, req.Method, req.URL.String(), err, string(buf))
log.Write("", LOG_ERROR, "[%s] %s %s - %v\n%s", req.RemoteAddr, req.Method, get_raw_url_path(req), err, string(buf))
log.Close()
os.Exit(99) // fatal error. treat panic() as a fatal runtime error
}
@ -469,3 +466,13 @@ func (auth *HttpAuthConfig) Authenticate(req *http.Request) (int, string) {
return http.StatusOK, ""
}
// ------------------------------------
func get_raw_url_path(req *http.Request) string {
var path string
path = req.URL.Path
if req.URL.RawQuery != "" { path += "?" + req.URL.RawQuery }
return path
}

View File

@ -28,10 +28,9 @@ const (
ROUTE_OPTION_TCP ROUTE_OPTION = 1
ROUTE_OPTION_TCP4 ROUTE_OPTION = 2
ROUTE_OPTION_TCP6 ROUTE_OPTION = 4
ROUTE_OPTION_TTY ROUTE_OPTION = 8
ROUTE_OPTION_HTTP ROUTE_OPTION = 16
ROUTE_OPTION_HTTPS ROUTE_OPTION = 32
ROUTE_OPTION_SSH ROUTE_OPTION = 64
ROUTE_OPTION_HTTP ROUTE_OPTION = 8
ROUTE_OPTION_HTTPS ROUTE_OPTION = 16
ROUTE_OPTION_SSH ROUTE_OPTION = 32
)
// Enum value maps for ROUTE_OPTION.
@ -41,20 +40,18 @@ var (
1: "TCP",
2: "TCP4",
4: "TCP6",
8: "TTY",
16: "HTTP",
32: "HTTPS",
64: "SSH",
8: "HTTP",
16: "HTTPS",
32: "SSH",
}
ROUTE_OPTION_value = map[string]int32{
"UNSPEC": 0,
"TCP": 1,
"TCP4": 2,
"TCP6": 4,
"TTY": 8,
"HTTP": 16,
"HTTPS": 32,
"SSH": 64,
"HTTP": 8,
"HTTPS": 16,
"SSH": 32,
}
)
@ -103,11 +100,8 @@ const (
PACKET_KIND_CONN_NOTICE PACKET_KIND = 13
PACKET_KIND_RPTY_START PACKET_KIND = 14
PACKET_KIND_RPTY_STOP PACKET_KIND = 15
PACKET_KIND_RPTY_STARTED PACKET_KIND = 16
PACKET_KIND_RPTY_STOPPED PACKET_KIND = 17
PACKET_KIND_RPTY_ABORTED PACKET_KIND = 18
PACKET_KIND_RPTY_EOF PACKET_KIND = 19
PACKET_KIND_RPTY_DATA PACKET_KIND = 20
PACKET_KIND_RPTY_DATA PACKET_KIND = 16
PACKET_KIND_RPTY_SIZE PACKET_KIND = 17
)
// Enum value maps for PACKET_KIND.
@ -128,11 +122,8 @@ var (
13: "CONN_NOTICE",
14: "RPTY_START",
15: "RPTY_STOP",
16: "RPTY_STARTED",
17: "RPTY_STOPPED",
18: "RPTY_ABORTED",
19: "RPTY_EOF",
20: "RPTY_DATA",
16: "RPTY_DATA",
17: "RPTY_SIZE",
}
PACKET_KIND_value = map[string]int32{
"RESERVED": 0,
@ -150,11 +141,8 @@ var (
"CONN_NOTICE": 13,
"RPTY_START": 14,
"RPTY_STOP": 15,
"RPTY_STARTED": 16,
"RPTY_STOPPED": 17,
"RPTY_ABORTED": 18,
"RPTY_EOF": 19,
"RPTY_DATA": 20,
"RPTY_DATA": 16,
"RPTY_SIZE": 17,
}
)
@ -605,7 +593,7 @@ func (x *ConnNotice) GetText() string {
type RptyEvent struct {
state protoimpl.MessageState `protogen:"open.v1"`
Token string `protobuf:"bytes,1,opt,name=Token,proto3" json:"Token,omitempty"`
Id uint64 `protobuf:"varint,1,opt,name=Id,proto3" json:"Id,omitempty"`
Data []byte `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
@ -641,11 +629,11 @@ func (*RptyEvent) Descriptor() ([]byte, []int) {
return file_hodu_proto_rawDescGZIP(), []int{7}
}
func (x *RptyEvent) GetToken() string {
func (x *RptyEvent) GetId() uint64 {
if x != nil {
return x.Token
return x.Id
}
return ""
return 0
}
func (x *RptyEvent) GetData() []byte {
@ -666,7 +654,7 @@ type Packet struct {
// *Packet_Conn
// *Packet_ConnErr
// *Packet_ConnNoti
// *Packet_Rpty
// *Packet_RptyEvt
U isPacket_U `protobuf_oneof:"U"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
@ -770,10 +758,10 @@ func (x *Packet) GetConnNoti() *ConnNotice {
return nil
}
func (x *Packet) GetRpty() *RptyEvent {
func (x *Packet) GetRptyEvt() *RptyEvent {
if x != nil {
if x, ok := x.U.(*Packet_Rpty); ok {
return x.Rpty
if x, ok := x.U.(*Packet_RptyEvt); ok {
return x.RptyEvt
}
}
return nil
@ -807,8 +795,8 @@ type Packet_ConnNoti struct {
ConnNoti *ConnNotice `protobuf:"bytes,7,opt,name=ConnNoti,proto3,oneof"`
}
type Packet_Rpty struct {
Rpty *RptyEvent `protobuf:"bytes,8,opt,name=Rpty,proto3,oneof"`
type Packet_RptyEvt struct {
RptyEvt *RptyEvent `protobuf:"bytes,8,opt,name=RptyEvt,proto3,oneof"`
}
func (*Packet_Route) isPacket_U() {}
@ -823,7 +811,7 @@ func (*Packet_ConnErr) isPacket_U() {}
func (*Packet_ConnNoti) isPacket_U() {}
func (*Packet_Rpty) isPacket_U() {}
func (*Packet_RptyEvt) isPacket_U() {}
var File_hodu_proto protoreflect.FileDescriptor
@ -859,10 +847,10 @@ const file_hodu_proto_rawDesc = "" +
"\x04Text\x18\x02 \x01(\tR\x04Text\" \n" +
"\n" +
"ConnNotice\x12\x12\n" +
"\x04Text\x18\x01 \x01(\tR\x04Text\"5\n" +
"\tRptyEvent\x12\x14\n" +
"\x05Token\x18\x01 \x01(\tR\x05Token\x12\x12\n" +
"\x04Data\x18\x02 \x01(\fR\x04Data\"\xab\x02\n" +
"\x04Text\x18\x01 \x01(\tR\x04Text\"/\n" +
"\tRptyEvent\x12\x0e\n" +
"\x02Id\x18\x01 \x01(\x04R\x02Id\x12\x12\n" +
"\x04Data\x18\x02 \x01(\fR\x04Data\"\xb1\x02\n" +
"\x06Packet\x12 \n" +
"\x04Kind\x18\x01 \x01(\x0e2\f.PACKET_KINDR\x04Kind\x12\"\n" +
"\x05Route\x18\x02 \x01(\v2\n" +
@ -872,20 +860,19 @@ const file_hodu_proto_rawDesc = "" +
"\x04Conn\x18\x05 \x01(\v2\t.ConnDescH\x00R\x04Conn\x12&\n" +
"\aConnErr\x18\x06 \x01(\v2\n" +
".ConnErrorH\x00R\aConnErr\x12)\n" +
"\bConnNoti\x18\a \x01(\v2\v.ConnNoticeH\x00R\bConnNoti\x12 \n" +
"\x04Rpty\x18\b \x01(\v2\n" +
".RptyEventH\x00R\x04RptyB\x03\n" +
"\x01U*^\n" +
"\bConnNoti\x18\a \x01(\v2\v.ConnNoticeH\x00R\bConnNoti\x12&\n" +
"\aRptyEvt\x18\b \x01(\v2\n" +
".RptyEventH\x00R\aRptyEvtB\x03\n" +
"\x01U*U\n" +
"\fROUTE_OPTION\x12\n" +
"\n" +
"\x06UNSPEC\x10\x00\x12\a\n" +
"\x03TCP\x10\x01\x12\b\n" +
"\x04TCP4\x10\x02\x12\b\n" +
"\x04TCP6\x10\x04\x12\a\n" +
"\x03TTY\x10\b\x12\b\n" +
"\x04HTTP\x10\x10\x12\t\n" +
"\x05HTTPS\x10 \x12\a\n" +
"\x03SSH\x10@*\xd7\x02\n" +
"\x04TCP6\x10\x04\x12\b\n" +
"\x04HTTP\x10\b\x12\t\n" +
"\x05HTTPS\x10\x10\x12\a\n" +
"\x03SSH\x10 *\xa2\x02\n" +
"\vPACKET_KIND\x12\f\n" +
"\bRESERVED\x10\x00\x12\x0f\n" +
"\vROUTE_START\x10\x01\x12\x0e\n" +
@ -904,12 +891,9 @@ const file_hodu_proto_rawDesc = "" +
"\vCONN_NOTICE\x10\r\x12\x0e\n" +
"\n" +
"RPTY_START\x10\x0e\x12\r\n" +
"\tRPTY_STOP\x10\x0f\x12\x10\n" +
"\fRPTY_STARTED\x10\x10\x12\x10\n" +
"\fRPTY_STOPPED\x10\x11\x12\x10\n" +
"\fRPTY_ABORTED\x10\x12\x12\f\n" +
"\bRPTY_EOF\x10\x13\x12\r\n" +
"\tRPTY_DATA\x10\x142I\n" +
"\tRPTY_STOP\x10\x0f\x12\r\n" +
"\tRPTY_DATA\x10\x10\x12\r\n" +
"\tRPTY_SIZE\x10\x112I\n" +
"\x04Hodu\x12\x19\n" +
"\aGetSeed\x12\x05.Seed\x1a\x05.Seed\"\x00\x12&\n" +
"\fPacketStream\x12\a.Packet\x1a\a.Packet\"\x00(\x010\x01B\bZ\x06./hodub\x06proto3"
@ -949,7 +933,7 @@ var file_hodu_proto_depIdxs = []int32{
6, // 4: Packet.Conn:type_name -> ConnDesc
7, // 5: Packet.ConnErr:type_name -> ConnError
8, // 6: Packet.ConnNoti:type_name -> ConnNotice
9, // 7: Packet.Rpty:type_name -> RptyEvent
9, // 7: Packet.RptyEvt:type_name -> RptyEvent
2, // 8: Hodu.GetSeed:input_type -> Seed
10, // 9: Hodu.PacketStream:input_type -> Packet
2, // 10: Hodu.GetSeed:output_type -> Seed
@ -973,7 +957,7 @@ func file_hodu_proto_init() {
(*Packet_Conn)(nil),
(*Packet_ConnErr)(nil),
(*Packet_ConnNoti)(nil),
(*Packet_Rpty)(nil),
(*Packet_RptyEvt)(nil),
}
type x struct{}
out := protoimpl.TypeBuilder{

View File

@ -23,10 +23,9 @@ enum ROUTE_OPTION {
TCP = 1;
TCP4 = 2;
TCP6 = 4;
TTY = 8;
HTTP = 16;
HTTPS = 32;
SSH = 64;
HTTP = 8;
HTTPS = 16;
SSH = 32;
};
message RouteDesc {
@ -82,7 +81,7 @@ message ConnNotice {
};
message RptyEvent {
string Token = 1;
uint64 Id = 1;
bytes Data = 2;
};
@ -103,11 +102,8 @@ enum PACKET_KIND {
RPTY_START = 14;
RPTY_STOP = 15;
RPTY_STARTED = 16;
RPTY_STOPPED = 17;
RPTY_ABORTED = 18;
RPTY_EOF = 19;
RPTY_DATA = 20;
RPTY_DATA = 16;
RPTY_SIZE = 17;
};
message Packet {
@ -120,6 +116,6 @@ message Packet {
ConnDesc Conn = 5;
ConnError ConnErr = 6;
ConnNotice ConnNoti = 7;
RptyEvent Rpty = 8;
RptyEvent RptyEvt = 8;
};
}

6
jwt.go
View File

@ -23,7 +23,7 @@ func Sign(data []byte, privkey *rsa.PrivateKey) ([]byte, error) {
func Verify(data []byte, pubkey *rsa.PublicKey, sig []byte) error {
var h hash.Hash
h = crypto.SHA512.New()
h.Write(data)
@ -41,7 +41,7 @@ func SignHS512(data []byte, key string) ([]byte, error) {
func VerifyHS512(data []byte, key string, sig []byte) error {
var h hash.Hash
h = crypto.SHA512.New()
h.Write(data)
@ -78,7 +78,7 @@ func (j *JWT[T]) SignRS512() (string, error) {
h.Algo = "RS512"
h.Type = "JWT"
hb, err = json.Marshal(h)
hb, err = json.Marshal(h)
if err != nil { return "", err }
cb, err = json.Marshal(j.claims)

View File

@ -8,16 +8,16 @@ import "testing"
func TestJwt(t *testing.T) {
var tok string
var err error
type JWTClaim struct {
Abc string `json:"abc"`
Donkey string `json:"donkey"`
IssuedAt int `json:"iat"`
}
}
var jc JWTClaim
jc.Abc = "def"
jc.Donkey = "kong"
jc.Donkey = "kong"
jc.IssuedAt = 111
var key *rsa.PrivateKey

View File

@ -75,14 +75,18 @@ func MakeConnNoticePacket(msg string) *Packet {
return &Packet{Kind: PACKET_KIND_CONN_NOTICE, U: &Packet_ConnNoti{ConnNoti: &ConnNotice{Text: msg}}}
}
func MakeRptyStartPacket(token string) *Packet {
return &Packet{Kind: PACKET_KIND_RPTY_START, U: &Packet_Rpty{Rpty: &RptyEvent{Token: token}}}
func MakeRptyStartPacket(id uint64) *Packet {
return &Packet{Kind: PACKET_KIND_RPTY_START, U: &Packet_RptyEvt{RptyEvt: &RptyEvent{Id: id}}}
}
func MakeRptyStopPacket(token string) *Packet {
return &Packet{Kind: PACKET_KIND_RPTY_START, U: &Packet_Rpty{Rpty: &RptyEvent{Token: token}}}
func MakeRptyStopPacket(id uint64, msg string) *Packet {
return &Packet{Kind: PACKET_KIND_RPTY_STOP, U: &Packet_RptyEvt{RptyEvt: &RptyEvent{Id: id, Data: []byte(msg)}}}
}
func MakeRptyDataPacket(token string, data []byte) *Packet {
return &Packet{Kind: PACKET_KIND_RPTY_START, U: &Packet_Rpty{Rpty: &RptyEvent{Token: token, Data: data}}}
func MakeRptyDataPacket(id uint64, data []byte) *Packet {
return &Packet{Kind: PACKET_KIND_RPTY_DATA, U: &Packet_RptyEvt{RptyEvt: &RptyEvent{Id: id, Data: data}}}
}
func MakeRptySizePacket(id uint64, data []byte) *Packet {
return &Packet{Kind: PACKET_KIND_RPTY_SIZE, U: &Packet_RptyEvt{RptyEvt: &RptyEvent{Id: id, Data: data}}}
}

71
pty.go Normal file
View File

@ -0,0 +1,71 @@
package hodu
import "encoding/json"
import "fmt"
import "os"
import "os/exec"
import "os/user"
import "strconv"
import "syscall"
import pts "github.com/creack/pty"
import "golang.org/x/net/websocket"
import "golang.org/x/sys/unix"
func connect_pty(pty_shell string, pty_user string) (*exec.Cmd, *os.File, error) {
var cmd *exec.Cmd
var tty *os.File
var err error
if pty_shell == "" {
return nil, nil, fmt.Errorf("blank pty shell")
}
cmd = exec.Command(pty_shell)
if pty_user != "" {
var uid int
var gid int
var u *user.User
u, err = user.Lookup(pty_user)
if err != nil { return nil, nil, err }
uid, _ = strconv.Atoi(u.Uid)
gid, _ = strconv.Atoi(u.Gid)
cmd.SysProcAttr = &syscall.SysProcAttr{
Credential: &syscall.Credential{
Uid: uint32(uid),
Gid: uint32(gid),
},
Setsid: true,
}
cmd.Dir = u.HomeDir
cmd.Env = append(cmd.Env,
"HOME=" + u.HomeDir,
"LOGNAME=" + u.Username,
"PATH=" + os.Getenv("PATH"),
"SHELL=" + pty_shell,
"TERM=xterm",
"USER=" + u.Username,
)
}
tty, err = pts.Start(cmd)
if err != nil {
return nil, nil, err
}
//syscall.SetNonblock(int(tty.Fd()), true)
unix.SetNonblock(int(tty.Fd()), true)
return cmd, tty, nil
}
func send_ws_data_for_xterm(ws *websocket.Conn, type_val string, data string) error {
var msg []byte
var err error
msg, err = json.Marshal(json_xterm_ws_event{Type: type_val, Data: []string{ data } })
if err == nil { err = websocket.Message.Send(ws, msg) }
return err
}

View File

@ -7,11 +7,9 @@ import "io"
import "net/http"
import "os"
import "os/exec"
import "os/user"
import "strconv"
import "strings"
import "sync"
import "syscall"
import "text/template"
import pts "github.com/creack/pty"
@ -33,6 +31,7 @@ type server_rpty_ws struct {
type server_pty_xterm_file struct {
ServerCtl
file string
mode string
}
// ------------------------------------------------------
@ -41,74 +40,11 @@ func (pty *server_pty_ws) Identity() string {
return pty.Id
}
func (pty *server_pty_ws) send_ws_data(ws *websocket.Conn, type_val string, data string) error {
var msg []byte
var err error
msg, err = json.Marshal(json_xterm_ws_event{Type: type_val, Data: []string{ data } })
if err == nil { err = websocket.Message.Send(ws, msg) }
return err
}
func (pty *server_pty_ws) connect_pty(username string, password string) (*exec.Cmd, *os.File, error) {
var s *Server
var cmd *exec.Cmd
var tty *os.File
var err error
// username and password are not used yet.
s = pty.S
if s.pty_shell == "" {
return nil, nil, fmt.Errorf("blank pty shell")
}
cmd = exec.Command(s.pty_shell);
if s.pty_user != "" {
var uid int
var gid int
var u *user.User
u, err = user.Lookup(s.pty_user)
if err != nil { return nil, nil, err }
uid, _ = strconv.Atoi(u.Uid)
gid, _ = strconv.Atoi(u.Gid)
cmd.SysProcAttr = &syscall.SysProcAttr{
Credential: &syscall.Credential{
Uid: uint32(uid),
Gid: uint32(gid),
},
Setsid: true,
}
cmd.Dir = u.HomeDir
cmd.Env = append(cmd.Env,
"HOME=" + u.HomeDir,
"LOGNAME=" + u.Username,
"PATH=" + os.Getenv("PATH"),
"SHELL=" + s.pty_shell,
"TERM=xterm",
"USER=" + u.Username,
)
}
tty, err = pts.Start(cmd)
if err != nil {
return nil, nil, err
}
//syscall.SetNonblock(int(tty.Fd()), true);
unix.SetNonblock(int(tty.Fd()), true);
return cmd, tty, nil
}
func (pty *server_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
var s *Server
var req *http.Request
var username string
var password string
//var username string
//var password string
var in *os.File
var out *os.File
var tty *os.File
@ -130,7 +66,7 @@ func (pty *server_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
conn_ready = <-conn_ready_chan
if conn_ready { // connected
var poll_fds []unix.PollFd;
var poll_fds []unix.PollFd
var buf []byte
var n int
var err error
@ -154,7 +90,7 @@ func (pty *server_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
if (poll_fds[0].Revents & (unix.POLLERR | unix.POLLHUP | unix.POLLNVAL)) != 0 {
s.log.Write(pty.Id, LOG_DEBUG, "[%s] EOF detected on pty stdout", req.RemoteAddr)
break;
break
}
if (poll_fds[0].Revents & unix.POLLIN) != 0 {
@ -166,7 +102,7 @@ func (pty *server_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
break
}
if n > 0 {
err = pty.send_ws_data(ws, "iov", string(buf[:n]))
err = send_ws_data_for_xterm(ws, "iov", string(buf[:n]))
if err != nil {
s.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to send to websocket - %s", req.RemoteAddr, err.Error())
break
@ -191,23 +127,24 @@ ws_recv_loop:
switch ev.Type {
case "open":
if tty == nil && len(ev.Data) == 2 {
username = string(ev.Data[0])
password = string(ev.Data[1])
// not using username and password for now...
//username = string(ev.Data[0])
//password = string(ev.Data[1])
wg.Add(1)
go func() {
var err error
defer wg.Done()
cmd, tty, err = pty.connect_pty(username, password)
cmd, tty, err = connect_pty(s.pty_shell, s.pty_user)
if err != nil {
s.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to connect pty - %s", req.RemoteAddr, err.Error())
pty.send_ws_data(ws, "error", err.Error())
send_ws_data_for_xterm(ws, "error", err.Error())
ws.Close() // dirty way to flag out the error - this will make websocket.MessageReceive to fail
} else {
err = pty.send_ws_data(ws, "status", "opened")
err = send_ws_data_for_xterm(ws, "status", "opened")
if err != nil {
s.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to write opened event to websocket - %s", req.RemoteAddr, err.Error())
s.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to write 'opened' event to websocket - %s", req.RemoteAddr, err.Error())
ws.Close() // dirty way to flag out the error
} else {
s.log.Write(pty.Id, LOG_DEBUG, "[%s] Opened pty session", req.RemoteAddr)
@ -250,7 +187,7 @@ ws_recv_loop:
}
if tty != nil {
err = pty.send_ws_data(ws, "status", "closed")
err = send_ws_data_for_xterm(ws, "status", "closed")
if err != nil { goto done }
}
@ -270,6 +207,101 @@ done:
return http.StatusOK, err
}
// ------------------------------------------------------
func (rpty *server_rpty_ws) Identity() string {
return rpty.Id
}
func (rpty *server_rpty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
var s *Server
var req *http.Request
var token string
var cts *ServerConn
//var username string
//var password string
var rp *ServerRpty
var wg sync.WaitGroup
var err error
s = rpty.S
req = ws.Request()
token = req.FormValue("client-token")
if token == "" {
ws.Close()
return http.StatusBadRequest, fmt.Errorf("no client token specified")
}
cts = s.FindServerConnByClientToken(token)
if cts == nil {
ws.Close()
return http.StatusBadRequest, fmt.Errorf("invalid client token - %s", token)
}
ws_recv_loop:
for {
var msg []byte
err = websocket.Message.Receive(ws, &msg)
if err != nil { goto done }
if len(msg) > 0 {
var ev json_xterm_ws_event
err = json.Unmarshal(msg, &ev)
if err == nil {
switch ev.Type {
case "open":
if rp == nil && len(ev.Data) == 2 {
//username = string(ev.Data[0])
//password = string(ev.Data[1])
rp, err = cts.StartRpty(ws)
if err != nil {
s.log.Write(rpty.Id, LOG_ERROR, "[%s] Failed to connect pty - %s", req.RemoteAddr, err.Error())
send_ws_data_for_xterm(ws, "error", err.Error())
ws.Close() // dirty way to flag out the error by making websocket.Message.Receive() fail
} else {
err = send_ws_data_for_xterm(ws, "status", "opened")
if err != nil {
s.log.Write(rpty.Id, LOG_ERROR, "[%s] Failed to write 'opened' event to websocket - %s", req.RemoteAddr, err.Error())
ws.Close() // dirty way to flag out the error
} else {
s.log.Write(rpty.Id, LOG_DEBUG, "[%s] Opened pty session", req.RemoteAddr)
}
}
}
case "close":
// just break out of the loop and let the remainder to close resources
break ws_recv_loop
case "iov":
var i int
for i, _ = range ev.Data {
cts.WriteRpty(ws, []byte(ev.Data[i]))
// ignore error for now
}
case "size":
if len(ev.Data) == 2 {
cts.WriteRptySize(ws, []byte(fmt.Sprintf("%s %s", ev.Data[0], ev.Data[1])))
s.log.Write(rpty.Id, LOG_DEBUG, "[%s] Requested to resize rpty terminal to %s,%s", req.RemoteAddr, ev.Data[0], ev.Data[1])
// ignore error
}
}
}
}
}
done:
cts.StopRpty(ws)
ws.Close() // don't care about multiple closes
wg.Wait()
s.log.Write(rpty.Id, LOG_DEBUG, "[%s] Ended rpty session for %s", req.RemoteAddr, token)
return http.StatusOK, err
}
// ------------------------------------------------------
func (pty *server_pty_xterm_file) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
@ -305,7 +337,7 @@ func (pty *server_pty_xterm_file) ServeHTTP(w http.ResponseWriter, req *http.Req
status_code = WriteHtmlRespHeader(w, http.StatusOK)
tmpl.Execute(w,
&xterm_session_info{
Mode: "pty",
Mode: pty.mode,
ConnId: "-1",
RouteId: "-1",
})

View File

@ -375,7 +375,7 @@ func (pxy *server_pxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Requ
}
proxy_url = pxy.req_to_proxy_url(req, pi)
s.log.Write(pxy.Id, LOG_INFO, "[%s] %s %s -> %+v", req.RemoteAddr, req.Method, req.URL.String(), proxy_url)
s.log.Write(pxy.Id, LOG_INFO, "[%s] %s %s -> %+v", req.RemoteAddr, req.Method, get_raw_url_path(req), proxy_url)
proxy_req, err = http.NewRequestWithContext(s.Ctx, req.Method, proxy_url.String(), req.Body)
if err != nil {
@ -401,7 +401,7 @@ func (pxy *server_pxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Requ
} else {
status_code = resp.StatusCode
if upgrade_required && resp.StatusCode == http.StatusSwitchingProtocols {
s.log.Write(pxy.Id, LOG_INFO, "[%s] %s %s %d", req.RemoteAddr, req.Method, req.URL.String(), status_code)
s.log.Write(pxy.Id, LOG_INFO, "[%s] %s %s %d", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code)
err = pxy.serve_upgraded(w, req, resp)
if err != nil { goto oops }
return 0, nil// print the log mesage before calling serve_upgraded() and exit here
@ -426,7 +426,7 @@ func (pxy *server_pxy_http_main) ServeHTTP(w http.ResponseWriter, req *http.Requ
_, err = io.Copy(w, resp_body)
if err != nil {
s.log.Write(pxy.Id, LOG_WARN, "[%s] %s %s %s", req.RemoteAddr, req.Method, req.URL.String(), err.Error())
s.log.Write(pxy.Id, LOG_WARN, "[%s] %s %s %s", req.RemoteAddr, req.Method, get_raw_url_path(req), err.Error())
}
// TODO: handle trailers
@ -562,15 +562,6 @@ func (pxy *server_pxy_ssh_ws) Identity() string {
// TODO: put this task to sync group.
// TODO: put the above proxy task to sync group too.
func (pxy *server_pxy_ssh_ws) send_ws_data(ws *websocket.Conn, type_val string, data string) error {
var msg []byte
var err error
msg, err = json.Marshal(json_xterm_ws_event{Type: type_val, Data: []string{ data } })
if err == nil { err = websocket.Message.Send(ws, msg) }
return err
}
func (pxy *server_pxy_ssh_ws) connect_ssh (ctx context.Context, username string, password string, r *ServerRoute) (*ssh.Client, *ssh.Session, io.Writer, io.Reader, error) {
var cc *ssh.ClientConfig
var addr *net.TCPAddr
@ -673,7 +664,7 @@ func (pxy *server_pxy_ssh_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
var pi *ServerRouteProxyInfo
pi, err = s.wpx_foreign_port_proxy_maker("ssh", conn_id)
if err != nil {
pxy.send_ws_data(ws, "error", err.Error())
send_ws_data_for_xterm(ws, "error", err.Error())
goto done
}
@ -685,7 +676,7 @@ func (pxy *server_pxy_ssh_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
r = proxy_info_to_server_route(pi)
}
if err != nil {
pxy.send_ws_data(ws, "error", err.Error())
send_ws_data_for_xterm(ws, "error", err.Error())
goto done
}
@ -713,7 +704,7 @@ func (pxy *server_pxy_ssh_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
break
}
if n > 0 {
err = pxy.send_ws_data(ws, "iov", string(buf[:n]))
err = send_ws_data_for_xterm(ws, "iov", string(buf[:n]))
if err != nil {
s.log.Write(pxy.Id, LOG_ERROR, "[%s] Failed to send to websocket - %s", req.RemoteAddr, err.Error())
break
@ -753,10 +744,10 @@ ws_recv_loop:
c, sess, in, out, err = pxy.connect_ssh(connect_ssh_ctx, username, password, r)
if err != nil {
s.log.Write(pxy.Id, LOG_ERROR, "[%s] Failed to connect ssh - %s", req.RemoteAddr, err.Error())
pxy.send_ws_data(ws, "error", err.Error())
send_ws_data_for_xterm(ws, "error", err.Error())
ws.Close() // dirty way to flag out the error
} else {
err = pxy.send_ws_data(ws, "status", "opened")
err = send_ws_data_for_xterm(ws, "status", "opened")
if err != nil {
s.log.Write(pxy.Id, LOG_ERROR, "[%s] Failed to write opened event to websocket - %s", req.RemoteAddr, err.Error())
ws.Close() // dirty way to flag out the error
@ -800,7 +791,7 @@ ws_recv_loop:
}
if sess != nil {
err = pxy.send_ws_data(ws, "status", "closed")
err = send_ws_data_for_xterm(ws, "status", "closed")
if err != nil { goto done }
}

35
server-rpx.go Normal file
View File

@ -0,0 +1,35 @@
package hodu
import "net/http"
type server_rpx struct {
S *Server
Id string
}
// ------------------------------------
func (pxy *server_rpx) Identity() string {
return pxy.Id
}
func (pxy *server_rpx) Cors(req *http.Request) bool {
return false
}
func (pxy *server_rpx) Authenticate(req *http.Request) (int, string) {
return http.StatusOK, ""
}
func (pxy *server_rpx) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
var status_code int
// var err error
status_code = http.StatusOK
//done:
return status_code, nil
//oops:
// return status_code, err
}

453
server.go
View File

@ -32,8 +32,9 @@ const CTS_LIMIT int = 16384
type PortId uint16
const PORT_ID_MARKER string = "_"
const HS_ID_CTL string = "ctl"
const HS_ID_RPX string = "pxy"
const HS_ID_PXY string = "rpx"
const HS_ID_WPX string = "wpx"
const HS_ID_PXY string = "pxy"
type ServerConnMapByAddr map[net.Addr]*ServerConn
type ServerConnMapByClientToken map[string]*ServerConn
@ -42,6 +43,9 @@ type ServerRouteMap map[RouteId]*ServerRoute
type ServerPeerConnMap map[PeerId]*ServerPeerConn
type ServerSvcPortMap map[PortId]ConnRouteId
type ServerRptyMap map[uint64]*ServerRpty
type ServerRptyMapByWs map[*websocket.Conn]*ServerRpty
type ServerWpxResponseTransformer func(r *ServerRouteProxyInfo, resp *http.Response) io.Reader
type ServerWpxForeignPortProxyMaker func(wpx_type string, port_id string) (*ServerRouteProxyInfo, error)
@ -61,6 +65,9 @@ type ServerConfig struct {
CtlAuth *HttpAuthConfig
CtlCors bool
RpxAddrs []string
RpxTls *tls.Config
PxyAddrs []string
PxyTls *tls.Config
@ -105,6 +112,11 @@ type Server struct {
ext_svcs []Service
ext_closed bool
rpx_mux *http.ServeMux
rpx []*http.Server // proxy server
rpx_addrs_mtx sync.Mutex
rpx_addrs *list.List // of net.Addr
pxy_mux *http.ServeMux
pxy []*http.Server // proxy server
pxy_addrs_mtx sync.Mutex
@ -185,9 +197,14 @@ type ServerConn struct {
pts_mtx sync.Mutex
pts_list *list.List
wg sync.WaitGroup
stop_req atomic.Bool
stop_chan chan bool
rpty_next_id uint64
rpty_mtx sync.Mutex
rpty_map ServerRptyMap
rpty_map_by_ws ServerRptyMapByWs
wg sync.WaitGroup
stop_req atomic.Bool
stop_chan chan bool
}
type ServerRoute struct {
@ -214,6 +231,11 @@ type ServerRoute struct {
stop_req atomic.Bool
}
type ServerRpty struct {
id uint64
ws *websocket.Conn
}
type GuardedPacketStreamServer struct {
mtx sync.Mutex
//pss Hodu_PacketStreamServer
@ -449,6 +471,7 @@ func (r *ServerRoute) ReportPacket(pts_id PeerId, packet_type PACKET_KIND, event
return spc.ReportPacket(packet_type, event_data)
}
// ------------------------------------
func (cts *ServerConn) make_route_listener(id RouteId, option RouteOption, svc_requested_addr string) (*net.TCPListener, *net.TCPAddr, error) {
@ -635,10 +658,172 @@ func (cts *ServerConn) ReqStopAllServerRoutes() {
cts.route_mtx.Unlock()
}
func (cts *ServerConn) StartRpty(ws *websocket.Conn) (*ServerRpty, error) {
var ok bool
var start_id uint64
var assigned_id uint64
var rpty *ServerRpty
var err error
func (cts *ServerConn) StartRpts() {
cts.rpty_mtx.Lock()
start_id = cts.rpty_next_id
for {
_, ok = cts.rpty_map[cts.rpty_next_id]
if !ok {
assigned_id = cts.rpty_next_id
cts.rpty_next_id++
if cts.rpty_next_id == 0 { cts.rpty_next_id++ }
break
}
cts.rpty_next_id++
if cts.rpty_next_id == 0 { cts.rpty_next_id++ }
if cts.rpty_next_id == start_id {
cts.rpty_mtx.Unlock()
return nil, fmt.Errorf("unable to assign id")
}
}
_, ok = cts.rpty_map_by_ws[ws]
if ok {
cts.rpty_mtx.Unlock()
return nil, fmt.Errorf("connection already associated with rpty. possibly internal error")
}
rpty = &ServerRpty{
id: assigned_id,
ws: ws,
}
cts.rpty_map[assigned_id] = rpty
cts.rpty_map_by_ws[ws] = rpty
cts.rpty_mtx.Unlock()
err = cts.pss.Send(MakeRptyStartPacket(assigned_id))
if err != nil {
cts.rpty_mtx.Lock()
delete(cts.rpty_map, assigned_id)
delete(cts.rpty_map_by_ws, ws)
cts.rpty_mtx.Unlock()
return nil , err
}
return rpty, nil
}
func (cts *ServerConn) StopRpty(ws *websocket.Conn) error {
// called by the websocket handler.
var rpty *ServerRpty
var id uint64
var ok bool
var err error
cts.rpty_mtx.Lock()
rpty, ok = cts.rpty_map_by_ws[ws]
if !ok {
return fmt.Errorf("unknown ws connection for rpty - %v", ws.RemoteAddr())
}
id = rpty.id
cts.rpty_mtx.Unlock()
// send the stop request to the client side
err = cts.pss.Send(MakeRptyStopPacket(id, ""))
if err != nil {
return fmt.Errorf("unable to send stop rpty request to client - %s", err.Error())
}
return nil
}
func (cts *ServerConn) StopRptyWsById(id uint64, msg string) error {
// called this when the stop requested comes from the client
// abort the websocket side.
var rpty *ServerRpty
var ok bool
cts.rpty_mtx.Lock()
rpty, ok = cts.rpty_map[id]
if !ok {
return fmt.Errorf("unknown rpty id %d", id)
}
rpty.ws.Close()
cts.rpty_mtx.Unlock()
cts.S.log.Write(cts.Sid, LOG_INFO, "Stopped rpty(%d) for %s - %s", id, cts.RemoteAddr, msg)
return nil
}
func (cts *ServerConn) WriteRpty(ws *websocket.Conn, data []byte) error {
var rpty *ServerRpty
var id uint64
var ok bool
var err error
cts.rpty_mtx.Lock()
rpty, ok = cts.rpty_map_by_ws[ws]
if !ok {
return fmt.Errorf("unknown ws connection for rpty - %v", ws.RemoteAddr())
}
id = rpty.id
cts.rpty_mtx.Unlock()
err = cts.pss.Send(MakeRptyDataPacket(id, data))
if err != nil {
return fmt.Errorf("unable to send rpty data to client - %s", err.Error())
}
return nil
}
func (cts *ServerConn) WriteRptySize(ws *websocket.Conn, data []byte) error {
var rpty *ServerRpty
var id uint64
var ok bool
var err error
cts.rpty_mtx.Lock()
rpty, ok = cts.rpty_map_by_ws[ws]
if !ok {
return fmt.Errorf("unknown ws connection for rpty size - %v", ws.RemoteAddr())
}
id = rpty.id
cts.rpty_mtx.Unlock()
err = cts.pss.Send(MakeRptySizePacket(id, data))
if err != nil {
return fmt.Errorf("unable to send rpty size to client - %s", err.Error())
}
return nil
}
func (cts *ServerConn) ReadRptyAndWriteWs(id uint64, data []byte) error {
var ok bool
var rpty *ServerRpty
var err error
cts.rpty_mtx.Lock()
rpty, ok = cts.rpty_map[id]
if !ok {
cts.rpty_mtx.Unlock()
return fmt.Errorf("unknown rpty id - %d", id)
}
err = send_ws_data_for_xterm(rpty.ws, "iov", string(data))
if err != nil {
cts.rpty_mtx.Unlock()
return fmt.Errorf("failed to write rpty data(%d) to ws - %s", id, err.Error())
}
cts.rpty_mtx.Unlock()
return nil
}
func (cts *ServerConn) ReportPacket(route_id RouteId, pts_id PeerId, packet_type PACKET_KIND, event_data interface{}) error {
var r *ServerRoute
var ok bool
@ -654,6 +839,20 @@ func (cts *ServerConn) ReportPacket(route_id RouteId, pts_id PeerId, packet_type
return r.ReportPacket(pts_id, packet_type, event_data)
}
func (cts *ServerConn) HandleRptyEvent(packet_type PACKET_KIND, evt *RptyEvent) error {
switch packet_type {
case PACKET_KIND_RPTY_STOP:
// stop requested from the server
return cts.StopRptyWsById(evt.Id, string(evt.Data))
case PACKET_KIND_RPTY_DATA:
return cts.ReadRptyAndWriteWs(evt.Id, evt.Data)
}
// ignore other packet types
return nil
}
func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
var pkt *Packet
var err error
@ -688,12 +887,12 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
err = cts.pss.Send(MakeRouteStoppedPacket(RouteId(x.Route.RouteId), RouteOption(x.Route.ServiceOption), x.Route.TargetAddrStr, x.Route.TargetName, x.Route.ServiceAddrStr, x.Route.ServiceNetStr))
if err != nil {
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to send route_stopped event(%d,%s,%v,%s) to client %s - %s",
"Failed to send ROUTE_STOPPED event(%d,%s,%v,%s) to client %s - %s",
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr, err.Error())
goto done
} else {
cts.S.log.Write(cts.Sid, LOG_DEBUG,
"Sent route_stopped event(%d,%s,%v,%s) to client %s",
"Sent ROUTE_STOPPED event(%d,%s,%v,%s) to client %s",
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr)
}
@ -705,7 +904,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
if err != nil {
r.ReqStop()
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to send route_started event(%d,%s,%s,%s%v,%v) to client %s - %s",
"Failed to send ROUTE_STARTED event(%d,%s,%s,%s%v,%v) to client %s - %s",
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet, cts.RemoteAddr, err.Error())
goto done
}
@ -735,7 +934,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
if err != nil {
r.ReqStop()
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to send route_stopped event(%d,%s,%s,%v.%v) to client %s - %s",
"Failed to send ROUTE_STOPPED event(%d,%s,%s,%v.%v) to client %s - %s",
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet.String(), cts.RemoteAddr, err.Error())
goto done
}
@ -745,65 +944,28 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
}
case PACKET_KIND_PEER_STARTED:
// the connection from the client to a peer has been established
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportPacket(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STARTED, x.Peer)
if err != nil {
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_started event from %s for peer(%d,%d,%s,%s) - %s",
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
cts.S.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_started event from %s for peer(%d,%d,%s,%s)",
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
// invalid event data
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_started event from %s", cts.RemoteAddr)
}
fallthrough
case PACKET_KIND_PEER_ABORTED:
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportPacket(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_ABORTED, x.Peer)
if err != nil {
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_aborted event from %s for peer(%d,%d,%s,%s) - %s",
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
cts.S.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_aborted event from %s for peer(%d,%d,%s,%s)",
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
// invalid event data
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_aborted event from %s", cts.RemoteAddr)
}
fallthrough
case PACKET_KIND_PEER_STOPPED:
// the connection from the client to a peer has been established
var x *Packet_Peer
var ok bool
x, ok = pkt.U.(*Packet_Peer)
if ok {
err = cts.ReportPacket(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), PACKET_KIND_PEER_STOPPED, x.Peer)
err = cts.ReportPacket(RouteId(x.Peer.RouteId), PeerId(x.Peer.PeerId), pkt.Kind, x.Peer)
if err != nil {
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_stopped event from %s for peer(%d,%d,%s,%s) - %s",
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
"Failed to handle %s event from %s for peer(%d,%d,%s,%s) - %s",
pkt.Kind.String(), cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr, err.Error())
} else {
cts.S.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_stopped event from %s for peer(%d,%d,%s,%s)",
cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
"Handled %s event from %s for peer(%d,%d,%s,%s)",
pkt.Kind.String(), cts.RemoteAddr, x.Peer.RouteId, x.Peer.PeerId, x.Peer.LocalAddrStr, x.Peer.RemoteAddrStr)
}
} else {
// invalid event data
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_stopped event from %s", cts.RemoteAddr)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s event from %s", pkt.Kind.String(), cts.RemoteAddr)
}
case PACKET_KIND_PEER_DATA:
@ -812,19 +974,19 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
var ok bool
x, ok = pkt.U.(*Packet_Data)
if ok {
err = cts.ReportPacket(RouteId(x.Data.RouteId), PeerId(x.Data.PeerId), PACKET_KIND_PEER_DATA, x.Data.Data)
err = cts.ReportPacket(RouteId(x.Data.RouteId), PeerId(x.Data.PeerId), pkt.Kind, x.Data.Data)
if err != nil {
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to handle peer_data event from %s for peer(%d,%d) - %s",
cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId, err.Error())
"Failed to handle %s event from %s for peer(%d,%d) - %s",
pkt.Kind.String(), cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId, err.Error())
} else {
cts.S.log.Write(cts.Sid, LOG_DEBUG,
"Handled peer_data event from %s for peer(%d,%d)",
cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId)
"Handled %s event from %s for peer(%d,%d)",
pkt.Kind.String(), cts.RemoteAddr, x.Data.RouteId, x.Data.PeerId)
}
} else {
// invalid event data
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid peer_data event from %s", cts.RemoteAddr)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s event from %s", pkt.Kind.String(), cts.RemoteAddr)
}
case PACKET_KIND_CONN_DESC:
@ -833,13 +995,13 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
x, ok = pkt.U.(*Packet_Conn)
if ok {
if x.Conn.Token == "" {
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_desc packet from %s - blank token", cts.RemoteAddr)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s packet from %s - blank token", pkt.Kind.String(), cts.RemoteAddr)
cts.pss.Send(MakeConnErrorPacket(1, "blank token refused"))
cts.ReqStop() // TODO: is this desirable to disconnect?
} else if x.Conn.Token != cts.ClientToken.Get() {
_, err = strconv.ParseUint(x.Conn.Token, 10, int(unsafe.Sizeof(ConnId(0)) * 8))
if err == nil { // this is not != nil. this is to check if the token is numeric
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_desc packet from %s - numeric token '%s'", cts.RemoteAddr, x.Conn.Token)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s packet from %s - numeric token '%s'", pkt.Kind.String(), cts.RemoteAddr, x.Conn.Token)
cts.pss.Send(MakeConnErrorPacket(1, "numeric token refused"))
cts.ReqStop() // TODO: is this desirable to disconnect?
} else {
@ -848,7 +1010,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
if ok {
// error
cts.S.cts_mtx.Unlock()
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_desc packet from %s - duplicate token '%s'", cts.RemoteAddr, x.Conn.Token)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s packet from %s - duplicate token '%s'", pkt.Kind.String(), cts.RemoteAddr, x.Conn.Token)
cts.pss.Send(MakeConnErrorPacket(1, fmt.Sprintf("duplicate token refused - %s", x.Conn.Token)))
cts.ReqStop() // TODO: is this desirable to disconnect?
} else {
@ -863,7 +1025,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
}
}
} else {
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_desc packet from %s", cts.RemoteAddr)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s packet from %s", pkt.Kind.String(), cts.RemoteAddr)
}
case PACKET_KIND_CONN_NOTICE:
@ -879,23 +1041,42 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
}
}
} else {
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_notice packet from %s", cts.RemoteAddr)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s packet from %s", pkt.Kind.String(), cts.RemoteAddr)
}
/*case PACKET_KIND_RPTY_START:
case PACKET_KIND_RPTY_STOP:*/
case PACKET_KIND_RPTY_STARTED:
case PACKET_KIND_RPTY_STOPPED:
case PACKET_KIND_RPTY_ABORTED:
case PACKET_KIND_RPTY_EOF:
//case PACKET_KIND_RPTY_START: stop is never sent by the client to the server
case PACKET_KIND_RPTY_STOP:
fallthrough
case PACKET_KIND_RPTY_DATA:
// inspect the token
// find the right websocket handler...
// report it to the right websocket handler
var x *Packet_RptyEvt
var ok bool
x, ok = pkt.U.(*Packet_RptyEvt)
if ok {
err = cts.HandleRptyEvent(pkt.Kind, x.RptyEvt)
if err != nil {
cts.S.log.Write(cts.Sid, LOG_ERROR, "Failed to handle %s event for rpty(%d) from %s - %s", pkt.Kind.String(), x.RptyEvt.Id, cts.RemoteAddr, err.Error())
} else {
cts.S.log.Write(cts.Sid, LOG_ERROR, "Handled %s event for rpty(%d) from %s", pkt.Kind.String(), x.RptyEvt.Id, cts.RemoteAddr)
}
} else {
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s packet from %s", pkt.Kind.String(), cts.RemoteAddr)
}
}
}
done:
// arrange to break all rpty resources
cts.rpty_mtx.Lock()
if len(cts.rpty_map) > 0 {
var rpty *ServerRpty
for _, rpty = range cts.rpty_map {
rpty.ws.Close()
}
}
cts.rpty_mtx.Unlock()
cts.S.log.Write(cts.Sid, LOG_INFO, "RPC stream receiver ended")
}
@ -1245,9 +1426,9 @@ func (s *Server) WrapHttpHandler(handler ServerHttpHandler) http.Handler {
if status_code > 0 {
if err != nil {
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f - %s", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds(), err.Error())
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f - %s", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code, time_taken.Seconds(), err.Error())
} else {
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds())
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s %d %.9f", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code, time_taken.Seconds())
}
}
})
@ -1259,7 +1440,7 @@ func (s *Server) SafeWrapWebsocketHandler(handler websocket.Handler) http.Handle
!strings.Contains(strings.ToLower(req.Header.Get("Connection")), "upgrade") {
var status_code int
status_code = WriteEmptyRespHeader(w, http.StatusBadRequest)
s.log.Write("", LOG_INFO, "[%s] %s %s %d[non-websocket]", req.RemoteAddr, req.Method, req.URL.String(), status_code)
s.log.Write("", LOG_INFO, "[%s] %s %s %d[non-websocket]", req.RemoteAddr, req.Method, get_raw_url_path(req), status_code)
return
}
handler.ServeHTTP(w, req)
@ -1273,9 +1454,11 @@ func (s *Server) WrapWebsocketHandler(handler ServerWebsocketHandler) websocket.
var start_time time.Time
var time_taken time.Duration
var req *http.Request
var raw_url_path string
req = ws.Request()
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws]", req.RemoteAddr, req.Method, req.URL.String())
raw_url_path = get_raw_url_path(req)
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws]", req.RemoteAddr, req.Method, raw_url_path)
start_time = time.Now()
status_code, err = handler.ServeWebsocket(ws)
@ -1283,9 +1466,9 @@ func (s *Server) WrapWebsocketHandler(handler ServerWebsocketHandler) websocket.
if status_code > 0 {
if err != nil {
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f - %s", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds(), err.Error())
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f - %s", req.RemoteAddr, req.Method, raw_url_path, status_code, time_taken.Seconds(), err.Error())
} else {
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f", req.RemoteAddr, req.Method, req.URL.String(), status_code, time_taken.Seconds())
s.log.Write(handler.Identity(), LOG_INFO, "[%s] %s %s [ws] %d %.9f", req.RemoteAddr, req.Method, raw_url_path, status_code, time_taken.Seconds())
}
}
})
@ -1340,6 +1523,7 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi
s.bulletin = NewBulletin[*ServerEvent](&s, 1024)
s.ctl_addrs = list.New()
s.rpx_addrs = list.New()
s.pxy_addrs = list.New()
s.wpx_addrs = list.New()
@ -1410,16 +1594,32 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi
s.ctl_mux.Handle("/_pty/xterm.css/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_forbidden"}))
s.ctl_mux.Handle("/_pty/xterm.html",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "xterm.html"}))
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "xterm.html", mode: "pty"}))
s.ctl_mux.Handle("/_pty/xterm.html/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_forbidden"}))
s.ctl_mux.Handle("/_pty/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_redir:xterm.html"}))
/*
s.ctl_mux.Handle("/_rpts/ws",
s.SafeWrapWebsocketHandler(s.WrapWebsocketHandler(&server_rpts_ws{S: &s, Id: HS_ID_CTL})))
*/
s.ctl_mux.Handle("/_rpty/ws",
s.SafeWrapWebsocketHandler(s.WrapWebsocketHandler(&server_rpty_ws{S: &s, Id: HS_ID_CTL})))
s.ctl_mux.Handle("/_rpty/xterm.js",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "xterm.js"}))
s.ctl_mux.Handle("/_rpty/xterm.js/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_forbidden"}))
s.ctl_mux.Handle("/_rpty/xterm-addon-fit.js",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "xterm-addon-fit.js"}))
s.ctl_mux.Handle("/_rpty/xterm-addon-fit.js/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_forbidden"}))
s.ctl_mux.Handle("/_rpty/xterm.css",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "xterm.css"}))
s.ctl_mux.Handle("/_rpty/xterm.css/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_forbidden"}))
s.ctl_mux.Handle("/_rpty/xterm.html",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "xterm.html", mode: "rpty"}))
s.ctl_mux.Handle("/_rpty/xterm.html/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_forbidden"}))
s.ctl_mux.Handle("/_rpty/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_redir:xterm.html"}))
s.ctl = make([]*http.Server, len(cfg.CtlAddrs))
for i = 0; i < len(cfg.CtlAddrs); i++ {
@ -1434,6 +1634,23 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi
// ---------------------------------------------------------
s.rpx_mux = http.NewServeMux() // TODO: make /_init,_ssh,_ssh/ws,_http configurable...
s.rpx_mux.Handle("/", s.WrapHttpHandler(&server_rpx{ S: &s, Id: HS_ID_RPX }))
s.rpx = make([]*http.Server, len(cfg.RpxAddrs))
for i = 0; i < len(cfg.RpxAddrs); i++ {
s.rpx[i] = &http.Server{
Addr: cfg.RpxAddrs[i],
Handler: s.rpx_mux,
TLSConfig: cfg.RpxTls,
ErrorLog: hs_log,
// TODO: more settings
}
}
// ---------------------------------------------------------
s.pxy_mux = http.NewServeMux() // TODO: make /_init,_ssh,_ssh/ws,_http configurable...
s.pxy_mux.Handle("/_ssh/{conn_id}/",
@ -1702,6 +1919,59 @@ func (s *Server) RunCtlTask(wg *sync.WaitGroup) {
l_wg.Wait()
}
func (s *Server) RunRpxTask(wg *sync.WaitGroup) {
var err error
var rpx *http.Server
var idx int
var l_wg sync.WaitGroup
defer wg.Done()
for idx, rpx = range s.rpx {
l_wg.Add(1)
go func(i int, cs *http.Server) {
var l net.Listener
s.log.Write("", LOG_INFO, "RPX channel[%d] started on %s", i, s.Cfg.RpxAddrs[i])
if s.stop_req.Load() == false {
l, err = net.Listen(TcpAddrStrClass(cs.Addr), cs.Addr)
if err == nil {
if s.stop_req.Load() == false {
var node *list.Element
s.rpx_addrs_mtx.Lock()
node = s.rpx_addrs.PushBack(l.Addr().(*net.TCPAddr))
s.rpx_addrs_mtx.Unlock()
if s.Cfg.RpxTls == nil { // TODO: change this
err = cs.Serve(l)
} else {
err = cs.ServeTLS(l, "", "") // s.Cfg.RpxTls must provide a certificate and a key
}
s.rpx_addrs_mtx.Lock()
s.rpx_addrs.Remove(node)
s.rpx_addrs_mtx.Unlock()
} else {
err = fmt.Errorf("stop requested")
}
l.Close()
}
} else {
err = fmt.Errorf("stop requested")
}
if errors.Is(err, http.ErrServerClosed) {
s.log.Write("", LOG_INFO, "RPX channel[%d] ended", i)
} else {
s.log.Write("", LOG_ERROR, "RPX channel[%d] error - %s", i, err.Error())
}
l_wg.Done()
}(idx, rpx)
}
l_wg.Wait()
}
func (s *Server) RunPxyTask(wg *sync.WaitGroup) {
var err error
var pxy *http.Server
@ -1825,6 +2095,10 @@ func (s *Server) ReqStop() {
hs.Shutdown(s.Ctx) // to break s.ctl.Serve()
}
for _, hs = range s.rpx {
hs.Shutdown(s.Ctx) // to break s.rpx.Serve()
}
for _, hs = range s.pxy {
hs.Shutdown(s.Ctx) // to break s.pxy.Serve()
}
@ -1865,6 +2139,9 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
cts.stop_chan = make(chan bool, 8)
cts.pts_list = list.New()
cts.rpty_map = make(ServerRptyMap)
cts.rpty_map_by_ws = make(ServerRptyMapByWs)
s.cts_mtx.Lock()
if s.cts_limit > 0 && len(s.cts_map) >= s.cts_limit {
@ -1892,6 +2169,7 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
}
cts.Id = assigned_id
cts.Sid = fmt.Sprintf("%d", cts.Id) // id in string used for logging
cts.rpty_next_id = 1
_, ok = s.cts_map_by_addr[cts.RemoteAddr]
if ok {
@ -2190,6 +2468,11 @@ func (s *Server) StartCtlService() {
go s.RunCtlTask(&s.wg)
}
func (s *Server) StartRpxService() {
s.wg.Add(1)
go s.RunRpxTask(&s.wg)
}
func (s *Server) StartPxyService() {
s.wg.Add(1)
go s.RunPxyTask(&s.wg)

View File

@ -68,7 +68,7 @@ func (t *Transformer) Transform(dst []byte, src []byte, at_eof bool) (int, int,
err = transform.ErrShortSrc
done:
return ndst, nsrc, err
return ndst, nsrc, err
}
func (t *Transformer) copy_all(dst []byte, src []byte) (int, error) {

View File

@ -109,6 +109,7 @@ window.onload = function(event) {
const login_pty_part = document.getElementById('login-pty-part');
const username_field = document.getElementById('username');
const password_field= document.getElementById('password');
const qparams = new URLSearchParams(window.location.search);
if (xt_mode == 'ssh') {
login_ssh_part.style.display = 'block';
@ -208,6 +209,12 @@ window.onload = function(event) {
pathname = pathname.substring(0, pathname.lastIndexOf('/'));
let url = prefix + window.location.host + pathname + '/ws';
if (xt_mode == 'rpty') {
// when accessing rpty, the server requires a client token
let client_token = qparams.get('client-token');
if (client_token != null && client_token != '') url += '?client-token=' + client_token;
}
const socket = new WebSocket(url);
socket.binaryType = 'arraybuffer';