rpty at least working

This commit is contained in:
2025-08-12 02:50:10 +09:00
parent 05cb0823b4
commit d818acc53d
16 changed files with 568 additions and 403 deletions

236
server.go
View File

@ -42,6 +42,9 @@ type ServerRouteMap map[RouteId]*ServerRoute
type ServerPeerConnMap map[PeerId]*ServerPeerConn
type ServerSvcPortMap map[PortId]ConnRouteId
type ServerRptyMap map[uint64]*ServerRpty
type ServerRptyMapByWs map[*websocket.Conn]*ServerRpty
type ServerWpxResponseTransformer func(r *ServerRouteProxyInfo, resp *http.Response) io.Reader
type ServerWpxForeignPortProxyMaker func(wpx_type string, port_id string) (*ServerRouteProxyInfo, error)
@ -185,14 +188,14 @@ type ServerConn struct {
pts_mtx sync.Mutex
pts_list *list.List
rpty_next_id uint64
rpty_next_id uint64
rpty_mtx sync.Mutex
rpty_map map[uint64]*ServerRpty
rpty_map_by_ws map[*websocket.Conn]*ServerRpty
rpty_map ServerRptyMap
rpty_map_by_ws ServerRptyMapByWs
wg sync.WaitGroup
stop_req atomic.Bool
stop_chan chan bool
wg sync.WaitGroup
stop_req atomic.Bool
stop_chan chan bool
}
type ServerRoute struct {
@ -462,11 +465,6 @@ func (r *ServerRoute) ReportPacket(pts_id PeerId, packet_type PACKET_KIND, event
// ------------------------------------
func (rpty *ServerRpty) Stop() {
}
// ------------------------------------
func (cts *ServerConn) make_route_listener(id RouteId, option RouteOption, svc_requested_addr string) (*net.TCPListener, *net.TCPAddr, error) {
var l *net.TCPListener
var svcaddr *net.TCPAddr
@ -651,7 +649,6 @@ func (cts *ServerConn) ReqStopAllServerRoutes() {
cts.route_mtx.Unlock()
}
func (cts *ServerConn) StartRpty(ws *websocket.Conn) (*ServerRpty, error) {
var ok bool
var start_id uint64
@ -701,10 +698,123 @@ func (cts *ServerConn) StartRpty(ws *websocket.Conn) (*ServerRpty, error) {
return nil , err
}
// TODO: send request...
return rpty, nil
}
func (cts *ServerConn) StopRpty(ws *websocket.Conn) error {
// called by the websocket handler.
var rpty *ServerRpty
var id uint64
var ok bool
var err error
cts.rpty_mtx.Lock()
rpty, ok = cts.rpty_map_by_ws[ws]
if !ok {
return fmt.Errorf("unknown ws connection for rpty - %v", ws.RemoteAddr())
}
id = rpty.id
cts.rpty_mtx.Unlock()
// send the stop request to the client side
err = cts.pss.Send(MakeRptyStopPacket(id, ""))
if err != nil {
return fmt.Errorf("unable to send stop rpty request to client - %s", err.Error())
}
return nil
}
func (cts *ServerConn) StopRptyWsById(id uint64, msg string) error {
// called this when the stop requested comes from the client
// abort the websocket side.
var rpty *ServerRpty
var ok bool
cts.rpty_mtx.Lock()
rpty, ok = cts.rpty_map[id]
if !ok {
return fmt.Errorf("unknown rpty id %d", id)
}
rpty.ws.Close()
cts.rpty_mtx.Unlock()
cts.S.log.Write(cts.Sid, LOG_INFO, "Stopped rpty(%d) for %s - %s", id, cts.RemoteAddr, msg)
return nil
}
func (cts *ServerConn) WriteRpty(ws *websocket.Conn, data []byte) error {
var rpty *ServerRpty
var id uint64
var ok bool
var err error
cts.rpty_mtx.Lock()
rpty, ok = cts.rpty_map_by_ws[ws]
if !ok {
return fmt.Errorf("unknown ws connection for rpty - %v", ws.RemoteAddr())
}
id = rpty.id
cts.rpty_mtx.Unlock()
err = cts.pss.Send(MakeRptyDataPacket(id, data))
if err != nil {
return fmt.Errorf("unable to send rpty data to client - %s", err.Error())
}
return nil
}
func (cts *ServerConn) WriteRptySize(ws *websocket.Conn, data []byte) error {
var rpty *ServerRpty
var id uint64
var ok bool
var err error
cts.rpty_mtx.Lock()
rpty, ok = cts.rpty_map_by_ws[ws]
if !ok {
return fmt.Errorf("unknown ws connection for rpty size - %v", ws.RemoteAddr())
}
id = rpty.id
cts.rpty_mtx.Unlock()
err = cts.pss.Send(MakeRptySizePacket(id, data))
if err != nil {
return fmt.Errorf("unable to send rpty size to client - %s", err.Error())
}
return nil
}
func (cts *ServerConn) ReadRptyAndWriteWs(id uint64, data []byte) error {
var ok bool
var rpty *ServerRpty
var err error
cts.rpty_mtx.Lock()
rpty, ok = cts.rpty_map[id]
if !ok {
cts.rpty_mtx.Unlock()
return fmt.Errorf("unknown rpty id - %d", id)
}
err = send_ws_data_for_xterm(rpty.ws, "iov", string(data))
if err != nil {
cts.rpty_mtx.Unlock()
return fmt.Errorf("failed to write rpty data(%d) to ws - %s", id, err.Error())
}
cts.rpty_mtx.Unlock()
return nil
}
func (cts *ServerConn) ReportPacket(route_id RouteId, pts_id PeerId, packet_type PACKET_KIND, event_data interface{}) error {
var r *ServerRoute
var ok bool
@ -720,6 +830,20 @@ func (cts *ServerConn) ReportPacket(route_id RouteId, pts_id PeerId, packet_type
return r.ReportPacket(pts_id, packet_type, event_data)
}
func (cts *ServerConn) HandleRptyEvent(packet_type PACKET_KIND, evt *RptyEvent) error {
switch packet_type {
case PACKET_KIND_RPTY_STOP:
// stop requested from the server
return cts.StopRptyWsById(evt.Id, string(evt.Data))
case PACKET_KIND_RPTY_DATA:
return cts.ReadRptyAndWriteWs(evt.Id, evt.Data)
}
// ignore other packet types
return nil
}
func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
var pkt *Packet
var err error
@ -754,12 +878,12 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
err = cts.pss.Send(MakeRouteStoppedPacket(RouteId(x.Route.RouteId), RouteOption(x.Route.ServiceOption), x.Route.TargetAddrStr, x.Route.TargetName, x.Route.ServiceAddrStr, x.Route.ServiceNetStr))
if err != nil {
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to send route_stopped event(%d,%s,%v,%s) to client %s - %s",
"Failed to send ROUTE_STOPPED event(%d,%s,%v,%s) to client %s - %s",
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr, err.Error())
goto done
} else {
cts.S.log.Write(cts.Sid, LOG_DEBUG,
"Sent route_stopped event(%d,%s,%v,%s) to client %s",
"Sent ROUTE_STOPPED event(%d,%s,%v,%s) to client %s",
x.Route.RouteId, x.Route.TargetAddrStr, x.Route.ServiceOption, x.Route.ServiceNetStr, cts.RemoteAddr)
}
@ -771,7 +895,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
if err != nil {
r.ReqStop()
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to send route_started event(%d,%s,%s,%s%v,%v) to client %s - %s",
"Failed to send ROUTE_STARTED event(%d,%s,%s,%s%v,%v) to client %s - %s",
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet, cts.RemoteAddr, err.Error())
goto done
}
@ -801,7 +925,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
if err != nil {
r.ReqStop()
cts.S.log.Write(cts.Sid, LOG_ERROR,
"Failed to send route_stopped event(%d,%s,%s,%v.%v) to client %s - %s",
"Failed to send ROUTE_STOPPED event(%d,%s,%s,%v.%v) to client %s - %s",
r.Id, r.PtcAddr, r.SvcAddr.String(), r.SvcOption, r.SvcPermNet.String(), cts.RemoteAddr, err.Error())
goto done
}
@ -862,13 +986,13 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
x, ok = pkt.U.(*Packet_Conn)
if ok {
if x.Conn.Token == "" {
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_desc packet from %s - blank token", cts.RemoteAddr)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s packet from %s - blank token", pkt.Kind.String(), cts.RemoteAddr)
cts.pss.Send(MakeConnErrorPacket(1, "blank token refused"))
cts.ReqStop() // TODO: is this desirable to disconnect?
} else if x.Conn.Token != cts.ClientToken.Get() {
_, err = strconv.ParseUint(x.Conn.Token, 10, int(unsafe.Sizeof(ConnId(0)) * 8))
if err == nil { // this is not != nil. this is to check if the token is numeric
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_desc packet from %s - numeric token '%s'", cts.RemoteAddr, x.Conn.Token)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s packet from %s - numeric token '%s'", pkt.Kind.String(), cts.RemoteAddr, x.Conn.Token)
cts.pss.Send(MakeConnErrorPacket(1, "numeric token refused"))
cts.ReqStop() // TODO: is this desirable to disconnect?
} else {
@ -877,7 +1001,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
if ok {
// error
cts.S.cts_mtx.Unlock()
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_desc packet from %s - duplicate token '%s'", cts.RemoteAddr, x.Conn.Token)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s packet from %s - duplicate token '%s'", pkt.Kind.String(), cts.RemoteAddr, x.Conn.Token)
cts.pss.Send(MakeConnErrorPacket(1, fmt.Sprintf("duplicate token refused - %s", x.Conn.Token)))
cts.ReqStop() // TODO: is this desirable to disconnect?
} else {
@ -892,7 +1016,7 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
}
}
} else {
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_desc packet from %s", cts.RemoteAddr)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s packet from %s", pkt.Kind.String(), cts.RemoteAddr)
}
case PACKET_KIND_CONN_NOTICE:
@ -908,27 +1032,42 @@ func (cts *ServerConn) receive_from_stream(wg *sync.WaitGroup) {
}
}
} else {
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid conn_notice packet from %s", cts.RemoteAddr)
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s packet from %s", pkt.Kind.String(), cts.RemoteAddr)
}
/*case PACKET_KIND_RPTY_START:
case PACKET_KIND_RPTY_STOP:*/
case PACKET_KIND_RPTY_STARTED:
fallthrough
case PACKET_KIND_RPTY_STOPPED:
fallthrough
case PACKET_KIND_RPTY_ABORTED:
fallthrough
case PACKET_KIND_RPTY_EOF:
//case PACKET_KIND_RPTY_START: stop is never sent by the client to the server
case PACKET_KIND_RPTY_STOP:
fallthrough
case PACKET_KIND_RPTY_DATA:
// inspect the token
// find the right websocket handler...
// report it to the right websocket handler
var x *Packet_RptyEvt
var ok bool
x, ok = pkt.U.(*Packet_RptyEvt)
if ok {
err = cts.HandleRptyEvent(pkt.Kind, x.RptyEvt)
if err != nil {
cts.S.log.Write(cts.Sid, LOG_ERROR, "Failed to handle %s event for rpty(%d) from %s - %s", pkt.Kind.String(), x.RptyEvt.Id, cts.RemoteAddr, err.Error())
} else {
cts.S.log.Write(cts.Sid, LOG_ERROR, "Handled %s event for rpty(%d) from %s", pkt.Kind.String(), x.RptyEvt.Id, cts.RemoteAddr)
}
} else {
cts.S.log.Write(cts.Sid, LOG_ERROR, "Invalid %s packet from %s", pkt.Kind.String(), cts.RemoteAddr)
}
}
}
done:
// arrange to break all rpty resources
cts.rpty_mtx.Lock()
if len(cts.rpty_map) > 0 {
var rpty *ServerRpty
for _, rpty = range cts.rpty_map {
rpty.ws.Close()
}
}
cts.rpty_mtx.Unlock()
cts.S.log.Write(cts.Sid, LOG_INFO, "RPC stream receiver ended")
}
@ -1443,16 +1582,32 @@ func NewServer(ctx context.Context, name string, logger Logger, cfg *ServerConfi
s.ctl_mux.Handle("/_pty/xterm.css/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_forbidden"}))
s.ctl_mux.Handle("/_pty/xterm.html",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "xterm.html"}))
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "xterm.html", mode: "pty"}))
s.ctl_mux.Handle("/_pty/xterm.html/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_forbidden"}))
s.ctl_mux.Handle("/_pty/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_redir:xterm.html"}))
/*
s.ctl_mux.Handle("/_rpts/ws",
s.SafeWrapWebsocketHandler(s.WrapWebsocketHandler(&server_rpts_ws{S: &s, Id: HS_ID_CTL})))
*/
s.ctl_mux.Handle("/_rpty/ws",
s.SafeWrapWebsocketHandler(s.WrapWebsocketHandler(&server_rpty_ws{S: &s, Id: HS_ID_CTL})))
s.ctl_mux.Handle("/_rpty/xterm.js",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "xterm.js"}))
s.ctl_mux.Handle("/_rpty/xterm.js/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_forbidden"}))
s.ctl_mux.Handle("/_rpty/xterm-addon-fit.js",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "xterm-addon-fit.js"}))
s.ctl_mux.Handle("/_rpty/xterm-addon-fit.js/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_forbidden"}))
s.ctl_mux.Handle("/_rpty/xterm.css",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "xterm.css"}))
s.ctl_mux.Handle("/_rpty/xterm.css/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_forbidden"}))
s.ctl_mux.Handle("/_rpty/xterm.html",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "xterm.html", mode: "rpty"}))
s.ctl_mux.Handle("/_rpty/xterm.html/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_forbidden"}))
s.ctl_mux.Handle("/_rpty/",
s.WrapHttpHandler(&server_pty_xterm_file{ServerCtl: ServerCtl{S: &s, Id: HS_ID_CTL}, file: "_redir:xterm.html"}))
s.ctl = make([]*http.Server, len(cfg.CtlAddrs))
for i = 0; i < len(cfg.CtlAddrs); i++ {
@ -1898,6 +2053,9 @@ func (s *Server) AddNewServerConn(remote_addr *net.Addr, local_addr *net.Addr, p
cts.stop_chan = make(chan bool, 8)
cts.pts_list = list.New()
cts.rpty_map = make(ServerRptyMap)
cts.rpty_map_by_ws = make(ServerRptyMapByWs)
s.cts_mtx.Lock()
if s.cts_limit > 0 && len(s.cts_map) >= s.cts_limit {