844 lines
20 KiB
Go
844 lines
20 KiB
Go
package hodu
|
|
|
|
import "container/heap"
|
|
import "encoding/base64"
|
|
import "fmt"
|
|
import "sort"
|
|
import "strconv"
|
|
import "strings"
|
|
import "sync"
|
|
import "time"
|
|
import "unsafe"
|
|
|
|
import "golang.org/x/net/websocket"
|
|
|
|
const SERVER_RXC_RUN_OUTPUT_MAX int = 1024
|
|
const SERVER_RXC_DONE_JOB_RETENTION time.Duration = 60 * time.Second
|
|
|
|
const SERVER_RXC_RUN_STATUS_STARTING string = "starting"
|
|
const SERVER_RXC_RUN_STATUS_RUNNING string = "running"
|
|
const SERVER_RXC_RUN_STATUS_STOPPING string = "stopping"
|
|
const SERVER_RXC_RUN_STATUS_STOPPED string = "stopped"
|
|
const SERVER_RXC_RUN_STATUS_FAILED string = "failed"
|
|
|
|
type ServerRxcJob struct {
|
|
S *Server
|
|
Id uint64
|
|
Type string
|
|
Script string
|
|
Created time.Time
|
|
Done time.Time
|
|
|
|
expire_at time.Time
|
|
heap_index int
|
|
|
|
run_mtx sync.Mutex
|
|
run_map map[ConnId]*ServerRxcJobRun
|
|
active_run_count int
|
|
starting_run_count int
|
|
running_run_count int
|
|
stopping_run_count int
|
|
stopped_run_count int
|
|
failed_run_count int
|
|
}
|
|
|
|
type ServerRxcJobRun struct {
|
|
Job *ServerRxcJob
|
|
Cts *ServerConn
|
|
ConnId ConnId
|
|
ClientToken string
|
|
Created time.Time
|
|
Started time.Time
|
|
Stopped time.Time
|
|
RxcId uint64
|
|
Status string
|
|
StopFlags uint64
|
|
StopMsg string
|
|
|
|
Output [2][]byte
|
|
OutputTruncated [2]bool
|
|
|
|
mtx sync.Mutex
|
|
}
|
|
|
|
type ServerRxcJobMap map[uint64]*ServerRxcJob
|
|
|
|
type ServerRxcJobExpiryHeap []*ServerRxcJob
|
|
|
|
type ServerRxcSink interface {
|
|
ReqStop()
|
|
Write(flags uint64, data []byte) error
|
|
Stop(flags uint64, msg string) error
|
|
}
|
|
|
|
type ServerRxcWebsocketSink struct {
|
|
ws *websocket.Conn
|
|
}
|
|
|
|
// ServerRxcWebsocketSink - implements ServerRxcSink
|
|
func (sink *ServerRxcWebsocketSink) ReqStop() {
|
|
if sink.ws != nil {
|
|
sink.ws.SetReadDeadline(time.Now())
|
|
}
|
|
}
|
|
|
|
func (sink *ServerRxcWebsocketSink) Write(flags uint64, data []byte) error {
|
|
// TODO: how to distinguish stdout and stderr? when sending iov, encode the info?
|
|
var data_type string
|
|
if flags & RXC_DATA_FLAG_STDERR != 0 {
|
|
data_type = "ioe" // stderr
|
|
} else {
|
|
data_type = "iov" // stdout
|
|
}
|
|
return send_ws_data_for_xterm(sink.ws, data_type, base64.StdEncoding.EncodeToString(data))
|
|
}
|
|
|
|
func (sink *ServerRxcWebsocketSink) Stop(flags uint64, msg string) error {
|
|
sink.ReqStop()
|
|
return nil
|
|
}
|
|
|
|
/* implement heap.Interface for ServerRxcJobExpiryMap */
|
|
func (heap ServerRxcJobExpiryHeap) Len() int {
|
|
return len(heap)
|
|
}
|
|
|
|
func (heap ServerRxcJobExpiryHeap) Less(i int, j int) bool {
|
|
if heap[i].expire_at.Before(heap[j].expire_at) { return true }
|
|
if heap[j].expire_at.Before(heap[i].expire_at) { return false }
|
|
return heap[i].Id < heap[j].Id
|
|
}
|
|
|
|
func (heap ServerRxcJobExpiryHeap) Swap(i int, j int) {
|
|
var x *ServerRxcJob
|
|
var y *ServerRxcJob
|
|
|
|
x = heap[i]
|
|
y = heap[j]
|
|
heap[i] = y
|
|
heap[j] = x
|
|
if heap[i] != nil { heap[i].heap_index = i }
|
|
if heap[j] != nil { heap[j].heap_index = j }
|
|
}
|
|
|
|
func (heap *ServerRxcJobExpiryHeap) Push(x interface{}) {
|
|
var job *ServerRxcJob
|
|
|
|
job = x.(*ServerRxcJob)
|
|
job.heap_index = len(*heap)
|
|
*heap = append(*heap, job)
|
|
}
|
|
|
|
func (heap *ServerRxcJobExpiryHeap) Pop() interface{} {
|
|
var old ServerRxcJobExpiryHeap
|
|
var n int
|
|
var job *ServerRxcJob
|
|
|
|
old = *heap
|
|
n = len(old)
|
|
job = old[n - 1]
|
|
old[n - 1] = nil
|
|
job.heap_index = -1
|
|
*heap = old[:n - 1]
|
|
return job
|
|
}
|
|
|
|
// ServerRxcJobRun
|
|
|
|
func new_server_rxc_job_run(job *ServerRxcJob, cts *ServerConn) *ServerRxcJobRun {
|
|
return &ServerRxcJobRun{
|
|
Job: job,
|
|
Cts: cts,
|
|
ConnId: cts.Id,
|
|
ClientToken: cts.ClientToken.Get(),
|
|
Created: time.Now(),
|
|
Status: SERVER_RXC_RUN_STATUS_STARTING,
|
|
}
|
|
}
|
|
|
|
func (job *ServerRxcJob) increment_run_status_count_no_lock(status string) {
|
|
switch status {
|
|
case SERVER_RXC_RUN_STATUS_STARTING:
|
|
job.starting_run_count++
|
|
|
|
case SERVER_RXC_RUN_STATUS_RUNNING:
|
|
job.running_run_count++
|
|
|
|
case SERVER_RXC_RUN_STATUS_STOPPING:
|
|
job.stopping_run_count++
|
|
|
|
case SERVER_RXC_RUN_STATUS_STOPPED:
|
|
job.stopped_run_count++
|
|
|
|
case SERVER_RXC_RUN_STATUS_FAILED:
|
|
job.failed_run_count++
|
|
}
|
|
}
|
|
|
|
func (job *ServerRxcJob) decrement_run_status_count_no_lock(status string) {
|
|
switch status {
|
|
case SERVER_RXC_RUN_STATUS_STARTING:
|
|
if job.starting_run_count > 0 { job.starting_run_count-- }
|
|
|
|
case SERVER_RXC_RUN_STATUS_RUNNING:
|
|
if job.running_run_count > 0 { job.running_run_count-- }
|
|
|
|
case SERVER_RXC_RUN_STATUS_STOPPING:
|
|
if job.stopping_run_count > 0 { job.stopping_run_count-- }
|
|
|
|
case SERVER_RXC_RUN_STATUS_STOPPED:
|
|
if job.stopped_run_count > 0 { job.stopped_run_count-- }
|
|
|
|
case SERVER_RXC_RUN_STATUS_FAILED:
|
|
if job.failed_run_count > 0 { job.failed_run_count-- }
|
|
}
|
|
}
|
|
|
|
func (run *ServerRxcJobRun) mark_started(rxc_id uint64) {
|
|
run.mtx.Lock()
|
|
run.RxcId = rxc_id
|
|
if run.Started.IsZero() {
|
|
run.Started = time.Now()
|
|
}
|
|
if run.Status == SERVER_RXC_RUN_STATUS_STARTING {
|
|
run.Job.run_mtx.Lock()
|
|
run.Job.decrement_run_status_count_no_lock(run.Status)
|
|
run.Job.increment_run_status_count_no_lock(SERVER_RXC_RUN_STATUS_RUNNING)
|
|
run.Job.run_mtx.Unlock()
|
|
run.Status = SERVER_RXC_RUN_STATUS_RUNNING
|
|
}
|
|
run.mtx.Unlock()
|
|
}
|
|
|
|
func (run *ServerRxcJobRun) mark_start_failure(msg string) {
|
|
var transitioned bool
|
|
|
|
run.mtx.Lock()
|
|
if run.Status != SERVER_RXC_RUN_STATUS_STOPPED && run.Status != SERVER_RXC_RUN_STATUS_FAILED {
|
|
run.Job.run_mtx.Lock()
|
|
run.Job.decrement_run_status_count_no_lock(run.Status)
|
|
run.Job.increment_run_status_count_no_lock(SERVER_RXC_RUN_STATUS_FAILED)
|
|
run.Job.run_mtx.Unlock()
|
|
run.Stopped = time.Now()
|
|
run.Status = SERVER_RXC_RUN_STATUS_FAILED
|
|
run.StopMsg = msg
|
|
transitioned = true
|
|
}
|
|
run.mtx.Unlock()
|
|
|
|
if transitioned {
|
|
run.Cts.S.FireRxcJobRunEvent(SERVER_EVENT_RXC_JOB_RUN_DONE, run)
|
|
if run.Cts.S.maybe_mark_rxc_job_done(run.Job) {
|
|
run.Cts.S.notify_rxc_job_purge()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (run *ServerRxcJobRun) request_stop() (*ServerConn, uint64, bool) {
|
|
var cts *ServerConn
|
|
var rxc_id uint64
|
|
|
|
run.mtx.Lock()
|
|
if run.Status == SERVER_RXC_RUN_STATUS_RUNNING {
|
|
run.Job.run_mtx.Lock()
|
|
run.Job.decrement_run_status_count_no_lock(run.Status)
|
|
run.Job.increment_run_status_count_no_lock(SERVER_RXC_RUN_STATUS_STOPPING)
|
|
run.Job.run_mtx.Unlock()
|
|
run.Status = SERVER_RXC_RUN_STATUS_STOPPING
|
|
cts = run.Cts
|
|
rxc_id = run.RxcId
|
|
}
|
|
run.mtx.Unlock()
|
|
|
|
if cts == nil || rxc_id == 0 {
|
|
// it wasn't originally at the running state
|
|
return nil, 0, false
|
|
}
|
|
|
|
return cts, rxc_id, true
|
|
}
|
|
|
|
func (run *ServerRxcJobRun) is_done() bool {
|
|
var status string
|
|
|
|
run.mtx.Lock()
|
|
status = run.Status
|
|
run.mtx.Unlock()
|
|
|
|
return status == SERVER_RXC_RUN_STATUS_STOPPED || status == SERVER_RXC_RUN_STATUS_FAILED
|
|
}
|
|
|
|
func (run *ServerRxcJobRun) append_output(data []byte, outidx int, capture_tail bool) {
|
|
var fit_len int
|
|
var keep_len int
|
|
var old_off int
|
|
var output_max int
|
|
|
|
output_max = run.Job.S.Cfg.RxcRunOutputMax
|
|
if output_max <= 0 { return } // don't capture
|
|
|
|
run.mtx.Lock()
|
|
if capture_tail {
|
|
if len(data) >= output_max {
|
|
if len(data) > output_max || len(run.Output[outidx]) > 0 {
|
|
run.OutputTruncated[outidx] = true
|
|
}
|
|
run.Output[outidx] = append(run.Output[outidx][:0], data[len(data) - output_max:]...)
|
|
} else {
|
|
if len(run.Output[outidx]) > output_max {
|
|
run.Output[outidx] = append([]byte(nil), run.Output[outidx][len(run.Output[outidx]) - output_max:]...)
|
|
run.OutputTruncated[outidx] = true
|
|
}
|
|
|
|
fit_len = output_max - len(data)
|
|
if fit_len < 0 { fit_len = 0 }
|
|
if len(run.Output[outidx]) > fit_len {
|
|
keep_len = fit_len
|
|
old_off = len(run.Output[outidx]) - keep_len
|
|
copy(run.Output[outidx], run.Output[outidx][old_off:])
|
|
run.Output[outidx] = run.Output[outidx][:keep_len]
|
|
run.OutputTruncated[outidx]= true
|
|
}
|
|
|
|
run.Output[outidx] = append(run.Output[outidx], data...)
|
|
}
|
|
} else {
|
|
//if len(run.Output[outidx]) > output_max {
|
|
// run.Output[outidx] = append([]byte(nil), run.Output[outidx][:output_max]...)
|
|
// run.OutputTruncate[outidx]d = true
|
|
//}
|
|
|
|
if !run.OutputTruncated[outidx] {
|
|
fit_len = output_max - len(run.Output[outidx])
|
|
if fit_len <= 0 {
|
|
run.OutputTruncated[outidx] = true
|
|
} else {
|
|
if len(data) > fit_len {
|
|
data = data[:fit_len]
|
|
run.OutputTruncated[outidx] = true
|
|
}
|
|
run.Output[outidx] = append(run.Output[outidx], data...)
|
|
}
|
|
}
|
|
}
|
|
run.mtx.Unlock()
|
|
}
|
|
|
|
func (run *ServerRxcJobRun) stop(flags uint64, msg string) {
|
|
var transitioned bool
|
|
|
|
run.mtx.Lock()
|
|
if run.Status != SERVER_RXC_RUN_STATUS_STOPPED && run.Status != SERVER_RXC_RUN_STATUS_FAILED {
|
|
run.Job.run_mtx.Lock()
|
|
run.Job.decrement_run_status_count_no_lock(run.Status)
|
|
run.Job.increment_run_status_count_no_lock(SERVER_RXC_RUN_STATUS_STOPPED)
|
|
run.Job.run_mtx.Unlock()
|
|
run.Status = SERVER_RXC_RUN_STATUS_STOPPED
|
|
run.Stopped = time.Now()
|
|
run.StopFlags = flags
|
|
run.StopMsg = msg
|
|
transitioned = true
|
|
}
|
|
run.mtx.Unlock()
|
|
|
|
if transitioned {
|
|
run.Cts.S.FireRxcJobRunEvent(SERVER_EVENT_RXC_JOB_RUN_DONE, run)
|
|
if run.Cts.S.maybe_mark_rxc_job_done(run.Job) {
|
|
run.Cts.S.notify_rxc_job_purge()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (run *ServerRxcJobRun) ReqStop() {
|
|
// this is called from run.Cts.receive_from_stream() or run.Cts.ReqStop()
|
|
// I can assume that the cause for this stop request is "connection closed".
|
|
// If you want to specify your own reason, you must call Stop() instead.
|
|
run.stop(0, "connection closed")
|
|
}
|
|
|
|
func (run *ServerRxcJobRun) Write(flags uint64, data []byte) error {
|
|
var outidx int
|
|
|
|
outidx = 0
|
|
if flags & RXC_DATA_FLAG_STDERR != 0 { outidx = 1 }
|
|
|
|
run.append_output(data, outidx, false)
|
|
return nil
|
|
}
|
|
|
|
func (run *ServerRxcJobRun) Stop(flags uint64, msg string) error {
|
|
run.stop(flags, msg)
|
|
return nil
|
|
}
|
|
|
|
// ServerRxcJob
|
|
|
|
func (job *ServerRxcJob) snapshot_runs() []*ServerRxcJobRun {
|
|
var runs []*ServerRxcJobRun
|
|
var run *ServerRxcJobRun
|
|
|
|
job.run_mtx.Lock()
|
|
runs = make([]*ServerRxcJobRun, 0, len(job.run_map))
|
|
for _, run = range job.run_map {
|
|
runs = append(runs, run)
|
|
}
|
|
job.run_mtx.Unlock()
|
|
|
|
sort.Slice(runs, func(i int, j int) bool { return runs[i].ConnId < runs[j].ConnId })
|
|
return runs
|
|
}
|
|
|
|
func (job *ServerRxcJob) FindRunByConnIdStr(conn_id string) (*ServerRxcJobRun, error) {
|
|
var conn_nid uint64
|
|
var err error
|
|
var run *ServerRxcJobRun
|
|
|
|
conn_nid, err = strconv.ParseUint(conn_id, 10, int(unsafe.Sizeof(ConnId(0)) * 8))
|
|
if err == nil {
|
|
job.run_mtx.Lock()
|
|
run = job.run_map[ConnId(conn_nid)]
|
|
job.run_mtx.Unlock()
|
|
if run == nil {
|
|
return nil, fmt.Errorf("non-existent rxc run conn id %d", conn_nid)
|
|
}
|
|
return run, nil
|
|
}
|
|
|
|
for _, run = range job.snapshot_runs() {
|
|
if run.ClientToken == conn_id {
|
|
return run, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("non-existent rxc run client %s", conn_id)
|
|
}
|
|
|
|
func (job *ServerRxcJob) is_done() bool {
|
|
var done bool
|
|
|
|
job.run_mtx.Lock()
|
|
done = (job.active_run_count <= 0)
|
|
job.run_mtx.Unlock()
|
|
return done
|
|
}
|
|
|
|
func (job *ServerRxcJob) get_done_time() time.Time {
|
|
var done time.Time
|
|
|
|
job.S.rxc_job_mtx.Lock()
|
|
done = job.Done
|
|
job.S.rxc_job_mtx.Unlock()
|
|
return done
|
|
}
|
|
|
|
func (s *Server) maybe_mark_rxc_job_done(job *ServerRxcJob) bool {
|
|
var existing *ServerRxcJob
|
|
var ok bool
|
|
|
|
job.run_mtx.Lock()
|
|
if job.active_run_count > 0 {
|
|
job.active_run_count--
|
|
if job.active_run_count > 0 {
|
|
job.run_mtx.Unlock()
|
|
return false
|
|
}
|
|
}
|
|
job.run_mtx.Unlock()
|
|
|
|
s.rxc_job_mtx.Lock()
|
|
existing, ok = s.rxc_job_map[job.Id]
|
|
if !ok || existing != job {
|
|
s.rxc_job_mtx.Unlock()
|
|
return false
|
|
}
|
|
if !job.Done.IsZero() {
|
|
s.rxc_job_mtx.Unlock()
|
|
return false
|
|
}
|
|
job.Done = time.Now()
|
|
if s.Cfg.RxcDoneJobRetention > 0 {
|
|
job.expire_at = job.Done.Add(s.Cfg.RxcDoneJobRetention)
|
|
heap.Push(&s.rxc_job_heap, job)
|
|
}
|
|
s.rxc_job_mtx.Unlock()
|
|
s.FireRxcJobEvent(SERVER_EVENT_RXC_JOB_DONE, job)
|
|
|
|
return true
|
|
}
|
|
|
|
func (job *ServerRxcJob) delete_run(run *ServerRxcJobRun) bool {
|
|
var existing *ServerRxcJobRun
|
|
var ok bool
|
|
var status string
|
|
|
|
run.mtx.Lock()
|
|
status = run.Status
|
|
run.mtx.Unlock()
|
|
|
|
job.run_mtx.Lock()
|
|
existing, ok = job.run_map[run.ConnId]
|
|
if ok && existing == run {
|
|
delete(job.run_map, run.ConnId)
|
|
job.decrement_run_status_count_no_lock(status)
|
|
}
|
|
job.run_mtx.Unlock()
|
|
|
|
return ok && existing == run
|
|
}
|
|
|
|
// Server
|
|
func (s *Server) snapshot_server_rxc_jobs() []*ServerRxcJob {
|
|
var jobs []*ServerRxcJob
|
|
var job *ServerRxcJob
|
|
|
|
s.rxc_job_mtx.Lock()
|
|
jobs = make([]*ServerRxcJob, 0, len(s.rxc_job_map))
|
|
for _, job = range s.rxc_job_map {
|
|
jobs = append(jobs, job)
|
|
}
|
|
s.rxc_job_mtx.Unlock()
|
|
|
|
sort.Slice(jobs, func(i int, j int) bool { return jobs[i].Id < jobs[j].Id })
|
|
return jobs
|
|
}
|
|
|
|
func (s *Server) FindServerRxcJobById(id uint64) *ServerRxcJob {
|
|
var job *ServerRxcJob
|
|
var ok bool
|
|
|
|
s.rxc_job_mtx.Lock()
|
|
job, ok = s.rxc_job_map[id]
|
|
s.rxc_job_mtx.Unlock()
|
|
if !ok { return nil }
|
|
return job
|
|
}
|
|
|
|
func (s *Server) FindServerRxcJobByIdStr(job_id string) (*ServerRxcJob, error) {
|
|
var job_nid uint64
|
|
var err error
|
|
var job *ServerRxcJob
|
|
|
|
job_nid, err = strconv.ParseUint(job_id, 10, 64)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("invalid exec job id %s - %s", job_id, err.Error())
|
|
}
|
|
|
|
job = s.FindServerRxcJobById(job_nid)
|
|
if job == nil {
|
|
return nil, fmt.Errorf("non-existent exec job id %d", job_nid)
|
|
}
|
|
|
|
return job, nil
|
|
}
|
|
|
|
func (s *Server) delete_server_rxc_job_from_heap_no_lock(job *ServerRxcJob) {
|
|
if job.heap_index >= 0 {
|
|
heap.Remove(&s.rxc_job_heap, job.heap_index)
|
|
job.expire_at = time.Time{}
|
|
//job.heap_index = -1 // skip this as it's set in s.rxc_job_heap.Pop() called by heap.Remove()
|
|
}
|
|
}
|
|
|
|
func (s *Server) delete_server_rxc_job_no_lock(job *ServerRxcJob) bool {
|
|
var existing *ServerRxcJob
|
|
var ok bool
|
|
|
|
existing, ok = s.rxc_job_map[job.Id]
|
|
if ok && existing == job {
|
|
delete(s.rxc_job_map, job.Id)
|
|
s.delete_server_rxc_job_from_heap_no_lock(job)
|
|
}
|
|
|
|
return ok && existing == job
|
|
}
|
|
|
|
func (s *Server) delete_server_rxc_job(job *ServerRxcJob) bool {
|
|
var deleted bool
|
|
|
|
s.rxc_job_mtx.Lock()
|
|
deleted = s.delete_server_rxc_job_no_lock(job)
|
|
s.rxc_job_mtx.Unlock()
|
|
|
|
return deleted
|
|
}
|
|
|
|
func (s *Server) resolve_server_rxc_targets(clients []string) ([]*ServerConn, error) {
|
|
var conns []*ServerConn
|
|
var client string
|
|
var cts *ServerConn
|
|
var err error
|
|
var dedupe map[ConnId]*ServerConn
|
|
|
|
if len(clients) <= 0 {
|
|
conns = s.snapshot_server_conns()
|
|
if len(conns) <= 0 {
|
|
return nil, fmt.Errorf("no connected clients")
|
|
}
|
|
return conns, nil
|
|
}
|
|
|
|
dedupe = make(map[ConnId]*ServerConn)
|
|
for _, client = range clients {
|
|
client = strings.TrimSpace(client)
|
|
if client == "" { continue }
|
|
cts, err = s.FindServerConnByIdStr(client)
|
|
if err != nil { return nil, err }
|
|
dedupe[cts.Id] = cts
|
|
}
|
|
|
|
if len(dedupe) <= 0 {
|
|
return nil, fmt.Errorf("no connected clients")
|
|
}
|
|
|
|
conns = make([]*ServerConn, 0, len(dedupe))
|
|
for _, cts = range dedupe {
|
|
conns = append(conns, cts)
|
|
}
|
|
sort.Slice(conns, func(i int, j int) bool { return conns[i].Id < conns[j].Id })
|
|
return conns, nil
|
|
}
|
|
|
|
func (s *Server) StartRxcJob(clients []string, type_ string, script string) (*ServerRxcJob, error) {
|
|
var conns []*ServerConn
|
|
var cts *ServerConn
|
|
var job *ServerRxcJob
|
|
var run *ServerRxcJobRun
|
|
var err error
|
|
var ok bool
|
|
|
|
if strings.TrimSpace(type_) == "" {
|
|
return nil, fmt.Errorf("blank rxc type")
|
|
}
|
|
if strings.TrimSpace(script) == "" {
|
|
return nil, fmt.Errorf("blank rxc script")
|
|
}
|
|
|
|
conns, err = s.resolve_server_rxc_targets(clients)
|
|
if err != nil { return nil, err }
|
|
|
|
job = &ServerRxcJob{
|
|
S: s,
|
|
Type: type_,
|
|
Script: script,
|
|
Created: time.Now(),
|
|
heap_index: -1,
|
|
run_map: make(map[ConnId]*ServerRxcJobRun),
|
|
active_run_count: len(conns),
|
|
starting_run_count: len(conns),
|
|
}
|
|
|
|
s.rxc_job_mtx.Lock()
|
|
job.Id = s.rxc_job_next_id
|
|
s.rxc_job_next_id++
|
|
if s.rxc_job_next_id == 0 { s.rxc_job_next_id++ }
|
|
s.rxc_job_map[job.Id] = job
|
|
s.rxc_job_mtx.Unlock()
|
|
|
|
job.run_mtx.Lock()
|
|
for _, cts = range conns {
|
|
job.run_map[cts.Id] = new_server_rxc_job_run(job, cts)
|
|
}
|
|
job.run_mtx.Unlock()
|
|
|
|
for _, cts = range conns {
|
|
job.run_mtx.Lock()
|
|
run, ok = job.run_map[cts.Id]
|
|
job.run_mtx.Unlock()
|
|
if !ok { continue }
|
|
|
|
err = cts.RunRxcJob(run, type_, script)
|
|
if err != nil {
|
|
run.mark_start_failure(err.Error())
|
|
s.log.Write(cts.Sid, LOG_DEBUG, "Failed to run rxc job(%d) on client(%s) from %s(%s)", job.Id, cts.ClientToken.Get(), cts.RemoteAddr)
|
|
} else {
|
|
s.log.Write(cts.Sid, LOG_DEBUG, "Ran rxc job(%d) on client(%s) from %s(%s)", job.Id, cts.ClientToken.Get(), cts.RemoteAddr)
|
|
}
|
|
}
|
|
|
|
s.log.Write("", LOG_DEBUG, "Started rxc job(%d) %s(%s)", job.Id, type_, script)
|
|
return job, nil
|
|
}
|
|
|
|
func (s *Server) StopRxcJobRun(run *ServerRxcJobRun) (bool, error) {
|
|
var cts *ServerConn
|
|
var rxc_id uint64
|
|
var ok bool
|
|
var err error
|
|
|
|
cts, rxc_id, ok = run.request_stop()
|
|
if !ok {
|
|
return false, nil
|
|
}
|
|
|
|
// sever sends a stop request to abort the run on the client side
|
|
err = cts.SendStopRxcById(rxc_id, 0)
|
|
if err != nil {
|
|
run.stop(0, err.Error())
|
|
return false, err
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
func (s *Server) StopRxcJob(job *ServerRxcJob) int {
|
|
var run *ServerRxcJobRun
|
|
var runs []*ServerRxcJobRun
|
|
var stop_count int
|
|
|
|
job.run_mtx.Lock()
|
|
runs = make([]*ServerRxcJobRun, 0, len(job.run_map))
|
|
for _, run = range job.run_map {
|
|
runs = append(runs, run)
|
|
}
|
|
job.run_mtx.Unlock()
|
|
|
|
for _, run = range runs {
|
|
var err error
|
|
var stopped bool
|
|
|
|
stopped, err = s.StopRxcJobRun(run)
|
|
if stopped { stop_count++ }
|
|
if err != nil { continue }
|
|
}
|
|
|
|
s.log.Write("", LOG_DEBUG, "Stopped rxc job(%d) after stopping %d runs", job.Id, stop_count)
|
|
return stop_count
|
|
}
|
|
|
|
func (s *Server) DeleteRxcJob(job *ServerRxcJob) error {
|
|
if !job.is_done() {
|
|
return fmt.Errorf("active rxc job id %d", job.Id)
|
|
}
|
|
if !s.delete_server_rxc_job(job) {
|
|
return fmt.Errorf("non-existent rxc job id %d", job.Id)
|
|
}
|
|
if s.Cfg.RxcDoneJobRetention > 0 {
|
|
// a job is already over and is actually deleted by request.
|
|
// the purge goroutine needs to re-schedule the next automatic purge.
|
|
s.notify_rxc_job_purge()
|
|
}
|
|
|
|
s.log.Write("", LOG_DEBUG, "Deleted rxc job(%d)", job.Id)
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) DeleteRxcJobRun(run *ServerRxcJobRun) error {
|
|
if !run.is_done() {
|
|
return fmt.Errorf("active rxc run conn id %d", run.ConnId)
|
|
}
|
|
if !run.Job.delete_run(run) {
|
|
return fmt.Errorf("non-existent rxc run conn id %d", run.ConnId)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Server) purge_stale_rxc_jobs(now time.Time, expiry time.Duration) int {
|
|
var job *ServerRxcJob
|
|
var purge_count int
|
|
|
|
if expiry <= 0 { return 0 }
|
|
|
|
s.rxc_job_mtx.Lock()
|
|
for {
|
|
if len(s.rxc_job_heap) <= 0 { break }
|
|
job = s.rxc_job_heap[0]
|
|
if now.Before(job.expire_at) { break }
|
|
if s.delete_server_rxc_job_no_lock(job) {
|
|
s.log.Write("", LOG_INFO, "Purged stale rxc job(%d)", job.Id)
|
|
purge_count++
|
|
} else {
|
|
// this must not happen. but if it happens, it is an internal error and
|
|
// the job expiry heap and the job map are already out of sync
|
|
s.log.Write("", LOG_WARN, "Failed to purge stale rxc job(%d): heap/map mismatch", job.Id)
|
|
// but still attempt to delete it from the heap to prevent future purge blockage
|
|
s.delete_server_rxc_job_from_heap_no_lock(job)
|
|
}
|
|
}
|
|
s.rxc_job_mtx.Unlock()
|
|
|
|
return purge_count
|
|
}
|
|
|
|
func (s *Server) notify_rxc_job_purge() {
|
|
select {
|
|
case s.rxc_job_purge_chan <- struct{}{}:
|
|
// attempt to write to the channel
|
|
default:
|
|
// if not writable, do nothing and return immediately
|
|
}
|
|
}
|
|
|
|
func (s *Server) get_next_rxc_job_purge_time(expiry time.Duration) (time.Time, bool) {
|
|
var next time.Time
|
|
|
|
if expiry <= 0 { return time.Time{}, false }
|
|
|
|
s.rxc_job_mtx.Lock()
|
|
if len(s.rxc_job_heap) <= 0 {
|
|
s.rxc_job_mtx.Unlock()
|
|
return time.Time{}, false
|
|
}
|
|
next = s.rxc_job_heap[0].expire_at
|
|
s.rxc_job_mtx.Unlock()
|
|
|
|
return next, true
|
|
}
|
|
|
|
func stop_and_drain_timer(timer *time.Timer) {
|
|
if timer == nil { return }
|
|
if !timer.Stop() {
|
|
select {
|
|
case <-timer.C:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *Server) run_rxc_job_purger(wg *sync.WaitGroup, expiry time.Duration) {
|
|
var timer *time.Timer
|
|
var timer_chan <-chan time.Time
|
|
var next time.Time
|
|
var now time.Time
|
|
var delay time.Duration
|
|
var ok bool
|
|
|
|
defer wg.Done()
|
|
|
|
// it's best if the caller ensures expiry > 0
|
|
if expiry <= 0 { return }
|
|
|
|
for {
|
|
next, ok = s.get_next_rxc_job_purge_time(expiry)
|
|
if ok {
|
|
delay = time.Until(next)
|
|
if delay < 0 { delay = 0 }
|
|
if timer == nil {
|
|
timer = time.NewTimer(delay)
|
|
} else {
|
|
stop_and_drain_timer(timer)
|
|
timer.Reset(delay)
|
|
}
|
|
timer_chan = timer.C
|
|
} else {
|
|
stop_and_drain_timer(timer)
|
|
timer_chan = nil
|
|
}
|
|
|
|
select {
|
|
case <-s.rxc_job_purge_chan:
|
|
// a job was done and/or deleted.
|
|
// go to the top of the loop to re-calculate when to
|
|
// run the automatic purge next time.
|
|
continue
|
|
|
|
case now = <-timer_chan:
|
|
s.purge_stale_rxc_jobs(now, expiry)
|
|
|
|
case <-s.Ctx.Done():
|
|
stop_and_drain_timer(timer)
|
|
return
|
|
}
|
|
}
|
|
}
|