226 lines
5.6 KiB
Go
226 lines
5.6 KiB
Go
package hodu
|
|
|
|
// Rxc - remote exec (server-side triggered, client-side executed)
|
|
|
|
import "fmt"
|
|
|
|
import "golang.org/x/net/websocket"
|
|
|
|
func (cts *ServerConn) add_new_rxc(sink ServerRxcSink, ws *websocket.Conn, kind string, script string) (*ServerRxc, error) {
|
|
var ok bool
|
|
var start_id uint64
|
|
var assigned_id uint64
|
|
var rxc *ServerRxc
|
|
var pkt *Packet
|
|
var err error
|
|
|
|
cts.rxc_mtx.Lock()
|
|
start_id = cts.rxc_next_id
|
|
for {
|
|
_, ok = cts.rxc_map[cts.rxc_next_id]
|
|
if !ok {
|
|
assigned_id = cts.rxc_next_id
|
|
cts.rxc_next_id++
|
|
if cts.rxc_next_id == 0 { cts.rxc_next_id++ }
|
|
break
|
|
}
|
|
cts.rxc_next_id++
|
|
if cts.rxc_next_id == 0 { cts.rxc_next_id++ }
|
|
if cts.rxc_next_id == start_id {
|
|
cts.rxc_mtx.Unlock()
|
|
return nil, fmt.Errorf("unable to assign id")
|
|
}
|
|
}
|
|
|
|
if ws != nil {
|
|
_, ok = cts.rxc_map_by_ws[ws]
|
|
if ok {
|
|
cts.rxc_mtx.Unlock()
|
|
return nil, fmt.Errorf("connection already associated with rxc. possibly internal error")
|
|
}
|
|
}
|
|
|
|
rxc = &ServerRxc{
|
|
id: assigned_id,
|
|
ws: ws,
|
|
sink: sink,
|
|
}
|
|
|
|
cts.rxc_map[assigned_id] = rxc
|
|
if ws != nil { cts.rxc_map_by_ws[ws] = rxc }
|
|
cts.rxc_mtx.Unlock()
|
|
|
|
pkt, err = MakeRxcStartPacket(assigned_id, kind, script)
|
|
if err != nil {
|
|
cts.rxc_mtx.Lock()
|
|
delete(cts.rxc_map, assigned_id)
|
|
if ws != nil { delete(cts.rxc_map_by_ws, ws) }
|
|
cts.rxc_mtx.Unlock()
|
|
return nil, fmt.Errorf("failed to make rxc start packet - %s", err.Error())
|
|
}
|
|
|
|
err = cts.pss.Send(pkt)
|
|
if err != nil {
|
|
cts.rxc_mtx.Lock()
|
|
delete(cts.rxc_map, assigned_id)
|
|
if ws != nil { delete(cts.rxc_map_by_ws, ws) }
|
|
cts.rxc_mtx.Unlock()
|
|
return nil, fmt.Errorf("failed to send rxc start packet - %s", err.Error())
|
|
}
|
|
|
|
cts.S.stats.rxc_sessions.Add(1)
|
|
return rxc, nil
|
|
}
|
|
|
|
func (cts *ServerConn) StartRxcForWs(ws *websocket.Conn, kind string, script string) (*ServerRxc, error) {
|
|
// start a single task over rxc for a websocket
|
|
return cts.add_new_rxc(&ServerRxcWebsocketSink{ws: ws}, ws, kind, script)
|
|
}
|
|
|
|
func (cts *ServerConn) RunRxcJob(run *ServerRxcJobRun, kind string, script string) error {
|
|
var rxc *ServerRxc
|
|
var err error
|
|
|
|
rxc, err = cts.add_new_rxc(run, nil, kind, script)
|
|
if err != nil { return err }
|
|
run.mark_started(rxc.id)
|
|
return nil
|
|
}
|
|
|
|
func (cts *ServerConn) find_rxc_by_ws(ws *websocket.Conn) (*ServerRxc, bool) {
|
|
var rxc *ServerRxc
|
|
var ok bool
|
|
|
|
cts.rxc_mtx.Lock()
|
|
rxc, ok = cts.rxc_map_by_ws[ws]
|
|
cts.rxc_mtx.Unlock()
|
|
return rxc, ok
|
|
}
|
|
|
|
func (cts *ServerConn) find_rxc_by_id(id uint64) (*ServerRxc, bool) {
|
|
var rxc *ServerRxc
|
|
var ok bool
|
|
|
|
cts.rxc_mtx.Lock()
|
|
rxc, ok = cts.rxc_map[id]
|
|
cts.rxc_mtx.Unlock()
|
|
return rxc, ok
|
|
}
|
|
|
|
func (cts *ServerConn) remove_rxc_by_id(id uint64) (*ServerRxc, bool) {
|
|
var rxc *ServerRxc
|
|
var ok bool
|
|
|
|
cts.rxc_mtx.Lock()
|
|
rxc, ok = cts.rxc_map[id]
|
|
if ok {
|
|
delete(cts.rxc_map, id)
|
|
if rxc.ws != nil {
|
|
delete(cts.rxc_map_by_ws, rxc.ws)
|
|
}
|
|
}
|
|
cts.rxc_mtx.Unlock()
|
|
return rxc, ok
|
|
}
|
|
|
|
func (cts *ServerConn) StopRxcForWs(ws *websocket.Conn) error {
|
|
// called by the websocket handler.
|
|
var rxc *ServerRxc
|
|
var ok bool
|
|
var err error
|
|
|
|
rxc, ok = cts.find_rxc_by_ws(ws)
|
|
if !ok {
|
|
cts.S.log.Write(cts.Sid, LOG_ERROR, "Unknown websocket connection for rxc - websocket %v", ws.RemoteAddr())
|
|
return fmt.Errorf("unknown websocket connection for rxc - %v", ws.RemoteAddr())
|
|
}
|
|
|
|
err = cts.pss.Send(MakeRxcStopPacket(rxc.id, 0, ""))
|
|
if err != nil {
|
|
cts.S.log.Write(cts.Sid, LOG_ERROR, "Failed to send %s(%d) for server %s websocket %v - %s", PACKET_KIND_RXC_STOP.String(), rxc.id, cts.RemoteAddr, ws.RemoteAddr(), err.Error())
|
|
}
|
|
|
|
_, ok = cts.remove_rxc_by_id(rxc.id)
|
|
if ok {
|
|
cts.S.stats.rxc_sessions.Add(-1)
|
|
}
|
|
|
|
rxc.ReqStop()
|
|
cts.S.log.Write(cts.Sid, LOG_INFO, "Stopped rxc(%d) for server %s websocket %v", rxc.id, cts.RemoteAddr, ws.RemoteAddr())
|
|
return nil
|
|
}
|
|
|
|
func (cts *ServerConn) SendStopRxcById(id uint64, flags uint64) error {
|
|
var ok bool
|
|
var err error
|
|
|
|
_, ok = cts.find_rxc_by_id(id)
|
|
if !ok {
|
|
return fmt.Errorf("unknown rxc id %d", id)
|
|
}
|
|
|
|
err = cts.pss.Send(MakeRxcStopPacket(id, flags, ""))
|
|
if err != nil {
|
|
return fmt.Errorf("failed to send %s(%d) to client - %s", PACKET_KIND_RXC_STOP.String(), id, err.Error())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (cts *ServerConn) StopRxcSinkById(id uint64, flags uint64, msg string) error {
|
|
var rxc *ServerRxc
|
|
var ok bool
|
|
var err error
|
|
|
|
rxc, ok = cts.remove_rxc_by_id(id)
|
|
if !ok {
|
|
return fmt.Errorf("unknown rxc id %d", id)
|
|
}
|
|
cts.S.stats.rxc_sessions.Add(-1)
|
|
|
|
if rxc.sink != nil {
|
|
err = rxc.sink.Stop(flags, msg)
|
|
if err != nil { return err }
|
|
}
|
|
|
|
cts.S.log.Write(cts.Sid, LOG_INFO, "Stopped rxc job(%d) run for client(%s) from %s", id, cts.ClientToken.Get(), cts.RemoteAddr)
|
|
return nil
|
|
}
|
|
|
|
func (cts *ServerConn) WriteRxcForWs(ws *websocket.Conn, data []byte) error {
|
|
var rxc *ServerRxc
|
|
var ok bool
|
|
var err error
|
|
|
|
rxc, ok = cts.find_rxc_by_ws(ws)
|
|
if !ok { return fmt.Errorf("unknown ws connection for rxc - %v", ws.RemoteAddr()) }
|
|
|
|
err = cts.pss.Send(MakeRxcDataPacket(rxc.id, RXC_DATA_FLAG_NONE, data))
|
|
if err != nil { return fmt.Errorf("unable to send rxc data to client - %s", err.Error()) }
|
|
|
|
return nil
|
|
}
|
|
|
|
func (cts *ServerConn) ReadRxcAndWriteSinkById(id uint64, flags uint64, data []byte) error {
|
|
var rxc *ServerRxc
|
|
var ok bool
|
|
|
|
rxc, ok = cts.find_rxc_by_id(id)
|
|
if !ok { return fmt.Errorf("unknown rxc id - %d", id) }
|
|
if rxc.sink == nil { return fmt.Errorf("missing rxc sink for id %d", id) }
|
|
|
|
return rxc.sink.Write(flags, data)
|
|
}
|
|
|
|
func (cts *ServerConn) HandleRxcEvent(packet_type PACKET_KIND, evt *RxcEvent) error {
|
|
switch packet_type {
|
|
case PACKET_KIND_RXC_STOP:
|
|
return cts.StopRxcSinkById(evt.Id, evt.Flags, string(evt.Data))
|
|
|
|
case PACKET_KIND_RXC_DATA:
|
|
return cts.ReadRxcAndWriteSinkById(evt.Id, evt.Flags, evt.Data)
|
|
}
|
|
|
|
return nil
|
|
}
|