Compare commits
2 Commits
bf80ad9d61
...
32730fc2a7
| Author | SHA1 | Date | |
|---|---|---|---|
| 32730fc2a7 | |||
| c4e48b6dc3 |
@@ -267,7 +267,9 @@ func main() {
|
||||
rpmBase = filepath.Join(cfg.DataDir, "rpm")
|
||||
dockerBase = filepath.Join(cfg.DataDir, "docker")
|
||||
var rpmMeta *rpm.MetaManager
|
||||
var rpmMirror *rpm.MirrorManager
|
||||
rpmMeta = rpm.NewMetaManager()
|
||||
rpmMirror = rpm.NewMirrorManager(store, logger, rpmMeta)
|
||||
var uploadStore storage.FileStore
|
||||
uploadStore = storage.FileStore{BaseDir: filepath.Join(cfg.DataDir, "uploads")}
|
||||
err = os.MkdirAll(rpmBase, 0o755)
|
||||
@@ -290,6 +292,7 @@ func main() {
|
||||
Uploads: uploadStore,
|
||||
Logger: logger,
|
||||
}
|
||||
rpmMirror.Start()
|
||||
|
||||
var graphqlHandler http.Handler
|
||||
graphqlHandler, err = handlers.NewGraphQL(store)
|
||||
@@ -444,7 +447,13 @@ func main() {
|
||||
router.Handle("GET", "/api/repos/:id/rpm/packages", api.RepoRPMPackages)
|
||||
router.Handle("GET", "/api/repos/:id/rpm/package", api.RepoRPMPackage)
|
||||
router.Handle("POST", "/api/repos/:id/rpm/subdirs", api.RepoRPMCreateSubdir)
|
||||
router.Handle("GET", "/api/repos/:id/rpm/subdir", api.RepoRPMGetSubdir)
|
||||
router.Handle("POST", "/api/repos/:id/rpm/subdir/rename", api.RepoRPMRenameSubdir)
|
||||
router.Handle("POST", "/api/repos/:id/rpm/subdir/sync", api.RepoRPMSyncSubdir)
|
||||
router.Handle("POST", "/api/repos/:id/rpm/subdir/suspend", api.RepoRPMSuspendSubdir)
|
||||
router.Handle("POST", "/api/repos/:id/rpm/subdir/resume", api.RepoRPMResumeSubdir)
|
||||
router.Handle("GET", "/api/repos/:id/rpm/subdir/runs", api.RepoRPMMirrorRuns)
|
||||
router.Handle("DELETE", "/api/repos/:id/rpm/subdir/runs", api.RepoRPMClearMirrorRuns)
|
||||
router.Handle("DELETE", "/api/repos/:id/rpm/subdir", api.RepoRPMDeleteSubdir)
|
||||
router.Handle("DELETE", "/api/repos/:id/rpm/file", api.RepoRPMDeleteFile)
|
||||
router.Handle("GET", "/api/repos/:id/rpm/file", api.RepoRPMFile)
|
||||
@@ -508,7 +517,8 @@ func main() {
|
||||
|
||||
err = extraListenerManager.Start()
|
||||
if err != nil {
|
||||
log.Fatalf("additional listener manager error: %v", err)
|
||||
logger.Write("", util.LOG_ERROR, "additional listener manager error: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
mainEndpoints, err = buildListenerEndpoints("main", tlsSettingsFromConfig(cfg), defaultListenerPolicy(), store)
|
||||
if err != nil {
|
||||
@@ -628,7 +638,17 @@ func (m *additionalListenerManager) ListenerEndpointCounts() map[string]int {
|
||||
|
||||
func (m *additionalListenerManager) Start() error {
|
||||
var err error
|
||||
var i int
|
||||
for i = 0; i < 30; i++ {
|
||||
err = m.reconcile()
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
if !isSQLiteBusyError(err) {
|
||||
return err
|
||||
}
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -644,6 +664,15 @@ func (m *additionalListenerManager) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func isSQLiteBusyError(err error) bool {
|
||||
var msg string
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
msg = strings.ToLower(err.Error())
|
||||
return strings.Contains(msg, "database is locked") || strings.Contains(msg, "sqlite_busy")
|
||||
}
|
||||
|
||||
func (m *additionalListenerManager) reconcile() error {
|
||||
var listeners []models.TLSListener
|
||||
var desired map[string]listenerEndpoint
|
||||
@@ -692,13 +721,14 @@ func (m *additionalListenerManager) reconcile() error {
|
||||
desired[key] = more[j]
|
||||
}
|
||||
}
|
||||
|
||||
m.mu.Lock()
|
||||
|
||||
// among the running listeners, stop all disabled/invalid listeners.
|
||||
for key, running = range m.Running {
|
||||
var exists bool
|
||||
_, exists = desired[key]
|
||||
if exists {
|
||||
continue
|
||||
}
|
||||
if exists { continue } // found in configuration
|
||||
shutdownCtx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
||||
_ = running.Server.Shutdown(shutdownCtx)
|
||||
cancel()
|
||||
@@ -707,16 +737,17 @@ func (m *additionalListenerManager) reconcile() error {
|
||||
m.Logger.Write("", util.LOG_INFO, "additional listener stopped name=%s addr=%s", running.Endpoint.Name, running.Endpoint.Addr)
|
||||
}
|
||||
}
|
||||
|
||||
// start the listeners if they are not already running
|
||||
for key, endpoint = range desired {
|
||||
var exists bool
|
||||
_, exists = m.Running[key]
|
||||
if exists {
|
||||
continue
|
||||
}
|
||||
if exists { continue } // already running
|
||||
running = m.startEndpoint(endpoint)
|
||||
m.Running[key] = running
|
||||
}
|
||||
m.mu.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -14,8 +14,10 @@ type Store struct {
|
||||
|
||||
func Open(driver, dsn string) (*Store, error) {
|
||||
var db *sql.DB
|
||||
var drv string
|
||||
var err error
|
||||
db, err = sql.Open(driverName(driver), dsn)
|
||||
drv = driverName(driver)
|
||||
db, err = sql.Open(drv, dsn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -23,6 +25,13 @@ func Open(driver, dsn string) (*Store, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if drv == "sqlite" {
|
||||
_, err = db.Exec(`PRAGMA busy_timeout = 10000`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
_, _ = db.Exec(`PRAGMA journal_mode = WAL`)
|
||||
}
|
||||
return &Store{DB: db}, nil
|
||||
}
|
||||
|
||||
|
||||
366
backend/internal/db/rpm_dirs.go
Normal file
366
backend/internal/db/rpm_dirs.go
Normal file
@@ -0,0 +1,366 @@
|
||||
package db
|
||||
|
||||
import "database/sql"
|
||||
import "strings"
|
||||
import "time"
|
||||
|
||||
import "codit/internal/models"
|
||||
import "codit/internal/util"
|
||||
|
||||
func (s *Store) ListRPMRepoDirs(repoID string) ([]models.RPMRepoDir, error) {
|
||||
var rows *sql.Rows
|
||||
var items []models.RPMRepoDir
|
||||
var item models.RPMRepoDir
|
||||
var err error
|
||||
rows, err = s.DB.Query(`SELECT repo_id, path, mode, remote_url, connect_host, host_header, tls_server_name, tls_insecure_skip_verify, sync_interval_sec, sync_enabled, dirty, next_sync_at, sync_running, sync_status, sync_error, sync_step, sync_total, sync_done, sync_failed, sync_deleted, last_sync_started_at, last_sync_finished_at, last_sync_success_at, last_synced_revision, created_at, updated_at FROM rpm_repo_dirs WHERE repo_id = ? ORDER BY LENGTH(path), path`, repoID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
err = rows.Scan(&item.RepoID, &item.Path, &item.Mode, &item.RemoteURL, &item.ConnectHost, &item.HostHeader, &item.TLSServerName, &item.TLSInsecureSkipVerify, &item.SyncIntervalSec, &item.SyncEnabled, &item.Dirty, &item.NextSyncAt, &item.SyncRunning, &item.SyncStatus, &item.SyncError, &item.SyncStep, &item.SyncTotal, &item.SyncDone, &item.SyncFailed, &item.SyncDeleted, &item.LastSyncStartedAt, &item.LastSyncFinishedAt, &item.LastSyncSuccessAt, &item.LastSyncedRevision, &item.CreatedAt, &item.UpdatedAt)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
items = append(items, item)
|
||||
}
|
||||
err = rows.Err()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return items, nil
|
||||
}
|
||||
|
||||
func (s *Store) UpsertRPMRepoDir(item models.RPMRepoDir) error {
|
||||
var now int64
|
||||
var err error
|
||||
now = time.Now().UTC().Unix()
|
||||
if item.SyncIntervalSec <= 0 {
|
||||
item.SyncIntervalSec = 300
|
||||
}
|
||||
_, err = s.DB.Exec(`
|
||||
INSERT INTO rpm_repo_dirs (repo_id, path, mode, remote_url, connect_host, host_header, tls_server_name, tls_insecure_skip_verify, sync_interval_sec, sync_enabled, dirty, next_sync_at, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
ON CONFLICT(repo_id, path) DO UPDATE SET
|
||||
mode = excluded.mode,
|
||||
remote_url = excluded.remote_url,
|
||||
connect_host = excluded.connect_host,
|
||||
host_header = excluded.host_header,
|
||||
tls_server_name = excluded.tls_server_name,
|
||||
tls_insecure_skip_verify = excluded.tls_insecure_skip_verify,
|
||||
sync_interval_sec = excluded.sync_interval_sec,
|
||||
sync_enabled = CASE WHEN excluded.mode = 'mirror' THEN excluded.sync_enabled ELSE 1 END,
|
||||
dirty = CASE WHEN excluded.mode = 'mirror' THEN 1 ELSE rpm_repo_dirs.dirty END,
|
||||
next_sync_at = CASE WHEN excluded.mode = 'mirror' THEN 0 ELSE rpm_repo_dirs.next_sync_at END,
|
||||
updated_at = excluded.updated_at
|
||||
`,
|
||||
item.RepoID,
|
||||
item.Path,
|
||||
item.Mode,
|
||||
item.RemoteURL,
|
||||
item.ConnectHost,
|
||||
item.HostHeader,
|
||||
item.TLSServerName,
|
||||
item.TLSInsecureSkipVerify,
|
||||
item.SyncIntervalSec,
|
||||
item.SyncEnabled,
|
||||
normalizeRPMRepoMode(item.Mode) == "mirror",
|
||||
int64(0),
|
||||
now,
|
||||
now)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) GetRPMRepoDir(repoID string, path string) (models.RPMRepoDir, error) {
|
||||
var row *sql.Row
|
||||
var item models.RPMRepoDir
|
||||
var err error
|
||||
row = s.DB.QueryRow(`SELECT repo_id, path, mode, remote_url, connect_host, host_header, tls_server_name, tls_insecure_skip_verify, sync_interval_sec, sync_enabled, dirty, next_sync_at, sync_running, sync_status, sync_error, sync_step, sync_total, sync_done, sync_failed, sync_deleted, last_sync_started_at, last_sync_finished_at, last_sync_success_at, last_synced_revision, created_at, updated_at FROM rpm_repo_dirs WHERE repo_id = ? AND path = ?`, repoID, path)
|
||||
err = row.Scan(&item.RepoID, &item.Path, &item.Mode, &item.RemoteURL, &item.ConnectHost, &item.HostHeader, &item.TLSServerName, &item.TLSInsecureSkipVerify, &item.SyncIntervalSec, &item.SyncEnabled, &item.Dirty, &item.NextSyncAt, &item.SyncRunning, &item.SyncStatus, &item.SyncError, &item.SyncStep, &item.SyncTotal, &item.SyncDone, &item.SyncFailed, &item.SyncDeleted, &item.LastSyncStartedAt, &item.LastSyncFinishedAt, &item.LastSyncSuccessAt, &item.LastSyncedRevision, &item.CreatedAt, &item.UpdatedAt)
|
||||
if err != nil {
|
||||
return item, err
|
||||
}
|
||||
return item, nil
|
||||
}
|
||||
|
||||
func (s *Store) ListDueRPMMirrorTasks(now int64, limit int) ([]models.RPMMirrorTask, error) {
|
||||
var rows *sql.Rows
|
||||
var out []models.RPMMirrorTask
|
||||
var item models.RPMMirrorTask
|
||||
var err error
|
||||
if limit <= 0 {
|
||||
limit = 10
|
||||
}
|
||||
rows, err = s.DB.Query(`
|
||||
SELECT d.repo_id, r.path, d.path, d.remote_url, d.connect_host, d.host_header, d.tls_server_name, d.tls_insecure_skip_verify, d.sync_interval_sec, d.dirty, d.last_synced_revision
|
||||
FROM rpm_repo_dirs d
|
||||
JOIN repos r ON r.id = d.repo_id
|
||||
WHERE d.mode = 'mirror' AND d.sync_enabled = 1 AND d.sync_running = 0 AND (d.dirty = 1 OR d.next_sync_at <= ? OR d.next_sync_at = 0)
|
||||
ORDER BY d.next_sync_at, d.updated_at
|
||||
LIMIT ?`, now, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
err = rows.Scan(&item.RepoID, &item.RepoPath, &item.MirrorPath, &item.RemoteURL, &item.ConnectHost, &item.HostHeader, &item.TLSServerName, &item.TLSInsecureSkipVerify, &item.SyncIntervalSec, &item.Dirty, &item.LastSyncedRevision)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, item)
|
||||
}
|
||||
err = rows.Err()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *Store) TryStartRPMMirrorTask(repoID string, path string, now int64) (bool, error) {
|
||||
var res sql.Result
|
||||
var rows int64
|
||||
var err error
|
||||
res, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_running = 1, sync_status = 'running', sync_error = '', sync_step = 'start', sync_total = 0, sync_done = 0, sync_failed = 0, sync_deleted = 0, last_sync_started_at = ?, updated_at = ? WHERE repo_id = ? AND path = ? AND mode = 'mirror' AND sync_running = 0`, now, now, repoID, path)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
rows, err = res.RowsAffected()
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return rows > 0, nil
|
||||
}
|
||||
|
||||
func (s *Store) UpdateRPMMirrorTaskProgress(repoID string, path string, step string, total int64, done int64, failed int64, deleted int64) error {
|
||||
var now int64
|
||||
var err error
|
||||
now = time.Now().UTC().Unix()
|
||||
_, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_step = ?, sync_total = ?, sync_done = ?, sync_failed = ?, sync_deleted = ?, updated_at = ? WHERE repo_id = ? AND path = ?`, step, total, done, failed, deleted, now, repoID, path)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) FinishRPMMirrorTask(repoID string, path string, success bool, revision string, errMsg string) error {
|
||||
var now int64
|
||||
var status string
|
||||
var nextSync int64
|
||||
var interval int64
|
||||
var row *sql.Row
|
||||
var err error
|
||||
row = s.DB.QueryRow(`SELECT sync_interval_sec FROM rpm_repo_dirs WHERE repo_id = ? AND path = ?`, repoID, path)
|
||||
err = row.Scan(&interval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if interval <= 0 {
|
||||
interval = 300
|
||||
}
|
||||
now = time.Now().UTC().Unix()
|
||||
nextSync = now + interval
|
||||
if success {
|
||||
status = "success"
|
||||
_, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_running = 0, dirty = 0, next_sync_at = ?, sync_status = ?, sync_error = '', sync_step = 'idle', last_sync_finished_at = ?, last_sync_success_at = ?, last_synced_revision = ?, updated_at = ? WHERE repo_id = ? AND path = ?`, nextSync, status, now, now, revision, now, repoID, path)
|
||||
return err
|
||||
}
|
||||
status = "failed"
|
||||
if errMsg == "" {
|
||||
errMsg = "mirror sync failed"
|
||||
}
|
||||
_, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_running = 0, dirty = 1, next_sync_at = ?, sync_status = ?, sync_error = ?, sync_step = 'idle', last_sync_finished_at = ?, updated_at = ? WHERE repo_id = ? AND path = ?`, now+30, status, errMsg, now, now, repoID, path)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) MarkRPMMirrorTaskDirty(repoID string, path string) error {
|
||||
var now int64
|
||||
var err error
|
||||
now = time.Now().UTC().Unix()
|
||||
_, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET dirty = 1, next_sync_at = ?, updated_at = ? WHERE repo_id = ? AND path = ?`, now, now, repoID, path)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) SetRPMMirrorSyncEnabled(repoID string, path string, enabled bool) error {
|
||||
var now int64
|
||||
var err error
|
||||
now = time.Now().UTC().Unix()
|
||||
_, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_enabled = ?, dirty = CASE WHEN ? THEN 1 ELSE dirty END, next_sync_at = CASE WHEN ? THEN ? ELSE next_sync_at END, updated_at = ? WHERE repo_id = ? AND path = ?`,
|
||||
enabled, enabled, enabled, now, now, repoID, path)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) ResetRunningRPMMirrorTasks() error {
|
||||
var now int64
|
||||
var err error
|
||||
now = time.Now().UTC().Unix()
|
||||
_, err = s.DB.Exec(`UPDATE rpm_repo_dirs SET sync_running = 0, dirty = 1, next_sync_at = ?, sync_status = 'failed', sync_error = 'aborted by restart', sync_step = 'idle', last_sync_finished_at = ?, updated_at = ? WHERE mode = 'mirror' AND sync_running = 1`, now+5, now, now)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) ListRPMMirrorPaths() ([]models.RPMMirrorTask, error) {
|
||||
var rows *sql.Rows
|
||||
var out []models.RPMMirrorTask
|
||||
var item models.RPMMirrorTask
|
||||
var err error
|
||||
rows, err = s.DB.Query(`
|
||||
SELECT d.repo_id, r.path, d.path
|
||||
FROM rpm_repo_dirs d
|
||||
JOIN repos r ON r.id = d.repo_id
|
||||
WHERE d.mode = 'mirror'`)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
err = rows.Scan(&item.RepoID, &item.RepoPath, &item.MirrorPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, item)
|
||||
}
|
||||
err = rows.Err()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *Store) CreateRPMMirrorRun(repoID string, path string, startedAt int64) (string, error) {
|
||||
var id string
|
||||
var err error
|
||||
id, err = util.NewID()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
_, err = s.DB.Exec(`INSERT INTO rpm_mirror_runs (id, repo_id, path, started_at, status) VALUES (?, ?, ?, ?, 'running')`, id, repoID, path, startedAt)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (s *Store) FinishRPMMirrorRun(id string, finishedAt int64, status string, step string, total int64, done int64, failed int64, deleted int64, revision string, errMsg string) error {
|
||||
var err error
|
||||
_, err = s.DB.Exec(`UPDATE rpm_mirror_runs SET finished_at = ?, status = ?, step = ?, total = ?, done = ?, failed = ?, deleted = ?, revision = ?, error = ? WHERE id = ?`,
|
||||
finishedAt, status, step, total, done, failed, deleted, revision, errMsg, id)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) ListRPMMirrorRuns(repoID string, path string, limit int) ([]models.RPMMirrorRun, error) {
|
||||
var rows *sql.Rows
|
||||
var out []models.RPMMirrorRun
|
||||
var item models.RPMMirrorRun
|
||||
var err error
|
||||
if limit <= 0 {
|
||||
limit = 20
|
||||
}
|
||||
rows, err = s.DB.Query(`SELECT id, repo_id, path, started_at, finished_at, status, step, total, done, failed, deleted, revision, error FROM rpm_mirror_runs WHERE repo_id = ? AND path = ? ORDER BY started_at DESC LIMIT ?`, repoID, path, limit)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
err = rows.Scan(&item.ID, &item.RepoID, &item.Path, &item.StartedAt, &item.FinishedAt, &item.Status, &item.Step, &item.Total, &item.Done, &item.Failed, &item.Deleted, &item.Revision, &item.Error)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, item)
|
||||
}
|
||||
err = rows.Err()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (s *Store) DeleteRPMMirrorRuns(repoID string, path string) (int64, error) {
|
||||
var res sql.Result
|
||||
var count int64
|
||||
var err error
|
||||
res, err = s.DB.Exec(`DELETE FROM rpm_mirror_runs WHERE repo_id = ? AND path = ?`, repoID, path)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
count, err = res.RowsAffected()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return count, nil
|
||||
}
|
||||
|
||||
func (s *Store) CleanupRPMMirrorRunsRetention(repoID string, path string, keepCount int, keepDays int) error {
|
||||
var cutoff int64
|
||||
var now int64
|
||||
var err error
|
||||
if keepCount <= 0 {
|
||||
keepCount = 200
|
||||
}
|
||||
if keepDays <= 0 {
|
||||
keepDays = 30
|
||||
}
|
||||
now = time.Now().UTC().Unix()
|
||||
cutoff = now - int64(keepDays*24*60*60)
|
||||
_, err = s.DB.Exec(`
|
||||
DELETE FROM rpm_mirror_runs
|
||||
WHERE repo_id = ?
|
||||
AND path = ?
|
||||
AND started_at < ?
|
||||
AND id NOT IN (
|
||||
SELECT id FROM rpm_mirror_runs
|
||||
WHERE repo_id = ? AND path = ?
|
||||
ORDER BY started_at DESC
|
||||
LIMIT ?
|
||||
)
|
||||
`, repoID, path, cutoff, repoID, path, keepCount)
|
||||
return err
|
||||
}
|
||||
|
||||
func normalizeRPMRepoMode(mode string) string {
|
||||
var v string
|
||||
v = strings.ToLower(strings.TrimSpace(mode))
|
||||
if v == "mirror" {
|
||||
return "mirror"
|
||||
}
|
||||
return "local"
|
||||
}
|
||||
|
||||
func (s *Store) DeleteRPMRepoDir(repoID string, path string) error {
|
||||
var err error
|
||||
_, err = s.DB.Exec(`DELETE FROM rpm_repo_dirs WHERE repo_id = ? AND path = ?`, repoID, path)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) DeleteRPMRepoDirSubtree(repoID string, path string) error {
|
||||
var prefix string
|
||||
var err error
|
||||
prefix = path + "/"
|
||||
_, err = s.DB.Exec(`DELETE FROM rpm_repo_dirs WHERE repo_id = ? AND (path = ? OR path LIKE (? || '%'))`, repoID, path, prefix)
|
||||
return err
|
||||
}
|
||||
|
||||
func (s *Store) MoveRPMRepoDir(repoID string, oldPath string, newPath string) error {
|
||||
var tx *sql.Tx
|
||||
var now int64
|
||||
var oldPrefix string
|
||||
var newPrefix string
|
||||
var err error
|
||||
tx, err = s.DB.Begin()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
now = time.Now().UTC().Unix()
|
||||
_, err = tx.Exec(`UPDATE rpm_repo_dirs SET path = ?, updated_at = ? WHERE repo_id = ? AND path = ?`, newPath, now, repoID, oldPath)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return err
|
||||
}
|
||||
oldPrefix = oldPath + "/"
|
||||
newPrefix = newPath + "/"
|
||||
_, err = tx.Exec(`UPDATE rpm_repo_dirs SET path = (? || SUBSTR(path, ?)), updated_at = ? WHERE repo_id = ? AND path LIKE (? || '%')`, newPrefix, len(oldPrefix)+1, now, repoID, oldPrefix)
|
||||
if err != nil {
|
||||
_ = tx.Rollback()
|
||||
return err
|
||||
}
|
||||
err = tx.Commit()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
package handlers
|
||||
|
||||
import "crypto/rand"
|
||||
import "database/sql"
|
||||
import "encoding/hex"
|
||||
import "errors"
|
||||
import "io"
|
||||
@@ -222,11 +223,25 @@ type repoRPMSubdirRequest struct {
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Parent string `json:"parent"`
|
||||
Mode string `json:"mode"`
|
||||
RemoteURL string `json:"remote_url"`
|
||||
ConnectHost string `json:"connect_host"`
|
||||
HostHeader string `json:"host_header"`
|
||||
TLSServerName string `json:"tls_server_name"`
|
||||
TLSInsecureSkipVerify bool `json:"tls_insecure_skip_verify"`
|
||||
SyncIntervalSec int64 `json:"sync_interval_sec"`
|
||||
}
|
||||
|
||||
type repoRPMRenameRequest struct {
|
||||
Path string `json:"path"`
|
||||
Name string `json:"name"`
|
||||
Mode string `json:"mode"`
|
||||
RemoteURL string `json:"remote_url"`
|
||||
ConnectHost string `json:"connect_host"`
|
||||
HostHeader string `json:"host_header"`
|
||||
TLSServerName string `json:"tls_server_name"`
|
||||
TLSInsecureSkipVerify bool `json:"tls_insecure_skip_verify"`
|
||||
SyncIntervalSec int64 `json:"sync_interval_sec"`
|
||||
}
|
||||
|
||||
type createAPIKeyRequest struct {
|
||||
@@ -2668,10 +2683,14 @@ func (api *API) RepoTypes(w http.ResponseWriter, r *http.Request, _ map[string]s
|
||||
|
||||
func (api *API) RepoRPMCreateSubdir(w http.ResponseWriter, r *http.Request, params map[string]string) {
|
||||
var repo models.Repo
|
||||
var dirConfig models.RPMRepoDir
|
||||
var writeBlocked bool
|
||||
var writeBlockedPath string
|
||||
var err error
|
||||
var req repoRPMSubdirRequest
|
||||
var name string
|
||||
var dirType string
|
||||
var mode string
|
||||
var parent string
|
||||
var parentPath string
|
||||
var fullPath string
|
||||
@@ -2726,6 +2745,19 @@ func (api *API) RepoRPMCreateSubdir(w http.ResponseWriter, r *http.Request, para
|
||||
}
|
||||
parentPath = filepath.FromSlash(parent)
|
||||
}
|
||||
parent = filepath.ToSlash(parentPath)
|
||||
if parent == "." {
|
||||
parent = ""
|
||||
}
|
||||
writeBlocked, writeBlockedPath, err = api.isRPMWriteBlocked(repo, parent)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
if writeBlocked {
|
||||
WriteJSON(w, http.StatusForbidden, map[string]string{"error": "writes are disabled for mirror repo subtree", "mirror_root": writeBlockedPath})
|
||||
return
|
||||
}
|
||||
fullRel = filepath.ToSlash(filepath.Join(parentPath, name))
|
||||
fullRelLower = strings.ToLower(fullRel)
|
||||
if fullRelLower == "repodata" || strings.HasPrefix(fullRelLower, "repodata/") || strings.Contains(fullRelLower, "/repodata/") {
|
||||
@@ -2733,6 +2765,7 @@ func (api *API) RepoRPMCreateSubdir(w http.ResponseWriter, r *http.Request, para
|
||||
return
|
||||
}
|
||||
if dirType == "repo" {
|
||||
mode = normalizeRPMRepoDirMode(req.Mode)
|
||||
absParent = filepath.Join(repo.Path, parentPath)
|
||||
hasRepoAncestor, err = hasRepodataAncestor(repo.Path, absParent)
|
||||
if err != nil {
|
||||
@@ -2743,6 +2776,11 @@ func (api *API) RepoRPMCreateSubdir(w http.ResponseWriter, r *http.Request, para
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "repo directories cannot be created under another repo directory"})
|
||||
return
|
||||
}
|
||||
err = validateRPMMirrorConfig(mode, strings.TrimSpace(req.RemoteURL), strings.TrimSpace(req.ConnectHost), strings.TrimSpace(req.HostHeader), strings.TrimSpace(req.TLSServerName))
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
}
|
||||
fullPath = filepath.Join(repo.Path, parentPath, name)
|
||||
err = os.MkdirAll(fullPath, 0o755)
|
||||
@@ -2757,16 +2795,266 @@ func (api *API) RepoRPMCreateSubdir(w http.ResponseWriter, r *http.Request, para
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
dirConfig = models.RPMRepoDir{
|
||||
RepoID: repo.ID,
|
||||
Path: fullRel,
|
||||
Mode: mode,
|
||||
RemoteURL: strings.TrimSpace(req.RemoteURL),
|
||||
ConnectHost: strings.TrimSpace(req.ConnectHost),
|
||||
HostHeader: strings.TrimSpace(req.HostHeader),
|
||||
TLSServerName: strings.TrimSpace(req.TLSServerName),
|
||||
TLSInsecureSkipVerify: req.TLSInsecureSkipVerify,
|
||||
SyncIntervalSec: normalizeRPMMirrorIntervalSec(req.SyncIntervalSec),
|
||||
SyncEnabled: true,
|
||||
}
|
||||
err = api.Store.UpsertRPMRepoDir(dirConfig)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
}
|
||||
WriteJSON(w, http.StatusOK, map[string]string{"status": "ok"})
|
||||
}
|
||||
|
||||
func (api *API) RepoRPMGetSubdir(w http.ResponseWriter, r *http.Request, params map[string]string) {
|
||||
var repo models.Repo
|
||||
var relPath string
|
||||
var normalizedPath string
|
||||
var config models.RPMRepoDir
|
||||
var err error
|
||||
repo, err = api.Store.GetRepo(params["id"])
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusNotFound, map[string]string{"error": "repo not found"})
|
||||
return
|
||||
}
|
||||
if !api.requireRepoRole(w, r, repo.ID, "viewer") {
|
||||
return
|
||||
}
|
||||
if repo.Type != "rpm" {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "repo is not rpm"})
|
||||
return
|
||||
}
|
||||
relPath = strings.TrimSpace(r.URL.Query().Get("path"))
|
||||
if relPath == "" {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "path required"})
|
||||
return
|
||||
}
|
||||
if !isSafeSubdirPath(relPath) {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid path"})
|
||||
return
|
||||
}
|
||||
normalizedPath = normalizeRPMPath(relPath)
|
||||
config, err = api.Store.GetRPMRepoDir(repo.ID, normalizedPath)
|
||||
if err == nil {
|
||||
WriteJSON(w, http.StatusOK, config)
|
||||
return
|
||||
}
|
||||
if !errors.Is(err, sql.ErrNoRows) {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
WriteJSON(w, http.StatusOK, models.RPMRepoDir{RepoID: repo.ID, Path: normalizedPath, Mode: "local"})
|
||||
}
|
||||
|
||||
func (api *API) RepoRPMSyncSubdir(w http.ResponseWriter, r *http.Request, params map[string]string) {
|
||||
var repo models.Repo
|
||||
var relPath string
|
||||
var normalizedPath string
|
||||
var config models.RPMRepoDir
|
||||
var err error
|
||||
repo, err = api.Store.GetRepo(params["id"])
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusNotFound, map[string]string{"error": "repo not found"})
|
||||
return
|
||||
}
|
||||
if !api.requireRepoRole(w, r, repo.ID, "writer") {
|
||||
return
|
||||
}
|
||||
if repo.Type != "rpm" {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "repo is not rpm"})
|
||||
return
|
||||
}
|
||||
relPath = strings.TrimSpace(r.URL.Query().Get("path"))
|
||||
if relPath == "" {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "path required"})
|
||||
return
|
||||
}
|
||||
if !isSafeSubdirPath(relPath) {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid path"})
|
||||
return
|
||||
}
|
||||
normalizedPath = normalizeRPMPath(relPath)
|
||||
config, err = api.Store.GetRPMRepoDir(repo.ID, normalizedPath)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
WriteJSON(w, http.StatusNotFound, map[string]string{"error": "repo directory config not found"})
|
||||
return
|
||||
}
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
if normalizeRPMRepoDirMode(config.Mode) != "mirror" {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "sync is only supported for mirror mode"})
|
||||
return
|
||||
}
|
||||
if !config.SyncEnabled {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "mirror sync is suspended"})
|
||||
return
|
||||
}
|
||||
err = api.Store.MarkRPMMirrorTaskDirty(repo.ID, normalizedPath)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
WriteJSON(w, http.StatusOK, map[string]string{"status": "scheduled"})
|
||||
}
|
||||
|
||||
func (api *API) RepoRPMSuspendSubdir(w http.ResponseWriter, r *http.Request, params map[string]string) {
|
||||
api.repoRPMSetSyncEnabled(w, r, params, false)
|
||||
}
|
||||
|
||||
func (api *API) RepoRPMResumeSubdir(w http.ResponseWriter, r *http.Request, params map[string]string) {
|
||||
api.repoRPMSetSyncEnabled(w, r, params, true)
|
||||
}
|
||||
|
||||
func (api *API) repoRPMSetSyncEnabled(w http.ResponseWriter, r *http.Request, params map[string]string, enabled bool) {
|
||||
var repo models.Repo
|
||||
var relPath string
|
||||
var normalizedPath string
|
||||
var config models.RPMRepoDir
|
||||
var err error
|
||||
repo, err = api.Store.GetRepo(params["id"])
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusNotFound, map[string]string{"error": "repo not found"})
|
||||
return
|
||||
}
|
||||
if !api.requireRepoRole(w, r, repo.ID, "writer") {
|
||||
return
|
||||
}
|
||||
if repo.Type != "rpm" {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "repo is not rpm"})
|
||||
return
|
||||
}
|
||||
relPath = strings.TrimSpace(r.URL.Query().Get("path"))
|
||||
if relPath == "" {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "path required"})
|
||||
return
|
||||
}
|
||||
if !isSafeSubdirPath(relPath) {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid path"})
|
||||
return
|
||||
}
|
||||
normalizedPath = normalizeRPMPath(relPath)
|
||||
config, err = api.Store.GetRPMRepoDir(repo.ID, normalizedPath)
|
||||
if err != nil {
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
WriteJSON(w, http.StatusNotFound, map[string]string{"error": "repo directory config not found"})
|
||||
return
|
||||
}
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
if normalizeRPMRepoDirMode(config.Mode) != "mirror" {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "sync control is only supported for mirror mode"})
|
||||
return
|
||||
}
|
||||
err = api.Store.SetRPMMirrorSyncEnabled(repo.ID, normalizedPath, enabled)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
WriteJSON(w, http.StatusOK, map[string]any{"status": "ok", "sync_enabled": enabled})
|
||||
}
|
||||
|
||||
func (api *API) RepoRPMMirrorRuns(w http.ResponseWriter, r *http.Request, params map[string]string) {
|
||||
var repo models.Repo
|
||||
var relPath string
|
||||
var normalizedPath string
|
||||
var limit int
|
||||
var runs []models.RPMMirrorRun
|
||||
var err error
|
||||
repo, err = api.Store.GetRepo(params["id"])
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusNotFound, map[string]string{"error": "repo not found"})
|
||||
return
|
||||
}
|
||||
if !api.requireRepoRole(w, r, repo.ID, "viewer") {
|
||||
return
|
||||
}
|
||||
if repo.Type != "rpm" {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "repo is not rpm"})
|
||||
return
|
||||
}
|
||||
relPath = strings.TrimSpace(r.URL.Query().Get("path"))
|
||||
if relPath == "" {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "path required"})
|
||||
return
|
||||
}
|
||||
if !isSafeSubdirPath(relPath) {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid path"})
|
||||
return
|
||||
}
|
||||
normalizedPath = normalizeRPMPath(relPath)
|
||||
limit = 20
|
||||
if strings.TrimSpace(r.URL.Query().Get("limit")) != "" {
|
||||
limit, err = strconv.Atoi(strings.TrimSpace(r.URL.Query().Get("limit")))
|
||||
if err != nil || limit <= 0 {
|
||||
limit = 20
|
||||
}
|
||||
}
|
||||
runs, err = api.Store.ListRPMMirrorRuns(repo.ID, normalizedPath, limit)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
WriteJSON(w, http.StatusOK, runs)
|
||||
}
|
||||
|
||||
func (api *API) RepoRPMClearMirrorRuns(w http.ResponseWriter, r *http.Request, params map[string]string) {
|
||||
var repo models.Repo
|
||||
var relPath string
|
||||
var normalizedPath string
|
||||
var count int64
|
||||
var err error
|
||||
repo, err = api.Store.GetRepo(params["id"])
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusNotFound, map[string]string{"error": "repo not found"})
|
||||
return
|
||||
}
|
||||
if !api.requireRepoRole(w, r, repo.ID, "writer") {
|
||||
return
|
||||
}
|
||||
if repo.Type != "rpm" {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "repo is not rpm"})
|
||||
return
|
||||
}
|
||||
relPath = strings.TrimSpace(r.URL.Query().Get("path"))
|
||||
if relPath == "" {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "path required"})
|
||||
return
|
||||
}
|
||||
if !isSafeSubdirPath(relPath) {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid path"})
|
||||
return
|
||||
}
|
||||
normalizedPath = normalizeRPMPath(relPath)
|
||||
count, err = api.Store.DeleteRPMMirrorRuns(repo.ID, normalizedPath)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
WriteJSON(w, http.StatusOK, map[string]any{"status": "ok", "deleted_count": count})
|
||||
}
|
||||
|
||||
func (api *API) RepoRPMDeleteSubdir(w http.ResponseWriter, r *http.Request, params map[string]string) {
|
||||
var repo models.Repo
|
||||
var writeBlocked bool
|
||||
var writeBlockedPath string
|
||||
var err error
|
||||
var relPath string
|
||||
var fullPath string
|
||||
var info os.FileInfo
|
||||
var relPathClean string
|
||||
var parentPath string
|
||||
var repodataPath string
|
||||
repo, err = api.Store.GetRepo(params["id"])
|
||||
@@ -2794,7 +3082,18 @@ func (api *API) RepoRPMDeleteSubdir(w http.ResponseWriter, r *http.Request, para
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "repodata cannot be deleted directly"})
|
||||
return
|
||||
}
|
||||
fullPath = filepath.Join(repo.Path, filepath.FromSlash(relPath))
|
||||
relPathClean = filepath.ToSlash(filepath.Clean(filepath.FromSlash(relPath)))
|
||||
relPathClean = strings.TrimPrefix(relPathClean, "./")
|
||||
writeBlocked, writeBlockedPath, err = api.isRPMWriteBlocked(repo, relPathClean)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
if writeBlocked {
|
||||
WriteJSON(w, http.StatusForbidden, map[string]string{"error": "writes are disabled for mirror repo subtree", "mirror_root": writeBlockedPath})
|
||||
return
|
||||
}
|
||||
fullPath = filepath.Join(repo.Path, filepath.FromSlash(relPathClean))
|
||||
info, err = os.Stat(fullPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
@@ -2813,6 +3112,11 @@ func (api *API) RepoRPMDeleteSubdir(w http.ResponseWriter, r *http.Request, para
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
err = api.Store.DeleteRPMRepoDirSubtree(repo.ID, relPathClean)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
parentPath = filepath.Dir(fullPath)
|
||||
repodataPath = filepath.Join(parentPath, "repodata")
|
||||
_, err = os.Stat(repodataPath)
|
||||
@@ -2824,15 +3128,24 @@ func (api *API) RepoRPMDeleteSubdir(w http.ResponseWriter, r *http.Request, para
|
||||
|
||||
func (api *API) RepoRPMRenameSubdir(w http.ResponseWriter, r *http.Request, params map[string]string) {
|
||||
var repo models.Repo
|
||||
var existingConfig models.RPMRepoDir
|
||||
var dirConfig models.RPMRepoDir
|
||||
var writeBlocked bool
|
||||
var writeBlockedPath string
|
||||
var newMode string
|
||||
var renamed bool
|
||||
var isRepoDir bool
|
||||
var err error
|
||||
var req repoRPMRenameRequest
|
||||
var relPath string
|
||||
var relPathClean string
|
||||
var newName string
|
||||
var fullPath string
|
||||
var info os.FileInfo
|
||||
var parentRel string
|
||||
var parentPath string
|
||||
var newPath string
|
||||
var newRelPath string
|
||||
var repodataPath string
|
||||
var hasAncestor bool
|
||||
var absParent string
|
||||
@@ -2875,7 +3188,18 @@ func (api *API) RepoRPMRenameSubdir(w http.ResponseWriter, r *http.Request, para
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "repodata cannot be renamed"})
|
||||
return
|
||||
}
|
||||
fullPath = filepath.Join(repo.Path, filepath.FromSlash(relPath))
|
||||
relPathClean = filepath.ToSlash(filepath.Clean(filepath.FromSlash(relPath)))
|
||||
relPathClean = strings.TrimPrefix(relPathClean, "./")
|
||||
writeBlocked, writeBlockedPath, err = api.isRPMWriteBlocked(repo, relPathClean)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
if writeBlocked && writeBlockedPath != relPathClean {
|
||||
WriteJSON(w, http.StatusForbidden, map[string]string{"error": "writes are disabled for mirror repo subtree", "mirror_root": writeBlockedPath})
|
||||
return
|
||||
}
|
||||
fullPath = filepath.Join(repo.Path, filepath.FromSlash(relPathClean))
|
||||
info, err = os.Stat(fullPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
@@ -2889,28 +3213,26 @@ func (api *API) RepoRPMRenameSubdir(w http.ResponseWriter, r *http.Request, para
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "path is not a directory"})
|
||||
return
|
||||
}
|
||||
parentRel = filepath.Dir(filepath.FromSlash(relPath))
|
||||
parentRel = filepath.Dir(filepath.FromSlash(relPathClean))
|
||||
if parentRel == "." {
|
||||
parentRel = ""
|
||||
}
|
||||
parentPath = filepath.FromSlash(parentRel)
|
||||
newPath = filepath.Join(repo.Path, parentPath, newName)
|
||||
if newPath == fullPath {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "name is unchanged"})
|
||||
return
|
||||
}
|
||||
_, err = os.Stat(newPath)
|
||||
if err == nil {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "target already exists"})
|
||||
return
|
||||
}
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
newRelPath = filepath.ToSlash(filepath.Join(parentRel, newName))
|
||||
repodataPath = filepath.Join(fullPath, "repodata")
|
||||
_, err = os.Stat(repodataPath)
|
||||
if err == nil {
|
||||
isRepoDir = true
|
||||
newMode = normalizeRPMRepoDirMode(req.Mode)
|
||||
if newMode == "" {
|
||||
newMode = "local"
|
||||
}
|
||||
err = validateRPMMirrorConfig(newMode, strings.TrimSpace(req.RemoteURL), strings.TrimSpace(req.ConnectHost), strings.TrimSpace(req.HostHeader), strings.TrimSpace(req.TLSServerName))
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
absParent = filepath.Join(repo.Path, parentPath)
|
||||
hasAncestor, err = hasRepodataAncestor(repo.Path, absParent)
|
||||
if err != nil {
|
||||
@@ -2922,11 +3244,55 @@ func (api *API) RepoRPMRenameSubdir(w http.ResponseWriter, r *http.Request, para
|
||||
return
|
||||
}
|
||||
}
|
||||
renamed = newPath != fullPath
|
||||
if renamed {
|
||||
_, err = os.Stat(newPath)
|
||||
if err == nil {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "target already exists"})
|
||||
return
|
||||
}
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
err = os.Rename(fullPath, newPath)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
err = api.Store.MoveRPMRepoDir(repo.ID, relPathClean, newRelPath)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
}
|
||||
if isRepoDir {
|
||||
dirConfig = models.RPMRepoDir{
|
||||
RepoID: repo.ID,
|
||||
Path: newRelPath,
|
||||
Mode: newMode,
|
||||
RemoteURL: strings.TrimSpace(req.RemoteURL),
|
||||
ConnectHost: strings.TrimSpace(req.ConnectHost),
|
||||
HostHeader: strings.TrimSpace(req.HostHeader),
|
||||
TLSServerName: strings.TrimSpace(req.TLSServerName),
|
||||
TLSInsecureSkipVerify: req.TLSInsecureSkipVerify,
|
||||
SyncIntervalSec: normalizeRPMMirrorIntervalSec(req.SyncIntervalSec),
|
||||
}
|
||||
existingConfig, err = api.Store.GetRPMRepoDir(repo.ID, relPathClean)
|
||||
if err == nil {
|
||||
dirConfig.SyncEnabled = existingConfig.SyncEnabled
|
||||
} else if errors.Is(err, sql.ErrNoRows) {
|
||||
dirConfig.SyncEnabled = true
|
||||
} else {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
err = api.Store.UpsertRPMRepoDir(dirConfig)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
}
|
||||
repodataPath = filepath.Join(filepath.Join(repo.Path, parentPath), "repodata")
|
||||
_, err = os.Stat(repodataPath)
|
||||
if err == nil && api.RpmMeta != nil {
|
||||
@@ -2937,8 +3303,11 @@ func (api *API) RepoRPMRenameSubdir(w http.ResponseWriter, r *http.Request, para
|
||||
|
||||
func (api *API) RepoRPMDeleteFile(w http.ResponseWriter, r *http.Request, params map[string]string) {
|
||||
var repo models.Repo
|
||||
var writeBlocked bool
|
||||
var writeBlockedPath string
|
||||
var err error
|
||||
var relPath string
|
||||
var relPathClean string
|
||||
var fullPath string
|
||||
var info os.FileInfo
|
||||
var parentPath string
|
||||
@@ -2969,12 +3338,23 @@ func (api *API) RepoRPMDeleteFile(w http.ResponseWriter, r *http.Request, params
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "repodata files cannot be deleted"})
|
||||
return
|
||||
}
|
||||
lower = strings.ToLower(relPath)
|
||||
relPathClean = filepath.ToSlash(filepath.Clean(filepath.FromSlash(relPath)))
|
||||
relPathClean = strings.TrimPrefix(relPathClean, "./")
|
||||
writeBlocked, writeBlockedPath, err = api.isRPMWriteBlocked(repo, relPathClean)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
if writeBlocked {
|
||||
WriteJSON(w, http.StatusForbidden, map[string]string{"error": "writes are disabled for mirror repo subtree", "mirror_root": writeBlockedPath})
|
||||
return
|
||||
}
|
||||
lower = strings.ToLower(relPathClean)
|
||||
if !strings.HasSuffix(lower, ".rpm") {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "only rpm files can be deleted"})
|
||||
return
|
||||
}
|
||||
fullPath = filepath.Join(repo.Path, filepath.FromSlash(relPath))
|
||||
fullPath = filepath.Join(repo.Path, filepath.FromSlash(relPathClean))
|
||||
info, err = os.Stat(fullPath)
|
||||
if err != nil {
|
||||
if os.IsNotExist(err) {
|
||||
@@ -3073,6 +3453,11 @@ func (api *API) RepoRPMTree(w http.ResponseWriter, r *http.Request, params map[s
|
||||
var err error
|
||||
var relPath string
|
||||
var entries []rpm.TreeEntry
|
||||
var repoDirs []models.RPMRepoDir
|
||||
var modeByPath map[string]string
|
||||
var i int
|
||||
var entryPath string
|
||||
var repodataPath string
|
||||
repo, err = api.Store.GetRepo(params["id"])
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusNotFound, map[string]string{"error": "repo not found"})
|
||||
@@ -3099,13 +3484,47 @@ func (api *API) RepoRPMTree(w http.ResponseWriter, r *http.Request, params map[s
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
repoDirs, err = api.Store.ListRPMRepoDirs(repo.ID)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
modeByPath = make(map[string]string)
|
||||
for i = 0; i < len(repoDirs); i++ {
|
||||
modeByPath[normalizeRPMPath(repoDirs[i].Path)] = normalizeRPMRepoDirMode(repoDirs[i].Mode)
|
||||
}
|
||||
for i = 0; i < len(entries); i++ {
|
||||
if entries[i].Type != "dir" {
|
||||
continue
|
||||
}
|
||||
entryPath = normalizeRPMPath(entries[i].Path)
|
||||
if modeByPath[entryPath] != "" {
|
||||
entries[i].IsRepoDir = true
|
||||
entries[i].RepoMode = modeByPath[entryPath]
|
||||
continue
|
||||
}
|
||||
repodataPath = filepath.Join(repo.Path, filepath.FromSlash(entryPath), "repodata")
|
||||
_, err = os.Stat(repodataPath)
|
||||
if err == nil {
|
||||
entries[i].IsRepoDir = true
|
||||
entries[i].RepoMode = "local"
|
||||
continue
|
||||
}
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
}
|
||||
WriteJSON(w, http.StatusOK, entries)
|
||||
}
|
||||
|
||||
func (api *API) RepoRPMUpload(w http.ResponseWriter, r *http.Request, params map[string]string) {
|
||||
var repo models.Repo
|
||||
var writeBlocked bool
|
||||
var writeBlockedPath string
|
||||
var err error
|
||||
var relPath string
|
||||
var relPathClean string
|
||||
var dirPath string
|
||||
var repodataDir string
|
||||
var file multipart.File
|
||||
@@ -3142,11 +3561,25 @@ func (api *API) RepoRPMUpload(w http.ResponseWriter, r *http.Request, params map
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "invalid path"})
|
||||
return
|
||||
}
|
||||
if isRepodataPath(relPath) {
|
||||
relPathClean = filepath.ToSlash(filepath.Clean(filepath.FromSlash(relPath)))
|
||||
relPathClean = strings.TrimPrefix(relPathClean, "./")
|
||||
if relPath == "" {
|
||||
relPathClean = ""
|
||||
}
|
||||
if isRepodataPath(relPathClean) {
|
||||
WriteJSON(w, http.StatusBadRequest, map[string]string{"error": "uploads are not allowed in repodata"})
|
||||
return
|
||||
}
|
||||
dirPath = filepath.Join(repo.Path, filepath.FromSlash(relPath))
|
||||
writeBlocked, writeBlockedPath, err = api.isRPMWriteBlocked(repo, relPathClean)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
if writeBlocked {
|
||||
WriteJSON(w, http.StatusForbidden, map[string]string{"error": "writes are disabled for mirror repo subtree", "mirror_root": writeBlockedPath})
|
||||
return
|
||||
}
|
||||
dirPath = filepath.Join(repo.Path, filepath.FromSlash(relPathClean))
|
||||
repodataDir, err = nearestRepodataDir(repo.Path, dirPath)
|
||||
if err != nil {
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
@@ -3226,7 +3659,7 @@ func (api *API) RepoRPMUpload(w http.ResponseWriter, r *http.Request, params map
|
||||
WriteJSON(w, http.StatusInternalServerError, map[string]string{"error": err.Error()})
|
||||
return
|
||||
}
|
||||
detail, err = rpm.GetPackageDetail(repo.Path, filepath.ToSlash(filepath.Join(relPath, filename)))
|
||||
detail, err = rpm.GetPackageDetail(repo.Path, filepath.ToSlash(filepath.Join(relPathClean, filename)))
|
||||
if err != nil {
|
||||
_ = os.Remove(fullPath)
|
||||
if oldTemp != "" {
|
||||
@@ -4525,6 +4958,111 @@ func nearestRepodataDir(root string, target string) (string, error) {
|
||||
}
|
||||
}
|
||||
|
||||
func normalizeRPMPath(path string) string {
|
||||
var cleaned string
|
||||
cleaned = filepath.ToSlash(filepath.Clean(filepath.FromSlash(strings.TrimSpace(path))))
|
||||
cleaned = strings.TrimPrefix(cleaned, "./")
|
||||
if cleaned == "." {
|
||||
return ""
|
||||
}
|
||||
return cleaned
|
||||
}
|
||||
|
||||
func normalizeRPMRepoDirMode(mode string) string {
|
||||
var v string
|
||||
v = strings.ToLower(strings.TrimSpace(mode))
|
||||
if v == "mirror" {
|
||||
return "mirror"
|
||||
}
|
||||
return "local"
|
||||
}
|
||||
|
||||
func normalizeRPMMirrorIntervalSec(value int64) int64 {
|
||||
if value <= 0 {
|
||||
return 300
|
||||
}
|
||||
if value < 10 {
|
||||
return 10
|
||||
}
|
||||
return value
|
||||
}
|
||||
|
||||
func validateRPMMirrorConfig(mode string, remoteURL string, connectHost string, hostHeader string, tlsServerName string) error {
|
||||
if normalizeRPMRepoDirMode(mode) != "mirror" {
|
||||
return nil
|
||||
}
|
||||
if strings.TrimSpace(remoteURL) == "" {
|
||||
return errors.New("remote_url is required for mirror mode")
|
||||
}
|
||||
if strings.TrimSpace(connectHost) != "" {
|
||||
if !strings.Contains(strings.TrimSpace(connectHost), ":") {
|
||||
return errors.New("connect_host must include port")
|
||||
}
|
||||
}
|
||||
_ = hostHeader
|
||||
_ = tlsServerName
|
||||
return nil
|
||||
}
|
||||
|
||||
func pathUnderRoot(path string, root string) bool {
|
||||
if root == "" {
|
||||
return true
|
||||
}
|
||||
if path == root {
|
||||
return true
|
||||
}
|
||||
return strings.HasPrefix(path, root+"/")
|
||||
}
|
||||
|
||||
func (api *API) findRPMMirrorRoot(repo models.Repo, relPath string) (string, error) {
|
||||
var dirs []models.RPMRepoDir
|
||||
var normalizedPath string
|
||||
var i int
|
||||
var root string
|
||||
var longest string
|
||||
var found bool
|
||||
var err error
|
||||
dirs, err = api.Store.ListRPMRepoDirs(repo.ID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
normalizedPath = normalizeRPMPath(relPath)
|
||||
for i = 0; i < len(dirs); i++ {
|
||||
if normalizeRPMRepoDirMode(dirs[i].Mode) != "mirror" {
|
||||
continue
|
||||
}
|
||||
root = normalizeRPMPath(dirs[i].Path)
|
||||
if !pathUnderRoot(normalizedPath, root) {
|
||||
continue
|
||||
}
|
||||
if !found {
|
||||
longest = root
|
||||
found = true
|
||||
continue
|
||||
}
|
||||
if len(root) > len(longest) {
|
||||
longest = root
|
||||
}
|
||||
}
|
||||
if !found {
|
||||
return "", nil
|
||||
}
|
||||
return longest, nil
|
||||
}
|
||||
|
||||
func (api *API) isRPMWriteBlocked(repo models.Repo, relPath string) (bool, string, error) {
|
||||
var root string
|
||||
var err error
|
||||
root, err = api.findRPMMirrorRoot(repo, relPath)
|
||||
if err != nil {
|
||||
return false, "", err
|
||||
}
|
||||
if root == "" {
|
||||
return false, "", nil
|
||||
}
|
||||
return true, root, nil
|
||||
}
|
||||
|
||||
func nameHasWhitespace(name string) bool {
|
||||
return strings.IndexFunc(name, unicode.IsSpace) >= 0
|
||||
}
|
||||
|
||||
@@ -44,6 +44,65 @@ type Repo struct {
|
||||
IsForeign bool `json:"is_foreign"`
|
||||
}
|
||||
|
||||
type RPMRepoDir struct {
|
||||
RepoID string `json:"repo_id"`
|
||||
Path string `json:"path"`
|
||||
Mode string `json:"mode"`
|
||||
RemoteURL string `json:"remote_url"`
|
||||
ConnectHost string `json:"connect_host"`
|
||||
HostHeader string `json:"host_header"`
|
||||
TLSServerName string `json:"tls_server_name"`
|
||||
TLSInsecureSkipVerify bool `json:"tls_insecure_skip_verify"`
|
||||
SyncIntervalSec int64 `json:"sync_interval_sec"`
|
||||
SyncEnabled bool `json:"sync_enabled"`
|
||||
Dirty bool `json:"dirty"`
|
||||
NextSyncAt int64 `json:"next_sync_at"`
|
||||
SyncRunning bool `json:"sync_running"`
|
||||
SyncStatus string `json:"sync_status"`
|
||||
SyncError string `json:"sync_error"`
|
||||
SyncStep string `json:"sync_step"`
|
||||
SyncTotal int64 `json:"sync_total"`
|
||||
SyncDone int64 `json:"sync_done"`
|
||||
SyncFailed int64 `json:"sync_failed"`
|
||||
SyncDeleted int64 `json:"sync_deleted"`
|
||||
LastSyncStartedAt int64 `json:"last_sync_started_at"`
|
||||
LastSyncFinishedAt int64 `json:"last_sync_finished_at"`
|
||||
LastSyncSuccessAt int64 `json:"last_sync_success_at"`
|
||||
LastSyncedRevision string `json:"last_synced_revision"`
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
UpdatedAt int64 `json:"updated_at"`
|
||||
}
|
||||
|
||||
type RPMMirrorTask struct {
|
||||
RepoID string `json:"repo_id"`
|
||||
RepoPath string `json:"repo_path"`
|
||||
MirrorPath string `json:"mirror_path"`
|
||||
RemoteURL string `json:"remote_url"`
|
||||
ConnectHost string `json:"connect_host"`
|
||||
HostHeader string `json:"host_header"`
|
||||
TLSServerName string `json:"tls_server_name"`
|
||||
TLSInsecureSkipVerify bool `json:"tls_insecure_skip_verify"`
|
||||
SyncIntervalSec int64 `json:"sync_interval_sec"`
|
||||
Dirty bool `json:"dirty"`
|
||||
LastSyncedRevision string `json:"last_synced_revision"`
|
||||
}
|
||||
|
||||
type RPMMirrorRun struct {
|
||||
ID string `json:"id"`
|
||||
RepoID string `json:"repo_id"`
|
||||
Path string `json:"path"`
|
||||
StartedAt int64 `json:"started_at"`
|
||||
FinishedAt int64 `json:"finished_at"`
|
||||
Status string `json:"status"`
|
||||
Step string `json:"step"`
|
||||
Total int64 `json:"total"`
|
||||
Done int64 `json:"done"`
|
||||
Failed int64 `json:"failed"`
|
||||
Deleted int64 `json:"deleted"`
|
||||
Revision string `json:"revision"`
|
||||
Error string `json:"error"`
|
||||
}
|
||||
|
||||
type Issue struct {
|
||||
ID string `json:"id"`
|
||||
ProjectID string `json:"project_id"`
|
||||
|
||||
632
backend/internal/rpm/mirror.go
Normal file
632
backend/internal/rpm/mirror.go
Normal file
@@ -0,0 +1,632 @@
|
||||
package rpm
|
||||
|
||||
import "compress/gzip"
|
||||
import "context"
|
||||
import "crypto/sha256"
|
||||
import "crypto/tls"
|
||||
import "bytes"
|
||||
import "encoding/hex"
|
||||
import "encoding/xml"
|
||||
import "errors"
|
||||
import "hash"
|
||||
import "io"
|
||||
import "io/fs"
|
||||
import "net"
|
||||
import "net/http"
|
||||
import "net/url"
|
||||
import "os"
|
||||
import "path/filepath"
|
||||
import "strings"
|
||||
import "time"
|
||||
|
||||
import "codit/internal/db"
|
||||
import "codit/internal/models"
|
||||
import "codit/internal/util"
|
||||
|
||||
type MirrorManager struct {
|
||||
store *db.Store
|
||||
logger *util.Logger
|
||||
meta *MetaManager
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
type repomdDoc struct {
|
||||
Data []repomdData `xml:"data"`
|
||||
}
|
||||
|
||||
type repomdData struct {
|
||||
Type string `xml:"type,attr"`
|
||||
Location repomdLocation `xml:"location"`
|
||||
}
|
||||
|
||||
type repomdLocation struct {
|
||||
Href string `xml:"href,attr"`
|
||||
}
|
||||
|
||||
type primaryDoc struct {
|
||||
Packages []primaryPackage `xml:"package"`
|
||||
}
|
||||
|
||||
type primaryPackage struct {
|
||||
Location primaryLocation `xml:"location"`
|
||||
Checksum primaryChecksum `xml:"checksum"`
|
||||
}
|
||||
|
||||
type primaryLocation struct {
|
||||
Href string `xml:"href,attr"`
|
||||
}
|
||||
|
||||
type primaryChecksum struct {
|
||||
Type string `xml:"type,attr"`
|
||||
Value string `xml:",chardata"`
|
||||
}
|
||||
|
||||
type mirrorHTTPConfig struct {
|
||||
BaseURL string
|
||||
ConnectHost string
|
||||
HostHeader string
|
||||
TLSServerName string
|
||||
TLSInsecure bool
|
||||
DefaultHost string
|
||||
DefaultServer string
|
||||
}
|
||||
|
||||
func NewMirrorManager(store *db.Store, logger *util.Logger, meta *MetaManager) *MirrorManager {
|
||||
var m *MirrorManager
|
||||
m = &MirrorManager{
|
||||
store: store,
|
||||
logger: logger,
|
||||
meta: meta,
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func (m *MirrorManager) Start() {
|
||||
var err error
|
||||
var tasks []models.RPMMirrorTask
|
||||
var i int
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
err = m.store.ResetRunningRPMMirrorTasks()
|
||||
if err != nil && m.logger != nil {
|
||||
m.logger.Write("rpm-mirror", util.LOG_ERROR, "reset running tasks failed err=%v", err)
|
||||
}
|
||||
tasks, err = m.store.ListRPMMirrorPaths()
|
||||
if err == nil {
|
||||
for i = 0; i < len(tasks); i++ {
|
||||
_ = m.store.CleanupRPMMirrorRunsRetention(tasks[i].RepoID, tasks[i].MirrorPath, 200, 30)
|
||||
}
|
||||
}
|
||||
go m.loop()
|
||||
}
|
||||
|
||||
func (m *MirrorManager) Stop() {
|
||||
if m == nil {
|
||||
return
|
||||
}
|
||||
close(m.stopCh)
|
||||
}
|
||||
|
||||
func (m *MirrorManager) loop() {
|
||||
var ticker *time.Ticker
|
||||
ticker = time.NewTicker(10 * time.Second)
|
||||
defer ticker.Stop()
|
||||
m.runDue()
|
||||
for {
|
||||
select {
|
||||
case <-m.stopCh:
|
||||
return
|
||||
case <-ticker.C:
|
||||
m.runDue()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MirrorManager) runDue() {
|
||||
var tasks []models.RPMMirrorTask
|
||||
var started bool
|
||||
var now int64
|
||||
var i int
|
||||
var err error
|
||||
now = time.Now().UTC().Unix()
|
||||
tasks, err = m.store.ListDueRPMMirrorTasks(now, 8)
|
||||
if err != nil {
|
||||
if m.logger != nil {
|
||||
m.logger.Write("rpm-mirror", util.LOG_ERROR, "list due tasks failed err=%v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
for i = 0; i < len(tasks); i++ {
|
||||
started, err = m.store.TryStartRPMMirrorTask(tasks[i].RepoID, tasks[i].MirrorPath, now)
|
||||
if err != nil {
|
||||
if m.logger != nil {
|
||||
m.logger.Write("rpm-mirror", util.LOG_ERROR, "try start failed repo=%s path=%s err=%v", tasks[i].RepoID, tasks[i].MirrorPath, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
if !started {
|
||||
continue
|
||||
}
|
||||
m.syncOne(tasks[i])
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MirrorManager) syncOne(task models.RPMMirrorTask) {
|
||||
var localRoot string
|
||||
var cfg mirrorHTTPConfig
|
||||
var client *http.Client
|
||||
var repomdData []byte
|
||||
var revision string
|
||||
var primaryHref string
|
||||
var primaryData []byte
|
||||
var expected map[string]string
|
||||
var runID string
|
||||
var startedAt int64
|
||||
var total int64
|
||||
var done int64
|
||||
var failed int64
|
||||
var deleted int64
|
||||
var err error
|
||||
localRoot = filepath.Join(task.RepoPath, filepath.FromSlash(task.MirrorPath))
|
||||
startedAt = time.Now().UTC().Unix()
|
||||
runID, err = m.store.CreateRPMMirrorRun(task.RepoID, task.MirrorPath, startedAt)
|
||||
if err != nil {
|
||||
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
||||
return
|
||||
}
|
||||
cfg, err = buildMirrorHTTPConfig(task)
|
||||
if err != nil {
|
||||
if m.logger != nil {
|
||||
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=start err=%v", task.RepoID, task.MirrorPath, err)
|
||||
}
|
||||
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
||||
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "start", 0, 0, 0, 0, "", err.Error())
|
||||
return
|
||||
}
|
||||
client = buildMirrorHTTPClient(cfg)
|
||||
if m.logger != nil {
|
||||
m.logger.Write("rpm-mirror", util.LOG_INFO, "sync start repo=%s path=%s remote=%s", task.RepoID, task.MirrorPath, task.RemoteURL)
|
||||
}
|
||||
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "fetch_repodata", 0, 0, 0, 0)
|
||||
repomdData, err = mirrorFetch(client, cfg, "repodata/repomd.xml")
|
||||
if err != nil {
|
||||
if m.logger != nil {
|
||||
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=fetch_repodata err=%v", task.RepoID, task.MirrorPath, err)
|
||||
}
|
||||
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
||||
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_repodata", 0, 0, 0, 0, "", err.Error())
|
||||
return
|
||||
}
|
||||
revision = sha256HexBytes(repomdData)
|
||||
if !task.Dirty && task.LastSyncedRevision != "" && task.LastSyncedRevision == revision {
|
||||
if m.logger != nil {
|
||||
m.logger.Write("rpm-mirror", util.LOG_INFO, "sync done repo=%s path=%s status=no_change revision=%s", task.RepoID, task.MirrorPath, revision)
|
||||
}
|
||||
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, true, revision, "")
|
||||
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "success", "no_change", 0, 0, 0, 0, revision, "")
|
||||
return
|
||||
}
|
||||
primaryHref, err = parseRepomdPrimaryHref(repomdData)
|
||||
if err != nil {
|
||||
if m.logger != nil {
|
||||
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=parse_repodata err=%v", task.RepoID, task.MirrorPath, err)
|
||||
}
|
||||
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
||||
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_repodata", 0, 0, 0, 0, "", err.Error())
|
||||
return
|
||||
}
|
||||
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "fetch_primary", 0, 0, 0, 0)
|
||||
primaryData, err = mirrorFetch(client, cfg, primaryHref)
|
||||
if err != nil {
|
||||
if m.logger != nil {
|
||||
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=fetch_primary err=%v", task.RepoID, task.MirrorPath, err)
|
||||
}
|
||||
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
||||
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_primary", 0, 0, 0, 0, "", err.Error())
|
||||
return
|
||||
}
|
||||
if strings.HasSuffix(strings.ToLower(primaryHref), ".gz") {
|
||||
primaryData, err = gunzipBytes(primaryData)
|
||||
if err != nil {
|
||||
if m.logger != nil {
|
||||
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=decode_primary err=%v", task.RepoID, task.MirrorPath, err)
|
||||
}
|
||||
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
||||
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_primary", 0, 0, 0, 0, "", err.Error())
|
||||
return
|
||||
}
|
||||
}
|
||||
expected, err = parsePrimaryPackages(primaryData)
|
||||
if err != nil {
|
||||
if m.logger != nil {
|
||||
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=parse_primary err=%v", task.RepoID, task.MirrorPath, err)
|
||||
}
|
||||
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
||||
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_primary", 0, 0, 0, 0, "", err.Error())
|
||||
return
|
||||
}
|
||||
total, done, failed, deleted, err = m.applyMirror(task, localRoot, client, cfg, expected)
|
||||
if err != nil {
|
||||
if m.logger != nil {
|
||||
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=apply total=%d done=%d failed=%d deleted=%d err=%v", task.RepoID, task.MirrorPath, total, done, failed, deleted, err)
|
||||
}
|
||||
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
||||
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "apply", total, done, failed, deleted, "", err.Error())
|
||||
return
|
||||
}
|
||||
if m.meta != nil {
|
||||
m.meta.Schedule(localRoot)
|
||||
}
|
||||
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, true, revision, "")
|
||||
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "success", "done", total, done, failed, deleted, revision, "")
|
||||
_ = m.store.CleanupRPMMirrorRunsRetention(task.RepoID, task.MirrorPath, 200, 30)
|
||||
if m.logger != nil {
|
||||
m.logger.Write("rpm-mirror", util.LOG_INFO, "sync done repo=%s path=%s status=success total=%d done=%d failed=%d deleted=%d revision=%s", task.RepoID, task.MirrorPath, total, done, failed, deleted, revision)
|
||||
}
|
||||
}
|
||||
|
||||
func (m *MirrorManager) applyMirror(task models.RPMMirrorTask, localRoot string, client *http.Client, cfg mirrorHTTPConfig, expected map[string]string) (int64, int64, int64, int64, error) {
|
||||
var local map[string]bool
|
||||
var total int64
|
||||
var done int64
|
||||
var failed int64
|
||||
var deleted int64
|
||||
var path string
|
||||
var checksum string
|
||||
var fullPath string
|
||||
var localSum string
|
||||
var needDownload bool
|
||||
var err error
|
||||
local, err = listLocalRPMs(localRoot)
|
||||
if err != nil {
|
||||
return 0, 0, 0, 0, err
|
||||
}
|
||||
total = int64(len(expected))
|
||||
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "apply", total, 0, 0, 0)
|
||||
for path = range local {
|
||||
if expected[path] != "" {
|
||||
continue
|
||||
}
|
||||
err = os.Remove(filepath.Join(localRoot, filepath.FromSlash(path)))
|
||||
if err == nil || os.IsNotExist(err) {
|
||||
deleted = deleted + 1
|
||||
}
|
||||
}
|
||||
for path, checksum = range expected {
|
||||
fullPath = filepath.Join(localRoot, filepath.FromSlash(path))
|
||||
needDownload = true
|
||||
_, err = os.Stat(fullPath)
|
||||
if err == nil {
|
||||
localSum, err = sha256HexFile(fullPath)
|
||||
if err == nil && (checksum == "" || strings.EqualFold(localSum, checksum)) {
|
||||
needDownload = false
|
||||
}
|
||||
}
|
||||
if needDownload {
|
||||
err = mirrorDownload(client, cfg, path, fullPath, checksum)
|
||||
if err != nil {
|
||||
failed = failed + 1
|
||||
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "apply", total, done, failed, deleted)
|
||||
continue
|
||||
}
|
||||
}
|
||||
done = done + 1
|
||||
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "apply", total, done, failed, deleted)
|
||||
}
|
||||
if failed > 0 {
|
||||
return total, done, failed, deleted, errors.New("some mirror files failed to sync")
|
||||
}
|
||||
return total, done, failed, deleted, nil
|
||||
}
|
||||
|
||||
func buildMirrorHTTPConfig(task models.RPMMirrorTask) (mirrorHTTPConfig, error) {
|
||||
var cfg mirrorHTTPConfig
|
||||
var u *url.URL
|
||||
var err error
|
||||
cfg = mirrorHTTPConfig{
|
||||
BaseURL: strings.TrimRight(strings.TrimSpace(task.RemoteURL), "/"),
|
||||
ConnectHost: strings.TrimSpace(task.ConnectHost),
|
||||
HostHeader: strings.TrimSpace(task.HostHeader),
|
||||
TLSServerName: strings.TrimSpace(task.TLSServerName),
|
||||
TLSInsecure: task.TLSInsecureSkipVerify,
|
||||
}
|
||||
if cfg.BaseURL == "" {
|
||||
return cfg, errors.New("remote url is empty")
|
||||
}
|
||||
u, err = url.Parse(cfg.BaseURL)
|
||||
if err != nil {
|
||||
return cfg, err
|
||||
}
|
||||
cfg.DefaultHost = u.Host
|
||||
cfg.DefaultServer = u.Hostname()
|
||||
if cfg.DefaultHost == "" {
|
||||
return cfg, errors.New("remote url host is empty")
|
||||
}
|
||||
return cfg, nil
|
||||
}
|
||||
|
||||
func buildMirrorHTTPClient(cfg mirrorHTTPConfig) *http.Client {
|
||||
var transport *http.Transport
|
||||
transport = &http.Transport{
|
||||
Proxy: http.ProxyFromEnvironment,
|
||||
TLSClientConfig: &tls.Config{
|
||||
InsecureSkipVerify: cfg.TLSInsecure,
|
||||
ServerName: effectiveServerName(cfg),
|
||||
},
|
||||
}
|
||||
if cfg.ConnectHost != "" {
|
||||
transport.DialContext = func(ctx context.Context, network string, addr string) (net.Conn, error) {
|
||||
var d net.Dialer
|
||||
_ = addr
|
||||
return d.DialContext(ctx, network, cfg.ConnectHost)
|
||||
}
|
||||
}
|
||||
return &http.Client{
|
||||
Transport: transport,
|
||||
Timeout: 60 * time.Second,
|
||||
}
|
||||
}
|
||||
|
||||
func effectiveHostHeader(cfg mirrorHTTPConfig) string {
|
||||
if strings.TrimSpace(cfg.HostHeader) != "" {
|
||||
return strings.TrimSpace(cfg.HostHeader)
|
||||
}
|
||||
return cfg.DefaultHost
|
||||
}
|
||||
|
||||
func effectiveServerName(cfg mirrorHTTPConfig) string {
|
||||
var host string
|
||||
if strings.TrimSpace(cfg.TLSServerName) != "" {
|
||||
return strings.TrimSpace(cfg.TLSServerName)
|
||||
}
|
||||
host = strings.TrimSpace(cfg.HostHeader)
|
||||
if host != "" {
|
||||
if strings.Contains(host, ":") {
|
||||
return strings.Split(host, ":")[0]
|
||||
}
|
||||
return host
|
||||
}
|
||||
return cfg.DefaultServer
|
||||
}
|
||||
|
||||
func mirrorFetch(client *http.Client, cfg mirrorHTTPConfig, rel string) ([]byte, error) {
|
||||
var fullURL string
|
||||
var req *http.Request
|
||||
var res *http.Response
|
||||
var body []byte
|
||||
var err error
|
||||
fullURL = joinRemoteURL(cfg.BaseURL, rel)
|
||||
req, err = http.NewRequest(http.MethodGet, fullURL, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Host = effectiveHostHeader(cfg)
|
||||
res, err = client.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
||||
return nil, errors.New("upstream request failed: " + res.Status)
|
||||
}
|
||||
body, err = io.ReadAll(res.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return body, nil
|
||||
}
|
||||
|
||||
func mirrorDownload(client *http.Client, cfg mirrorHTTPConfig, rel string, dstPath string, checksum string) error {
|
||||
var fullURL string
|
||||
var req *http.Request
|
||||
var res *http.Response
|
||||
var tempPath string
|
||||
var out *os.File
|
||||
var hash hashWriter
|
||||
var copied int64
|
||||
var err error
|
||||
err = os.MkdirAll(filepath.Dir(dstPath), 0o755)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
fullURL = joinRemoteURL(cfg.BaseURL, rel)
|
||||
req, err = http.NewRequest(http.MethodGet, fullURL, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Host = effectiveHostHeader(cfg)
|
||||
res, err = client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer res.Body.Close()
|
||||
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
||||
return errors.New("upstream request failed: " + res.Status)
|
||||
}
|
||||
tempPath = dstPath + ".mirror.tmp"
|
||||
out, err = os.Create(tempPath)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer out.Close()
|
||||
hash = newHashWriter()
|
||||
copied, err = io.Copy(io.MultiWriter(out, hash), res.Body)
|
||||
_ = copied
|
||||
if err != nil {
|
||||
_ = os.Remove(tempPath)
|
||||
return err
|
||||
}
|
||||
err = out.Close()
|
||||
if err != nil {
|
||||
_ = os.Remove(tempPath)
|
||||
return err
|
||||
}
|
||||
if strings.TrimSpace(checksum) != "" {
|
||||
if !strings.EqualFold(hash.Sum(), strings.TrimSpace(checksum)) {
|
||||
_ = os.Remove(tempPath)
|
||||
return errors.New("download checksum mismatch for " + rel)
|
||||
}
|
||||
}
|
||||
err = os.Rename(tempPath, dstPath)
|
||||
if err != nil {
|
||||
_ = os.Remove(tempPath)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func joinRemoteURL(base string, rel string) string {
|
||||
var cleanRel string
|
||||
cleanRel = strings.TrimLeft(strings.ReplaceAll(rel, "\\", "/"), "/")
|
||||
return strings.TrimRight(base, "/") + "/" + cleanRel
|
||||
}
|
||||
|
||||
func parseRepomdPrimaryHref(data []byte) (string, error) {
|
||||
var doc repomdDoc
|
||||
var i int
|
||||
var href string
|
||||
var err error
|
||||
err = xml.Unmarshal(data, &doc)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
for i = 0; i < len(doc.Data); i++ {
|
||||
if strings.EqualFold(strings.TrimSpace(doc.Data[i].Type), "primary") {
|
||||
href = strings.TrimSpace(doc.Data[i].Location.Href)
|
||||
if href != "" {
|
||||
return href, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return "", errors.New("primary metadata not found in repomd")
|
||||
}
|
||||
|
||||
func parsePrimaryPackages(data []byte) (map[string]string, error) {
|
||||
var doc primaryDoc
|
||||
var out map[string]string
|
||||
var i int
|
||||
var path string
|
||||
var checksum string
|
||||
var err error
|
||||
err = xml.Unmarshal(data, &doc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = make(map[string]string)
|
||||
for i = 0; i < len(doc.Packages); i++ {
|
||||
path = strings.TrimSpace(doc.Packages[i].Location.Href)
|
||||
if path == "" {
|
||||
continue
|
||||
}
|
||||
if !strings.HasSuffix(strings.ToLower(path), ".rpm") {
|
||||
continue
|
||||
}
|
||||
checksum = strings.TrimSpace(doc.Packages[i].Checksum.Value)
|
||||
out[path] = strings.ToLower(checksum)
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func listLocalRPMs(root string) (map[string]bool, error) {
|
||||
var out map[string]bool
|
||||
var err error
|
||||
out = make(map[string]bool)
|
||||
err = filepath.WalkDir(root, func(path string, d fs.DirEntry, walkErr error) error {
|
||||
var rel string
|
||||
if walkErr != nil {
|
||||
return walkErr
|
||||
}
|
||||
if d == nil {
|
||||
return nil
|
||||
}
|
||||
if d.IsDir() {
|
||||
if strings.EqualFold(d.Name(), "repodata") {
|
||||
return filepath.SkipDir
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if !strings.HasSuffix(strings.ToLower(d.Name()), ".rpm") {
|
||||
return nil
|
||||
}
|
||||
rel, err = filepath.Rel(root, path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
out[filepath.ToSlash(rel)] = true
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func sha256HexBytes(data []byte) string {
|
||||
var sum [32]byte
|
||||
sum = sha256.Sum256(data)
|
||||
return hex.EncodeToString(sum[:])
|
||||
}
|
||||
|
||||
func sha256HexFile(path string) (string, error) {
|
||||
var file *os.File
|
||||
var hash hashWriter
|
||||
var copied int64
|
||||
var err error
|
||||
file, err = os.Open(path)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer file.Close()
|
||||
hash = newHashWriter()
|
||||
copied, err = io.Copy(hash, file)
|
||||
_ = copied
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return hash.Sum(), nil
|
||||
}
|
||||
|
||||
func gunzipBytes(data []byte) ([]byte, error) {
|
||||
var reader *gzip.Reader
|
||||
var input *bytes.Reader
|
||||
var out []byte
|
||||
var err error
|
||||
input = bytes.NewReader(data)
|
||||
reader, err = gzip.NewReader(input)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer reader.Close()
|
||||
out, err = io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
type hashWriter interface {
|
||||
io.Writer
|
||||
Sum() string
|
||||
}
|
||||
|
||||
type shaWriter struct {
|
||||
h hash.Hash
|
||||
}
|
||||
|
||||
func newHashWriter() hashWriter {
|
||||
var w *shaWriter
|
||||
w = &shaWriter{h: sha256.New()}
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *shaWriter) Write(p []byte) (int, error) {
|
||||
return w.h.Write(p)
|
||||
}
|
||||
|
||||
func (w *shaWriter) Sum() string {
|
||||
var raw []byte
|
||||
raw = w.h.Sum(nil)
|
||||
return hex.EncodeToString(raw)
|
||||
}
|
||||
@@ -12,6 +12,8 @@ type TreeEntry struct {
|
||||
Path string `json:"path"`
|
||||
Type string `json:"type"`
|
||||
Size int64 `json:"size"`
|
||||
IsRepoDir bool `json:"is_repo_dir"`
|
||||
RepoMode string `json:"repo_mode"`
|
||||
}
|
||||
|
||||
var ErrPathNotFound = errors.New("path not found")
|
||||
|
||||
14
backend/migrations/017_rpm_repo_dirs.sql
Normal file
14
backend/migrations/017_rpm_repo_dirs.sql
Normal file
@@ -0,0 +1,14 @@
|
||||
CREATE TABLE IF NOT EXISTS rpm_repo_dirs (
|
||||
repo_id TEXT NOT NULL,
|
||||
path TEXT NOT NULL,
|
||||
mode TEXT NOT NULL DEFAULT 'local',
|
||||
remote_url TEXT NOT NULL DEFAULT '',
|
||||
connect_host TEXT NOT NULL DEFAULT '',
|
||||
host_header TEXT NOT NULL DEFAULT '',
|
||||
tls_server_name TEXT NOT NULL DEFAULT '',
|
||||
tls_insecure_skip_verify INTEGER NOT NULL DEFAULT 0,
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (repo_id, path),
|
||||
FOREIGN KEY(repo_id) REFERENCES repos(id) ON DELETE CASCADE
|
||||
);
|
||||
19
backend/migrations/018_rpm_repo_dirs_sync_state.sql
Normal file
19
backend/migrations/018_rpm_repo_dirs_sync_state.sql
Normal file
@@ -0,0 +1,19 @@
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN sync_interval_sec INTEGER NOT NULL DEFAULT 300;
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN dirty INTEGER NOT NULL DEFAULT 1;
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN next_sync_at INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN sync_running INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN sync_status TEXT NOT NULL DEFAULT 'idle';
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN sync_error TEXT NOT NULL DEFAULT '';
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN sync_step TEXT NOT NULL DEFAULT '';
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN sync_total INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN sync_done INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN sync_failed INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN sync_deleted INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN last_sync_started_at INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN last_sync_finished_at INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN last_sync_success_at INTEGER NOT NULL DEFAULT 0;
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN last_synced_revision TEXT NOT NULL DEFAULT '';
|
||||
|
||||
UPDATE rpm_repo_dirs
|
||||
SET dirty = 1, next_sync_at = 0
|
||||
WHERE mode = 'mirror';
|
||||
19
backend/migrations/019_rpm_mirror_runs.sql
Normal file
19
backend/migrations/019_rpm_mirror_runs.sql
Normal file
@@ -0,0 +1,19 @@
|
||||
CREATE TABLE IF NOT EXISTS rpm_mirror_runs (
|
||||
id TEXT PRIMARY KEY,
|
||||
repo_id TEXT NOT NULL,
|
||||
path TEXT NOT NULL,
|
||||
started_at INTEGER NOT NULL,
|
||||
finished_at INTEGER NOT NULL DEFAULT 0,
|
||||
status TEXT NOT NULL DEFAULT 'running',
|
||||
step TEXT NOT NULL DEFAULT '',
|
||||
total INTEGER NOT NULL DEFAULT 0,
|
||||
done INTEGER NOT NULL DEFAULT 0,
|
||||
failed INTEGER NOT NULL DEFAULT 0,
|
||||
deleted INTEGER NOT NULL DEFAULT 0,
|
||||
revision TEXT NOT NULL DEFAULT '',
|
||||
error TEXT NOT NULL DEFAULT '',
|
||||
FOREIGN KEY(repo_id, path) REFERENCES rpm_repo_dirs(repo_id, path) ON DELETE CASCADE
|
||||
);
|
||||
|
||||
CREATE INDEX IF NOT EXISTS idx_rpm_mirror_runs_repo_path_started
|
||||
ON rpm_mirror_runs (repo_id, path, started_at DESC);
|
||||
5
backend/migrations/020_rpm_mirror_suspend.sql
Normal file
5
backend/migrations/020_rpm_mirror_suspend.sql
Normal file
@@ -0,0 +1,5 @@
|
||||
ALTER TABLE rpm_repo_dirs ADD COLUMN sync_enabled INTEGER NOT NULL DEFAULT 1;
|
||||
|
||||
UPDATE rpm_repo_dirs
|
||||
SET sync_enabled = 1
|
||||
WHERE mode = 'mirror';
|
||||
@@ -96,6 +96,53 @@ export interface RpmTreeEntry {
|
||||
path: string
|
||||
type: 'file' | 'dir'
|
||||
size: number
|
||||
is_repo_dir?: boolean
|
||||
repo_mode?: 'local' | 'mirror' | ''
|
||||
}
|
||||
|
||||
export interface RpmRepoDirConfig {
|
||||
repo_id: string
|
||||
path: string
|
||||
mode: 'local' | 'mirror' | ''
|
||||
remote_url: string
|
||||
connect_host: string
|
||||
host_header: string
|
||||
tls_server_name: string
|
||||
tls_insecure_skip_verify: boolean
|
||||
sync_interval_sec: number
|
||||
sync_enabled: boolean
|
||||
dirty: boolean
|
||||
next_sync_at: number
|
||||
sync_running: boolean
|
||||
sync_status: string
|
||||
sync_error: string
|
||||
sync_step: string
|
||||
sync_total: number
|
||||
sync_done: number
|
||||
sync_failed: number
|
||||
sync_deleted: number
|
||||
last_sync_started_at: number
|
||||
last_sync_finished_at: number
|
||||
last_sync_success_at: number
|
||||
last_synced_revision: string
|
||||
created_at: number
|
||||
updated_at: number
|
||||
}
|
||||
|
||||
export interface RpmMirrorRun {
|
||||
id: string
|
||||
repo_id: string
|
||||
path: string
|
||||
started_at: number
|
||||
finished_at: number
|
||||
status: string
|
||||
step: string
|
||||
total: number
|
||||
done: number
|
||||
failed: number
|
||||
deleted: number
|
||||
revision: string
|
||||
error: string
|
||||
}
|
||||
|
||||
export interface RepoTypeItem {
|
||||
@@ -612,16 +659,78 @@ export const api = {
|
||||
body: form
|
||||
})
|
||||
},
|
||||
createRpmSubdir: (repoId: string, name: string, type: string, parent?: string) =>
|
||||
createRpmSubdir: (
|
||||
repoId: string,
|
||||
name: string,
|
||||
type: string,
|
||||
parent?: string,
|
||||
mode?: 'local' | 'mirror',
|
||||
remote_url?: string,
|
||||
connect_host?: string,
|
||||
host_header?: string,
|
||||
tls_server_name?: string,
|
||||
tls_insecure_skip_verify?: boolean,
|
||||
sync_interval_sec?: number
|
||||
) =>
|
||||
request<{ status: string }>(`/api/repos/${repoId}/rpm/subdirs`, {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({ name, type, parent })
|
||||
body: JSON.stringify({ name, type, parent, mode, remote_url, connect_host, host_header, tls_server_name, tls_insecure_skip_verify, sync_interval_sec })
|
||||
}),
|
||||
renameRpmSubdir: (repoId: string, path: string, name: string) =>
|
||||
getRpmSubdir: (repoId: string, path: string) => {
|
||||
const params = new URLSearchParams()
|
||||
params.set('path', path)
|
||||
return request<RpmRepoDirConfig>(`/api/repos/${repoId}/rpm/subdir?${params.toString()}`)
|
||||
},
|
||||
renameRpmSubdir: (
|
||||
repoId: string,
|
||||
path: string,
|
||||
name: string,
|
||||
mode?: 'local' | 'mirror',
|
||||
remote_url?: string,
|
||||
connect_host?: string,
|
||||
host_header?: string,
|
||||
tls_server_name?: string,
|
||||
tls_insecure_skip_verify?: boolean,
|
||||
sync_interval_sec?: number
|
||||
) =>
|
||||
request<{ status: string }>(`/api/repos/${repoId}/rpm/subdir/rename`, {
|
||||
method: 'POST',
|
||||
body: JSON.stringify({ path, name })
|
||||
body: JSON.stringify({ path, name, mode, remote_url, connect_host, host_header, tls_server_name, tls_insecure_skip_verify, sync_interval_sec })
|
||||
}),
|
||||
syncRpmSubdir: (repoId: string, path: string) => {
|
||||
const params = new URLSearchParams()
|
||||
params.set('path', path)
|
||||
return request<{ status: string }>(`/api/repos/${repoId}/rpm/subdir/sync?${params.toString()}`, {
|
||||
method: 'POST'
|
||||
})
|
||||
},
|
||||
suspendRpmSubdir: (repoId: string, path: string) => {
|
||||
const params = new URLSearchParams()
|
||||
params.set('path', path)
|
||||
return request<{ status: string; sync_enabled: boolean }>(`/api/repos/${repoId}/rpm/subdir/suspend?${params.toString()}`, {
|
||||
method: 'POST'
|
||||
})
|
||||
},
|
||||
resumeRpmSubdir: (repoId: string, path: string) => {
|
||||
const params = new URLSearchParams()
|
||||
params.set('path', path)
|
||||
return request<{ status: string; sync_enabled: boolean }>(`/api/repos/${repoId}/rpm/subdir/resume?${params.toString()}`, {
|
||||
method: 'POST'
|
||||
})
|
||||
},
|
||||
listRpmMirrorRuns: (repoId: string, path: string, limit?: number) => {
|
||||
const params = new URLSearchParams()
|
||||
params.set('path', path)
|
||||
if (limit && limit > 0) params.set('limit', String(limit))
|
||||
return request<RpmMirrorRun[]>(`/api/repos/${repoId}/rpm/subdir/runs?${params.toString()}`)
|
||||
},
|
||||
clearRpmMirrorRuns: (repoId: string, path: string) => {
|
||||
const params = new URLSearchParams()
|
||||
params.set('path', path)
|
||||
return request<{ status: string; deleted_count: number }>(`/api/repos/${repoId}/rpm/subdir/runs?${params.toString()}`, {
|
||||
method: 'DELETE'
|
||||
})
|
||||
},
|
||||
deleteRpmSubdir: (repoId: string, path: string) => {
|
||||
const params = new URLSearchParams()
|
||||
params.set('path', path)
|
||||
|
||||
@@ -3,6 +3,7 @@ import {
|
||||
Box,
|
||||
Button,
|
||||
Checkbox,
|
||||
Chip,
|
||||
Dialog,
|
||||
DialogActions,
|
||||
DialogContent,
|
||||
@@ -23,7 +24,7 @@ import {
|
||||
} from '@mui/material'
|
||||
import { useEffect, useRef, useState } from 'react'
|
||||
import { Link, useParams } from 'react-router-dom'
|
||||
import { api, Project, Repo, RpmPackageDetail, RpmPackageSummary, RpmTreeEntry } from '../api'
|
||||
import { api, Project, Repo, RpmMirrorRun, RpmPackageDetail, RpmPackageSummary, RpmTreeEntry } from '../api'
|
||||
import ChevronLeftIcon from '@mui/icons-material/ChevronLeft'
|
||||
import ChevronRightIcon from '@mui/icons-material/ChevronRight'
|
||||
import DeleteOutlineIcon from '@mui/icons-material/DeleteOutline'
|
||||
@@ -31,6 +32,7 @@ import DriveFileRenameOutlineIcon from '@mui/icons-material/DriveFileRenameOutli
|
||||
import FolderIcon from '@mui/icons-material/Folder'
|
||||
import InsertDriveFileIcon from '@mui/icons-material/InsertDriveFile'
|
||||
import HomeOutlinedIcon from '@mui/icons-material/HomeOutlined'
|
||||
import MonitorHeartOutlinedIcon from '@mui/icons-material/MonitorHeartOutlined'
|
||||
import ProjectNavBar from '../components/ProjectNavBar'
|
||||
import RepoSubNav from '../components/RepoSubNav'
|
||||
import CodeBlock from '../components/CodeBlock'
|
||||
@@ -57,6 +59,13 @@ export default function RepoRpmDetailPage(props: RepoRpmDetailPageProps) {
|
||||
const [subdirOpen, setSubdirOpen] = useState(false)
|
||||
const [subdirName, setSubdirName] = useState('')
|
||||
const [subdirType, setSubdirType] = useState<'container' | 'repo'>('container')
|
||||
const [subdirMode, setSubdirMode] = useState<'local' | 'mirror'>('local')
|
||||
const [subdirSyncIntervalSec, setSubdirSyncIntervalSec] = useState('300')
|
||||
const [subdirRemoteURL, setSubdirRemoteURL] = useState('')
|
||||
const [subdirConnectHost, setSubdirConnectHost] = useState('')
|
||||
const [subdirHostHeader, setSubdirHostHeader] = useState('')
|
||||
const [subdirTLSServerName, setSubdirTLSServerName] = useState('')
|
||||
const [subdirTLSInsecureSkipVerify, setSubdirTLSInsecureSkipVerify] = useState(false)
|
||||
const [subdirError, setSubdirError] = useState<string | null>(null)
|
||||
const [subdirSaving, setSubdirSaving] = useState(false)
|
||||
const [uploadOpen, setUploadOpen] = useState(false)
|
||||
@@ -75,8 +84,32 @@ export default function RepoRpmDetailPage(props: RepoRpmDetailPageProps) {
|
||||
const [renamePath, setRenamePath] = useState('')
|
||||
const [renameName, setRenameName] = useState('')
|
||||
const [renameNewName, setRenameNewName] = useState('')
|
||||
const [renameIsRepoDir, setRenameIsRepoDir] = useState(false)
|
||||
const [renameMode, setRenameMode] = useState<'local' | 'mirror'>('local')
|
||||
const [renameSyncIntervalSec, setRenameSyncIntervalSec] = useState('300')
|
||||
const [renameRemoteURL, setRenameRemoteURL] = useState('')
|
||||
const [renameConnectHost, setRenameConnectHost] = useState('')
|
||||
const [renameHostHeader, setRenameHostHeader] = useState('')
|
||||
const [renameTLSServerName, setRenameTLSServerName] = useState('')
|
||||
const [renameTLSInsecureSkipVerify, setRenameTLSInsecureSkipVerify] = useState(false)
|
||||
const [renameError, setRenameError] = useState<string | null>(null)
|
||||
const [renaming, setRenaming] = useState(false)
|
||||
const [statusOpen, setStatusOpen] = useState(false)
|
||||
const [statusPath, setStatusPath] = useState('')
|
||||
const [statusName, setStatusName] = useState('')
|
||||
const [statusMode, setStatusMode] = useState<'local' | 'mirror'>('local')
|
||||
const [statusSyncStatus, setStatusSyncStatus] = useState('')
|
||||
const [statusSyncStep, setStatusSyncStep] = useState('')
|
||||
const [statusSyncError, setStatusSyncError] = useState('')
|
||||
const [statusSyncTotal, setStatusSyncTotal] = useState(0)
|
||||
const [statusSyncDone, setStatusSyncDone] = useState(0)
|
||||
const [statusSyncFailed, setStatusSyncFailed] = useState(0)
|
||||
const [statusSyncDeleted, setStatusSyncDeleted] = useState(0)
|
||||
const [statusSyncEnabled, setStatusSyncEnabled] = useState(true)
|
||||
const [statusSyncBusy, setStatusSyncBusy] = useState(false)
|
||||
const [statusRuns, setStatusRuns] = useState<RpmMirrorRun[]>([])
|
||||
const [statusError, setStatusError] = useState<string | null>(null)
|
||||
const [clearRunsConfirmOpen, setClearRunsConfirmOpen] = useState(false)
|
||||
const [rpmPath, setRpmPath] = useState('')
|
||||
const [rpmPathSegments, setRpmPathSegments] = useState<string[]>([])
|
||||
const [rpmTree, setRpmTree] = useState<RpmTreeEntry[]>([])
|
||||
@@ -222,6 +255,7 @@ export default function RepoRpmDetailPage(props: RepoRpmDetailPageProps) {
|
||||
}
|
||||
|
||||
const handleCreateSubdir = async () => {
|
||||
let syncIntervalSec: number
|
||||
if (!repoId) return
|
||||
if (!subdirName.trim()) {
|
||||
setSubdirError('Name is required.')
|
||||
@@ -230,8 +264,24 @@ export default function RepoRpmDetailPage(props: RepoRpmDetailPageProps) {
|
||||
setSubdirError(null)
|
||||
setSubdirSaving(true)
|
||||
try {
|
||||
syncIntervalSec = Number(subdirSyncIntervalSec)
|
||||
if (!Number.isFinite(syncIntervalSec) || syncIntervalSec <= 0) {
|
||||
syncIntervalSec = 300
|
||||
}
|
||||
const parent = rpmPath
|
||||
await api.createRpmSubdir(repoId, subdirName.trim(), subdirType, parent)
|
||||
await api.createRpmSubdir(
|
||||
repoId,
|
||||
subdirName.trim(),
|
||||
subdirType,
|
||||
parent,
|
||||
subdirMode,
|
||||
subdirRemoteURL.trim(),
|
||||
subdirConnectHost.trim(),
|
||||
subdirHostHeader.trim(),
|
||||
subdirTLSServerName.trim(),
|
||||
subdirTLSInsecureSkipVerify,
|
||||
syncIntervalSec
|
||||
)
|
||||
api.listRpmTree(repoId, rpmPath)
|
||||
.then((list) => {
|
||||
setRpmTree(Array.isArray(list) ? list : [])
|
||||
@@ -244,6 +294,13 @@ export default function RepoRpmDetailPage(props: RepoRpmDetailPageProps) {
|
||||
setSubdirOpen(false)
|
||||
setSubdirName('')
|
||||
setSubdirType('container')
|
||||
setSubdirMode('local')
|
||||
setSubdirSyncIntervalSec('300')
|
||||
setSubdirRemoteURL('')
|
||||
setSubdirConnectHost('')
|
||||
setSubdirHostHeader('')
|
||||
setSubdirTLSServerName('')
|
||||
setSubdirTLSInsecureSkipVerify(false)
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : 'Failed to create subdirectory'
|
||||
setSubdirError(message)
|
||||
@@ -289,19 +346,47 @@ export default function RepoRpmDetailPage(props: RepoRpmDetailPageProps) {
|
||||
}
|
||||
|
||||
const handleRenameSubdir = async () => {
|
||||
let syncIntervalSec: number
|
||||
if (!repoId || !renamePath) return
|
||||
if (!renameNewName.trim()) {
|
||||
setRenameError('New name is required.')
|
||||
return
|
||||
}
|
||||
if (renameIsRepoDir && renameMode === 'mirror' && !renameRemoteURL.trim()) {
|
||||
setRenameError('Remote URL is required for mirror mode.')
|
||||
return
|
||||
}
|
||||
setRenameError(null)
|
||||
setRenaming(true)
|
||||
try {
|
||||
await api.renameRpmSubdir(repoId, renamePath, renameNewName.trim())
|
||||
syncIntervalSec = Number(renameSyncIntervalSec)
|
||||
if (!Number.isFinite(syncIntervalSec) || syncIntervalSec <= 0) {
|
||||
syncIntervalSec = 300
|
||||
}
|
||||
await api.renameRpmSubdir(
|
||||
repoId,
|
||||
renamePath,
|
||||
renameNewName.trim(),
|
||||
renameIsRepoDir ? renameMode : undefined,
|
||||
renameIsRepoDir ? renameRemoteURL.trim() : undefined,
|
||||
renameIsRepoDir ? renameConnectHost.trim() : undefined,
|
||||
renameIsRepoDir ? renameHostHeader.trim() : undefined,
|
||||
renameIsRepoDir ? renameTLSServerName.trim() : undefined,
|
||||
renameIsRepoDir ? renameTLSInsecureSkipVerify : undefined,
|
||||
renameIsRepoDir ? syncIntervalSec : undefined
|
||||
)
|
||||
setRenameOpen(false)
|
||||
setRenamePath('')
|
||||
setRenameName('')
|
||||
setRenameNewName('')
|
||||
setRenameIsRepoDir(false)
|
||||
setRenameMode('local')
|
||||
setRenameSyncIntervalSec('300')
|
||||
setRenameRemoteURL('')
|
||||
setRenameConnectHost('')
|
||||
setRenameHostHeader('')
|
||||
setRenameTLSServerName('')
|
||||
setRenameTLSInsecureSkipVerify(false)
|
||||
api.listRpmTree(repoId, rpmPath)
|
||||
.then((list) => {
|
||||
setRpmTree(Array.isArray(list) ? list : [])
|
||||
@@ -319,6 +404,115 @@ export default function RepoRpmDetailPage(props: RepoRpmDetailPageProps) {
|
||||
}
|
||||
}
|
||||
|
||||
const loadStatus = async (path: string) => {
|
||||
let cfg: Awaited<ReturnType<typeof api.getRpmSubdir>>
|
||||
if (!repoId || !path) return
|
||||
setStatusError(null)
|
||||
cfg = await api.getRpmSubdir(repoId, path)
|
||||
setStatusMode(cfg.mode === 'mirror' ? 'mirror' : 'local')
|
||||
setStatusSyncStatus(cfg.sync_status || '')
|
||||
setStatusSyncStep(cfg.sync_step || '')
|
||||
setStatusSyncError(cfg.sync_error || '')
|
||||
setStatusSyncEnabled(Boolean(cfg.sync_enabled))
|
||||
setStatusSyncTotal(Number(cfg.sync_total || 0))
|
||||
setStatusSyncDone(Number(cfg.sync_done || 0))
|
||||
setStatusSyncFailed(Number(cfg.sync_failed || 0))
|
||||
setStatusSyncDeleted(Number(cfg.sync_deleted || 0))
|
||||
if ((cfg.mode || '') === 'mirror') {
|
||||
const runs = await api.listRpmMirrorRuns(repoId, path, 10)
|
||||
setStatusRuns(Array.isArray(runs) ? runs : [])
|
||||
} else {
|
||||
setStatusRuns([])
|
||||
}
|
||||
}
|
||||
|
||||
const openStatusDialog = async (entry: RpmTreeEntry) => {
|
||||
setStatusPath(entry.path)
|
||||
setStatusName(entry.name)
|
||||
setStatusMode(entry.repo_mode === 'mirror' ? 'mirror' : 'local')
|
||||
setStatusSyncStatus('')
|
||||
setStatusSyncStep('')
|
||||
setStatusSyncError('')
|
||||
setStatusSyncEnabled(true)
|
||||
setStatusSyncTotal(0)
|
||||
setStatusSyncDone(0)
|
||||
setStatusSyncFailed(0)
|
||||
setStatusSyncDeleted(0)
|
||||
setStatusRuns([])
|
||||
setStatusError(null)
|
||||
setStatusOpen(true)
|
||||
if (!repoId || !entry.is_repo_dir) return
|
||||
try {
|
||||
await loadStatus(entry.path)
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : 'Failed to load mirror status'
|
||||
setStatusError(message)
|
||||
}
|
||||
}
|
||||
|
||||
const handleStatusSyncNow = async () => {
|
||||
if (!repoId || !statusPath || statusMode !== 'mirror') return
|
||||
setStatusSyncBusy(true)
|
||||
setStatusError(null)
|
||||
try {
|
||||
await api.syncRpmSubdir(repoId, statusPath)
|
||||
setStatusSyncEnabled(true)
|
||||
setStatusSyncStatus('scheduled')
|
||||
setStatusSyncStep('queued')
|
||||
setStatusSyncError('')
|
||||
const runs = await api.listRpmMirrorRuns(repoId, statusPath, 10)
|
||||
setStatusRuns(Array.isArray(runs) ? runs : [])
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : 'Failed to schedule sync'
|
||||
setStatusError(message)
|
||||
} finally {
|
||||
setStatusSyncBusy(false)
|
||||
}
|
||||
}
|
||||
|
||||
const handleStatusToggleSyncEnabled = async () => {
|
||||
if (!repoId || !statusPath || statusMode !== 'mirror') return
|
||||
setStatusSyncBusy(true)
|
||||
setStatusError(null)
|
||||
try {
|
||||
if (statusSyncEnabled) {
|
||||
await api.suspendRpmSubdir(repoId, statusPath)
|
||||
setStatusSyncEnabled(false)
|
||||
setStatusSyncStatus('suspended')
|
||||
} else {
|
||||
await api.resumeRpmSubdir(repoId, statusPath)
|
||||
setStatusSyncEnabled(true)
|
||||
setStatusSyncStatus('scheduled')
|
||||
}
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : 'Failed to change mirror sync state'
|
||||
setStatusError(message)
|
||||
} finally {
|
||||
setStatusSyncBusy(false)
|
||||
}
|
||||
}
|
||||
|
||||
const handleStatusClearRuns = async () => {
|
||||
let runs: RpmMirrorRun[]
|
||||
let result: { status: string; deleted_count: number }
|
||||
if (!repoId || !statusPath || statusMode !== 'mirror') return
|
||||
setStatusSyncBusy(true)
|
||||
setStatusError(null)
|
||||
try {
|
||||
result = await api.clearRpmMirrorRuns(repoId, statusPath)
|
||||
runs = await api.listRpmMirrorRuns(repoId, statusPath, 10)
|
||||
setStatusRuns(Array.isArray(runs) ? runs : [])
|
||||
if ((result.deleted_count || 0) <= 0 && Array.isArray(runs) && runs.length > 0) {
|
||||
setStatusError('No rows were deleted. New runs may still be added by active mirror sync.')
|
||||
}
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : 'Failed to clear mirror runs'
|
||||
setStatusError(message)
|
||||
} finally {
|
||||
setStatusSyncBusy(false)
|
||||
}
|
||||
}
|
||||
|
||||
const handleRpmBack = () => {
|
||||
if (!rpmPath) return
|
||||
const nextSegments = rpmPathSegments.slice(0, -1)
|
||||
@@ -493,7 +687,22 @@ export default function RepoRpmDetailPage(props: RepoRpmDetailPageProps) {
|
||||
<Box sx={{ display: 'flex', alignItems: 'center', gap: 0.5 }}>
|
||||
{canWrite ? (
|
||||
<>
|
||||
<Button size="small" onClick={() => setSubdirOpen(true)}>
|
||||
<Button
|
||||
size="small"
|
||||
onClick={() => {
|
||||
setSubdirError(null)
|
||||
setSubdirName('')
|
||||
setSubdirType('container')
|
||||
setSubdirMode('local')
|
||||
setSubdirSyncIntervalSec('300')
|
||||
setSubdirRemoteURL('')
|
||||
setSubdirConnectHost('')
|
||||
setSubdirHostHeader('')
|
||||
setSubdirTLSServerName('')
|
||||
setSubdirTLSInsecureSkipVerify(false)
|
||||
setSubdirOpen(true)
|
||||
}}
|
||||
>
|
||||
New Folder...
|
||||
</Button>
|
||||
<Button
|
||||
@@ -568,21 +777,64 @@ export default function RepoRpmDetailPage(props: RepoRpmDetailPageProps) {
|
||||
<InsertDriveFileIcon fontSize="small" color="info" />
|
||||
)}
|
||||
<Typography variant="body2">{entry.name}</Typography>
|
||||
{entry.type === 'dir' && entry.is_repo_dir ? (
|
||||
<Chip
|
||||
size="small"
|
||||
color={entry.repo_mode === 'mirror' ? 'warning' : 'default'}
|
||||
label={entry.repo_mode === 'mirror' ? 'mirror' : 'local'}
|
||||
sx={{ height: 18 }}
|
||||
/>
|
||||
) : null}
|
||||
</Box>
|
||||
}
|
||||
/>
|
||||
</ListItemButton>
|
||||
{canWrite ? (
|
||||
<Box sx={{ display: 'flex', alignItems: 'center', gap: 0.5, pr: 0.5 }}>
|
||||
{entry.type === 'dir' && entry.is_repo_dir ? (
|
||||
<IconButton
|
||||
size="small"
|
||||
onClick={async (event) => {
|
||||
event.stopPropagation()
|
||||
await openStatusDialog(entry)
|
||||
}}
|
||||
aria-label={`View status for ${entry.name}`}
|
||||
>
|
||||
<MonitorHeartOutlinedIcon fontSize="small" />
|
||||
</IconButton>
|
||||
) : null}
|
||||
{entry.type === 'dir' && entry.name.toLowerCase() !== 'repodata' ? (
|
||||
<IconButton
|
||||
size="small"
|
||||
onClick={(event) => {
|
||||
onClick={async (event) => {
|
||||
event.stopPropagation()
|
||||
setRenameError(null)
|
||||
setRenamePath(entry.path)
|
||||
setRenameName(entry.name)
|
||||
setRenameNewName(entry.name)
|
||||
setRenameIsRepoDir(Boolean(entry.is_repo_dir))
|
||||
setRenameMode(entry.repo_mode === 'mirror' ? 'mirror' : 'local')
|
||||
setRenameSyncIntervalSec('300')
|
||||
setRenameRemoteURL('')
|
||||
setRenameConnectHost('')
|
||||
setRenameHostHeader('')
|
||||
setRenameTLSServerName('')
|
||||
setRenameTLSInsecureSkipVerify(false)
|
||||
if (repoId && entry.is_repo_dir) {
|
||||
try {
|
||||
const cfg = await api.getRpmSubdir(repoId, entry.path)
|
||||
setRenameMode(cfg.mode === 'mirror' ? 'mirror' : 'local')
|
||||
setRenameSyncIntervalSec(String(cfg.sync_interval_sec || 300))
|
||||
setRenameRemoteURL(cfg.remote_url || '')
|
||||
setRenameConnectHost(cfg.connect_host || '')
|
||||
setRenameHostHeader(cfg.host_header || '')
|
||||
setRenameTLSServerName(cfg.tls_server_name || '')
|
||||
setRenameTLSInsecureSkipVerify(Boolean(cfg.tls_insecure_skip_verify))
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : 'Failed to load repo directory settings'
|
||||
setRenameError(message)
|
||||
}
|
||||
}
|
||||
setRenameOpen(true)
|
||||
}}
|
||||
aria-label={`Rename folder ${entry.name}`}
|
||||
@@ -793,6 +1045,69 @@ export default function RepoRpmDetailPage(props: RepoRpmDetailPageProps) {
|
||||
<MenuItem value="container">Container</MenuItem>
|
||||
<MenuItem value="repo">RPM Repo</MenuItem>
|
||||
</TextField>
|
||||
{subdirType === 'repo' ? (
|
||||
<TextField
|
||||
select
|
||||
label="Repo Mode"
|
||||
value={subdirMode}
|
||||
onChange={(event) => setSubdirMode(event.target.value as 'local' | 'mirror')}
|
||||
helperText={subdirMode === 'mirror' ? 'Mirror repos are read-only from UI writes.' : 'Local repos allow upload/delete from UI.'}
|
||||
fullWidth
|
||||
>
|
||||
<MenuItem value="local">local</MenuItem>
|
||||
<MenuItem value="mirror">mirror</MenuItem>
|
||||
</TextField>
|
||||
) : null}
|
||||
{subdirType === 'repo' ? (
|
||||
<TextField
|
||||
label="Sync Interval (seconds)"
|
||||
value={subdirSyncIntervalSec}
|
||||
onChange={(event) => setSubdirSyncIntervalSec(event.target.value)}
|
||||
helperText="Used for mirror mode periodic pull. Minimum 10."
|
||||
fullWidth
|
||||
/>
|
||||
) : null}
|
||||
{subdirType === 'repo' && subdirMode === 'mirror' ? (
|
||||
<>
|
||||
<TextField
|
||||
label="Remote URL"
|
||||
value={subdirRemoteURL}
|
||||
onChange={(event) => setSubdirRemoteURL(event.target.value)}
|
||||
helperText="Example: https://rpm.repo.com/base/path"
|
||||
fullWidth
|
||||
/>
|
||||
<TextField
|
||||
label="Connect Host (optional)"
|
||||
value={subdirConnectHost}
|
||||
onChange={(event) => setSubdirConnectHost(event.target.value)}
|
||||
helperText="Optional host:port override, e.g. 127.0.0.1:443"
|
||||
fullWidth
|
||||
/>
|
||||
<TextField
|
||||
label="Host Header (optional)"
|
||||
value={subdirHostHeader}
|
||||
onChange={(event) => setSubdirHostHeader(event.target.value)}
|
||||
helperText="Optional Host header override, e.g. rpm.repo.com"
|
||||
fullWidth
|
||||
/>
|
||||
<TextField
|
||||
label="TLS Server Name (optional)"
|
||||
value={subdirTLSServerName}
|
||||
onChange={(event) => setSubdirTLSServerName(event.target.value)}
|
||||
helperText="Optional SNI/verify server name override"
|
||||
fullWidth
|
||||
/>
|
||||
<FormControlLabel
|
||||
control={
|
||||
<Checkbox
|
||||
checked={subdirTLSInsecureSkipVerify}
|
||||
onChange={(event) => setSubdirTLSInsecureSkipVerify(event.target.checked)}
|
||||
/>
|
||||
}
|
||||
label="Skip TLS certificate verification"
|
||||
/>
|
||||
</>
|
||||
) : null}
|
||||
</Box>
|
||||
</DialogContent>
|
||||
<DialogActions>
|
||||
@@ -858,11 +1173,119 @@ export default function RepoRpmDetailPage(props: RepoRpmDetailPageProps) {
|
||||
</Button>
|
||||
</DialogActions>
|
||||
</Dialog>
|
||||
<Dialog open={renameOpen} onClose={() => setRenameOpen(false)} maxWidth="xs" fullWidth>
|
||||
<DialogTitle>Rename folder</DialogTitle>
|
||||
<Dialog
|
||||
open={statusOpen}
|
||||
onClose={() => {
|
||||
setStatusOpen(false)
|
||||
setStatusError(null)
|
||||
setClearRunsConfirmOpen(false)
|
||||
}}
|
||||
maxWidth="sm"
|
||||
fullWidth
|
||||
>
|
||||
<DialogTitle>Repo directory status</DialogTitle>
|
||||
<DialogContent>
|
||||
{statusError ? <Alert severity="error">{statusError}</Alert> : null}
|
||||
<Box sx={{ display: 'grid', gap: 0.5, mt: 1 }}>
|
||||
<Typography variant="body2">Directory: {statusName}</Typography>
|
||||
<Typography variant="body2">Path: {statusPath || '.'}</Typography>
|
||||
<Typography variant="body2">Mode: {statusMode}</Typography>
|
||||
{statusMode === 'mirror' ? (
|
||||
<>
|
||||
<Typography variant="body2">
|
||||
Sync: {statusSyncEnabled ? 'enabled' : 'suspended'} · status: {statusSyncStatus || '-'} {statusSyncStep ? `(${statusSyncStep})` : ''}
|
||||
</Typography>
|
||||
<Typography variant="body2">
|
||||
Progress: {statusSyncDone}/{statusSyncTotal} · failed: {statusSyncFailed} · deleted: {statusSyncDeleted}
|
||||
</Typography>
|
||||
{statusSyncError ? (
|
||||
<Typography variant="body2" color="error">
|
||||
Last error: {statusSyncError}
|
||||
</Typography>
|
||||
) : null}
|
||||
<Box sx={{ pt: 1 }}>
|
||||
<Button size="small" variant="outlined" onClick={handleStatusSyncNow} disabled={statusSyncBusy}>
|
||||
{statusSyncBusy ? 'Scheduling...' : 'Sync now'}
|
||||
</Button>
|
||||
<Button size="small" variant="outlined" onClick={handleStatusToggleSyncEnabled} disabled={statusSyncBusy} sx={{ ml: 1 }}>
|
||||
{statusSyncEnabled ? 'Suspend' : 'Resume'}
|
||||
</Button>
|
||||
<Button
|
||||
size="small"
|
||||
variant="outlined"
|
||||
color="warning"
|
||||
onClick={() => setClearRunsConfirmOpen(true)}
|
||||
disabled={statusSyncBusy}
|
||||
sx={{ ml: 1 }}
|
||||
>
|
||||
Clear runs
|
||||
</Button>
|
||||
<Button
|
||||
size="small"
|
||||
variant="outlined"
|
||||
onClick={async () => {
|
||||
if (!statusPath) return
|
||||
try {
|
||||
await loadStatus(statusPath)
|
||||
} catch (err) {
|
||||
const message = err instanceof Error ? err.message : 'Failed to refresh mirror status'
|
||||
setStatusError(message)
|
||||
}
|
||||
}}
|
||||
disabled={statusSyncBusy}
|
||||
sx={{ ml: 1 }}
|
||||
>
|
||||
Refresh
|
||||
</Button>
|
||||
</Box>
|
||||
<Box sx={{ display: 'grid', gap: 0.25, pt: 1 }}>
|
||||
<Typography variant="caption" color="text.secondary">Recent runs</Typography>
|
||||
{statusRuns.length ? (
|
||||
statusRuns.map((run) => (
|
||||
<Typography key={run.id} variant="caption" color={run.status === 'failed' ? 'error' : 'text.secondary'}>
|
||||
{new Date((run.started_at || 0) * 1000).toLocaleString()} · {run.status} · {run.done}/{run.total} · fail {run.failed} · del {run.deleted}
|
||||
</Typography>
|
||||
))
|
||||
) : (
|
||||
<Typography variant="caption" color="text.secondary">
|
||||
No runs yet.
|
||||
</Typography>
|
||||
)}
|
||||
</Box>
|
||||
</>
|
||||
) : (
|
||||
<Typography variant="body2" color="text.secondary">
|
||||
This directory is local and does not run mirror sync.
|
||||
</Typography>
|
||||
)}
|
||||
</Box>
|
||||
</DialogContent>
|
||||
<DialogActions>
|
||||
<Button
|
||||
onClick={() => {
|
||||
setStatusOpen(false)
|
||||
setStatusError(null)
|
||||
setClearRunsConfirmOpen(false)
|
||||
}}
|
||||
>
|
||||
Close
|
||||
</Button>
|
||||
</DialogActions>
|
||||
</Dialog>
|
||||
<Dialog
|
||||
open={renameOpen}
|
||||
onClose={() => {
|
||||
setRenameOpen(false)
|
||||
setRenameError(null)
|
||||
}}
|
||||
maxWidth="sm"
|
||||
fullWidth
|
||||
>
|
||||
<DialogTitle>{renameIsRepoDir ? 'Edit repo directory' : 'Rename folder'}</DialogTitle>
|
||||
<DialogContent>
|
||||
{renameError ? <Alert severity="error">{renameError}</Alert> : null}
|
||||
<Typography variant="body2" sx={{ mt: 1 }}>
|
||||
<Box sx={{ display: 'grid', gap: 1, mt: 1 }}>
|
||||
<Typography variant="body2">
|
||||
Current name: {renameName}
|
||||
</Typography>
|
||||
<TextField
|
||||
@@ -870,13 +1293,101 @@ export default function RepoRpmDetailPage(props: RepoRpmDetailPageProps) {
|
||||
value={renameNewName}
|
||||
onChange={(event) => setRenameNewName(event.target.value)}
|
||||
fullWidth
|
||||
sx={{ mt: 1 }}
|
||||
/>
|
||||
{renameIsRepoDir ? (
|
||||
<TextField
|
||||
select
|
||||
label="Repo Mode"
|
||||
value={renameMode}
|
||||
onChange={(event) => setRenameMode(event.target.value as 'local' | 'mirror')}
|
||||
fullWidth
|
||||
>
|
||||
<MenuItem value="local">local</MenuItem>
|
||||
<MenuItem value="mirror">mirror</MenuItem>
|
||||
</TextField>
|
||||
) : null}
|
||||
{renameIsRepoDir ? (
|
||||
<TextField
|
||||
label="Sync Interval (seconds)"
|
||||
value={renameSyncIntervalSec}
|
||||
onChange={(event) => setRenameSyncIntervalSec(event.target.value)}
|
||||
helperText="Used for periodic pull. Minimum 10."
|
||||
fullWidth
|
||||
/>
|
||||
) : null}
|
||||
{renameIsRepoDir && renameMode === 'mirror' ? (
|
||||
<>
|
||||
<TextField
|
||||
label="Remote URL"
|
||||
value={renameRemoteURL}
|
||||
onChange={(event) => setRenameRemoteURL(event.target.value)}
|
||||
helperText="Example: https://rpm.repo.com/base/path"
|
||||
fullWidth
|
||||
/>
|
||||
<TextField
|
||||
label="Connect Host (optional)"
|
||||
value={renameConnectHost}
|
||||
onChange={(event) => setRenameConnectHost(event.target.value)}
|
||||
helperText="Optional host:port override, e.g. 127.0.0.1:443"
|
||||
fullWidth
|
||||
/>
|
||||
<TextField
|
||||
label="Host Header (optional)"
|
||||
value={renameHostHeader}
|
||||
onChange={(event) => setRenameHostHeader(event.target.value)}
|
||||
fullWidth
|
||||
/>
|
||||
<TextField
|
||||
label="TLS Server Name (optional)"
|
||||
value={renameTLSServerName}
|
||||
onChange={(event) => setRenameTLSServerName(event.target.value)}
|
||||
fullWidth
|
||||
/>
|
||||
<FormControlLabel
|
||||
control={<Checkbox checked={renameTLSInsecureSkipVerify} onChange={(event) => setRenameTLSInsecureSkipVerify(event.target.checked)} />}
|
||||
label="Skip TLS certificate verification"
|
||||
/>
|
||||
</>
|
||||
) : null}
|
||||
</Box>
|
||||
</DialogContent>
|
||||
<DialogActions>
|
||||
<Button onClick={() => setRenameOpen(false)}>Cancel</Button>
|
||||
<Button
|
||||
onClick={() => {
|
||||
setRenameOpen(false)
|
||||
setRenameError(null)
|
||||
}}
|
||||
>
|
||||
Cancel
|
||||
</Button>
|
||||
<Button onClick={handleRenameSubdir} variant="contained" disabled={renaming || !renameNewName.trim()}>
|
||||
{renaming ? 'Renaming...' : 'Rename'}
|
||||
{renaming ? (renameIsRepoDir ? 'Saving...' : 'Renaming...') : (renameIsRepoDir ? 'Save' : 'Rename')}
|
||||
</Button>
|
||||
</DialogActions>
|
||||
</Dialog>
|
||||
<Dialog
|
||||
open={clearRunsConfirmOpen}
|
||||
onClose={() => setClearRunsConfirmOpen(false)}
|
||||
maxWidth="xs"
|
||||
fullWidth
|
||||
>
|
||||
<DialogTitle>Clear runs</DialogTitle>
|
||||
<DialogContent>
|
||||
<Typography variant="body2" sx={{ mt: 1 }}>
|
||||
Clear recent mirror runs for "{statusName}"?
|
||||
</Typography>
|
||||
</DialogContent>
|
||||
<DialogActions>
|
||||
<Button onClick={() => setClearRunsConfirmOpen(false)}>Cancel</Button>
|
||||
<Button
|
||||
color="warning"
|
||||
variant="contained"
|
||||
onClick={async () => {
|
||||
setClearRunsConfirmOpen(false)
|
||||
await handleStatusClearRuns()
|
||||
}}
|
||||
>
|
||||
Clear
|
||||
</Button>
|
||||
</DialogActions>
|
||||
</Dialog>
|
||||
|
||||
Reference in New Issue
Block a user