398 lines
15 KiB
Go
398 lines
15 KiB
Go
package db
|
|
|
|
import "database/sql"
|
|
import "strings"
|
|
import "time"
|
|
|
|
import "codit/internal/models"
|
|
import "codit/internal/util"
|
|
|
|
func (s *Store) ListRPMRepoDirs(repoID string) ([]models.RPMRepoDir, error) {
|
|
var rows *sql.Rows
|
|
var items []models.RPMRepoDir
|
|
var item models.RPMRepoDir
|
|
var err error
|
|
rows, err = s.DB.Query(`SELECT r.public_id, d.path, d.mode, d.allow_delete, d.remote_url, d.connect_host, d.host_header, d.tls_server_name, d.tls_insecure_skip_verify, d.sync_interval_sec, d.sync_enabled, d.dirty, d.next_sync_at, d.sync_running, d.sync_status, d.sync_error, d.sync_step, d.sync_total, d.sync_done, d.sync_failed, d.sync_deleted, d.last_sync_started_at, d.last_sync_finished_at, d.last_sync_success_at, d.last_synced_revision, d.created_at, d.updated_at
|
|
FROM rpm_repo_dirs d
|
|
JOIN repos r ON r.id = d.repo_id
|
|
WHERE r.public_id = ?
|
|
ORDER BY LENGTH(d.path), d.path`, repoID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
err = rows.Scan(&item.RepoID, &item.Path, &item.Mode, &item.AllowDelete, &item.RemoteURL, &item.ConnectHost, &item.HostHeader, &item.TLSServerName, &item.TLSInsecureSkipVerify, &item.SyncIntervalSec, &item.SyncEnabled, &item.Dirty, &item.NextSyncAt, &item.SyncRunning, &item.SyncStatus, &item.SyncError, &item.SyncStep, &item.SyncTotal, &item.SyncDone, &item.SyncFailed, &item.SyncDeleted, &item.LastSyncStartedAt, &item.LastSyncFinishedAt, &item.LastSyncSuccessAt, &item.LastSyncedRevision, &item.CreatedAt, &item.UpdatedAt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, item)
|
|
}
|
|
err = rows.Err()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
func (s *Store) UpsertRPMRepoDir(item models.RPMRepoDir) error {
|
|
var now int64
|
|
var err error
|
|
now = time.Now().UTC().Unix()
|
|
if item.SyncIntervalSec <= 0 {
|
|
item.SyncIntervalSec = 300
|
|
}
|
|
_, err = s.DB.Exec(`
|
|
INSERT INTO rpm_repo_dirs (repo_id, path, mode, allow_delete, remote_url, connect_host, host_header, tls_server_name, tls_insecure_skip_verify, sync_interval_sec, sync_enabled, dirty, next_sync_at, created_at, updated_at)
|
|
VALUES ((SELECT id FROM repos WHERE public_id = ?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(repo_id, path) DO UPDATE SET
|
|
mode = excluded.mode,
|
|
allow_delete = excluded.allow_delete,
|
|
remote_url = excluded.remote_url,
|
|
connect_host = excluded.connect_host,
|
|
host_header = excluded.host_header,
|
|
tls_server_name = excluded.tls_server_name,
|
|
tls_insecure_skip_verify = excluded.tls_insecure_skip_verify,
|
|
sync_interval_sec = excluded.sync_interval_sec,
|
|
sync_enabled = CASE WHEN excluded.mode = 'mirror' THEN excluded.sync_enabled ELSE 1 END,
|
|
dirty = CASE WHEN excluded.mode = 'mirror' THEN 1 ELSE rpm_repo_dirs.dirty END,
|
|
next_sync_at = CASE WHEN excluded.mode = 'mirror' THEN 0 ELSE rpm_repo_dirs.next_sync_at END,
|
|
updated_at = excluded.updated_at
|
|
`,
|
|
item.RepoID,
|
|
item.Path,
|
|
item.Mode,
|
|
item.AllowDelete,
|
|
item.RemoteURL,
|
|
item.ConnectHost,
|
|
item.HostHeader,
|
|
item.TLSServerName,
|
|
item.TLSInsecureSkipVerify,
|
|
item.SyncIntervalSec,
|
|
item.SyncEnabled,
|
|
normalizeRPMRepoMode(item.Mode) == "mirror",
|
|
int64(0),
|
|
now,
|
|
now)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) GetRPMRepoDir(repoID string, path string) (models.RPMRepoDir, error) {
|
|
var row *sql.Row
|
|
var item models.RPMRepoDir
|
|
var err error
|
|
row = s.DB.QueryRow(`SELECT r.public_id, d.path, d.mode, d.allow_delete, d.remote_url, d.connect_host, d.host_header, d.tls_server_name, d.tls_insecure_skip_verify, d.sync_interval_sec, d.sync_enabled, d.dirty, d.next_sync_at, d.sync_running, d.sync_status, d.sync_error, d.sync_step, d.sync_total, d.sync_done, d.sync_failed, d.sync_deleted, d.last_sync_started_at, d.last_sync_finished_at, d.last_sync_success_at, d.last_synced_revision, d.created_at, d.updated_at
|
|
FROM rpm_repo_dirs d
|
|
JOIN repos r ON r.id = d.repo_id
|
|
WHERE r.public_id = ? AND d.path = ?`, repoID, path)
|
|
err = row.Scan(&item.RepoID, &item.Path, &item.Mode, &item.AllowDelete, &item.RemoteURL, &item.ConnectHost, &item.HostHeader, &item.TLSServerName, &item.TLSInsecureSkipVerify, &item.SyncIntervalSec, &item.SyncEnabled, &item.Dirty, &item.NextSyncAt, &item.SyncRunning, &item.SyncStatus, &item.SyncError, &item.SyncStep, &item.SyncTotal, &item.SyncDone, &item.SyncFailed, &item.SyncDeleted, &item.LastSyncStartedAt, &item.LastSyncFinishedAt, &item.LastSyncSuccessAt, &item.LastSyncedRevision, &item.CreatedAt, &item.UpdatedAt)
|
|
if err != nil {
|
|
return item, err
|
|
}
|
|
return item, nil
|
|
}
|
|
|
|
func (s *Store) ListDueRPMMirrorTasks(now int64, limit int) ([]models.RPMMirrorTask, error) {
|
|
var rows *sql.Rows
|
|
var out []models.RPMMirrorTask
|
|
var item models.RPMMirrorTask
|
|
var err error
|
|
if limit <= 0 {
|
|
limit = 10
|
|
}
|
|
rows, err = s.DB.Query(`
|
|
SELECT r.public_id, r.path, d.path, d.remote_url, d.connect_host, d.host_header, d.tls_server_name, d.tls_insecure_skip_verify, d.sync_interval_sec, d.dirty, d.last_synced_revision
|
|
FROM rpm_repo_dirs d
|
|
JOIN repos r ON r.id = d.repo_id
|
|
WHERE d.mode = 'mirror' AND d.sync_enabled = 1 AND d.sync_running = 0 AND (d.dirty = 1 OR d.next_sync_at <= ? OR d.next_sync_at = 0)
|
|
ORDER BY d.next_sync_at, d.updated_at
|
|
LIMIT ?`, now, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
err = rows.Scan(&item.RepoID, &item.RepoPath, &item.MirrorPath, &item.RemoteURL, &item.ConnectHost, &item.HostHeader, &item.TLSServerName, &item.TLSInsecureSkipVerify, &item.SyncIntervalSec, &item.Dirty, &item.LastSyncedRevision)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, item)
|
|
}
|
|
err = rows.Err()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (s *Store) TryStartRPMMirrorTask(repoID string, path string, now int64) (bool, error) {
|
|
var res sql.Result
|
|
var rows int64
|
|
var err error
|
|
res, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_running = 1, sync_status = 'running', sync_error = '', sync_step = 'start', sync_total = 0, sync_done = 0, sync_failed = 0, sync_deleted = 0, last_sync_started_at = ?, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ? AND mode = 'mirror' AND sync_enabled = 1 AND sync_running = 0`, now, now, repoID, path)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
rows, err = res.RowsAffected()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return rows > 0, nil
|
|
}
|
|
|
|
func (s *Store) UpdateRPMMirrorTaskProgress(repoID string, path string, step string, total int64, done int64, failed int64, deleted int64) error {
|
|
var now int64
|
|
var err error
|
|
now = time.Now().UTC().Unix()
|
|
_, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_step = ?, sync_total = ?, sync_done = ?, sync_failed = ?, sync_deleted = ?, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, step, total, done, failed, deleted, now, repoID, path)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) FinishRPMMirrorTask(repoID string, path string, success bool, revision string, errMsg string) error {
|
|
var now int64
|
|
var status string
|
|
var nextSync int64
|
|
var interval int64
|
|
var row *sql.Row
|
|
var err error
|
|
row = s.DB.QueryRow(`SELECT sync_interval_sec FROM rpm_repo_dirs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, repoID, path)
|
|
err = row.Scan(&interval)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if interval <= 0 {
|
|
interval = 300
|
|
}
|
|
now = time.Now().UTC().Unix()
|
|
nextSync = now + interval
|
|
if success {
|
|
status = "success"
|
|
_, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_running = 0, dirty = 0, next_sync_at = ?, sync_status = ?, sync_error = '', sync_step = 'idle', last_sync_finished_at = ?, last_sync_success_at = ?, last_synced_revision = ?, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, nextSync, status, now, now, revision, now, repoID, path)
|
|
return err
|
|
}
|
|
status = "failed"
|
|
if errMsg == "" {
|
|
errMsg = "mirror sync failed"
|
|
}
|
|
_, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_running = 0, dirty = 1, next_sync_at = ?, sync_status = ?, sync_error = ?, sync_step = 'idle', last_sync_finished_at = ?, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, now+30, status, errMsg, now, now, repoID, path)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) MarkRPMMirrorTaskDirty(repoID string, path string) error {
|
|
var now int64
|
|
var err error
|
|
now = time.Now().UTC().Unix()
|
|
_, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET dirty = 1, next_sync_at = ?, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, now, now, repoID, path)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) SetRPMMirrorSyncEnabled(repoID string, path string, enabled bool) error {
|
|
var now int64
|
|
var err error
|
|
now = time.Now().UTC().Unix()
|
|
_, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_enabled = ?, dirty = CASE WHEN ? THEN 1 ELSE dirty END, next_sync_at = CASE WHEN ? THEN ? ELSE next_sync_at END, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`,
|
|
enabled, enabled, enabled, now, now, repoID, path)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) ResetRunningRPMMirrorTasks() error {
|
|
var now int64
|
|
var err error
|
|
now = time.Now().UTC().Unix()
|
|
_, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_running = 0, dirty = 1, next_sync_at = ?, sync_status = 'failed', sync_error = 'aborted by restart', sync_step = 'idle', last_sync_finished_at = ?, updated_at = ? WHERE mode = 'mirror' AND sync_running = 1`, now+5, now, now)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) ListRPMMirrorPaths() ([]models.RPMMirrorTask, error) {
|
|
var rows *sql.Rows
|
|
var out []models.RPMMirrorTask
|
|
var item models.RPMMirrorTask
|
|
var err error
|
|
rows, err = s.DB.Query(`
|
|
SELECT r.public_id, r.path, d.path
|
|
FROM rpm_repo_dirs d
|
|
JOIN repos r ON r.id = d.repo_id
|
|
WHERE d.mode = 'mirror'`)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
err = rows.Scan(&item.RepoID, &item.RepoPath, &item.MirrorPath)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, item)
|
|
}
|
|
err = rows.Err()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (s *Store) HasRunningRPMMirrorTask(repoID string) (bool, error) {
|
|
var row *sql.Row
|
|
var count int64
|
|
var err error
|
|
row = s.DB.QueryRow(`SELECT COUNT(1) FROM rpm_repo_dirs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND mode = 'mirror' AND sync_running = 1`, repoID)
|
|
err = row.Scan(&count)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
return count > 0, nil
|
|
}
|
|
|
|
func (s *Store) CreateRPMMirrorRun(repoID string, path string, startedAt int64) (string, error) {
|
|
var id string
|
|
var err error
|
|
id, err = util.NewID()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
_, err = s.DB.Exec(`INSERT INTO rpm_mirror_runs (public_id, repo_id, path, started_at, status) VALUES (?, (SELECT id FROM repos WHERE public_id = ?), ?, ?, 'running')`, id, repoID, path, startedAt)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return id, nil
|
|
}
|
|
|
|
func (s *Store) FinishRPMMirrorRun(id string, finishedAt int64, status string, step string, total int64, done int64, failed int64, deleted int64, revision string, errMsg string) error {
|
|
var err error
|
|
_, err = s.DB.Exec(`UPDATE rpm_mirror_runs SET finished_at = ?, status = ?, step = ?, total = ?, done = ?, failed = ?, deleted = ?, revision = ?, error = ? WHERE public_id = ?`,
|
|
finishedAt, status, step, total, done, failed, deleted, revision, errMsg, id)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) ListRPMMirrorRuns(repoID string, path string, limit int) ([]models.RPMMirrorRun, error) {
|
|
var rows *sql.Rows
|
|
var out []models.RPMMirrorRun
|
|
var item models.RPMMirrorRun
|
|
var err error
|
|
if limit <= 0 {
|
|
limit = 20
|
|
}
|
|
rows, err = s.DB.Query(`SELECT m.public_id, r.public_id, m.path, m.started_at, m.finished_at, m.status, m.step, m.total, m.done, m.failed, m.deleted, m.revision, m.error
|
|
FROM rpm_mirror_runs m
|
|
JOIN repos r ON r.id = m.repo_id
|
|
WHERE r.public_id = ? AND m.path = ?
|
|
ORDER BY m.started_at DESC
|
|
LIMIT ?`, repoID, path, limit)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
for rows.Next() {
|
|
err = rows.Scan(&item.ID, &item.RepoID, &item.Path, &item.StartedAt, &item.FinishedAt, &item.Status, &item.Step, &item.Total, &item.Done, &item.Failed, &item.Deleted, &item.Revision, &item.Error)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
out = append(out, item)
|
|
}
|
|
err = rows.Err()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func (s *Store) DeleteRPMMirrorRuns(repoID string, path string) (int64, error) {
|
|
var res sql.Result
|
|
var count int64
|
|
var err error
|
|
res, err = s.DB.Exec(`DELETE FROM rpm_mirror_runs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, repoID, path)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
count, err = res.RowsAffected()
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return count, nil
|
|
}
|
|
|
|
func (s *Store) CleanupRPMMirrorRunsRetention(repoID string, path string, keepCount int, keepDays int) error {
|
|
var cutoff int64
|
|
var now int64
|
|
var err error
|
|
if keepCount <= 0 {
|
|
keepCount = 200
|
|
}
|
|
if keepDays <= 0 {
|
|
keepDays = 30
|
|
}
|
|
now = time.Now().UTC().Unix()
|
|
cutoff = now - int64(keepDays*24*60*60)
|
|
_, err = s.DB.Exec(`
|
|
DELETE FROM rpm_mirror_runs
|
|
WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?)
|
|
AND path = ?
|
|
AND started_at < ?
|
|
AND id NOT IN (
|
|
SELECT id FROM rpm_mirror_runs
|
|
WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?
|
|
ORDER BY started_at DESC
|
|
LIMIT ?
|
|
)
|
|
`, repoID, path, cutoff, repoID, path, keepCount)
|
|
return err
|
|
}
|
|
|
|
func normalizeRPMRepoMode(mode string) string {
|
|
var v string
|
|
v = strings.ToLower(strings.TrimSpace(mode))
|
|
if v == "mirror" {
|
|
return "mirror"
|
|
}
|
|
return "local"
|
|
}
|
|
|
|
func (s *Store) DeleteRPMRepoDir(repoID string, path string) error {
|
|
var err error
|
|
_, err = s.DB.Exec(`DELETE FROM rpm_repo_dirs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, repoID, path)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) DeleteRPMRepoDirSubtree(repoID string, path string) error {
|
|
var prefix string
|
|
var err error
|
|
prefix = path + "/"
|
|
_, err = s.DB.Exec(`DELETE FROM rpm_repo_dirs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND (path = ? OR path LIKE (? || '%'))`, repoID, path, prefix)
|
|
return err
|
|
}
|
|
|
|
func (s *Store) MoveRPMRepoDir(repoID string, oldPath string, newPath string) error {
|
|
var tx *sql.Tx
|
|
var now int64
|
|
var oldPrefix string
|
|
var newPrefix string
|
|
var err error
|
|
tx, err = s.DB.Begin()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
now = time.Now().UTC().Unix()
|
|
oldPrefix = oldPath + "/"
|
|
newPrefix = newPath + "/"
|
|
_, err = tx.Exec(`DELETE FROM rpm_mirror_runs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND (path = ? OR path LIKE (? || '%'))`, repoID, oldPath, oldPrefix)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return err
|
|
}
|
|
_, err = tx.Exec(`UPDATE rpm_repo_dirs SET path = ?, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, newPath, now, repoID, oldPath)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return err
|
|
}
|
|
_, err = tx.Exec(`UPDATE rpm_repo_dirs SET path = (? || SUBSTR(path, ?)), updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path LIKE (? || '%')`, newPrefix, len(oldPrefix)+1, now, repoID, oldPrefix)
|
|
if err != nil {
|
|
_ = tx.Rollback()
|
|
return err
|
|
}
|
|
err = tx.Commit()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|