909 lines
25 KiB
Go
909 lines
25 KiB
Go
package rpm
|
|
|
|
import "compress/gzip"
|
|
import "context"
|
|
import "crypto/md5"
|
|
import "crypto/sha1"
|
|
import "crypto/sha256"
|
|
import "crypto/sha512"
|
|
import "crypto/tls"
|
|
import "bytes"
|
|
import "encoding/hex"
|
|
import "encoding/xml"
|
|
import "errors"
|
|
import "hash"
|
|
import "io"
|
|
import "io/fs"
|
|
import "net"
|
|
import "net/http"
|
|
import "net/url"
|
|
import "os"
|
|
import "path/filepath"
|
|
import "strconv"
|
|
import "strings"
|
|
import "sync"
|
|
import "time"
|
|
|
|
import "codit/internal/db"
|
|
import "codit/internal/models"
|
|
import "codit/internal/util"
|
|
|
|
type MirrorManager struct {
|
|
store *db.Store
|
|
logger *util.Logger
|
|
meta *MetaManager
|
|
stopCh chan struct{}
|
|
cancelMu sync.Mutex
|
|
cancelByKey map[string]context.CancelFunc
|
|
}
|
|
|
|
type repomdDoc struct {
|
|
Data []repomdData `xml:"data"`
|
|
}
|
|
|
|
type repomdData struct {
|
|
Type string `xml:"type,attr"`
|
|
Location repomdLocation `xml:"location"`
|
|
}
|
|
|
|
type repomdLocation struct {
|
|
Href string `xml:"href,attr"`
|
|
}
|
|
|
|
type primaryDoc struct {
|
|
Packages []primaryPackage `xml:"package"`
|
|
}
|
|
|
|
type primaryPackage struct {
|
|
Location primaryLocation `xml:"location"`
|
|
Checksum primaryChecksum `xml:"checksum"`
|
|
Time primaryTime `xml:"time"`
|
|
}
|
|
|
|
type primaryLocation struct {
|
|
Href string `xml:"href,attr"`
|
|
}
|
|
|
|
type primaryChecksum struct {
|
|
Type string `xml:"type,attr"`
|
|
Value string `xml:",chardata"`
|
|
}
|
|
|
|
type mirrorChecksum struct {
|
|
Algo string
|
|
Value string
|
|
BuildTime int64
|
|
FileTime int64
|
|
}
|
|
|
|
type primaryTime struct {
|
|
File string `xml:"file,attr"`
|
|
Build string `xml:"build,attr"`
|
|
}
|
|
|
|
type mirrorHTTPConfig struct {
|
|
BaseURL string
|
|
ConnectHost string
|
|
HostHeader string
|
|
TLSServerName string
|
|
TLSInsecure bool
|
|
DefaultHost string
|
|
DefaultServer string
|
|
}
|
|
|
|
func NewMirrorManager(store *db.Store, logger *util.Logger, meta *MetaManager) *MirrorManager {
|
|
var m *MirrorManager
|
|
m = &MirrorManager{
|
|
store: store,
|
|
logger: logger,
|
|
meta: meta,
|
|
stopCh: make(chan struct{}),
|
|
cancelByKey: make(map[string]context.CancelFunc),
|
|
}
|
|
return m
|
|
}
|
|
|
|
func (m *MirrorManager) CancelTask(repoID string, path string) bool {
|
|
var key string
|
|
var cancel context.CancelFunc
|
|
if m == nil {
|
|
return false
|
|
}
|
|
key = mirrorTaskKey(repoID, path)
|
|
m.cancelMu.Lock()
|
|
cancel = m.cancelByKey[key]
|
|
m.cancelMu.Unlock()
|
|
if cancel == nil {
|
|
return false
|
|
}
|
|
cancel()
|
|
return true
|
|
}
|
|
|
|
func (m *MirrorManager) Start() {
|
|
var err error
|
|
var tasks []models.RPMMirrorTask
|
|
var i int
|
|
if m == nil {
|
|
return
|
|
}
|
|
err = m.store.ResetRunningRPMMirrorTasks()
|
|
if err != nil && m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_ERROR, "reset running tasks failed err=%v", err)
|
|
}
|
|
tasks, err = m.store.ListRPMMirrorPaths()
|
|
if err == nil {
|
|
for i = 0; i < len(tasks); i++ {
|
|
_ = m.store.CleanupRPMMirrorRunsRetention(tasks[i].RepoID, tasks[i].MirrorPath, 200, 30)
|
|
}
|
|
}
|
|
go m.loop()
|
|
}
|
|
|
|
func (m *MirrorManager) Stop() {
|
|
if m == nil {
|
|
return
|
|
}
|
|
close(m.stopCh)
|
|
}
|
|
|
|
func (m *MirrorManager) loop() {
|
|
var ticker *time.Ticker
|
|
ticker = time.NewTicker(10 * time.Second)
|
|
defer ticker.Stop()
|
|
m.runDue()
|
|
for {
|
|
select {
|
|
case <-m.stopCh:
|
|
return
|
|
case <-ticker.C:
|
|
m.runDue()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *MirrorManager) runDue() {
|
|
var tasks []models.RPMMirrorTask
|
|
var started bool
|
|
var now int64
|
|
var i int
|
|
var err error
|
|
now = time.Now().UTC().Unix()
|
|
tasks, err = m.store.ListDueRPMMirrorTasks(now, 8)
|
|
if err != nil {
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_ERROR, "list due tasks failed err=%v", err)
|
|
}
|
|
return
|
|
}
|
|
for i = 0; i < len(tasks); i++ {
|
|
started, err = m.store.TryStartRPMMirrorTask(tasks[i].RepoID, tasks[i].MirrorPath, now)
|
|
if err != nil {
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_ERROR, "try start failed repo=%s path=%s err=%v", tasks[i].RepoID, tasks[i].MirrorPath, err)
|
|
}
|
|
continue
|
|
}
|
|
if !started {
|
|
continue
|
|
}
|
|
m.syncOne(tasks[i])
|
|
}
|
|
}
|
|
|
|
func (m *MirrorManager) syncOne(task models.RPMMirrorTask) {
|
|
var localRoot string
|
|
var cfg mirrorHTTPConfig
|
|
var client *http.Client
|
|
var repomdData []byte
|
|
var revision string
|
|
var primaryHref string
|
|
var primaryData []byte
|
|
var expected map[string]mirrorChecksum
|
|
var duplicateCount int
|
|
var runID string
|
|
var startedAt int64
|
|
var total int64
|
|
var done int64
|
|
var failed int64
|
|
var deleted int64
|
|
var changed int64
|
|
var err error
|
|
var syncCtx context.Context
|
|
var syncCancel context.CancelFunc
|
|
var canceled bool
|
|
var key string
|
|
localRoot = filepath.Join(task.RepoPath, filepath.FromSlash(task.MirrorPath))
|
|
startedAt = time.Now().UTC().Unix()
|
|
syncCtx, syncCancel = context.WithCancel(context.Background())
|
|
key = mirrorTaskKey(task.RepoID, task.MirrorPath)
|
|
m.cancelMu.Lock()
|
|
m.cancelByKey[key] = syncCancel
|
|
m.cancelMu.Unlock()
|
|
defer func() {
|
|
m.cancelMu.Lock()
|
|
delete(m.cancelByKey, key)
|
|
m.cancelMu.Unlock()
|
|
syncCancel()
|
|
}()
|
|
runID, err = m.store.CreateRPMMirrorRun(task.RepoID, task.MirrorPath, startedAt)
|
|
if err != nil {
|
|
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
|
return
|
|
}
|
|
cfg, err = buildMirrorHTTPConfig(task)
|
|
if err != nil {
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=start err=%v", task.RepoID, task.MirrorPath, err)
|
|
}
|
|
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
|
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "start", 0, 0, 0, 0, "", err.Error())
|
|
return
|
|
}
|
|
client = buildMirrorHTTPClient(cfg)
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_INFO, "sync start repo=%s path=%s remote=%s", task.RepoID, task.MirrorPath, task.RemoteURL)
|
|
}
|
|
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "fetch_repodata", 0, 0, 0, 0)
|
|
repomdData, err = mirrorFetch(syncCtx, client, cfg, "repodata/repomd.xml")
|
|
if err != nil {
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=fetch_repodata err=%v", task.RepoID, task.MirrorPath, err)
|
|
}
|
|
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
|
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_repodata", 0, 0, 0, 0, "", err.Error())
|
|
return
|
|
}
|
|
revision = sha256HexBytes(repomdData)
|
|
if !task.Dirty && task.LastSyncedRevision != "" && task.LastSyncedRevision == revision {
|
|
if m.meta != nil {
|
|
ensureRepodata(task, localRoot, m.meta, m.logger)
|
|
}
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_INFO, "sync done repo=%s path=%s status=no_change revision=%s", task.RepoID, task.MirrorPath, revision)
|
|
}
|
|
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, true, revision, "")
|
|
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "success", "no_change", 0, 0, 0, 0, revision, "")
|
|
return
|
|
}
|
|
primaryHref, err = parseRepomdPrimaryHref(repomdData)
|
|
if err != nil {
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=parse_repodata err=%v", task.RepoID, task.MirrorPath, err)
|
|
}
|
|
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
|
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_repodata", 0, 0, 0, 0, "", err.Error())
|
|
return
|
|
}
|
|
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "fetch_primary", 0, 0, 0, 0)
|
|
primaryData, err = mirrorFetch(syncCtx, client, cfg, primaryHref)
|
|
if err != nil {
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=fetch_primary err=%v", task.RepoID, task.MirrorPath, err)
|
|
}
|
|
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
|
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_primary", 0, 0, 0, 0, "", err.Error())
|
|
return
|
|
}
|
|
if strings.HasSuffix(strings.ToLower(primaryHref), ".gz") {
|
|
primaryData, err = gunzipBytes(primaryData)
|
|
if err != nil {
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=decode_primary err=%v", task.RepoID, task.MirrorPath, err)
|
|
}
|
|
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
|
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_primary", 0, 0, 0, 0, "", err.Error())
|
|
return
|
|
}
|
|
}
|
|
expected, duplicateCount, err = parsePrimaryPackages(primaryData)
|
|
if err != nil {
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=parse_primary err=%v", task.RepoID, task.MirrorPath, err)
|
|
}
|
|
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
|
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "fetch_primary", 0, 0, 0, 0, "", err.Error())
|
|
return
|
|
}
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_INFO, "primary parsed repo=%s path=%s primary_href=%s packages=%d", task.RepoID, task.MirrorPath, primaryHref, len(expected))
|
|
if duplicateCount > 0 {
|
|
m.logger.Write("rpm-mirror", util.LOG_WARN, "primary has duplicate package paths repo=%s path=%s primary_href=%s duplicates=%d", task.RepoID, task.MirrorPath, primaryHref, duplicateCount)
|
|
}
|
|
}
|
|
total, done, failed, deleted, changed, err = m.applyMirror(syncCtx, task, localRoot, client, cfg, expected)
|
|
if err != nil {
|
|
canceled = errors.Is(err, context.Canceled)
|
|
if m.logger != nil {
|
|
if canceled {
|
|
m.logger.Write("rpm-mirror", util.LOG_WARN, "sync canceled repo=%s path=%s step=apply total=%d done=%d failed=%d deleted=%d err=%v", task.RepoID, task.MirrorPath, total, done, failed, deleted, err)
|
|
} else {
|
|
m.logger.Write("rpm-mirror", util.LOG_ERROR, "sync failed repo=%s path=%s step=apply total=%d done=%d failed=%d deleted=%d err=%v", task.RepoID, task.MirrorPath, total, done, failed, deleted, err)
|
|
}
|
|
}
|
|
if canceled {
|
|
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", "sync canceled by user")
|
|
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "canceled", total, done, failed, deleted, "", "sync canceled by user")
|
|
} else {
|
|
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, false, "", err.Error())
|
|
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "failed", "apply", total, done, failed, deleted, "", err.Error())
|
|
}
|
|
return
|
|
}
|
|
if m.meta != nil && changed > 0 {
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_INFO, "repodata schedule repo=%s path=%s reason=sync_changed changed=%d", task.RepoID, task.MirrorPath, changed)
|
|
}
|
|
m.meta.Schedule(localRoot)
|
|
}
|
|
_ = m.store.FinishRPMMirrorTask(task.RepoID, task.MirrorPath, true, revision, "")
|
|
_ = m.store.FinishRPMMirrorRun(runID, time.Now().UTC().Unix(), "success", "done", total, done, failed, deleted, revision, "")
|
|
_ = m.store.CleanupRPMMirrorRunsRetention(task.RepoID, task.MirrorPath, 200, 30)
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_INFO, "sync done repo=%s path=%s status=success total=%d done=%d failed=%d deleted=%d revision=%s", task.RepoID, task.MirrorPath, total, done, failed, deleted, revision)
|
|
}
|
|
}
|
|
|
|
func (m *MirrorManager) applyMirror(ctx context.Context, task models.RPMMirrorTask, localRoot string, client *http.Client, cfg mirrorHTTPConfig, expected map[string]mirrorChecksum) (int64, int64, int64, int64, int64, error) {
|
|
var local map[string]bool
|
|
var total int64
|
|
var done int64
|
|
var failed int64
|
|
var deleted int64
|
|
var changed int64
|
|
var path string
|
|
var checksum mirrorChecksum
|
|
var fullPath string
|
|
var localSum string
|
|
var needDownload bool
|
|
var err error
|
|
local, err = listLocalRPMs(localRoot)
|
|
if err != nil {
|
|
return 0, 0, 0, 0, 0, err
|
|
}
|
|
total = int64(len(expected))
|
|
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "apply", total, 0, 0, 0)
|
|
for path = range local {
|
|
select {
|
|
case <-ctx.Done():
|
|
return total, done, failed, deleted, changed, ctx.Err()
|
|
default:
|
|
}
|
|
if expected[path].Value != "" {
|
|
continue
|
|
}
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_DEBUG, "delete local stale repo=%s path=%s file=%s", task.RepoID, task.MirrorPath, path)
|
|
}
|
|
err = os.Remove(filepath.Join(localRoot, filepath.FromSlash(path)))
|
|
if err == nil || os.IsNotExist(err) {
|
|
deleted = deleted + 1
|
|
changed = changed + 1
|
|
} else {
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_WARN, "delete local stale failed repo=%s path=%s file=%s err=%v", task.RepoID, task.MirrorPath, path, err)
|
|
}
|
|
}
|
|
}
|
|
for path, checksum = range expected {
|
|
select {
|
|
case <-ctx.Done():
|
|
return total, done, failed, deleted, changed, ctx.Err()
|
|
default:
|
|
}
|
|
fullPath = filepath.Join(localRoot, filepath.FromSlash(path))
|
|
needDownload = true
|
|
_, err = os.Stat(fullPath)
|
|
if err == nil {
|
|
localSum, err = fileHexByAlgo(fullPath, checksum.Algo)
|
|
if err == nil && (checksum.Value == "" || strings.EqualFold(localSum, checksum.Value)) {
|
|
needDownload = false
|
|
}
|
|
}
|
|
if needDownload {
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_DEBUG, "download start repo=%s path=%s file=%s checksum_type=%s checksum=%s", task.RepoID, task.MirrorPath, path, checksum.Algo, checksum.Value)
|
|
}
|
|
err = mirrorDownload(ctx, client, cfg, path, fullPath, checksum.Algo, checksum.Value)
|
|
if err != nil {
|
|
failed = failed + 1
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_WARN, "download failed repo=%s path=%s file=%s err=%v", task.RepoID, task.MirrorPath, path, err)
|
|
}
|
|
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "apply", total, done, failed, deleted)
|
|
continue
|
|
}
|
|
changed = changed + 1
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_DEBUG, "download done repo=%s path=%s file=%s", task.RepoID, task.MirrorPath, path)
|
|
}
|
|
} else {
|
|
if m.logger != nil {
|
|
m.logger.Write("rpm-mirror", util.LOG_DEBUG, "download skip repo=%s path=%s file=%s reason=up-to-date", task.RepoID, task.MirrorPath, path)
|
|
}
|
|
}
|
|
done = done + 1
|
|
_ = m.store.UpdateRPMMirrorTaskProgress(task.RepoID, task.MirrorPath, "apply", total, done, failed, deleted)
|
|
}
|
|
if failed > 0 {
|
|
return total, done, failed, deleted, changed, errors.New("some mirror files failed to sync")
|
|
}
|
|
return total, done, failed, deleted, changed, nil
|
|
}
|
|
|
|
func buildMirrorHTTPConfig(task models.RPMMirrorTask) (mirrorHTTPConfig, error) {
|
|
var cfg mirrorHTTPConfig
|
|
var u *url.URL
|
|
var err error
|
|
cfg = mirrorHTTPConfig{
|
|
BaseURL: strings.TrimRight(strings.TrimSpace(task.RemoteURL), "/"),
|
|
ConnectHost: strings.TrimSpace(task.ConnectHost),
|
|
HostHeader: strings.TrimSpace(task.HostHeader),
|
|
TLSServerName: strings.TrimSpace(task.TLSServerName),
|
|
TLSInsecure: task.TLSInsecureSkipVerify,
|
|
}
|
|
if cfg.BaseURL == "" {
|
|
return cfg, errors.New("remote url is empty")
|
|
}
|
|
u, err = url.Parse(cfg.BaseURL)
|
|
if err != nil {
|
|
return cfg, err
|
|
}
|
|
cfg.DefaultHost = u.Host
|
|
cfg.DefaultServer = u.Hostname()
|
|
if cfg.DefaultHost == "" {
|
|
return cfg, errors.New("remote url host is empty")
|
|
}
|
|
return cfg, nil
|
|
}
|
|
|
|
func buildMirrorHTTPClient(cfg mirrorHTTPConfig) *http.Client {
|
|
var transport *http.Transport
|
|
transport = &http.Transport{
|
|
Proxy: http.ProxyFromEnvironment,
|
|
TLSClientConfig: &tls.Config{
|
|
InsecureSkipVerify: cfg.TLSInsecure,
|
|
ServerName: effectiveServerName(cfg),
|
|
},
|
|
}
|
|
if cfg.ConnectHost != "" {
|
|
transport.DialContext = func(ctx context.Context, network string, addr string) (net.Conn, error) {
|
|
var d net.Dialer
|
|
_ = addr
|
|
return d.DialContext(ctx, network, cfg.ConnectHost)
|
|
}
|
|
}
|
|
return &http.Client{
|
|
Transport: transport,
|
|
Timeout: 60 * time.Second,
|
|
}
|
|
}
|
|
|
|
func effectiveHostHeader(cfg mirrorHTTPConfig) string {
|
|
if strings.TrimSpace(cfg.HostHeader) != "" {
|
|
return strings.TrimSpace(cfg.HostHeader)
|
|
}
|
|
return cfg.DefaultHost
|
|
}
|
|
|
|
func effectiveServerName(cfg mirrorHTTPConfig) string {
|
|
var host string
|
|
if strings.TrimSpace(cfg.TLSServerName) != "" {
|
|
return strings.TrimSpace(cfg.TLSServerName)
|
|
}
|
|
host = strings.TrimSpace(cfg.HostHeader)
|
|
if host != "" {
|
|
if strings.Contains(host, ":") {
|
|
return strings.Split(host, ":")[0]
|
|
}
|
|
return host
|
|
}
|
|
return cfg.DefaultServer
|
|
}
|
|
|
|
func mirrorFetch(ctx context.Context, client *http.Client, cfg mirrorHTTPConfig, rel string) ([]byte, error) {
|
|
var fullURL string
|
|
var req *http.Request
|
|
var res *http.Response
|
|
var body []byte
|
|
var err error
|
|
fullURL = joinRemoteURL(cfg.BaseURL, rel)
|
|
req, err = http.NewRequestWithContext(ctx, http.MethodGet, fullURL, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Host = effectiveHostHeader(cfg)
|
|
res, err = client.Do(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer res.Body.Close()
|
|
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
|
return nil, errors.New("upstream request failed: " + res.Status)
|
|
}
|
|
body, err = io.ReadAll(res.Body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return body, nil
|
|
}
|
|
|
|
func mirrorDownload(ctx context.Context, client *http.Client, cfg mirrorHTTPConfig, rel string, dstPath string, checksumType string, checksum string) error {
|
|
var fullURL string
|
|
var req *http.Request
|
|
var res *http.Response
|
|
var tempPath string
|
|
var out *os.File
|
|
var hash hashWriter
|
|
var copied int64
|
|
var actualSum string
|
|
var contentType string
|
|
var finalURL string
|
|
var err error
|
|
err = os.MkdirAll(filepath.Dir(dstPath), 0o755)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
fullURL = joinRemoteURL(cfg.BaseURL, rel)
|
|
req, err = http.NewRequestWithContext(ctx, http.MethodGet, fullURL, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
req.Host = effectiveHostHeader(cfg)
|
|
res, err = client.Do(req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer res.Body.Close()
|
|
if res.StatusCode < 200 || res.StatusCode >= 300 {
|
|
return errors.New("upstream request failed: " + res.Status)
|
|
}
|
|
contentType = strings.TrimSpace(res.Header.Get("Content-Type"))
|
|
finalURL = ""
|
|
if res.Request != nil && res.Request.URL != nil {
|
|
finalURL = res.Request.URL.String()
|
|
}
|
|
tempPath = dstPath + ".mirror.tmp"
|
|
out, err = os.Create(tempPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer out.Close()
|
|
hash, err = newHashWriter(checksumType)
|
|
if err != nil {
|
|
_ = os.Remove(tempPath)
|
|
return err
|
|
}
|
|
copied, err = io.Copy(io.MultiWriter(out, hash), res.Body)
|
|
_ = copied
|
|
if err != nil {
|
|
_ = os.Remove(tempPath)
|
|
return err
|
|
}
|
|
err = out.Close()
|
|
if err != nil {
|
|
_ = os.Remove(tempPath)
|
|
return err
|
|
}
|
|
if strings.TrimSpace(checksum) != "" {
|
|
actualSum = hash.Sum()
|
|
if !strings.EqualFold(actualSum, strings.TrimSpace(checksum)) {
|
|
_ = os.Remove(tempPath)
|
|
return errors.New(
|
|
"download checksum mismatch for " + rel +
|
|
" type=" + normalizeChecksumAlgo(checksumType) +
|
|
" expected=" + strings.TrimSpace(checksum) +
|
|
" actual=" + actualSum +
|
|
" bytes=" + int64ToString(copied) +
|
|
" content_type=" + contentType +
|
|
" url=" + finalURL)
|
|
}
|
|
}
|
|
err = os.Rename(tempPath, dstPath)
|
|
if err != nil {
|
|
_ = os.Remove(tempPath)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func joinRemoteURL(base string, rel string) string {
|
|
var baseURL *url.URL
|
|
var relURL *url.URL
|
|
var cleanRel string
|
|
var err error
|
|
cleanRel = strings.ReplaceAll(rel, "\\", "/")
|
|
baseURL, err = url.Parse(strings.TrimSpace(base))
|
|
if err != nil || baseURL == nil {
|
|
cleanRel = strings.TrimLeft(cleanRel, "/")
|
|
return strings.TrimRight(base, "/") + "/" + cleanRel
|
|
}
|
|
// Treat base as a directory root for repository-relative href resolution.
|
|
if !strings.HasSuffix(baseURL.Path, "/") {
|
|
baseURL.Path = baseURL.Path + "/"
|
|
}
|
|
relURL, err = url.Parse(strings.TrimSpace(cleanRel))
|
|
if err != nil || relURL == nil {
|
|
cleanRel = strings.TrimLeft(cleanRel, "/")
|
|
return strings.TrimRight(base, "/") + "/" + cleanRel
|
|
}
|
|
return baseURL.ResolveReference(relURL).String()
|
|
}
|
|
|
|
func parseRepomdPrimaryHref(data []byte) (string, error) {
|
|
var doc repomdDoc
|
|
var i int
|
|
var href string
|
|
var err error
|
|
err = xml.Unmarshal(data, &doc)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
for i = 0; i < len(doc.Data); i++ {
|
|
if strings.EqualFold(strings.TrimSpace(doc.Data[i].Type), "primary") {
|
|
href = strings.TrimSpace(doc.Data[i].Location.Href)
|
|
if href != "" {
|
|
return href, nil
|
|
}
|
|
}
|
|
}
|
|
return "", errors.New("primary metadata not found in repomd")
|
|
}
|
|
|
|
func parsePrimaryPackages(data []byte) (map[string]mirrorChecksum, int, error) {
|
|
var doc primaryDoc
|
|
var out map[string]mirrorChecksum
|
|
var i int
|
|
var path string
|
|
var checksum string
|
|
var checksumType string
|
|
var fileTime int64
|
|
var buildTime int64
|
|
var existing mirrorChecksum
|
|
var ok bool
|
|
var duplicates int
|
|
var err error
|
|
err = xml.Unmarshal(data, &doc)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
out = make(map[string]mirrorChecksum)
|
|
for i = 0; i < len(doc.Packages); i++ {
|
|
path = strings.TrimSpace(doc.Packages[i].Location.Href)
|
|
if path == "" {
|
|
continue
|
|
}
|
|
if !strings.HasSuffix(strings.ToLower(path), ".rpm") {
|
|
continue
|
|
}
|
|
checksum = strings.TrimSpace(doc.Packages[i].Checksum.Value)
|
|
checksumType = strings.TrimSpace(doc.Packages[i].Checksum.Type)
|
|
fileTime = parseTimeAttr(doc.Packages[i].Time.File)
|
|
buildTime = parseTimeAttr(doc.Packages[i].Time.Build)
|
|
if existing, ok = out[path]; ok {
|
|
duplicates = duplicates + 1
|
|
if !shouldReplaceDuplicate(existing, buildTime, fileTime, checksum) {
|
|
continue
|
|
}
|
|
}
|
|
out[path] = mirrorChecksum{
|
|
Algo: normalizeChecksumAlgo(checksumType),
|
|
Value: strings.ToLower(checksum),
|
|
BuildTime: buildTime,
|
|
FileTime: fileTime,
|
|
}
|
|
}
|
|
return out, duplicates, nil
|
|
}
|
|
|
|
func listLocalRPMs(root string) (map[string]bool, error) {
|
|
var out map[string]bool
|
|
var err error
|
|
out = make(map[string]bool)
|
|
err = filepath.WalkDir(root, func(path string, d fs.DirEntry, walkErr error) error {
|
|
var rel string
|
|
if walkErr != nil {
|
|
return walkErr
|
|
}
|
|
if d == nil {
|
|
return nil
|
|
}
|
|
if d.IsDir() {
|
|
if strings.EqualFold(d.Name(), "repodata") {
|
|
return filepath.SkipDir
|
|
}
|
|
return nil
|
|
}
|
|
if !strings.HasSuffix(strings.ToLower(d.Name()), ".rpm") {
|
|
return nil
|
|
}
|
|
rel, err = filepath.Rel(root, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
out[filepath.ToSlash(rel)] = true
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
func sha256HexBytes(data []byte) string {
|
|
var sum [32]byte
|
|
sum = sha256.Sum256(data)
|
|
return hex.EncodeToString(sum[:])
|
|
}
|
|
|
|
func fileHexByAlgo(path string, algo string) (string, error) {
|
|
var file *os.File
|
|
var hash hashWriter
|
|
var copied int64
|
|
var err error
|
|
file, err = os.Open(path)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
defer file.Close()
|
|
hash, err = newHashWriter(algo)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
copied, err = io.Copy(hash, file)
|
|
_ = copied
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return hash.Sum(), nil
|
|
}
|
|
|
|
func gunzipBytes(data []byte) ([]byte, error) {
|
|
var reader *gzip.Reader
|
|
var input *bytes.Reader
|
|
var out []byte
|
|
var err error
|
|
input = bytes.NewReader(data)
|
|
reader, err = gzip.NewReader(input)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer reader.Close()
|
|
out, err = io.ReadAll(reader)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
type hashWriter interface {
|
|
io.Writer
|
|
Sum() string
|
|
}
|
|
|
|
type shaWriter struct {
|
|
h hash.Hash
|
|
}
|
|
|
|
func newHashWriter(algo string) (hashWriter, error) {
|
|
var w *shaWriter
|
|
var normalized string
|
|
var h hash.Hash
|
|
normalized = normalizeChecksumAlgo(algo)
|
|
switch normalized {
|
|
case "", "sha256":
|
|
h = sha256.New()
|
|
case "sha", "sha1":
|
|
h = sha1.New()
|
|
case "sha224":
|
|
h = sha256.New224()
|
|
case "sha384":
|
|
h = sha512.New384()
|
|
case "sha512":
|
|
h = sha512.New()
|
|
case "md5":
|
|
h = md5.New()
|
|
default:
|
|
return nil, errors.New("unsupported checksum type: " + normalized)
|
|
}
|
|
w = &shaWriter{h: h}
|
|
return w, nil
|
|
}
|
|
|
|
func (w *shaWriter) Write(p []byte) (int, error) {
|
|
return w.h.Write(p)
|
|
}
|
|
|
|
func (w *shaWriter) Sum() string {
|
|
var raw []byte
|
|
raw = w.h.Sum(nil)
|
|
return hex.EncodeToString(raw)
|
|
}
|
|
|
|
func normalizeChecksumAlgo(algo string) string {
|
|
var out string
|
|
out = strings.ToLower(strings.TrimSpace(algo))
|
|
out = strings.ReplaceAll(out, "-", "")
|
|
out = strings.ReplaceAll(out, "_", "")
|
|
if out == "sha1" {
|
|
return "sha1"
|
|
}
|
|
if out == "sha" {
|
|
return "sha"
|
|
}
|
|
if out == "sha224" {
|
|
return "sha224"
|
|
}
|
|
if out == "sha256" {
|
|
return "sha256"
|
|
}
|
|
if out == "sha384" {
|
|
return "sha384"
|
|
}
|
|
if out == "sha512" {
|
|
return "sha512"
|
|
}
|
|
if out == "md5" {
|
|
return "md5"
|
|
}
|
|
return out
|
|
}
|
|
|
|
func int64ToString(v int64) string {
|
|
return strconv.FormatInt(v, 10)
|
|
}
|
|
|
|
func mirrorTaskKey(repoID string, path string) string {
|
|
return repoID + "\x00" + path
|
|
}
|
|
|
|
func ensureRepodata(task models.RPMMirrorTask, localRoot string, meta *MetaManager, logger *util.Logger) {
|
|
var repomdPath string
|
|
var statErr error
|
|
repomdPath = filepath.Join(localRoot, "repodata", "repomd.xml")
|
|
_, statErr = os.Stat(repomdPath)
|
|
if statErr == nil {
|
|
return
|
|
}
|
|
if logger != nil {
|
|
logger.Write("rpm-mirror", util.LOG_INFO, "repodata schedule repo=%s path=%s reason=missing repomd=%s", task.RepoID, task.MirrorPath, repomdPath)
|
|
}
|
|
meta.Schedule(localRoot)
|
|
}
|
|
|
|
func parseTimeAttr(value string) int64 {
|
|
var trimmed string
|
|
var parsed int64
|
|
var err error
|
|
trimmed = strings.TrimSpace(value)
|
|
if trimmed == "" {
|
|
return 0
|
|
}
|
|
parsed, err = strconv.ParseInt(trimmed, 10, 64)
|
|
if err != nil {
|
|
return 0
|
|
}
|
|
return parsed
|
|
}
|
|
|
|
func shouldReplaceDuplicate(existing mirrorChecksum, newBuildTime int64, newFileTime int64, newChecksum string) bool {
|
|
var existingChecksum string
|
|
if newBuildTime > existing.BuildTime {
|
|
return true
|
|
}
|
|
if newBuildTime < existing.BuildTime {
|
|
return false
|
|
}
|
|
if newFileTime > existing.FileTime {
|
|
return true
|
|
}
|
|
if newFileTime < existing.FileTime {
|
|
return false
|
|
}
|
|
existingChecksum = strings.TrimSpace(existing.Value)
|
|
if existingChecksum == "" && strings.TrimSpace(newChecksum) != "" {
|
|
return true
|
|
}
|
|
return false
|
|
}
|