package rpm import "compress/gzip" import "context" import "crypto/md5" import "crypto/sha1" import "crypto/sha256" import "crypto/sha512" import "crypto/tls" import "bytes" import "encoding/hex" import "encoding/xml" import "errors" import "hash" import "io" import "io/fs" import "net" import "net/http" import "net/url" import "os" import "path/filepath" import "strconv" import "strings" import "sync" import "time" import "codit/internal/db" import "codit/internal/models" import "codit/internal/util" type MirrorManager struct { store *db.Store logger *util.Logger meta *MetaManager stopCh chan struct{} cancelMu sync.Mutex cancelByKey map[string]context.CancelFunc } type repomdDoc struct { Data []repomdData `xml:"data"` } type repomdData struct { Type string `xml:"type,attr"` Location repomdLocation `xml:"location"` } type repomdLocation struct { Href string `xml:"href,attr"` } type primaryDoc struct { Packages []primaryPackage `xml:"package"` } type primaryPackage struct { Location primaryLocation `xml:"location"` Checksum primaryChecksum `xml:"checksum"` Time primaryTime `xml:"time"` } type primaryLocation struct { Href string `xml:"href,attr"` } type primaryChecksum struct { Type string `xml:"type,attr"` Value string `xml:",chardata"` } type mirrorChecksum struct { Algo string Value string BuildTime int64 FileTime int64 } type primaryTime struct { File string `xml:"file,attr"` Build string `xml:"build,attr"` } type mirrorHTTPConfig struct { BaseURL string ConnectHost string HostHeader string TLSServerName string TLSInsecure bool DefaultHost string DefaultServer string } func NewMirrorManager(store *db.Store, logger *util.Logger, meta *MetaManager) *MirrorManager { var m *MirrorManager m = &MirrorManager{ store: store, logger: logger, meta: meta, stopCh: make(chan struct{}), cancelByKey: make(map[string]context.CancelFunc), } return m } func (m *MirrorManager) CancelTask(repoID string, path string) bool { var key string var cancel context.CancelFunc if m == nil { return false } key = mirrorTaskKey(repoID, path) m.cancelMu.Lock() cancel = m.cancelByKey[key] m.cancelMu.Unlock() if cancel == nil { return false } cancel() return true } func (m *MirrorManager) Start() { var err error var tasks []models.RPMMirrorTask var i int if m == nil { return } err = m.store.ResetRunningRPMMirrorTasks() if err != nil && m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_ERROR, "reset running tasks failed err=%v", err) } tasks, err = m.store.ListRPMMirrorPaths() if err == nil { for i = 0; i < len(tasks); i++ { _ = m.store.CleanupRPMMirrorRunsRetention(tasks[i].RepoID, tasks[i].MirrorPath, 200, 30) } } go m.loop() } func (m *MirrorManager) Stop() { if m == nil { return } close(m.stopCh) } func (m *MirrorManager) loop() { var ticker *time.Ticker ticker = time.NewTicker(10 * time.Second) defer ticker.Stop() m.runDue() for { select { case <-m.stopCh: return case <-ticker.C: m.runDue() } } } func (m *MirrorManager) runDue() { var tasks []models.RPMMirrorTask var started bool var now int64 var i int var err error now = time.Now().UTC().Unix() tasks, err = m.store.ListDueRPMMirrorTasks(now, 8) if err != nil { if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_ERROR, "list due tasks failed err=%v", err) } return } for i = 0; i < len(tasks); i++ { started, err = m.store.TryStartRPMMirrorTask(tasks[i].RepoID, tasks[i].MirrorPath, now) if err != nil { if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_ERROR, "try start failed repo=%s path=%s err=%v", tasks[i].RepoID, tasks[i].MirrorPath, err) } continue } if !started { continue } m.syncOne(tasks[i]) } } func (m *MirrorManager) syncOne(task models.RPMMirrorTask) { var localRoot string var cfg mirrorHTTPConfig var client *http.Client var repomdData []byte var revision string var primaryHref string var primaryData []byte var expected map[string]mirrorChecksum var duplicateCount int var runID string var startedAt int64 var total int64 var done int64 var failed int64 var deleted int64 var changed int64 var err error var syncCtx context.Context var syncCancel context.CancelFunc var canceled bool var key string localRoot = filepath.Join(task.RepoPath, filepath.FromSlash(task.MirrorPath)) startedAt = time.Now().UTC().Unix() syncCtx, syncCancel = context.WithCancel(context.Background()) key = mirrorTaskKey(task.RepoID, task.MirrorPath) m.cancelMu.Lock() m.cancelByKey[key] = syncCancel m.cancelMu.Unlock() defer func() { m.cancelMu.Lock() delete(m.cancelByKey, key) m.cancelMu.Unlock() syncCancel() }() runID, err = m.store.CreateRPMMirrorRun(task.RepoID, task.MirrorPath, startedAt) if err != nil { _ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error()) return } cfg, err = buildMirrorHTTPConfig(task) if err != nil { if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=start err=%v", task.RepoID, task.MirrorPath, err) } _ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error()) _ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "start", 0, 0, 0, 0, "", err.Error()) return } client = buildMirrorHTTPClient(cfg) if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_INFO, "sync start repo=%s path=%s remote=%s", task.RepoID, task.MirrorPath, task.RemoteURL) } _ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "fetch_repodata", 0, 0, 0, 0) repomdData, err = mirrorFetch(syncCtx, client, cfg, "repodata/repomd.xml") if err != nil { if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=fetch_repodata err=%v", task.RepoID, task.MirrorPath, err) } _ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error()) _ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_repodata", 0, 0, 0, 0, "", err.Error()) return } revision = sha256HexBytes(repomdData) if !task.Dirty && task.LastSyncedRevision != "" && task.LastSyncedRevision == revision { if m.meta != nil { ensureRepodata(task, localRoot, m.meta, m.logger) } if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_INFO, "sync done repo=%s path=%s status=no_change revision=%s", task.RepoID, task.MirrorPath, revision) } _ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, true, revision, "") _ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "success", "no_change", 0, 0, 0, 0, revision, "") return } primaryHref, err = parseRepomdPrimaryHref(repomdData) if err != nil { if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=parse_repodata err=%v", task.RepoID, task.MirrorPath, err) } _ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error()) _ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_repodata", 0, 0, 0, 0, "", err.Error()) return } _ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "fetch_primary", 0, 0, 0, 0) primaryData, err = mirrorFetch(syncCtx, client, cfg, primaryHref) if err != nil { if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=fetch_primary err=%v", task.RepoID, task.MirrorPath, err) } _ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error()) _ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_primary", 0, 0, 0, 0, "", err.Error()) return } if strings.HasSuffix(strings.ToLower(primaryHref), ".gz") { primaryData, err = gunzipBytes(primaryData) if err != nil { if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=decode_primary err=%v", task.RepoID, task.MirrorPath, err) } _ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error()) _ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_primary", 0, 0, 0, 0, "", err.Error()) return } } expected, duplicateCount, err = parsePrimaryPackages(primaryData) if err != nil { if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=parse_primary err=%v", task.RepoID, task.MirrorPath, err) } _ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error()) _ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_primary", 0, 0, 0, 0, "", err.Error()) return } if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_INFO, "primary parsed repo=%s path=%s primary_href=%s packages=%d", task.RepoID, task.MirrorPath, primaryHref, len(expected)) if duplicateCount > 0 { m.logger.Write("rpm-mirror", util.LOG_WARN, "primary has duplicate package paths repo=%s path=%s primary_href=%s duplicates=%d", task.RepoID, task.MirrorPath, primaryHref, duplicateCount) } } total, done, failed, deleted, changed, err = m.applyMirror(syncCtx, task, localRoot, client, cfg, expected) if err != nil { canceled = errors.Is(err, context.Canceled) if m.logger != nil { if canceled { m.logger.Write("rpm-mirror", util.LOG_WARN, "sync canceled repo=%s path=%s step=apply total=%d done=%d failed=%d deleted=%d err=%v", task.RepoID, task.MirrorPath, total, done, failed, deleted, err) } else { m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=apply total=%d done=%d failed=%d deleted=%d err=%v", task.RepoID, task.MirrorPath, total, done, failed, deleted, err) } } if canceled { _ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", "sync canceled by user") _ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "canceled", total, done, failed, deleted, "", "sync canceled by user") } else { _ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error()) _ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "apply", total, done, failed, deleted, "", err.Error()) } return } if m.meta != nil && changed > 0 { if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_INFO, "repodata schedule repo=%s path=%s reason=sync_changed changed=%d", task.RepoID, task.MirrorPath, changed) } m.meta.Schedule(localRoot) } _ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, true, revision, "") _ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "success", "done", total, done, failed, deleted, revision, "") _ = m.store.CleanupRPMMirrorRunsRetention(task.RepoID, task.MirrorPath, 200, 30) if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_INFO, "sync done repo=%s path=%s status=success total=%d done=%d failed=%d deleted=%d revision=%s", task.RepoID, task.MirrorPath, total, done, failed, deleted, revision) } } func (m *MirrorManager) applyMirror(ctx context.Context, task models.RPMMirrorTask, localRoot string, client *http.Client, cfg mirrorHTTPConfig, expected map[string]mirrorChecksum) (int64, int64, int64, int64, int64, error) { var local map[string]bool var total int64 var done int64 var failed int64 var deleted int64 var changed int64 var path string var checksum mirrorChecksum var fullPath string var localSum string var needDownload bool var err error local, err = listLocalRPMs(localRoot) if err != nil { return 0, 0, 0, 0, 0, err } total = int64(len(expected)) _ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "apply", total, 0, 0, 0) for path = range local { select { case <-ctx.Done(): return total, done, failed, deleted, changed, ctx.Err() default: } if expected[path].Value != "" { continue } if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_DEBUG, "delete local stale repo=%s path=%s file=%s", task.RepoID, task.MirrorPath, path) } err = os.Remove(filepath.Join(localRoot, filepath.FromSlash(path))) if err == nil || os.IsNotExist(err) { deleted = deleted + 1 changed = changed + 1 } else { if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_WARN, "delete local stale failed repo=%s path=%s file=%s err=%v", task.RepoID, task.MirrorPath, path, err) } } } for path, checksum = range expected { select { case <-ctx.Done(): return total, done, failed, deleted, changed, ctx.Err() default: } fullPath = filepath.Join(localRoot, filepath.FromSlash(path)) needDownload = true _, err = os.Stat(fullPath) if err == nil { localSum, err = fileHexByAlgo(fullPath, checksum.Algo) if err == nil && (checksum.Value == "" || strings.EqualFold(localSum, checksum.Value)) { needDownload = false } } if needDownload { if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_DEBUG, "download start repo=%s path=%s file=%s checksum_type=%s checksum=%s", task.RepoID, task.MirrorPath, path, checksum.Algo, checksum.Value) } err = mirrorDownload(ctx, client, cfg, path, fullPath, checksum.Algo, checksum.Value) if err != nil { failed = failed + 1 if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_WARN, "download failed repo=%s path=%s file=%s err=%v", task.RepoID, task.MirrorPath, path, err) } _ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "apply", total, done, failed, deleted) continue } changed = changed + 1 if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_DEBUG, "download done repo=%s path=%s file=%s", task.RepoID, task.MirrorPath, path) } } else { if m.logger != nil { m.logger.Write("rpm-mirror", util.LOG_DEBUG, "download skip repo=%s path=%s file=%s reason=up-to-date", task.RepoID, task.MirrorPath, path) } } done = done + 1 _ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "apply", total, done, failed, deleted) } if failed > 0 { return total, done, failed, deleted, changed, errors.New("some mirror files failed to sync") } return total, done, failed, deleted, changed, nil } func buildMirrorHTTPConfig(task models.RPMMirrorTask) (mirrorHTTPConfig, error) { var cfg mirrorHTTPConfig var u *url.URL var err error cfg = mirrorHTTPConfig{ BaseURL: strings.TrimRight(strings.TrimSpace(task.RemoteURL), "/"), ConnectHost: strings.TrimSpace(task.ConnectHost), HostHeader: strings.TrimSpace(task.HostHeader), TLSServerName: strings.TrimSpace(task.TLSServerName), TLSInsecure: task.TLSInsecureSkipVerify, } if cfg.BaseURL == "" { return cfg, errors.New("remote url is empty") } u, err = url.Parse(cfg.BaseURL) if err != nil { return cfg, err } cfg.DefaultHost = u.Host cfg.DefaultServer = u.Hostname() if cfg.DefaultHost == "" { return cfg, errors.New("remote url host is empty") } return cfg, nil } func buildMirrorHTTPClient(cfg mirrorHTTPConfig) *http.Client { var transport *http.Transport transport = &http.Transport{ Proxy: http.ProxyFromEnvironment, TLSClientConfig: &tls.Config{ InsecureSkipVerify: cfg.TLSInsecure, ServerName: effectiveServerName(cfg), }, } if cfg.ConnectHost != "" { transport.DialContext = func(ctx context.Context, network string, addr string) (net.Conn, error) { var d net.Dialer _ = addr return d.DialContext(ctx, network, cfg.ConnectHost) } } return &http.Client{ Transport: transport, Timeout: 60 * time.Second, } } func effectiveHostHeader(cfg mirrorHTTPConfig) string { if strings.TrimSpace(cfg.HostHeader) != "" { return strings.TrimSpace(cfg.HostHeader) } return cfg.DefaultHost } func effectiveServerName(cfg mirrorHTTPConfig) string { var host string if strings.TrimSpace(cfg.TLSServerName) != "" { return strings.TrimSpace(cfg.TLSServerName) } host = strings.TrimSpace(cfg.HostHeader) if host != "" { if strings.Contains(host, ":") { return strings.Split(host, ":")[0] } return host } return cfg.DefaultServer } func mirrorFetch(ctx context.Context, client *http.Client, cfg mirrorHTTPConfig, rel string) ([]byte, error) { var fullURL string var req *http.Request var res *http.Response var body []byte var err error fullURL = joinRemoteURL(cfg.BaseURL, rel) req, err = http.NewRequestWithContext(ctx, http.MethodGet, fullURL, nil) if err != nil { return nil, err } req.Host = effectiveHostHeader(cfg) res, err = client.Do(req) if err != nil { return nil, err } defer res.Body.Close() if res.StatusCode < 200 || res.StatusCode >= 300 { return nil, errors.New("upstream request failed: " + res.Status) } body, err = io.ReadAll(res.Body) if err != nil { return nil, err } return body, nil } func mirrorDownload(ctx context.Context, client *http.Client, cfg mirrorHTTPConfig, rel string, dstPath string, checksumType string, checksum string) error { var fullURL string var req *http.Request var res *http.Response var tempPath string var out *os.File var hash hashWriter var copied int64 var actualSum string var contentType string var finalURL string var err error err = os.MkdirAll(filepath.Dir(dstPath), 0o755) if err != nil { return err } fullURL = joinRemoteURL(cfg.BaseURL, rel) req, err = http.NewRequestWithContext(ctx, http.MethodGet, fullURL, nil) if err != nil { return err } req.Host = effectiveHostHeader(cfg) res, err = client.Do(req) if err != nil { return err } defer res.Body.Close() if res.StatusCode < 200 || res.StatusCode >= 300 { return errors.New("upstream request failed: " + res.Status) } contentType = strings.TrimSpace(res.Header.Get("Content-Type")) finalURL = "" if res.Request != nil && res.Request.URL != nil { finalURL = res.Request.URL.String() } tempPath = dstPath + ".mirror.tmp" out, err = os.Create(tempPath) if err != nil { return err } defer out.Close() hash, err = newHashWriter(checksumType) if err != nil { _ = os.Remove(tempPath) return err } copied, err = io.Copy(io.MultiWriter(out, hash), res.Body) _ = copied if err != nil { _ = os.Remove(tempPath) return err } err = out.Close() if err != nil { _ = os.Remove(tempPath) return err } if strings.TrimSpace(checksum) != "" { actualSum = hash.Sum() if !strings.EqualFold(actualSum, strings.TrimSpace(checksum)) { _ = os.Remove(tempPath) return errors.New( "download checksum mismatch for " + rel + " type=" + normalizeChecksumAlgo(checksumType) + " expected=" + strings.TrimSpace(checksum) + " actual=" + actualSum + " bytes=" + int64ToString(copied) + " content_type=" + contentType + " url=" + finalURL) } } err = os.Rename(tempPath, dstPath) if err != nil { _ = os.Remove(tempPath) return err } return nil } func joinRemoteURL(base string, rel string) string { var baseURL *url.URL var relURL *url.URL var cleanRel string var err error cleanRel = strings.ReplaceAll(rel, "\\", "/") baseURL, err = url.Parse(strings.TrimSpace(base)) if err != nil || baseURL == nil { cleanRel = strings.TrimLeft(cleanRel, "/") return strings.TrimRight(base, "/") + "/" + cleanRel } // Treat base as a directory root for repository-relative href resolution. if !strings.HasSuffix(baseURL.Path, "/") { baseURL.Path = baseURL.Path + "/" } relURL, err = url.Parse(strings.TrimSpace(cleanRel)) if err != nil || relURL == nil { cleanRel = strings.TrimLeft(cleanRel, "/") return strings.TrimRight(base, "/") + "/" + cleanRel } return baseURL.ResolveReference(relURL).String() } func parseRepomdPrimaryHref(data []byte) (string, error) { var doc repomdDoc var i int var href string var err error err = xml.Unmarshal(data, &doc) if err != nil { return "", err } for i = 0; i < len(doc.Data); i++ { if strings.EqualFold(strings.TrimSpace(doc.Data[i].Type), "primary") { href = strings.TrimSpace(doc.Data[i].Location.Href) if href != "" { return href, nil } } } return "", errors.New("primary metadata not found in repomd") } func parsePrimaryPackages(data []byte) (map[string]mirrorChecksum, int, error) { var doc primaryDoc var out map[string]mirrorChecksum var i int var path string var checksum string var checksumType string var fileTime int64 var buildTime int64 var existing mirrorChecksum var ok bool var duplicates int var err error err = xml.Unmarshal(data, &doc) if err != nil { return nil, 0, err } out = make(map[string]mirrorChecksum) for i = 0; i < len(doc.Packages); i++ { path = strings.TrimSpace(doc.Packages[i].Location.Href) if path == "" { continue } if !strings.HasSuffix(strings.ToLower(path), ".rpm") { continue } checksum = strings.TrimSpace(doc.Packages[i].Checksum.Value) checksumType = strings.TrimSpace(doc.Packages[i].Checksum.Type) fileTime = parseTimeAttr(doc.Packages[i].Time.File) buildTime = parseTimeAttr(doc.Packages[i].Time.Build) if existing, ok = out[path]; ok { duplicates = duplicates + 1 if !shouldReplaceDuplicate(existing, buildTime, fileTime, checksum) { continue } } out[path] = mirrorChecksum{ Algo: normalizeChecksumAlgo(checksumType), Value: strings.ToLower(checksum), BuildTime: buildTime, FileTime: fileTime, } } return out, duplicates, nil } func listLocalRPMs(root string) (map[string]bool, error) { var out map[string]bool var err error out = make(map[string]bool) err = filepath.WalkDir(root, func(path string, d fs.DirEntry, walkErr error) error { var rel string if walkErr != nil { return walkErr } if d == nil { return nil } if d.IsDir() { if strings.EqualFold(d.Name(), "repodata") { return filepath.SkipDir } return nil } if !strings.HasSuffix(strings.ToLower(d.Name()), ".rpm") { return nil } rel, err = filepath.Rel(root, path) if err != nil { return err } out[filepath.ToSlash(rel)] = true return nil }) if err != nil { return nil, err } return out, nil } func sha256HexBytes(data []byte) string { var sum [32]byte sum = sha256.Sum256(data) return hex.EncodeToString(sum[:]) } func fileHexByAlgo(path string, algo string) (string, error) { var file *os.File var hash hashWriter var copied int64 var err error file, err = os.Open(path) if err != nil { return "", err } defer file.Close() hash, err = newHashWriter(algo) if err != nil { return "", err } copied, err = io.Copy(hash, file) _ = copied if err != nil { return "", err } return hash.Sum(), nil } func gunzipBytes(data []byte) ([]byte, error) { var reader *gzip.Reader var input *bytes.Reader var out []byte var err error input = bytes.NewReader(data) reader, err = gzip.NewReader(input) if err != nil { return nil, err } defer reader.Close() out, err = io.ReadAll(reader) if err != nil { return nil, err } return out, nil } type hashWriter interface { io.Writer Sum() string } type shaWriter struct { h hash.Hash } func newHashWriter(algo string) (hashWriter, error) { var w *shaWriter var normalized string var h hash.Hash normalized = normalizeChecksumAlgo(algo) switch normalized { case "", "sha256": h = sha256.New() case "sha", "sha1": h = sha1.New() case "sha224": h = sha256.New224() case "sha384": h = sha512.New384() case "sha512": h = sha512.New() case "md5": h = md5.New() default: return nil, errors.New("unsupported checksum type: " + normalized) } w = &shaWriter{h: h} return w, nil } func (w *shaWriter) Write(p []byte) (int, error) { return w.h.Write(p) } func (w *shaWriter) Sum() string { var raw []byte raw = w.h.Sum(nil) return hex.EncodeToString(raw) } func normalizeChecksumAlgo(algo string) string { var out string out = strings.ToLower(strings.TrimSpace(algo)) out = strings.ReplaceAll(out, "-", "") out = strings.ReplaceAll(out, "_", "") if out == "sha1" { return "sha1" } if out == "sha" { return "sha" } if out == "sha224" { return "sha224" } if out == "sha256" { return "sha256" } if out == "sha384" { return "sha384" } if out == "sha512" { return "sha512" } if out == "md5" { return "md5" } return out } func int64ToString(v int64) string { return strconv.FormatInt(v, 10) } func mirrorTaskKey(repoID string, path string) string { return repoID + "\x00" + path } func ensureRepodata(task models.RPMMirrorTask, localRoot string, meta *MetaManager, logger *util.Logger) { var repomdPath string var statErr error repomdPath = filepath.Join(localRoot, "repodata", "repomd.xml") _, statErr = os.Stat(repomdPath) if statErr == nil { return } if logger != nil { logger.Write("rpm-mirror", util.LOG_INFO, "repodata schedule repo=%s path=%s reason=missing repomd=%s", task.RepoID, task.MirrorPath, repomdPath) } meta.Schedule(localRoot) } func parseTimeAttr(value string) int64 { var trimmed string var parsed int64 var err error trimmed = strings.TrimSpace(value) if trimmed == "" { return 0 } parsed, err = strconv.ParseInt(trimmed, 10, 64) if err != nil { return 0 } return parsed } func shouldReplaceDuplicate(existing mirrorChecksum, newBuildTime int64, newFileTime int64, newChecksum string) bool { var existingChecksum string if newBuildTime > existing.BuildTime { return true } if newBuildTime < existing.BuildTime { return false } if newFileTime > existing.FileTime { return true } if newFileTime < existing.FileTime { return false } existingChecksum = strings.TrimSpace(existing.Value) if existingChecksum == "" && strings.TrimSpace(newChecksum) != "" { return true } return false }