593 lines
15 KiB
Go
593 lines
15 KiB
Go
package hodu
|
|
|
|
|
|
import "encoding/base64"
|
|
import "encoding/json"
|
|
import "fmt"
|
|
import "net/http"
|
|
import "strconv"
|
|
import "sync"
|
|
import "time"
|
|
|
|
import "golang.org/x/net/websocket"
|
|
|
|
// rxc - remote exec
|
|
type server_rxc_ws struct {
|
|
S *Server
|
|
Id string
|
|
ws *websocket.Conn
|
|
}
|
|
|
|
func (rxc *server_rxc_ws) Identity() string {
|
|
return rxc.Id
|
|
}
|
|
|
|
func (rxc *server_rxc_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
|
|
var s *Server
|
|
var req *http.Request
|
|
var token string
|
|
var cts *ServerConn
|
|
var rp *ServerRxc
|
|
var wg sync.WaitGroup
|
|
var err error
|
|
|
|
s = rxc.S
|
|
req = ws.Request()
|
|
token = req.FormValue("client-token")
|
|
if token == "" {
|
|
ws.Close()
|
|
return http.StatusBadRequest, fmt.Errorf("no client token specified")
|
|
}
|
|
|
|
cts = s.FindServerConnByClientToken(token)
|
|
if cts == nil {
|
|
ws.Close()
|
|
return http.StatusBadRequest, fmt.Errorf("invalid client token - %s", token)
|
|
}
|
|
|
|
ws_recv_loop:
|
|
for {
|
|
var msg []byte
|
|
err = websocket.Message.Receive(ws, &msg)
|
|
if err != nil {
|
|
s.log.Write(rxc.Id, LOG_ERROR, "[%s] websocket receive error - %s", req.RemoteAddr, err.Error())
|
|
goto done
|
|
}
|
|
|
|
if len(msg) > 0 {
|
|
var ev json_xterm_ws_event
|
|
err = json.Unmarshal(msg, &ev)
|
|
if err == nil {
|
|
switch ev.Type {
|
|
case "open":
|
|
if rp == nil && len(ev.Data) == 2 {
|
|
var type_ string
|
|
var script string
|
|
|
|
type_ = string(ev.Data[0])
|
|
script = string(ev.Data[1])
|
|
//options = string(ev.Data[2])
|
|
|
|
rp, err = cts.StartRxcForWs(ws, type_, script)
|
|
if err != nil {
|
|
s.log.Write(rxc.Id, LOG_ERROR, "[%s] Failed to connect rxc - %s", req.RemoteAddr, err.Error())
|
|
send_ws_data_for_xterm(ws, "error", err.Error())
|
|
//ws.Close() // dirty way to flag out the error by making websocket.Message.Receive() fail
|
|
ws.SetReadDeadline(time.Now()) // slightly cleaner way to break the main loop
|
|
} else {
|
|
err = send_ws_data_for_xterm(ws, "status", "opened")
|
|
if err != nil {
|
|
s.log.Write(rxc.Id, LOG_ERROR, "[%s] Failed to write 'opened' event to websocket - %s", req.RemoteAddr, err.Error())
|
|
//ws.Close() // dirty way to flag out the error
|
|
ws.SetReadDeadline(time.Now()) // slightly cleaner way to break the main loop
|
|
} else {
|
|
s.log.Write(rxc.Id, LOG_DEBUG, "[%s] Opened rxc session", req.RemoteAddr)
|
|
}
|
|
}
|
|
}
|
|
|
|
case "close":
|
|
// just break out of the loop and let the remainder to close resources
|
|
break ws_recv_loop
|
|
|
|
case "iov":
|
|
var i int
|
|
for i, _ = range ev.Data {
|
|
var bytes []byte
|
|
bytes, err = base64.StdEncoding.DecodeString(ev.Data[i])
|
|
if err != nil {
|
|
s.log.Write(rxc.Id, LOG_WARN, "[%s] Invalid rxc iov data received - %s", req.RemoteAddr, ev.Data[i])
|
|
} else {
|
|
cts.WriteRxcForWs(ws, bytes)
|
|
// ignore error for now
|
|
}
|
|
}
|
|
|
|
default:
|
|
send_ws_data_for_xterm(ws, "error", fmt.Sprintf("invalid rxc event type - %s", ev.Type));
|
|
s.log.Write(rxc.Id, LOG_WARN, "[%s] Invalid rxc event type received - %s", req.RemoteAddr, ev.Type)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
done:
|
|
cts.StopRxcForWs(ws)
|
|
ws.Close() // don't care about multiple closes
|
|
|
|
wg.Wait()
|
|
s.log.Write(rxc.Id, LOG_DEBUG, "[%s] Ended rxc session for %s", req.RemoteAddr, token)
|
|
|
|
return http.StatusOK, err
|
|
}
|
|
|
|
// HTTP control endpoints for RXC jobs:
|
|
// GET /_rxc
|
|
// POST /_rxc
|
|
// GET /_rxc/{job_id}
|
|
// DELETE /_rxc/{job_id}
|
|
// POST /_rxc/{job_id}/stop
|
|
// GET /_rxc/{job_id}/runs
|
|
// GET /_rxc/{job_id}/runs/{conn_id}
|
|
// DELETE /_rxc/{job_id}/runs/{conn_id}
|
|
// POST /_rxc/{job_id}/runs/{conn_id}/stop
|
|
//
|
|
// The websocket endpoint /_rxc/ws is kept for interactive single-client RXC.
|
|
|
|
/*
|
|
• Example to run a command on all connected clients:
|
|
|
|
curl -X POST http://127.0.0.1:9999/_rxc \
|
|
-H 'Content-Type: application/json' \
|
|
-d '{
|
|
"type": "bash",
|
|
"script": "uname -a"
|
|
}'
|
|
|
|
Example to run on specific clients only:
|
|
|
|
curl -X POST http://127.0.0.1:9999/_rxc \
|
|
-H 'Content-Type: application/json' \
|
|
-d '{
|
|
"clients": ["1", "client-token-abc"],
|
|
"type": "bash",
|
|
"script": "hostname; id"
|
|
}'
|
|
|
|
Then inspect the created job:
|
|
|
|
curl http://127.0.0.1:9999/_rxc/1
|
|
curl http://127.0.0.1:9999/_rxc/1/runs
|
|
curl http://127.0.0.1:9999/_rxc/1/runs/1
|
|
|
|
Stop a job:
|
|
|
|
curl -X POST http://127.0.0.1:9999/_rxc/1/stop
|
|
|
|
Stop one run only:
|
|
|
|
curl -X POST http://127.0.0.1:9999/_rxc/1/runs/1/stop
|
|
|
|
Delete a retained job record:
|
|
|
|
curl -X DELETE http://127.0.0.1:9999/_rxc/1
|
|
|
|
Delete a retained run record:
|
|
|
|
curl -X DELETE http://127.0.0.1:9999/_rxc/1/runs/1
|
|
|
|
If your control server uses a prefix, prepend it, for example http://127.0.0.1:9999/api/_rxc.
|
|
*/
|
|
|
|
type json_in_server_rxc struct {
|
|
Clients []string `json:"clients"`
|
|
Type string `json:"type"`
|
|
Script string `json:"script"`
|
|
}
|
|
|
|
type json_out_server_rxc_job struct {
|
|
JobId uint64 `json:"job-id"`
|
|
Type string `json:"type"`
|
|
Script string `json:"script"`
|
|
Status string `json:"status"`
|
|
CreatedMilli int64 `json:"created-milli"`
|
|
DoneMilli int64 `json:"done-milli"`
|
|
TargetCount int `json:"target-count"`
|
|
RunningCount int `json:"running-count"`
|
|
StoppingCount int `json:"stopping-count"`
|
|
StoppedCount int `json:"stopped-count"`
|
|
FailedCount int `json:"failed-count"`
|
|
}
|
|
|
|
type json_out_server_rxc_run struct {
|
|
JobId uint64 `json:"job-id"`
|
|
ConnId ConnId `json:"conn-id"`
|
|
ClientToken string `json:"client-token"`
|
|
RxcId uint64 `json:"rxc-id"`
|
|
Status string `json:"status"`
|
|
ExitCode int `json:"exit-code"`
|
|
StopMsg string `json:"stop-msg"`
|
|
CreatedMilli int64 `json:"created-milli"`
|
|
StartedMilli int64 `json:"started-milli"`
|
|
StoppedMilli int64 `json:"stopped-milli"`
|
|
Stdout []byte `json:"stdout,omitempty"`
|
|
StdoutTruncated bool `json:"stdout-truncated"`
|
|
Stderr []byte `json:"stderr,omitempty"`
|
|
StderrTruncated bool `json:"stderr-truncated"`
|
|
}
|
|
|
|
type server_ctl_rxc struct {
|
|
ServerCtl
|
|
}
|
|
|
|
type server_ctl_rxc_id struct {
|
|
ServerCtl
|
|
}
|
|
|
|
type server_ctl_rxc_id_stop struct {
|
|
ServerCtl
|
|
}
|
|
|
|
type server_ctl_rxc_id_runs struct {
|
|
ServerCtl
|
|
}
|
|
|
|
type server_ctl_rxc_id_runs_id struct {
|
|
ServerCtl
|
|
}
|
|
|
|
type server_ctl_rxc_id_runs_id_stop struct {
|
|
ServerCtl
|
|
}
|
|
|
|
func server_rxc_job_to_json(job *ServerRxcJob) json_out_server_rxc_job {
|
|
var js json_out_server_rxc_job
|
|
var done time.Time
|
|
|
|
js.JobId = job.Id
|
|
js.Type = job.Type
|
|
js.Script = job.Script
|
|
js.CreatedMilli = job.Created.UnixMilli()
|
|
done = job.get_done_time()
|
|
if !done.IsZero() { js.DoneMilli = done.UnixMilli() }
|
|
|
|
job.run_mtx.Lock()
|
|
js.TargetCount = len(job.run_map)
|
|
js.RunningCount = job.running_run_count
|
|
js.StoppingCount = job.stopping_run_count
|
|
js.StoppedCount = job.stopped_run_count
|
|
js.FailedCount = job.failed_run_count
|
|
job.run_mtx.Unlock()
|
|
|
|
switch {
|
|
case js.TargetCount <= 0:
|
|
js.Status = "done"
|
|
case js.RunningCount > 0 || js.StoppingCount > 0:
|
|
js.Status = "running"
|
|
case js.StoppedCount > 0 || js.FailedCount > 0:
|
|
js.Status = "done"
|
|
default:
|
|
js.Status = "starting"
|
|
}
|
|
|
|
return js
|
|
}
|
|
|
|
func server_rxc_run_to_json(run *ServerRxcJobRun, with_output bool) json_out_server_rxc_run {
|
|
var js json_out_server_rxc_run
|
|
|
|
run.mtx.Lock()
|
|
js.JobId = run.Job.Id
|
|
js.ConnId = run.ConnId
|
|
js.ClientToken = run.ClientToken
|
|
js.RxcId = run.RxcId
|
|
js.Status = run.Status
|
|
js.ExitCode = GetRxcStopExitCode(run.StopFlags)
|
|
js.StopMsg = run.StopMsg
|
|
js.CreatedMilli = run.Created.UnixMilli()
|
|
if !run.Started.IsZero() { js.StartedMilli = run.Started.UnixMilli() }
|
|
if !run.Stopped.IsZero() { js.StoppedMilli = run.Stopped.UnixMilli() }
|
|
js.Stdout = make([]byte, 0) // i don't want to show nil when empty
|
|
js.Stderr = make([]byte, 0) // i don't want to show nil when empty
|
|
js.StdoutTruncated = run.OutputTruncated[0]
|
|
if with_output { js.Stdout = append(js.Stdout, run.Output[0]...) }
|
|
js.StderrTruncated = run.OutputTruncated[1]
|
|
if with_output { js.Stderr = append(js.Stderr, run.Output[1]...) }
|
|
run.mtx.Unlock()
|
|
|
|
return js
|
|
}
|
|
|
|
func (ctl *server_ctl_rxc) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
|
var s *Server
|
|
var status_code int
|
|
var je *json.Encoder
|
|
var err error
|
|
|
|
s = ctl.S
|
|
je = json.NewEncoder(w)
|
|
|
|
switch req.Method {
|
|
case http.MethodGet:
|
|
var jobs []json_out_server_rxc_job
|
|
var job *ServerRxcJob
|
|
|
|
jobs = make([]json_out_server_rxc_job, 0)
|
|
for _, job = range s.snapshot_server_rxc_jobs() {
|
|
jobs = append(jobs, server_rxc_job_to_json(job))
|
|
}
|
|
|
|
status_code = WriteJsonRespHeader(w, http.StatusOK)
|
|
if err = je.Encode(jobs); err != nil { goto oops }
|
|
|
|
case http.MethodPost:
|
|
var x json_in_server_rxc
|
|
var job *ServerRxcJob
|
|
|
|
err = json.NewDecoder(req.Body).Decode(&x)
|
|
if err != nil {
|
|
status_code = WriteEmptyRespHeader(w, http.StatusBadRequest)
|
|
goto oops
|
|
}
|
|
|
|
job, err = s.StartRxcJob(x.Clients, x.Type, x.Script)
|
|
if err != nil {
|
|
status_code = WriteJsonRespHeader(w, http.StatusBadRequest)
|
|
je.Encode(JsonErrmsg{Text: err.Error()})
|
|
goto oops
|
|
}
|
|
|
|
status_code = WriteJsonRespHeader(w, http.StatusAccepted)
|
|
if err = je.Encode(server_rxc_job_to_json(job)); err != nil { goto oops }
|
|
|
|
default:
|
|
status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed)
|
|
}
|
|
|
|
return status_code, nil
|
|
|
|
oops:
|
|
return status_code, err
|
|
}
|
|
|
|
func (ctl *server_ctl_rxc_id) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
|
var s *Server
|
|
var status_code int
|
|
var je *json.Encoder
|
|
var job_id string
|
|
var job *ServerRxcJob
|
|
var err error
|
|
|
|
s = ctl.S
|
|
je = json.NewEncoder(w)
|
|
job_id = req.PathValue("job_id")
|
|
|
|
job, err = s.FindServerRxcJobByIdStr(job_id)
|
|
if err != nil {
|
|
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
|
|
je.Encode(JsonErrmsg{Text: err.Error()})
|
|
goto oops
|
|
}
|
|
|
|
switch req.Method {
|
|
case http.MethodGet:
|
|
status_code = WriteJsonRespHeader(w, http.StatusOK)
|
|
if err = je.Encode(server_rxc_job_to_json(job)); err != nil { goto oops }
|
|
|
|
case http.MethodDelete:
|
|
err = s.DeleteRxcJob(job)
|
|
if err != nil {
|
|
status_code = WriteJsonRespHeader(w, http.StatusConflict)
|
|
je.Encode(JsonErrmsg{Text: err.Error()})
|
|
goto oops
|
|
}
|
|
status_code = WriteEmptyRespHeader(w, http.StatusNoContent)
|
|
|
|
default:
|
|
status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed)
|
|
}
|
|
|
|
return status_code, nil
|
|
|
|
oops:
|
|
return status_code, err
|
|
}
|
|
|
|
func (ctl *server_ctl_rxc_id_stop) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
|
var s *Server
|
|
var status_code int
|
|
var je *json.Encoder
|
|
var job_id string
|
|
var job *ServerRxcJob
|
|
var stop_count int
|
|
var err error
|
|
|
|
s = ctl.S
|
|
je = json.NewEncoder(w)
|
|
job_id = req.PathValue("job_id")
|
|
|
|
job, err = s.FindServerRxcJobByIdStr(job_id)
|
|
if err != nil {
|
|
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
|
|
je.Encode(JsonErrmsg{Text: err.Error()})
|
|
goto oops
|
|
}
|
|
|
|
switch req.Method {
|
|
case http.MethodPost:
|
|
stop_count = s.StopRxcJob(job)
|
|
if stop_count > 0 {
|
|
status_code = WriteEmptyRespHeader(w, http.StatusAccepted)
|
|
} else {
|
|
status_code = WriteEmptyRespHeader(w, http.StatusNoContent)
|
|
}
|
|
|
|
default:
|
|
status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed)
|
|
}
|
|
|
|
return status_code, nil
|
|
|
|
oops:
|
|
return status_code, err
|
|
}
|
|
|
|
func (ctl *server_ctl_rxc_id_runs) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
|
var s *Server
|
|
var status_code int
|
|
var je *json.Encoder
|
|
var job_id string
|
|
var job *ServerRxcJob
|
|
var run *ServerRxcJobRun
|
|
var runs []json_out_server_rxc_run
|
|
var err error
|
|
|
|
s = ctl.S
|
|
je = json.NewEncoder(w)
|
|
job_id = req.PathValue("job_id")
|
|
|
|
job, err = s.FindServerRxcJobByIdStr(job_id)
|
|
if err != nil {
|
|
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
|
|
je.Encode(JsonErrmsg{Text: err.Error()})
|
|
goto oops
|
|
}
|
|
|
|
switch req.Method {
|
|
case http.MethodGet:
|
|
var output string
|
|
var with_output bool
|
|
|
|
output = req.URL.Query().Get("output")
|
|
if output != "" {
|
|
with_output, err = strconv.ParseBool(output)
|
|
if err != nil {
|
|
status_code = WriteJsonRespHeader(w, http.StatusBadRequest)
|
|
je.Encode(JsonErrmsg{Text: fmt.Sprintf("invalid output parameter - %s", output)})
|
|
goto oops
|
|
}
|
|
}
|
|
runs = make([]json_out_server_rxc_run, 0)
|
|
for _, run = range job.snapshot_runs() {
|
|
runs = append(runs, server_rxc_run_to_json(run, with_output))
|
|
}
|
|
status_code = WriteJsonRespHeader(w, http.StatusOK)
|
|
if err = je.Encode(runs); err != nil { goto oops }
|
|
|
|
default:
|
|
status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed)
|
|
}
|
|
|
|
return status_code, nil
|
|
|
|
oops:
|
|
return status_code, err
|
|
}
|
|
|
|
func (ctl *server_ctl_rxc_id_runs_id) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
|
var s *Server
|
|
var status_code int
|
|
var je *json.Encoder
|
|
var job_id string
|
|
var conn_id string
|
|
var job *ServerRxcJob
|
|
var run *ServerRxcJobRun
|
|
var err error
|
|
|
|
s = ctl.S
|
|
je = json.NewEncoder(w)
|
|
job_id = req.PathValue("job_id")
|
|
conn_id = req.PathValue("conn_id")
|
|
|
|
job, err = s.FindServerRxcJobByIdStr(job_id)
|
|
if err != nil {
|
|
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
|
|
je.Encode(JsonErrmsg{Text: err.Error()})
|
|
goto oops
|
|
}
|
|
|
|
run, err = job.FindRunByConnIdStr(conn_id)
|
|
if err != nil {
|
|
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
|
|
je.Encode(JsonErrmsg{Text: err.Error()})
|
|
goto oops
|
|
}
|
|
|
|
switch req.Method {
|
|
case http.MethodGet:
|
|
status_code = WriteJsonRespHeader(w, http.StatusOK)
|
|
if err = je.Encode(server_rxc_run_to_json(run, true)); err != nil { goto oops }
|
|
|
|
case http.MethodDelete:
|
|
err = s.DeleteRxcJobRun(run)
|
|
if err != nil {
|
|
status_code = WriteJsonRespHeader(w, http.StatusConflict)
|
|
je.Encode(JsonErrmsg{Text: err.Error()})
|
|
goto oops
|
|
}
|
|
status_code = WriteEmptyRespHeader(w, http.StatusNoContent)
|
|
|
|
default:
|
|
status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed)
|
|
}
|
|
|
|
return status_code, nil
|
|
|
|
oops:
|
|
return status_code, err
|
|
}
|
|
|
|
func (ctl *server_ctl_rxc_id_runs_id_stop) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
|
|
var s *Server
|
|
var status_code int
|
|
var je *json.Encoder
|
|
var job_id string
|
|
var conn_id string
|
|
var job *ServerRxcJob
|
|
var run *ServerRxcJobRun
|
|
var stopped bool
|
|
var err error
|
|
|
|
s = ctl.S
|
|
je = json.NewEncoder(w)
|
|
job_id = req.PathValue("job_id")
|
|
conn_id = req.PathValue("conn_id")
|
|
|
|
job, err = s.FindServerRxcJobByIdStr(job_id)
|
|
if err != nil {
|
|
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
|
|
je.Encode(JsonErrmsg{Text: err.Error()})
|
|
goto oops
|
|
}
|
|
|
|
run, err = job.FindRunByConnIdStr(conn_id)
|
|
if err != nil {
|
|
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
|
|
je.Encode(JsonErrmsg{Text: err.Error()})
|
|
goto oops
|
|
}
|
|
|
|
switch req.Method {
|
|
case http.MethodPost:
|
|
stopped, err = s.StopRxcJobRun(run)
|
|
if err != nil {
|
|
status_code = WriteJsonRespHeader(w, http.StatusInternalServerError)
|
|
je.Encode(JsonErrmsg{Text: err.Error()})
|
|
goto oops
|
|
}
|
|
if stopped {
|
|
status_code = WriteEmptyRespHeader(w, http.StatusAccepted)
|
|
} else {
|
|
status_code = WriteEmptyRespHeader(w, http.StatusNoContent)
|
|
}
|
|
|
|
default:
|
|
status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed)
|
|
}
|
|
|
|
return status_code, nil
|
|
|
|
oops:
|
|
return status_code, err
|
|
}
|