added a event pipe to break unix.Poll better for local pty

This commit is contained in:
2025-08-21 21:10:06 +09:00
parent 7ec1387132
commit 9f8d3e2696
2 changed files with 89 additions and 23 deletions

View File

@ -41,6 +41,7 @@ func (pty *client_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
var out *os.File
var tty *os.File
var cmd *exec.Cmd
var pfd [2]int = [2]int{ -1, -1 }
var wg sync.WaitGroup
var conn_ready_chan chan bool
var err error
@ -63,9 +64,9 @@ func (pty *client_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
var n int
var err error
poll_fds = []unix.PollFd{
unix.PollFd{Fd: int32(out.Fd()), Events: unix.POLLIN},
unix.PollFd{Fd: int32(pfd[0]), Events: unix.POLLIN},
}
c.stats.pty_sessions.Add(1)
@ -84,6 +85,10 @@ func (pty *client_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
c.log.Write(pty.Id, LOG_DEBUG, "[%s] EOF detected on pty stdout", req.RemoteAddr)
break
}
if (poll_fds[1].Revents & (unix.POLLERR | unix.POLLHUP | unix.POLLNVAL)) != 0 {
c.log.Write(pty.Id, LOG_DEBUG, "[%s] EOF detected on pty event pipe", req.RemoteAddr)
break
}
if (poll_fds[0].Revents & unix.POLLIN) != 0 {
n, err = out.Read(buf[:])
@ -102,6 +107,10 @@ func (pty *client_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
break
}
}
if (poll_fds[1].Revents & unix.POLLIN) != 0 {
c.log.Write(pty.Id, LOG_DEBUG, "[%s] Stop request noticed on pty event pipe", req.RemoteAddr)
break
}
}
c.stats.pty_sessions.Add(-1)
}
@ -120,6 +129,7 @@ ws_recv_loop:
switch ev.Type {
case "open":
if tty == nil && len(ev.Data) == 2 {
// not using username and password for now...
//username = string(ev.Data[0])
//password = string(ev.Data[1])
@ -128,23 +138,38 @@ ws_recv_loop:
var err error
defer wg.Done()
err = unix.Pipe(pfd[:])
if err != nil {
c.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to create event pipe for pty - %s", req.RemoteAddr, err.Error())
send_ws_data_for_xterm(ws, "error", err.Error())
ws.Close() // dirty way to flag out the error
return
}
cmd, tty, err = connect_pty(c.pty_shell, c.pty_user)
if err != nil {
c.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to connect pty - %s", req.RemoteAddr, err.Error())
send_ws_data_for_xterm(ws, "error", err.Error())
ws.Close() // dirty way to flag out the error
} else {
err = send_ws_data_for_xterm(ws, "status", "opened")
if err != nil {
c.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to write opened event to websocket - %s", req.RemoteAddr, err.Error())
ws.Close() // dirty way to flag out the error
} else {
c.log.Write(pty.Id, LOG_DEBUG, "[%s] Opened pty session", req.RemoteAddr)
out = tty
in = tty
conn_ready_chan <- true
}
unix.Close(pfd[0]); pfd[0] = -1
unix.Close(pfd[1]); pfd[1] = -1
return
}
err = send_ws_data_for_xterm(ws, "status", "opened")
if err != nil {
c.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to write opened event to websocket - %s", req.RemoteAddr, err.Error())
ws.Close() // dirty way to flag out the error
unix.Close(pfd[0]); pfd[0] = -1
unix.Close(pfd[1]); pfd[1] = -1
return
}
c.log.Write(pty.Id, LOG_DEBUG, "[%s] Opened pty session", req.RemoteAddr)
out = tty
in = tty
conn_ready_chan <- true
}()
}
@ -153,6 +178,9 @@ ws_recv_loop:
tty.Close()
tty = nil
}
if pfd[1] >= 0 {
unix.Write(pfd[1], []byte{0})
}
break ws_recv_loop
case "iov":
@ -194,6 +222,11 @@ done:
if tty != nil { tty.Close() }
if cmd != nil { cmd.Wait() }
wg.Wait()
// close the event pipe after all goroutines are over
if pfd[0] >= 0 { unix.Close(pfd[0]) }
if pfd[1] >= 0 { unix.Close(pfd[1]) }
c.log.Write(pty.Id, LOG_DEBUG, "[%s] Ended pty session", req.RemoteAddr)
return http.StatusOK, err

View File

@ -49,6 +49,7 @@ func (pty *server_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
var out *os.File
var tty *os.File
var cmd *exec.Cmd
var pfd [2]int = [2]int{ -1, -1 }
var wg sync.WaitGroup
var conn_ready_chan chan bool
var err error
@ -73,6 +74,7 @@ func (pty *server_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
poll_fds = []unix.PollFd{
unix.PollFd{Fd: int32(out.Fd()), Events: unix.POLLIN},
unix.PollFd{Fd: int32(pfd[0]), Events: unix.POLLIN},
}
s.stats.pty_sessions.Add(1)
@ -91,6 +93,10 @@ func (pty *server_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
s.log.Write(pty.Id, LOG_DEBUG, "[%s] EOF detected on pty stdout", req.RemoteAddr)
break
}
if (poll_fds[1].Revents & (unix.POLLERR | unix.POLLHUP | unix.POLLNVAL)) != 0 {
s.log.Write(pty.Id, LOG_DEBUG, "[%s] EOF detected on pty event pipe", req.RemoteAddr)
break
}
if (poll_fds[0].Revents & unix.POLLIN) != 0 {
n, err = out.Read(buf[:])
@ -109,6 +115,10 @@ func (pty *server_pty_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
break
}
}
if (poll_fds[1].Revents & unix.POLLIN) != 0 {
s.log.Write(pty.Id, LOG_DEBUG, "[%s] Stop request noticed on pty event pipe", req.RemoteAddr)
break
}
}
s.stats.pty_sessions.Add(-1)
}
@ -136,23 +146,38 @@ ws_recv_loop:
var err error
defer wg.Done()
err = unix.Pipe(pfd[:])
if err != nil {
s.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to create event pipe for pty - %s", req.RemoteAddr, err.Error())
send_ws_data_for_xterm(ws, "error", err.Error())
ws.Close() // dirty way to flag out the error
return
}
cmd, tty, err = connect_pty(s.pty_shell, s.pty_user)
if err != nil {
s.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to connect pty - %s", req.RemoteAddr, err.Error())
send_ws_data_for_xterm(ws, "error", err.Error())
ws.Close() // dirty way to flag out the error - this will make websocket.MessageReceive to fail
} else {
err = send_ws_data_for_xterm(ws, "status", "opened")
if err != nil {
s.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to write 'opened' event to websocket - %s", req.RemoteAddr, err.Error())
ws.Close() // dirty way to flag out the error
} else {
s.log.Write(pty.Id, LOG_DEBUG, "[%s] Opened pty session", req.RemoteAddr)
out = tty
in = tty
conn_ready_chan <- true
}
unix.Close(pfd[0]); pfd[0] = -1
unix.Close(pfd[1]); pfd[1] = -1
return
}
err = send_ws_data_for_xterm(ws, "status", "opened")
if err != nil {
s.log.Write(pty.Id, LOG_ERROR, "[%s] Failed to write 'opened' event to websocket - %s", req.RemoteAddr, err.Error())
ws.Close() // dirty way to flag out the error
unix.Close(pfd[0]); pfd[0] = -1
unix.Close(pfd[1]); pfd[1] = -1
return
}
s.log.Write(pty.Id, LOG_DEBUG, "[%s] Opened pty session", req.RemoteAddr)
out = tty
in = tty
conn_ready_chan <- true
}()
}
@ -161,6 +186,9 @@ ws_recv_loop:
tty.Close()
tty = nil
}
if pfd[1] >= 0 {
unix.Write(pfd[1], []byte{0})
}
break ws_recv_loop
case "iov":
@ -202,6 +230,11 @@ done:
if tty != nil { tty.Close() }
if cmd != nil { cmd.Wait() }
wg.Wait()
// close the event pipe after all goroutines are over
if pfd[0] >= 0 { unix.Close(pfd[0]) }
if pfd[1] >= 0 { unix.Close(pfd[1]) }
s.log.Write(pty.Id, LOG_DEBUG, "[%s] Ended pty session", req.RemoteAddr)
return http.StatusOK, err