package db import "database/sql" import "strings" import "time" import "codit/internal/models" import "codit/internal/util" func (s *Store) ListRPMRepoDirs(repoID string) ([]models.RPMRepoDir, error) { var rows *sql.Rows var items []models.RPMRepoDir var item models.RPMRepoDir var err error rows, err = s.DB.Query(`SELECT r.public_id, d.path, d.mode, d.allow_delete, d.remote_url, d.connect_host, d.host_header, d.tls_server_name, d.tls_insecure_skip_verify, d.sync_interval_sec, d.sync_enabled, d.dirty, d.next_sync_at, d.sync_running, d.sync_status, d.sync_error, d.sync_step, d.sync_total, d.sync_done, d.sync_failed, d.sync_deleted, d.last_sync_started_at, d.last_sync_finished_at, d.last_sync_success_at, d.last_synced_revision, d.created_at, d.updated_at FROM rpm_repo_dirs d JOIN repos r ON r.id = d.repo_id WHERE r.public_id = ? ORDER BY LENGTH(d.path), d.path`, repoID) if err != nil { return nil, err } defer rows.Close() for rows.Next() { err = rows.Scan(&item.RepoID, &item.Path, &item.Mode, &item.AllowDelete, &item.RemoteURL, &item.ConnectHost, &item.HostHeader, &item.TLSServerName, &item.TLSInsecureSkipVerify, &item.SyncIntervalSec, &item.SyncEnabled, &item.Dirty, &item.NextSyncAt, &item.SyncRunning, &item.SyncStatus, &item.SyncError, &item.SyncStep, &item.SyncTotal, &item.SyncDone, &item.SyncFailed, &item.SyncDeleted, &item.LastSyncStartedAt, &item.LastSyncFinishedAt, &item.LastSyncSuccessAt, &item.LastSyncedRevision, &item.CreatedAt, &item.UpdatedAt) if err != nil { return nil, err } items = append(items, item) } err = rows.Err() if err != nil { return nil, err } return items, nil } func (s *Store) UpsertRPMRepoDir(item models.RPMRepoDir) error { var now int64 var err error now = time.Now().UTC().Unix() if item.SyncIntervalSec <= 0 { item.SyncIntervalSec = 300 } _, err = s.DB.Exec(` INSERT INTO rpm_repo_dirs (repo_id, path, mode, allow_delete, remote_url, connect_host, host_header, tls_server_name, tls_insecure_skip_verify, sync_interval_sec, sync_enabled, dirty, next_sync_at, created_at, updated_at) VALUES ((SELECT id FROM repos WHERE public_id = ?), ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(repo_id, path) DO UPDATE SET mode = excluded.mode, allow_delete = excluded.allow_delete, remote_url = excluded.remote_url, connect_host = excluded.connect_host, host_header = excluded.host_header, tls_server_name = excluded.tls_server_name, tls_insecure_skip_verify = excluded.tls_insecure_skip_verify, sync_interval_sec = excluded.sync_interval_sec, sync_enabled = CASE WHEN excluded.mode = 'mirror' THEN excluded.sync_enabled ELSE 1 END, dirty = CASE WHEN excluded.mode = 'mirror' THEN 1 ELSE rpm_repo_dirs.dirty END, next_sync_at = CASE WHEN excluded.mode = 'mirror' THEN 0 ELSE rpm_repo_dirs.next_sync_at END, updated_at = excluded.updated_at `, item.RepoID, item.Path, item.Mode, item.AllowDelete, item.RemoteURL, item.ConnectHost, item.HostHeader, item.TLSServerName, item.TLSInsecureSkipVerify, item.SyncIntervalSec, item.SyncEnabled, normalizeRPMRepoMode(item.Mode) == "mirror", int64(0), now, now) return err } func (s *Store) GetRPMRepoDir(repoID string, path string) (models.RPMRepoDir, error) { var row *sql.Row var item models.RPMRepoDir var err error row = s.DB.QueryRow(`SELECT r.public_id, d.path, d.mode, d.allow_delete, d.remote_url, d.connect_host, d.host_header, d.tls_server_name, d.tls_insecure_skip_verify, d.sync_interval_sec, d.sync_enabled, d.dirty, d.next_sync_at, d.sync_running, d.sync_status, d.sync_error, d.sync_step, d.sync_total, d.sync_done, d.sync_failed, d.sync_deleted, d.last_sync_started_at, d.last_sync_finished_at, d.last_sync_success_at, d.last_synced_revision, d.created_at, d.updated_at FROM rpm_repo_dirs d JOIN repos r ON r.id = d.repo_id WHERE r.public_id = ? AND d.path = ?`, repoID, path) err = row.Scan(&item.RepoID, &item.Path, &item.Mode, &item.AllowDelete, &item.RemoteURL, &item.ConnectHost, &item.HostHeader, &item.TLSServerName, &item.TLSInsecureSkipVerify, &item.SyncIntervalSec, &item.SyncEnabled, &item.Dirty, &item.NextSyncAt, &item.SyncRunning, &item.SyncStatus, &item.SyncError, &item.SyncStep, &item.SyncTotal, &item.SyncDone, &item.SyncFailed, &item.SyncDeleted, &item.LastSyncStartedAt, &item.LastSyncFinishedAt, &item.LastSyncSuccessAt, &item.LastSyncedRevision, &item.CreatedAt, &item.UpdatedAt) if err != nil { return item, err } return item, nil } func (s *Store) ListDueRPMMirrorTasks(now int64, limit int) ([]models.RPMMirrorTask, error) { var rows *sql.Rows var out []models.RPMMirrorTask var item models.RPMMirrorTask var err error if limit <= 0 { limit = 10 } rows, err = s.DB.Query(` SELECT r.public_id, r.path, d.path, d.remote_url, d.connect_host, d.host_header, d.tls_server_name, d.tls_insecure_skip_verify, d.sync_interval_sec, d.dirty, d.last_synced_revision FROM rpm_repo_dirs d JOIN repos r ON r.id = d.repo_id WHERE d.mode = 'mirror' AND d.sync_enabled = 1 AND d.sync_running = 0 AND (d.dirty = 1 OR d.next_sync_at <= ? OR d.next_sync_at = 0) ORDER BY d.next_sync_at, d.updated_at LIMIT ?`, now, limit) if err != nil { return nil, err } defer rows.Close() for rows.Next() { err = rows.Scan(&item.RepoID, &item.RepoPath, &item.MirrorPath, &item.RemoteURL, &item.ConnectHost, &item.HostHeader, &item.TLSServerName, &item.TLSInsecureSkipVerify, &item.SyncIntervalSec, &item.Dirty, &item.LastSyncedRevision) if err != nil { return nil, err } out = append(out, item) } err = rows.Err() if err != nil { return nil, err } return out, nil } func (s *Store) TryStartRPMMirrorTask(repoID string, path string, now int64) (bool, error) { var res sql.Result var rows int64 var err error res, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_running = 1, sync_status = 'running', sync_error = '', sync_step = 'start', sync_total = 0, sync_done = 0, sync_failed = 0, sync_deleted = 0, last_sync_started_at = ?, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ? AND mode = 'mirror' AND sync_enabled = 1 AND sync_running = 0`, now, now, repoID, path) if err != nil { return false, err } rows, err = res.RowsAffected() if err != nil { return false, err } return rows > 0, nil } func (s *Store) UpdateRPMMirrorTaskProgress(repoID string, path string, step string, total int64, done int64, failed int64, deleted int64) error { var now int64 var err error now = time.Now().UTC().Unix() _, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_step = ?, sync_total = ?, sync_done = ?, sync_failed = ?, sync_deleted = ?, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, step, total, done, failed, deleted, now, repoID, path) return err } func (s *Store) FinishRPMMirrorTask(repoID string, path string, success bool, revision string, errMsg string) error { var now int64 var status string var nextSync int64 var interval int64 var row *sql.Row var err error row = s.DB.QueryRow(`SELECT sync_interval_sec FROM rpm_repo_dirs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, repoID, path) err = row.Scan(&interval) if err != nil { return err } if interval <= 0 { interval = 300 } now = time.Now().UTC().Unix() nextSync = now + interval if success { status = "success" _, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_running = 0, dirty = 0, next_sync_at = ?, sync_status = ?, sync_error = '', sync_step = 'idle', last_sync_finished_at = ?, last_sync_success_at = ?, last_synced_revision = ?, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, nextSync, status, now, now, revision, now, repoID, path) return err } status = "failed" if errMsg == "" { errMsg = "mirror sync failed" } _, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_running = 0, dirty = 1, next_sync_at = ?, sync_status = ?, sync_error = ?, sync_step = 'idle', last_sync_finished_at = ?, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, now+30, status, errMsg, now, now, repoID, path) return err } func (s *Store) MarkRPMMirrorTaskDirty(repoID string, path string) error { var now int64 var err error now = time.Now().UTC().Unix() _, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET dirty = 1, next_sync_at = ?, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, now, now, repoID, path) return err } func (s *Store) SetRPMMirrorSyncEnabled(repoID string, path string, enabled bool) error { var now int64 var err error now = time.Now().UTC().Unix() _, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_enabled = ?, dirty = CASE WHEN ? THEN 1 ELSE dirty END, next_sync_at = CASE WHEN ? THEN ? ELSE next_sync_at END, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, enabled, enabled, enabled, now, now, repoID, path) return err } func (s *Store) ResetRunningRPMMirrorTasks() error { var now int64 var err error now = time.Now().UTC().Unix() _, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_running = 0, dirty = 1, next_sync_at = ?, sync_status = 'failed', sync_error = 'aborted by restart', sync_step = 'idle', last_sync_finished_at = ?, updated_at = ? WHERE mode = 'mirror' AND sync_running = 1`, now+5, now, now) return err } func (s *Store) ListRPMMirrorPaths() ([]models.RPMMirrorTask, error) { var rows *sql.Rows var out []models.RPMMirrorTask var item models.RPMMirrorTask var err error rows, err = s.DB.Query(` SELECT r.public_id, r.path, d.path FROM rpm_repo_dirs d JOIN repos r ON r.id = d.repo_id WHERE d.mode = 'mirror'`) if err != nil { return nil, err } defer rows.Close() for rows.Next() { err = rows.Scan(&item.RepoID, &item.RepoPath, &item.MirrorPath) if err != nil { return nil, err } out = append(out, item) } err = rows.Err() if err != nil { return nil, err } return out, nil } func (s *Store) HasRunningRPMMirrorTask(repoID string) (bool, error) { var row *sql.Row var count int64 var err error row = s.DB.QueryRow(`SELECT COUNT(1) FROM rpm_repo_dirs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND mode = 'mirror' AND sync_running = 1`, repoID) err = row.Scan(&count) if err != nil { return false, err } return count > 0, nil } func (s *Store) CreateRPMMirrorRun(repoID string, path string, startedAt int64) (string, error) { var id string var err error id, err = util.NewID() if err != nil { return "", err } _, err = s.DB.Exec(`INSERT INTO rpm_mirror_runs (public_id, repo_id, path, started_at, status) VALUES (?, (SELECT id FROM repos WHERE public_id = ?), ?, ?, 'running')`, id, repoID, path, startedAt) if err != nil { return "", err } return id, nil } func (s *Store) FinishRPMMirrorRun(id string, finishedAt int64, status string, step string, total int64, done int64, failed int64, deleted int64, revision string, errMsg string) error { var err error _, err = s.DB.Exec(`UPDATE rpm_mirror_runs SET finished_at = ?, status = ?, step = ?, total = ?, done = ?, failed = ?, deleted = ?, revision = ?, error = ? WHERE public_id = ?`, finishedAt, status, step, total, done, failed, deleted, revision, errMsg, id) return err } func (s *Store) ListRPMMirrorRuns(repoID string, path string, limit int) ([]models.RPMMirrorRun, error) { var rows *sql.Rows var out []models.RPMMirrorRun var item models.RPMMirrorRun var err error if limit <= 0 { limit = 20 } rows, err = s.DB.Query(`SELECT m.public_id, r.public_id, m.path, m.started_at, m.finished_at, m.status, m.step, m.total, m.done, m.failed, m.deleted, m.revision, m.error FROM rpm_mirror_runs m JOIN repos r ON r.id = m.repo_id WHERE r.public_id = ? AND m.path = ? ORDER BY m.started_at DESC LIMIT ?`, repoID, path, limit) if err != nil { return nil, err } defer rows.Close() for rows.Next() { err = rows.Scan(&item.ID, &item.RepoID, &item.Path, &item.StartedAt, &item.FinishedAt, &item.Status, &item.Step, &item.Total, &item.Done, &item.Failed, &item.Deleted, &item.Revision, &item.Error) if err != nil { return nil, err } out = append(out, item) } err = rows.Err() if err != nil { return nil, err } return out, nil } func (s *Store) DeleteRPMMirrorRuns(repoID string, path string) (int64, error) { var res sql.Result var count int64 var err error res, err = s.DB.Exec(`DELETE FROM rpm_mirror_runs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, repoID, path) if err != nil { return 0, err } count, err = res.RowsAffected() if err != nil { return 0, err } return count, nil } func (s *Store) CleanupRPMMirrorRunsRetention(repoID string, path string, keepCount int, keepDays int) error { var cutoff int64 var now int64 var err error if keepCount <= 0 { keepCount = 200 } if keepDays <= 0 { keepDays = 30 } now = time.Now().UTC().Unix() cutoff = now - int64(keepDays*24*60*60) _, err = s.DB.Exec(` DELETE FROM rpm_mirror_runs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ? AND started_at < ? AND id NOT IN ( SELECT id FROM rpm_mirror_runs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ? ORDER BY started_at DESC LIMIT ? ) `, repoID, path, cutoff, repoID, path, keepCount) return err } func normalizeRPMRepoMode(mode string) string { var v string v = strings.ToLower(strings.TrimSpace(mode)) if v == "mirror" { return "mirror" } return "local" } func (s *Store) DeleteRPMRepoDir(repoID string, path string) error { var err error _, err = s.DB.Exec(`DELETE FROM rpm_repo_dirs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, repoID, path) return err } func (s *Store) DeleteRPMRepoDirSubtree(repoID string, path string) error { var prefix string var err error prefix = path + "/" _, err = s.DB.Exec(`DELETE FROM rpm_repo_dirs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND (path = ? OR path LIKE (? || '%'))`, repoID, path, prefix) return err } func (s *Store) MoveRPMRepoDir(repoID string, oldPath string, newPath string) error { var tx *sql.Tx var now int64 var oldPrefix string var newPrefix string var err error tx, err = s.DB.Begin() if err != nil { return err } now = time.Now().UTC().Unix() oldPrefix = oldPath + "/" newPrefix = newPath + "/" _, err = tx.Exec(`DELETE FROM rpm_mirror_runs WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND (path = ? OR path LIKE (? || '%'))`, repoID, oldPath, oldPrefix) if err != nil { _ = tx.Rollback() return err } _, err = tx.Exec(`UPDATE rpm_repo_dirs SET path = ?, updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path = ?`, newPath, now, repoID, oldPath) if err != nil { _ = tx.Rollback() return err } _, err = tx.Exec(`UPDATE rpm_repo_dirs SET path = (? || SUBSTR(path, ?)), updated_at = ? WHERE repo_id = (SELECT id FROM repos WHERE public_id = ?) AND path LIKE (? || '%')`, newPrefix, len(oldPrefix)+1, now, repoID, oldPrefix) if err != nil { _ = tx.Rollback() return err } err = tx.Commit() if err != nil { return err } return nil }