Files
codit/backend/internal/rpm/mirror.go

909 lines
25 KiB
Go

package rpm
import "compress/gzip"
import "context"
import "crypto/md5"
import "crypto/sha1"
import "crypto/sha256"
import "crypto/sha512"
import "crypto/tls"
import "bytes"
import "encoding/hex"
import "encoding/xml"
import "errors"
import "hash"
import "io"
import "io/fs"
import "net"
import "net/http"
import "net/url"
import "os"
import "path/filepath"
import "strconv"
import "strings"
import "sync"
import "time"
import "codit/internal/db"
import "codit/internal/models"
import "codit/internal/util"
type MirrorManager struct {
store *db.Store
logger *util.Logger
meta *MetaManager
stopCh chan struct{}
cancelMu sync.Mutex
cancelByKey map[string]context.CancelFunc
}
type repomdDoc struct {
Data []repomdData `xml:"data"`
}
type repomdData struct {
Type string `xml:"type,attr"`
Location repomdLocation `xml:"location"`
}
type repomdLocation struct {
Href string `xml:"href,attr"`
}
type primaryDoc struct {
Packages []primaryPackage `xml:"package"`
}
type primaryPackage struct {
Location primaryLocation `xml:"location"`
Checksum primaryChecksum `xml:"checksum"`
Time primaryTime `xml:"time"`
}
type primaryLocation struct {
Href string `xml:"href,attr"`
}
type primaryChecksum struct {
Type string `xml:"type,attr"`
Value string `xml:",chardata"`
}
type mirrorChecksum struct {
Algo string
Value string
BuildTime int64
FileTime int64
}
type primaryTime struct {
File string `xml:"file,attr"`
Build string `xml:"build,attr"`
}
type mirrorHTTPConfig struct {
BaseURL string
ConnectHost string
HostHeader string
TLSServerName string
TLSInsecure bool
DefaultHost string
DefaultServer string
}
func NewMirrorManager(store *db.Store, logger *util.Logger, meta *MetaManager) *MirrorManager {
var m *MirrorManager
m = &MirrorManager{
store: store,
logger: logger,
meta: meta,
stopCh: make(chan struct{}),
cancelByKey: make(map[string]context.CancelFunc),
}
return m
}
func (m *MirrorManager) CancelTask(repoID string, path string) bool {
var key string
var cancel context.CancelFunc
if m == nil {
return false
}
key = mirrorTaskKey(repoID, path)
m.cancelMu.Lock()
cancel = m.cancelByKey[key]
m.cancelMu.Unlock()
if cancel == nil {
return false
}
cancel()
return true
}
func (m *MirrorManager) Start() {
var err error
var tasks []models.RPMMirrorTask
var i int
if m == nil {
return
}
err = m.store.ResetRunningRPMMirrorTasks()
if err != nil && m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_ERROR, "reset running tasks failed err=%v", err)
}
tasks, err = m.store.ListRPMMirrorPaths()
if err == nil {
for i = 0; i < len(tasks); i++ {
_ = m.store.CleanupRPMMirrorRunsRetention(tasks[i].RepoID, tasks[i].MirrorPath, 200, 30)
}
}
go m.loop()
}
func (m *MirrorManager) Stop() {
if m == nil {
return
}
close(m.stopCh)
}
func (m *MirrorManager) loop() {
var ticker *time.Ticker
ticker = time.NewTicker(10 * time.Second)
defer ticker.Stop()
m.runDue()
for {
select {
case <-m.stopCh:
return
case <-ticker.C:
m.runDue()
}
}
}
func (m *MirrorManager) runDue() {
var tasks []models.RPMMirrorTask
var started bool
var now int64
var i int
var err error
now = time.Now().UTC().Unix()
tasks, err = m.store.ListDueRPMMirrorTasks(now, 8)
if err != nil {
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_ERROR, "list due tasks failed err=%v", err)
}
return
}
for i = 0; i < len(tasks); i++ {
started, err = m.store.TryStartRPMMirrorTask(tasks[i].RepoID, tasks[i].MirrorPath, now)
if err != nil {
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_ERROR, "try start failed repo=%s path=%s err=%v", tasks[i].RepoID, tasks[i].MirrorPath, err)
}
continue
}
if !started {
continue
}
m.syncOne(tasks[i])
}
}
func (m *MirrorManager) syncOne(task models.RPMMirrorTask) {
var localRoot string
var cfg mirrorHTTPConfig
var client *http.Client
var repomdData []byte
var revision string
var primaryHref string
var primaryData []byte
var expected map[string]mirrorChecksum
var duplicateCount int
var runID string
var startedAt int64
var total int64
var done int64
var failed int64
var deleted int64
var changed int64
var err error
var syncCtx context.Context
var syncCancel context.CancelFunc
var canceled bool
var key string
localRoot = filepath.Join(task.RepoPath, filepath.FromSlash(task.MirrorPath))
startedAt = time.Now().UTC().Unix()
syncCtx, syncCancel = context.WithCancel(context.Background())
key = mirrorTaskKey(task.RepoID, task.MirrorPath)
m.cancelMu.Lock()
m.cancelByKey[key] = syncCancel
m.cancelMu.Unlock()
defer func() {
m.cancelMu.Lock()
delete(m.cancelByKey, key)
m.cancelMu.Unlock()
syncCancel()
}()
runID, err = m.store.CreateRPMMirrorRun(task.RepoID, task.MirrorPath, startedAt)
if err != nil {
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
return
}
cfg, err = buildMirrorHTTPConfig(task)
if err != nil {
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=start err=%v", task.RepoID, task.MirrorPath, err)
}
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "start", 0, 0, 0, 0, "", err.Error())
return
}
client = buildMirrorHTTPClient(cfg)
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_INFO, "sync start repo=%s path=%s remote=%s", task.RepoID, task.MirrorPath, task.RemoteURL)
}
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "fetch_repodata", 0, 0, 0, 0)
repomdData, err = mirrorFetch(syncCtx, client, cfg, "repodata/repomd.xml")
if err != nil {
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=fetch_repodata err=%v", task.RepoID, task.MirrorPath, err)
}
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_repodata", 0, 0, 0, 0, "", err.Error())
return
}
revision = sha256HexBytes(repomdData)
if !task.Dirty && task.LastSyncedRevision != "" && task.LastSyncedRevision == revision {
if m.meta != nil {
ensureRepodata(task, localRoot, m.meta, m.logger)
}
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_INFO, "sync done repo=%s path=%s status=no_change revision=%s", task.RepoID, task.MirrorPath, revision)
}
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, true, revision, "")
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "success", "no_change", 0, 0, 0, 0, revision, "")
return
}
primaryHref, err = parseRepomdPrimaryHref(repomdData)
if err != nil {
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=parse_repodata err=%v", task.RepoID, task.MirrorPath, err)
}
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_repodata", 0, 0, 0, 0, "", err.Error())
return
}
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "fetch_primary", 0, 0, 0, 0)
primaryData, err = mirrorFetch(syncCtx, client, cfg, primaryHref)
if err != nil {
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=fetch_primary err=%v", task.RepoID, task.MirrorPath, err)
}
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_primary", 0, 0, 0, 0, "", err.Error())
return
}
if strings.HasSuffix(strings.ToLower(primaryHref), ".gz") {
primaryData, err = gunzipBytes(primaryData)
if err != nil {
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=decode_primary err=%v", task.RepoID, task.MirrorPath, err)
}
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_primary", 0, 0, 0, 0, "", err.Error())
return
}
}
expected, duplicateCount, err = parsePrimaryPackages(primaryData)
if err != nil {
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=parse_primary err=%v", task.RepoID, task.MirrorPath, err)
}
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_primary", 0, 0, 0, 0, "", err.Error())
return
}
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_INFO, "primary parsed repo=%s path=%s primary_href=%s packages=%d", task.RepoID, task.MirrorPath, primaryHref, len(expected))
if duplicateCount > 0 {
m.logger.Write("rpm-mirror", util.LOG_WARN, "primary has duplicate package paths repo=%s path=%s primary_href=%s duplicates=%d", task.RepoID, task.MirrorPath, primaryHref, duplicateCount)
}
}
total, done, failed, deleted, changed, err = m.applyMirror(syncCtx, task, localRoot, client, cfg, expected)
if err != nil {
canceled = errors.Is(err, context.Canceled)
if m.logger != nil {
if canceled {
m.logger.Write("rpm-mirror", util.LOG_WARN, "sync canceled repo=%s path=%s step=apply total=%d done=%d failed=%d deleted=%d err=%v", task.RepoID, task.MirrorPath, total, done, failed, deleted, err)
} else {
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=apply total=%d done=%d failed=%d deleted=%d err=%v", task.RepoID, task.MirrorPath, total, done, failed, deleted, err)
}
}
if canceled {
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", "sync canceled by user")
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "canceled", total, done, failed, deleted, "", "sync canceled by user")
} else {
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "apply", total, done, failed, deleted, "", err.Error())
}
return
}
if m.meta != nil && changed > 0 {
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_INFO, "repodata schedule repo=%s path=%s reason=sync_changed changed=%d", task.RepoID, task.MirrorPath, changed)
}
m.meta.Schedule(localRoot)
}
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, true, revision, "")
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "success", "done", total, done, failed, deleted, revision, "")
_ = m.store.CleanupRPMMirrorRunsRetention(task.RepoID, task.MirrorPath, 200, 30)
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_INFO, "sync done repo=%s path=%s status=success total=%d done=%d failed=%d deleted=%d revision=%s", task.RepoID, task.MirrorPath, total, done, failed, deleted, revision)
}
}
func (m *MirrorManager) applyMirror(ctx context.Context, task models.RPMMirrorTask, localRoot string, client *http.Client, cfg mirrorHTTPConfig, expected map[string]mirrorChecksum) (int64, int64, int64, int64, int64, error) {
var local map[string]bool
var total int64
var done int64
var failed int64
var deleted int64
var changed int64
var path string
var checksum mirrorChecksum
var fullPath string
var localSum string
var needDownload bool
var err error
local, err = listLocalRPMs(localRoot)
if err != nil {
return 0, 0, 0, 0, 0, err
}
total = int64(len(expected))
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "apply", total, 0, 0, 0)
for path = range local {
select {
case <-ctx.Done():
return total, done, failed, deleted, changed, ctx.Err()
default:
}
if expected[path].Value != "" {
continue
}
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_DEBUG, "delete local stale repo=%s path=%s file=%s", task.RepoID, task.MirrorPath, path)
}
err = os.Remove(filepath.Join(localRoot, filepath.FromSlash(path)))
if err == nil || os.IsNotExist(err) {
deleted = deleted + 1
changed = changed + 1
} else {
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_WARN, "delete local stale failed repo=%s path=%s file=%s err=%v", task.RepoID, task.MirrorPath, path, err)
}
}
}
for path, checksum = range expected {
select {
case <-ctx.Done():
return total, done, failed, deleted, changed, ctx.Err()
default:
}
fullPath = filepath.Join(localRoot, filepath.FromSlash(path))
needDownload = true
_, err = os.Stat(fullPath)
if err == nil {
localSum, err = fileHexByAlgo(fullPath, checksum.Algo)
if err == nil && (checksum.Value == "" || strings.EqualFold(localSum, checksum.Value)) {
needDownload = false
}
}
if needDownload {
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_DEBUG, "download start repo=%s path=%s file=%s checksum_type=%s checksum=%s", task.RepoID, task.MirrorPath, path, checksum.Algo, checksum.Value)
}
err = mirrorDownload(ctx, client, cfg, path, fullPath, checksum.Algo, checksum.Value)
if err != nil {
failed = failed + 1
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_WARN, "download failed repo=%s path=%s file=%s err=%v", task.RepoID, task.MirrorPath, path, err)
}
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "apply", total, done, failed, deleted)
continue
}
changed = changed + 1
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_DEBUG, "download done repo=%s path=%s file=%s", task.RepoID, task.MirrorPath, path)
}
} else {
if m.logger != nil {
m.logger.Write("rpm-mirror", util.LOG_DEBUG, "download skip repo=%s path=%s file=%s reason=up-to-date", task.RepoID, task.MirrorPath, path)
}
}
done = done + 1
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "apply", total, done, failed, deleted)
}
if failed > 0 {
return total, done, failed, deleted, changed, errors.New("some mirror files failed to sync")
}
return total, done, failed, deleted, changed, nil
}
func buildMirrorHTTPConfig(task models.RPMMirrorTask) (mirrorHTTPConfig, error) {
var cfg mirrorHTTPConfig
var u *url.URL
var err error
cfg = mirrorHTTPConfig{
BaseURL: strings.TrimRight(strings.TrimSpace(task.RemoteURL), "/"),
ConnectHost: strings.TrimSpace(task.ConnectHost),
HostHeader: strings.TrimSpace(task.HostHeader),
TLSServerName: strings.TrimSpace(task.TLSServerName),
TLSInsecure: task.TLSInsecureSkipVerify,
}
if cfg.BaseURL == "" {
return cfg, errors.New("remote url is empty")
}
u, err = url.Parse(cfg.BaseURL)
if err != nil {
return cfg, err
}
cfg.DefaultHost = u.Host
cfg.DefaultServer = u.Hostname()
if cfg.DefaultHost == "" {
return cfg, errors.New("remote url host is empty")
}
return cfg, nil
}
func buildMirrorHTTPClient(cfg mirrorHTTPConfig) *http.Client {
var transport *http.Transport
transport = &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSClientConfig: &tls.Config{
InsecureSkipVerify: cfg.TLSInsecure,
ServerName: effectiveServerName(cfg),
},
}
if cfg.ConnectHost != "" {
transport.DialContext = func(ctx context.Context, network string, addr string) (net.Conn, error) {
var d net.Dialer
_ = addr
return d.DialContext(ctx, network, cfg.ConnectHost)
}
}
return &http.Client{
Transport: transport,
Timeout: 60 * time.Second,
}
}
func effectiveHostHeader(cfg mirrorHTTPConfig) string {
if strings.TrimSpace(cfg.HostHeader) != "" {
return strings.TrimSpace(cfg.HostHeader)
}
return cfg.DefaultHost
}
func effectiveServerName(cfg mirrorHTTPConfig) string {
var host string
if strings.TrimSpace(cfg.TLSServerName) != "" {
return strings.TrimSpace(cfg.TLSServerName)
}
host = strings.TrimSpace(cfg.HostHeader)
if host != "" {
if strings.Contains(host, ":") {
return strings.Split(host, ":")[0]
}
return host
}
return cfg.DefaultServer
}
func mirrorFetch(ctx context.Context, client *http.Client, cfg mirrorHTTPConfig, rel string) ([]byte, error) {
var fullURL string
var req *http.Request
var res *http.Response
var body []byte
var err error
fullURL = joinRemoteURL(cfg.BaseURL, rel)
req, err = http.NewRequestWithContext(ctx, http.MethodGet, fullURL, nil)
if err != nil {
return nil, err
}
req.Host = effectiveHostHeader(cfg)
res, err = client.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode < 200 || res.StatusCode >= 300 {
return nil, errors.New("upstream request failed: " + res.Status)
}
body, err = io.ReadAll(res.Body)
if err != nil {
return nil, err
}
return body, nil
}
func mirrorDownload(ctx context.Context, client *http.Client, cfg mirrorHTTPConfig, rel string, dstPath string, checksumType string, checksum string) error {
var fullURL string
var req *http.Request
var res *http.Response
var tempPath string
var out *os.File
var hash hashWriter
var copied int64
var actualSum string
var contentType string
var finalURL string
var err error
err = os.MkdirAll(filepath.Dir(dstPath), 0o755)
if err != nil {
return err
}
fullURL = joinRemoteURL(cfg.BaseURL, rel)
req, err = http.NewRequestWithContext(ctx, http.MethodGet, fullURL, nil)
if err != nil {
return err
}
req.Host = effectiveHostHeader(cfg)
res, err = client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode < 200 || res.StatusCode >= 300 {
return errors.New("upstream request failed: " + res.Status)
}
contentType = strings.TrimSpace(res.Header.Get("Content-Type"))
finalURL = ""
if res.Request != nil && res.Request.URL != nil {
finalURL = res.Request.URL.String()
}
tempPath = dstPath + ".mirror.tmp"
out, err = os.Create(tempPath)
if err != nil {
return err
}
defer out.Close()
hash, err = newHashWriter(checksumType)
if err != nil {
_ = os.Remove(tempPath)
return err
}
copied, err = io.Copy(io.MultiWriter(out, hash), res.Body)
_ = copied
if err != nil {
_ = os.Remove(tempPath)
return err
}
err = out.Close()
if err != nil {
_ = os.Remove(tempPath)
return err
}
if strings.TrimSpace(checksum) != "" {
actualSum = hash.Sum()
if !strings.EqualFold(actualSum, strings.TrimSpace(checksum)) {
_ = os.Remove(tempPath)
return errors.New(
"download checksum mismatch for " + rel +
" type=" + normalizeChecksumAlgo(checksumType) +
" expected=" + strings.TrimSpace(checksum) +
" actual=" + actualSum +
" bytes=" + int64ToString(copied) +
" content_type=" + contentType +
" url=" + finalURL)
}
}
err = os.Rename(tempPath, dstPath)
if err != nil {
_ = os.Remove(tempPath)
return err
}
return nil
}
func joinRemoteURL(base string, rel string) string {
var baseURL *url.URL
var relURL *url.URL
var cleanRel string
var err error
cleanRel = strings.ReplaceAll(rel, "\\", "/")
baseURL, err = url.Parse(strings.TrimSpace(base))
if err != nil || baseURL == nil {
cleanRel = strings.TrimLeft(cleanRel, "/")
return strings.TrimRight(base, "/") + "/" + cleanRel
}
// Treat base as a directory root for repository-relative href resolution.
if !strings.HasSuffix(baseURL.Path, "/") {
baseURL.Path = baseURL.Path + "/"
}
relURL, err = url.Parse(strings.TrimSpace(cleanRel))
if err != nil || relURL == nil {
cleanRel = strings.TrimLeft(cleanRel, "/")
return strings.TrimRight(base, "/") + "/" + cleanRel
}
return baseURL.ResolveReference(relURL).String()
}
func parseRepomdPrimaryHref(data []byte) (string, error) {
var doc repomdDoc
var i int
var href string
var err error
err = xml.Unmarshal(data, &doc)
if err != nil {
return "", err
}
for i = 0; i < len(doc.Data); i++ {
if strings.EqualFold(strings.TrimSpace(doc.Data[i].Type), "primary") {
href = strings.TrimSpace(doc.Data[i].Location.Href)
if href != "" {
return href, nil
}
}
}
return "", errors.New("primary metadata not found in repomd")
}
func parsePrimaryPackages(data []byte) (map[string]mirrorChecksum, int, error) {
var doc primaryDoc
var out map[string]mirrorChecksum
var i int
var path string
var checksum string
var checksumType string
var fileTime int64
var buildTime int64
var existing mirrorChecksum
var ok bool
var duplicates int
var err error
err = xml.Unmarshal(data, &doc)
if err != nil {
return nil, 0, err
}
out = make(map[string]mirrorChecksum)
for i = 0; i < len(doc.Packages); i++ {
path = strings.TrimSpace(doc.Packages[i].Location.Href)
if path == "" {
continue
}
if !strings.HasSuffix(strings.ToLower(path), ".rpm") {
continue
}
checksum = strings.TrimSpace(doc.Packages[i].Checksum.Value)
checksumType = strings.TrimSpace(doc.Packages[i].Checksum.Type)
fileTime = parseTimeAttr(doc.Packages[i].Time.File)
buildTime = parseTimeAttr(doc.Packages[i].Time.Build)
if existing, ok = out[path]; ok {
duplicates = duplicates + 1
if !shouldReplaceDuplicate(existing, buildTime, fileTime, checksum) {
continue
}
}
out[path] = mirrorChecksum{
Algo: normalizeChecksumAlgo(checksumType),
Value: strings.ToLower(checksum),
BuildTime: buildTime,
FileTime: fileTime,
}
}
return out, duplicates, nil
}
func listLocalRPMs(root string) (map[string]bool, error) {
var out map[string]bool
var err error
out = make(map[string]bool)
err = filepath.WalkDir(root, func(path string, d fs.DirEntry, walkErr error) error {
var rel string
if walkErr != nil {
return walkErr
}
if d == nil {
return nil
}
if d.IsDir() {
if strings.EqualFold(d.Name(), "repodata") {
return filepath.SkipDir
}
return nil
}
if !strings.HasSuffix(strings.ToLower(d.Name()), ".rpm") {
return nil
}
rel, err = filepath.Rel(root, path)
if err != nil {
return err
}
out[filepath.ToSlash(rel)] = true
return nil
})
if err != nil {
return nil, err
}
return out, nil
}
func sha256HexBytes(data []byte) string {
var sum [32]byte
sum = sha256.Sum256(data)
return hex.EncodeToString(sum[:])
}
func fileHexByAlgo(path string, algo string) (string, error) {
var file *os.File
var hash hashWriter
var copied int64
var err error
file, err = os.Open(path)
if err != nil {
return "", err
}
defer file.Close()
hash, err = newHashWriter(algo)
if err != nil {
return "", err
}
copied, err = io.Copy(hash, file)
_ = copied
if err != nil {
return "", err
}
return hash.Sum(), nil
}
func gunzipBytes(data []byte) ([]byte, error) {
var reader *gzip.Reader
var input *bytes.Reader
var out []byte
var err error
input = bytes.NewReader(data)
reader, err = gzip.NewReader(input)
if err != nil {
return nil, err
}
defer reader.Close()
out, err = io.ReadAll(reader)
if err != nil {
return nil, err
}
return out, nil
}
type hashWriter interface {
io.Writer
Sum() string
}
type shaWriter struct {
h hash.Hash
}
func newHashWriter(algo string) (hashWriter, error) {
var w *shaWriter
var normalized string
var h hash.Hash
normalized = normalizeChecksumAlgo(algo)
switch normalized {
case "", "sha256":
h = sha256.New()
case "sha", "sha1":
h = sha1.New()
case "sha224":
h = sha256.New224()
case "sha384":
h = sha512.New384()
case "sha512":
h = sha512.New()
case "md5":
h = md5.New()
default:
return nil, errors.New("unsupported checksum type: " + normalized)
}
w = &shaWriter{h: h}
return w, nil
}
func (w *shaWriter) Write(p []byte) (int, error) {
return w.h.Write(p)
}
func (w *shaWriter) Sum() string {
var raw []byte
raw = w.h.Sum(nil)
return hex.EncodeToString(raw)
}
func normalizeChecksumAlgo(algo string) string {
var out string
out = strings.ToLower(strings.TrimSpace(algo))
out = strings.ReplaceAll(out, "-", "")
out = strings.ReplaceAll(out, "_", "")
if out == "sha1" {
return "sha1"
}
if out == "sha" {
return "sha"
}
if out == "sha224" {
return "sha224"
}
if out == "sha256" {
return "sha256"
}
if out == "sha384" {
return "sha384"
}
if out == "sha512" {
return "sha512"
}
if out == "md5" {
return "md5"
}
return out
}
func int64ToString(v int64) string {
return strconv.FormatInt(v, 10)
}
func mirrorTaskKey(repoID string, path string) string {
return repoID + "\x00" + path
}
func ensureRepodata(task models.RPMMirrorTask, localRoot string, meta *MetaManager, logger *util.Logger) {
var repomdPath string
var statErr error
repomdPath = filepath.Join(localRoot, "repodata", "repomd.xml")
_, statErr = os.Stat(repomdPath)
if statErr == nil {
return
}
if logger != nil {
logger.Write("rpm-mirror", util.LOG_INFO, "repodata schedule repo=%s path=%s reason=missing repomd=%s", task.RepoID, task.MirrorPath, repomdPath)
}
meta.Schedule(localRoot)
}
func parseTimeAttr(value string) int64 {
var trimmed string
var parsed int64
var err error
trimmed = strings.TrimSpace(value)
if trimmed == "" {
return 0
}
parsed, err = strconv.ParseInt(trimmed, 10, 64)
if err != nil {
return 0
}
return parsed
}
func shouldReplaceDuplicate(existing mirrorChecksum, newBuildTime int64, newFileTime int64, newChecksum string) bool {
var existingChecksum string
if newBuildTime > existing.BuildTime {
return true
}
if newBuildTime < existing.BuildTime {
return false
}
if newFileTime > existing.FileTime {
return true
}
if newFileTime < existing.FileTime {
return false
}
existingChecksum = strings.TrimSpace(existing.Value)
if existingChecksum == "" && strings.TrimSpace(newChecksum) != "" {
return true
}
return false
}