Files
hodu/server-rxc.go

593 lines
15 KiB
Go

package hodu
import "encoding/base64"
import "encoding/json"
import "fmt"
import "net/http"
import "strconv"
import "sync"
import "time"
import "golang.org/x/net/websocket"
// rxc - remote exec
type server_rxc_ws struct {
S *Server
Id string
ws *websocket.Conn
}
func (rxc *server_rxc_ws) Identity() string {
return rxc.Id
}
func (rxc *server_rxc_ws) ServeWebsocket(ws *websocket.Conn) (int, error) {
var s *Server
var req *http.Request
var token string
var cts *ServerConn
var rp *ServerRxc
var wg sync.WaitGroup
var err error
s = rxc.S
req = ws.Request()
token = req.FormValue("client-token")
if token == "" {
ws.Close()
return http.StatusBadRequest, fmt.Errorf("no client token specified")
}
cts = s.FindServerConnByClientToken(token)
if cts == nil {
ws.Close()
return http.StatusBadRequest, fmt.Errorf("invalid client token - %s", token)
}
ws_recv_loop:
for {
var msg []byte
err = websocket.Message.Receive(ws, &msg)
if err != nil {
s.log.Write(rxc.Id, LOG_ERROR, "[%s] websocket receive error - %s", req.RemoteAddr, err.Error())
goto done
}
if len(msg) > 0 {
var ev json_xterm_ws_event
err = json.Unmarshal(msg, &ev)
if err == nil {
switch ev.Type {
case "open":
if rp == nil && len(ev.Data) == 2 {
var type_ string
var script string
type_ = string(ev.Data[0])
script = string(ev.Data[1])
//options = string(ev.Data[2])
rp, err = cts.StartRxcForWs(ws, type_, script)
if err != nil {
s.log.Write(rxc.Id, LOG_ERROR, "[%s] Failed to connect rxc - %s", req.RemoteAddr, err.Error())
send_ws_data_for_xterm(ws, "error", err.Error())
//ws.Close() // dirty way to flag out the error by making websocket.Message.Receive() fail
ws.SetReadDeadline(time.Now()) // slightly cleaner way to break the main loop
} else {
err = send_ws_data_for_xterm(ws, "status", "opened")
if err != nil {
s.log.Write(rxc.Id, LOG_ERROR, "[%s] Failed to write 'opened' event to websocket - %s", req.RemoteAddr, err.Error())
//ws.Close() // dirty way to flag out the error
ws.SetReadDeadline(time.Now()) // slightly cleaner way to break the main loop
} else {
s.log.Write(rxc.Id, LOG_DEBUG, "[%s] Opened rxc session", req.RemoteAddr)
}
}
}
case "close":
// just break out of the loop and let the remainder to close resources
break ws_recv_loop
case "iov":
var i int
for i, _ = range ev.Data {
var bytes []byte
bytes, err = base64.StdEncoding.DecodeString(ev.Data[i])
if err != nil {
s.log.Write(rxc.Id, LOG_WARN, "[%s] Invalid rxc iov data received - %s", req.RemoteAddr, ev.Data[i])
} else {
cts.WriteRxcForWs(ws, bytes)
// ignore error for now
}
}
default:
send_ws_data_for_xterm(ws, "error", fmt.Sprintf("invalid rxc event type - %s", ev.Type));
s.log.Write(rxc.Id, LOG_WARN, "[%s] Invalid rxc event type received - %s", req.RemoteAddr, ev.Type)
}
}
}
}
done:
cts.StopRxcForWs(ws)
ws.Close() // don't care about multiple closes
wg.Wait()
s.log.Write(rxc.Id, LOG_DEBUG, "[%s] Ended rxc session for %s", req.RemoteAddr, token)
return http.StatusOK, err
}
// HTTP control endpoints for RXC jobs:
// GET /_rxc
// POST /_rxc
// GET /_rxc/{job_id}
// DELETE /_rxc/{job_id}
// POST /_rxc/{job_id}/stop
// GET /_rxc/{job_id}/runs
// GET /_rxc/{job_id}/runs/{conn_id}
// DELETE /_rxc/{job_id}/runs/{conn_id}
// POST /_rxc/{job_id}/runs/{conn_id}/stop
//
// The websocket endpoint /_rxc/ws is kept for interactive single-client RXC.
/*
• Example to run a command on all connected clients:
curl -X POST http://127.0.0.1:9999/_rxc \
-H 'Content-Type: application/json' \
-d '{
"type": "bash",
"script": "uname -a"
}'
Example to run on specific clients only:
curl -X POST http://127.0.0.1:9999/_rxc \
-H 'Content-Type: application/json' \
-d '{
"clients": ["1", "client-token-abc"],
"type": "bash",
"script": "hostname; id"
}'
Then inspect the created job:
curl http://127.0.0.1:9999/_rxc/1
curl http://127.0.0.1:9999/_rxc/1/runs
curl http://127.0.0.1:9999/_rxc/1/runs/1
Stop a job:
curl -X POST http://127.0.0.1:9999/_rxc/1/stop
Stop one run only:
curl -X POST http://127.0.0.1:9999/_rxc/1/runs/1/stop
Delete a retained job record:
curl -X DELETE http://127.0.0.1:9999/_rxc/1
Delete a retained run record:
curl -X DELETE http://127.0.0.1:9999/_rxc/1/runs/1
If your control server uses a prefix, prepend it, for example http://127.0.0.1:9999/api/_rxc.
*/
type json_in_server_rxc struct {
Clients []string `json:"clients"`
Type string `json:"type"`
Script string `json:"script"`
}
type json_out_server_rxc_job struct {
JobId uint64 `json:"job-id"`
Type string `json:"type"`
Script string `json:"script"`
Status string `json:"status"`
CreatedMilli int64 `json:"created-milli"`
DoneMilli int64 `json:"done-milli"`
TargetCount int `json:"target-count"`
RunningCount int `json:"running-count"`
StoppingCount int `json:"stopping-count"`
StoppedCount int `json:"stopped-count"`
FailedCount int `json:"failed-count"`
}
type json_out_server_rxc_run struct {
JobId uint64 `json:"job-id"`
ConnId ConnId `json:"conn-id"`
ClientToken string `json:"client-token"`
RxcId uint64 `json:"rxc-id"`
Status string `json:"status"`
ExitCode int `json:"exit-code"`
StopMsg string `json:"stop-msg"`
CreatedMilli int64 `json:"created-milli"`
StartedMilli int64 `json:"started-milli"`
StoppedMilli int64 `json:"stopped-milli"`
Stdout []byte `json:"stdout,omitempty"`
StdoutTruncated bool `json:"stdout-truncated"`
Stderr []byte `json:"stderr,omitempty"`
StderrTruncated bool `json:"stderr-truncated"`
}
type server_ctl_rxc struct {
ServerCtl
}
type server_ctl_rxc_id struct {
ServerCtl
}
type server_ctl_rxc_id_stop struct {
ServerCtl
}
type server_ctl_rxc_id_runs struct {
ServerCtl
}
type server_ctl_rxc_id_runs_id struct {
ServerCtl
}
type server_ctl_rxc_id_runs_id_stop struct {
ServerCtl
}
func server_rxc_job_to_json(job *ServerRxcJob) json_out_server_rxc_job {
var js json_out_server_rxc_job
var done time.Time
js.JobId = job.Id
js.Type = job.Type
js.Script = job.Script
js.CreatedMilli = job.Created.UnixMilli()
done = job.get_done_time()
if !done.IsZero() { js.DoneMilli = done.UnixMilli() }
job.run_mtx.Lock()
js.TargetCount = len(job.run_map)
js.RunningCount = job.running_run_count
js.StoppingCount = job.stopping_run_count
js.StoppedCount = job.stopped_run_count
js.FailedCount = job.failed_run_count
job.run_mtx.Unlock()
switch {
case js.TargetCount <= 0:
js.Status = "done"
case js.RunningCount > 0 || js.StoppingCount > 0:
js.Status = "running"
case js.StoppedCount > 0 || js.FailedCount > 0:
js.Status = "done"
default:
js.Status = "starting"
}
return js
}
func server_rxc_run_to_json(run *ServerRxcJobRun, with_output bool) json_out_server_rxc_run {
var js json_out_server_rxc_run
run.mtx.Lock()
js.JobId = run.Job.Id
js.ConnId = run.ConnId
js.ClientToken = run.ClientToken
js.RxcId = run.RxcId
js.Status = run.Status
js.ExitCode = GetRxcStopExitCode(run.StopFlags)
js.StopMsg = run.StopMsg
js.CreatedMilli = run.Created.UnixMilli()
if !run.Started.IsZero() { js.StartedMilli = run.Started.UnixMilli() }
if !run.Stopped.IsZero() { js.StoppedMilli = run.Stopped.UnixMilli() }
js.Stdout = make([]byte, 0) // i don't want to show nil when empty
js.Stderr = make([]byte, 0) // i don't want to show nil when empty
js.StdoutTruncated = run.OutputTruncated[0]
if with_output { js.Stdout = append(js.Stdout, run.Output[0]...) }
js.StderrTruncated = run.OutputTruncated[1]
if with_output { js.Stderr = append(js.Stderr, run.Output[1]...) }
run.mtx.Unlock()
return js
}
func (ctl *server_ctl_rxc) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
var s *Server
var status_code int
var je *json.Encoder
var err error
s = ctl.S
je = json.NewEncoder(w)
switch req.Method {
case http.MethodGet:
var jobs []json_out_server_rxc_job
var job *ServerRxcJob
jobs = make([]json_out_server_rxc_job, 0)
for _, job = range s.snapshot_server_rxc_jobs() {
jobs = append(jobs, server_rxc_job_to_json(job))
}
status_code = WriteJsonRespHeader(w, http.StatusOK)
if err = je.Encode(jobs); err != nil { goto oops }
case http.MethodPost:
var x json_in_server_rxc
var job *ServerRxcJob
err = json.NewDecoder(req.Body).Decode(&x)
if err != nil {
status_code = WriteEmptyRespHeader(w, http.StatusBadRequest)
goto oops
}
job, err = s.StartRxcJob(x.Clients, x.Type, x.Script)
if err != nil {
status_code = WriteJsonRespHeader(w, http.StatusBadRequest)
je.Encode(JsonErrmsg{Text: err.Error()})
goto oops
}
status_code = WriteJsonRespHeader(w, http.StatusAccepted)
if err = je.Encode(server_rxc_job_to_json(job)); err != nil { goto oops }
default:
status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed)
}
return status_code, nil
oops:
return status_code, err
}
func (ctl *server_ctl_rxc_id) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
var s *Server
var status_code int
var je *json.Encoder
var job_id string
var job *ServerRxcJob
var err error
s = ctl.S
je = json.NewEncoder(w)
job_id = req.PathValue("job_id")
job, err = s.FindServerRxcJobByIdStr(job_id)
if err != nil {
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
je.Encode(JsonErrmsg{Text: err.Error()})
goto oops
}
switch req.Method {
case http.MethodGet:
status_code = WriteJsonRespHeader(w, http.StatusOK)
if err = je.Encode(server_rxc_job_to_json(job)); err != nil { goto oops }
case http.MethodDelete:
err = s.DeleteRxcJob(job)
if err != nil {
status_code = WriteJsonRespHeader(w, http.StatusConflict)
je.Encode(JsonErrmsg{Text: err.Error()})
goto oops
}
status_code = WriteEmptyRespHeader(w, http.StatusNoContent)
default:
status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed)
}
return status_code, nil
oops:
return status_code, err
}
func (ctl *server_ctl_rxc_id_stop) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
var s *Server
var status_code int
var je *json.Encoder
var job_id string
var job *ServerRxcJob
var stop_count int
var err error
s = ctl.S
je = json.NewEncoder(w)
job_id = req.PathValue("job_id")
job, err = s.FindServerRxcJobByIdStr(job_id)
if err != nil {
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
je.Encode(JsonErrmsg{Text: err.Error()})
goto oops
}
switch req.Method {
case http.MethodPost:
stop_count = s.StopRxcJob(job)
if stop_count > 0 {
status_code = WriteEmptyRespHeader(w, http.StatusAccepted)
} else {
status_code = WriteEmptyRespHeader(w, http.StatusNoContent)
}
default:
status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed)
}
return status_code, nil
oops:
return status_code, err
}
func (ctl *server_ctl_rxc_id_runs) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
var s *Server
var status_code int
var je *json.Encoder
var job_id string
var job *ServerRxcJob
var run *ServerRxcJobRun
var runs []json_out_server_rxc_run
var err error
s = ctl.S
je = json.NewEncoder(w)
job_id = req.PathValue("job_id")
job, err = s.FindServerRxcJobByIdStr(job_id)
if err != nil {
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
je.Encode(JsonErrmsg{Text: err.Error()})
goto oops
}
switch req.Method {
case http.MethodGet:
var output string
var with_output bool
output = req.URL.Query().Get("output")
if output != "" {
with_output, err = strconv.ParseBool(output)
if err != nil {
status_code = WriteJsonRespHeader(w, http.StatusBadRequest)
je.Encode(JsonErrmsg{Text: fmt.Sprintf("invalid output parameter - %s", output)})
goto oops
}
}
runs = make([]json_out_server_rxc_run, 0)
for _, run = range job.snapshot_runs() {
runs = append(runs, server_rxc_run_to_json(run, with_output))
}
status_code = WriteJsonRespHeader(w, http.StatusOK)
if err = je.Encode(runs); err != nil { goto oops }
default:
status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed)
}
return status_code, nil
oops:
return status_code, err
}
func (ctl *server_ctl_rxc_id_runs_id) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
var s *Server
var status_code int
var je *json.Encoder
var job_id string
var conn_id string
var job *ServerRxcJob
var run *ServerRxcJobRun
var err error
s = ctl.S
je = json.NewEncoder(w)
job_id = req.PathValue("job_id")
conn_id = req.PathValue("conn_id")
job, err = s.FindServerRxcJobByIdStr(job_id)
if err != nil {
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
je.Encode(JsonErrmsg{Text: err.Error()})
goto oops
}
run, err = job.FindRunByConnIdStr(conn_id)
if err != nil {
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
je.Encode(JsonErrmsg{Text: err.Error()})
goto oops
}
switch req.Method {
case http.MethodGet:
status_code = WriteJsonRespHeader(w, http.StatusOK)
if err = je.Encode(server_rxc_run_to_json(run, true)); err != nil { goto oops }
case http.MethodDelete:
err = s.DeleteRxcJobRun(run)
if err != nil {
status_code = WriteJsonRespHeader(w, http.StatusConflict)
je.Encode(JsonErrmsg{Text: err.Error()})
goto oops
}
status_code = WriteEmptyRespHeader(w, http.StatusNoContent)
default:
status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed)
}
return status_code, nil
oops:
return status_code, err
}
func (ctl *server_ctl_rxc_id_runs_id_stop) ServeHTTP(w http.ResponseWriter, req *http.Request) (int, error) {
var s *Server
var status_code int
var je *json.Encoder
var job_id string
var conn_id string
var job *ServerRxcJob
var run *ServerRxcJobRun
var stopped bool
var err error
s = ctl.S
je = json.NewEncoder(w)
job_id = req.PathValue("job_id")
conn_id = req.PathValue("conn_id")
job, err = s.FindServerRxcJobByIdStr(job_id)
if err != nil {
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
je.Encode(JsonErrmsg{Text: err.Error()})
goto oops
}
run, err = job.FindRunByConnIdStr(conn_id)
if err != nil {
status_code = WriteJsonRespHeader(w, http.StatusNotFound)
je.Encode(JsonErrmsg{Text: err.Error()})
goto oops
}
switch req.Method {
case http.MethodPost:
stopped, err = s.StopRxcJobRun(run)
if err != nil {
status_code = WriteJsonRespHeader(w, http.StatusInternalServerError)
je.Encode(JsonErrmsg{Text: err.Error()})
goto oops
}
if stopped {
status_code = WriteEmptyRespHeader(w, http.StatusAccepted)
} else {
status_code = WriteEmptyRespHeader(w, http.StatusNoContent)
}
default:
status_code = WriteEmptyRespHeader(w, http.StatusMethodNotAllowed)
}
return status_code, nil
oops:
return status_code, err
}