Files
repokit/rpm-repo.go

646 lines
17 KiB
Go

package repokit
import "encoding/xml"
import "errors"
import "fmt"
import "io"
import "io/fs"
import "os"
import "path"
import "path/filepath"
import "sort"
import "strconv"
import "strings"
import "sync"
import "syscall"
import "time"
type RpmLockMode int
const (
RpmLockUnset RpmLockMode = iota
RpmLockFail
RpmLockWait
RpmLockNone
)
type RpmRepoOptions struct {
OutputDir string
FilelistsExt bool
Workers int
ChangeLogLimit int
UniqueMDFilenames bool
IgnoreLock bool
CompressionType RpmCompressionType
LockMode RpmLockMode
AllowMissingRepomd bool
}
type rpm_pool_task struct {
id int64
media_id int64
full_path string
filename string
path string
}
type rpm_user_data struct {
repo_dir_name_len int
location_base string
checksum_type RpmChecksumType
change_log_limit int
filelists_ext bool
mutex_nevra_table sync.Mutex
nevra_table map[string][]*RpmDuplicateLocation
all_xml_structs *RpmXmlStructAll
}
type rpm_repo_job struct {
user_data *rpm_user_data
pool_task *rpm_pool_task
}
func RpmDefaultRepoOptions() RpmRepoOptions {
var options RpmRepoOptions
options.OutputDir = ""
options.FilelistsExt = false
options.Workers = RpmDefaultWorkers
options.ChangeLogLimit = RpmDefaultChangelogLimit
options.UniqueMDFilenames = false
options.IgnoreLock = false
options.CompressionType = RpmDefaultCompressionType
options.LockMode = RpmLockFail
options.AllowMissingRepomd = false
return options
}
func RpmCreateRepo(dir string) error {
var options RpmRepoOptions
var err error
options = RpmDefaultRepoOptions()
err = RpmCreateRepoWithOptions(dir, options)
if err != nil {
return err
}
return nil
}
func RpmCreateRepoWithOptions(dir string, options RpmRepoOptions) error {
var err error
err = rpm_create_rpm_repo(dir, options)
if err != nil {
return err
}
return nil
}
func rpm_create_rpm_repo(dir string, options RpmRepoOptions) (ret_err error) {
var in_dir string
var in_repo string
var out_dir string
var out_repo string
var tmp_out_dir string
var out_dir_exist bool
var err error
var pool_tasks []*rpm_pool_task
var total_pkg_num int
var jobs chan *rpm_repo_job
var results chan error
var wg sync.WaitGroup
var i int
var task *rpm_pool_task
var job_item *rpm_repo_job
var xml_compression_suffix string
var pri_xml_filename string
var file_xml_filename string
var oth_xml_filename string
var fex_xml_filename string
var support_filelists_ext bool
var primary_repomd_record *RpmRepomdRecord
var file_repomd_record *RpmRepomdRecord
var other_repomd_record *RpmRepomdRecord
var fex_repomd_record *RpmRepomdRecord
var revision string
var repomd RpmRepomd
var repomd_path string
var exist bool
var old_repo string
var failed_pkg_num int
var result error
var worker_count int
var data rpm_user_data
var compression_type RpmCompressionType
var lock_mode RpmLockMode
var lock_handle *os.File
in_dir = rpm_normalize_dir_path(dir)
in_repo = path.Join(in_dir, "repodata")
out_dir = options.OutputDir
out_repo = ""
tmp_out_dir = ""
if out_dir != "" {
out_dir_exist, err = rpm_path_exists(out_dir)
if err != nil {
return fmt.Errorf("error to check if outputdir %s exists: %v", out_dir, err)
}
if !out_dir_exist {
return fmt.Errorf("specified outputdir \"%s\" doesn't exist", out_dir)
}
out_dir = rpm_normalize_dir_path(out_dir)
out_repo = path.Join(out_dir, "repodata")
} else {
out_dir = in_dir
out_repo = in_repo
}
pool_tasks, err = rpm_find_rpm_files(in_dir)
if err != nil {
return fmt.Errorf("error to walk through packages in directory %s", in_dir)
}
total_pkg_num = len(pool_tasks)
lock_mode = options.LockMode
if lock_mode == RpmLockUnset {
if options.IgnoreLock {
lock_mode = RpmLockNone
} else {
lock_mode = RpmLockFail
}
}
lock_handle, tmp_out_dir, err = rpm_lock_repo(out_dir, lock_mode)
if err != nil {
return fmt.Errorf("error to lock repo: %v", err)
}
defer func() {
var unlock_err error
rpm_exit_cleanup(tmp_out_dir)
unlock_err = rpm_unlock_repo(lock_handle)
if ret_err == nil && unlock_err != nil {
// i can't use the return statement because it is
// set in a deferred function.
ret_err = fmt.Errorf("unlock repo lock failed: %w", unlock_err)
}
}()
jobs = make(chan *rpm_repo_job, 100)
results = make(chan error, 100)
worker_count = options.Workers
if worker_count <= 0 {
worker_count = RpmDefaultWorkers
}
for i = 0; i < worker_count; i++ {
wg.Add(1)
go rpm_worker(jobs, results, &wg)
}
data.location_base = ""
data.filelists_ext = options.FilelistsExt
data.change_log_limit = options.ChangeLogLimit
if data.change_log_limit <= 0 {
data.change_log_limit = RpmDefaultChangelogLimit
}
data.repo_dir_name_len = len(in_dir)
data.nevra_table = make(map[string][]*RpmDuplicateLocation)
data.all_xml_structs = rpm_init_xml_struct_all()
data.checksum_type = RpmDefaultChecksum
for _, task = range pool_tasks {
job_item = &rpm_repo_job{}
job_item.pool_task = task
job_item.user_data = &data
jobs <- job_item
}
close(jobs)
wg.Wait()
close(results)
failed_pkg_num = 0
for result = range results {
if result != nil {
failed_pkg_num++
}
}
compression_type = options.CompressionType
xml_compression_suffix = CompressionSuffix(compression_type)
pri_xml_filename = tmp_out_dir + "/primary.xml" + xml_compression_suffix
file_xml_filename = tmp_out_dir + "/filelists.xml" + xml_compression_suffix
oth_xml_filename = tmp_out_dir + "/other.xml" + xml_compression_suffix
fex_xml_filename = ""
support_filelists_ext = options.FilelistsExt
if support_filelists_ext {
fex_xml_filename = tmp_out_dir + "/filelists-ext.xml" + xml_compression_suffix
}
data.all_xml_structs.SortPackageByTaskID()
data.all_xml_structs.SetPackageNum()
primary_repomd_record, err = rpm_get_repomd_record(pri_xml_filename, RpmRepomdTypePrimary, &data, options.UniqueMDFilenames, compression_type)
if err != nil {
return err
}
file_repomd_record, err = rpm_get_repomd_record(file_xml_filename, RpmRepomdTypeFilelists, &data, options.UniqueMDFilenames, compression_type)
if err != nil {
return err
}
other_repomd_record, err = rpm_get_repomd_record(oth_xml_filename, RpmRepomdTypeOther, &data, options.UniqueMDFilenames, compression_type)
if err != nil {
return err
}
revision = strconv.FormatInt(time.Now().Unix(), 10)
repomd = RpmRepomd{}
repomd.Xmlns = RPM_XML_NS_REPOMD
repomd.XmlnsRpm = RPM_XML_NS_RPM
repomd.Revision = revision
repomd.Data = []*RpmRepomdRecord{primary_repomd_record, file_repomd_record, other_repomd_record}
if support_filelists_ext {
fex_repomd_record, err = rpm_get_repomd_record(fex_xml_filename, RpmRepomdTypeFilelistsExt, &data, options.UniqueMDFilenames, compression_type)
if err != nil {
return err
}
repomd.Data = append(repomd.Data, fex_repomd_record)
}
sort.Slice(repomd.Data, func(i int, j int) bool {
var cmp int
cmp = rpm_repomd_record_cmp(repomd.Data[i], repomd.Data[j])
return cmp < 0
})
repomd_path = filepath.Join(tmp_out_dir, "repomd.xml")
err = rpm_write_xml(repomd, repomd_path)
if err != nil {
return fmt.Errorf("write %s failed: %v", repomd_path, err)
}
err = rpm_old_metadata_retention(out_repo, tmp_out_dir, options.AllowMissingRepomd)
if err != nil {
return fmt.Errorf("old metadata files retention failed: %v", err)
}
exist, err = rpm_path_exists(out_repo)
if err != nil {
return err
}
if exist {
old_repo = path.Join(out_dir, rpm_append_pid_and_datetime("repodata.old", ""))
err = os.Rename(out_repo, old_repo)
if err != nil {
return fmt.Errorf("rename %s to %s failed: %v", out_repo, old_repo, err)
}
defer rpm_exit_cleanup(old_repo)
}
err = os.Rename(tmp_out_dir, out_repo)
if err != nil {
return fmt.Errorf("rename %s to %s failed: %v", tmp_out_dir, out_repo, err)
}
if failed_pkg_num > 0 {
return fmt.Errorf("finished with error: %d packages failed, %d packages succeeded", failed_pkg_num, total_pkg_num - failed_pkg_num)
}
return nil
}
func rpm_find_rpm_files(dir_path string) ([]*rpm_pool_task, error) {
var rpm_files []*rpm_pool_task
var err error
var i int
rpm_files = nil
err = filepath.WalkDir(dir_path, func(walk_path string, d fs.DirEntry, walk_err error) error {
var task *rpm_pool_task
if walk_err != nil {
return walk_err
}
if !d.IsDir() && strings.HasSuffix(d.Name(), ".rpm") {
task = &rpm_pool_task{}
task.full_path = walk_path
task.filename = d.Name()
task.path = filepath.Dir(walk_path)
rpm_files = append(rpm_files, task)
}
return nil
})
rpm_sort_pool_tasks(rpm_files)
for i = 0; i < len(rpm_files); i++ {
rpm_files[i].id = int64(i)
rpm_files[i].media_id = 0
}
return rpm_files, err
}
func rpm_task_cmp(a *rpm_pool_task, b *rpm_pool_task) int {
var cmp int
cmp = strings.Compare(a.filename, b.filename)
if cmp != 0 {
return cmp
}
return strings.Compare(a.full_path, b.full_path)
}
func rpm_sort_pool_tasks(package_tasks []*rpm_pool_task) {
sort.Slice(package_tasks, func(i int, j int) bool {
var cmp int
cmp = rpm_task_cmp(package_tasks[i], package_tasks[j])
return cmp < 0
})
}
func rpm_old_metadata_retention(old_repo string, new_repo string, allow_missing_repomd bool) error {
var existed bool
var err error
var old_repomd_path string
var xml_file *os.File
var xml_data []byte
var repomd_data RpmRepomd
var excluded_list []string
var repomd_record *RpmRepomdRecord
var href string
var entries []os.DirEntry
var entry os.DirEntry
var src_path string
var dst_path string
existed, err = rpm_path_exists(old_repo)
if err != nil {
return err
}
if !existed {
return nil
}
old_repomd_path = path.Join(old_repo, "repomd.xml")
xml_file, err = os.Open(old_repomd_path)
if err != nil {
if allow_missing_repomd && os.IsNotExist(err) {
return nil
}
return fmt.Errorf("error opening file %s: %v", old_repomd_path, err)
}
defer xml_file.Close()
xml_data, err = io.ReadAll(xml_file)
if err != nil {
return fmt.Errorf("error reading file %s: %v", old_repomd_path, err)
}
err = xml.Unmarshal(xml_data, &repomd_data)
if err != nil {
return fmt.Errorf("error unmarshaling XML to %s: %v", old_repomd_path, err)
}
excluded_list = []string{"repomd.xml"}
for _, repomd_record = range repomd_data.Data {
href = repomd_record.Location.Href
if href == "" {
continue
}
if repomd_record.Location.XMLBase != "" {
continue
}
excluded_list = append(excluded_list, filepath.Base(href))
}
entries, err = os.ReadDir(old_repo)
if err != nil {
return err
}
for _, entry = range entries {
if rpm_contains(excluded_list, entry.Name()) {
continue
}
src_path = path.Join(old_repo, entry.Name())
dst_path = path.Join(new_repo, entry.Name())
if entry.IsDir() {
err = rpm_copy_dir(src_path, dst_path)
if err != nil {
return err
}
} else {
err = rpm_copy_file(src_path, dst_path)
if err != nil {
return err
}
}
}
return nil
}
func rpm_get_repomd_record(xml_compressed_file_path string, repomd_type RpmRepomdType, data *rpm_user_data, unique_md_filenames bool, compression_type RpmCompressionType) (*RpmRepomdRecord, error) {
var xml_bytes []byte
var err error
var xml_data_checksum string
var compressed_file_checksum string
var compressed_filename string
var base_dir string
var old_xml_compressed_file_path string
var stat_mod_time int64
var stat_size int64
var repomd_rec *RpmRepomdRecord
if repomd_type == RpmRepomdTypePrimary {
xml_bytes, err = rpm_dump_xml(data.all_xml_structs.RpmPrimaryXMLData)
} else if repomd_type == RpmRepomdTypeFilelists {
xml_bytes, err = rpm_dump_xml(data.all_xml_structs.RpmFilelistsXMLData)
} else if repomd_type == RpmRepomdTypeOther {
xml_bytes, err = rpm_dump_xml(data.all_xml_structs.RpmOtherXMLData)
} else if repomd_type == RpmRepomdTypeFilelistsExt {
xml_bytes, err = rpm_dump_xml(data.all_xml_structs.RpmFilelistsExtXMLData)
} else {
return nil, fmt.Errorf("invalid repomd type %s", repomd_type)
}
if err != nil {
return nil, err
}
xml_data_checksum, err = rpm_checksum_bytes(xml_bytes, RpmDefaultRepomdChecksum)
if err != nil {
return nil, err
}
err = rpm_compress_xml(xml_bytes, xml_compressed_file_path, compression_type)
if err != nil {
return nil, err
}
compressed_file_checksum, err = rpm_checksum_file(xml_compressed_file_path, RpmDefaultRepomdChecksum)
if err != nil {
return nil, err
}
compressed_filename = filepath.Base(xml_compressed_file_path)
if unique_md_filenames {
base_dir = filepath.Dir(xml_compressed_file_path)
compressed_filename = fmt.Sprintf("%s-%s", compressed_file_checksum, compressed_filename)
old_xml_compressed_file_path = xml_compressed_file_path
xml_compressed_file_path = path.Join(base_dir, compressed_filename)
err = os.Rename(old_xml_compressed_file_path, xml_compressed_file_path)
if err != nil {
return nil, fmt.Errorf("rename %s to %s failed: %v", old_xml_compressed_file_path, xml_compressed_file_path, err)
}
}
stat_mod_time, stat_size, err = rpm_stat_file(xml_compressed_file_path)
if err != nil {
return nil, err
}
repomd_rec = &RpmRepomdRecord{}
repomd_rec.Type = string(repomd_type)
repomd_rec.OpenChecksum = RpmChecksum{Type: RpmChecksumName(RpmDefaultRepomdChecksum), Value: xml_data_checksum}
repomd_rec.OpenSize = int64(len(xml_bytes))
repomd_rec.Checksum = RpmChecksum{Type: RpmChecksumName(RpmDefaultRepomdChecksum), Value: compressed_file_checksum}
repomd_rec.Size = stat_size
repomd_rec.Location = RpmLocation{Href: RpmLocationHrefPrefix + compressed_filename}
repomd_rec.Timestamp = stat_mod_time
return repomd_rec, nil
}
func rpm_exit_cleanup(lock_dir string) {
_ = os.RemoveAll(lock_dir)
}
func rpm_lock_repo(repo_dir string, lock_mode RpmLockMode) (*os.File, string, error) {
var lock_handle *os.File
var tmp_repodata_dir string
var err error
var flock_flags int
if lock_mode != RpmLockNone {
lock_handle, err = os.Open(repo_dir)
if err != nil {
return nil, "", fmt.Errorf("cannot open repo dir %s for locking: %w", repo_dir, err)
}
flock_flags = syscall.LOCK_EX
if lock_mode == RpmLockFail {
flock_flags = flock_flags | syscall.LOCK_NB
} else if lock_mode != RpmLockWait {
lock_handle.Close()
return nil, "", fmt.Errorf("unknown lock mode")
}
err = syscall.Flock(int(lock_handle.Fd()), flock_flags)
if err != nil {
lock_handle.Close()
if errors.Is(err, syscall.EWOULDBLOCK) || errors.Is(err, syscall.EAGAIN) {
return nil, "", fmt.Errorf("repo dir lock exists: %s", repo_dir)
}
return nil, "", fmt.Errorf("cannot lock repo dir %s: %w", repo_dir, err)
}
}
tmp_repodata_dir = filepath.Join(repo_dir, rpm_append_pid_and_datetime(".repodata", ""))
err = os.Mkdir(tmp_repodata_dir, os.ModePerm)
if err != nil {
rpm_unlock_repo(lock_handle)
return nil, "", fmt.Errorf("cannot create %s: %w", tmp_repodata_dir, err)
}
return lock_handle, tmp_repodata_dir, nil
}
func rpm_unlock_repo(lock_handle *os.File) error {
var err error
if lock_handle == nil {
// in case rpm_lock_repo() didn't really lock a repo or did fail.
return nil
}
err = syscall.Flock(int(lock_handle.Fd()), syscall.LOCK_UN)
if err != nil {
lock_handle.Close()
return err
}
return lock_handle.Close()
}
func rpm_append_pid_and_datetime(prefix string, suffix string) string {
var pid int
var current_time time.Time
var timestamp string
var microseconds int
var result string
pid = os.Getpid()
current_time = time.Now()
timestamp = current_time.Format("20060102150405")
microseconds = current_time.Nanosecond() / 1000
result = fmt.Sprintf("%s.%d.%s.%d", prefix, pid, timestamp, microseconds)
if suffix != "" {
result = fmt.Sprintf("%s.%s", result, suffix)
}
return result
}
func rpm_worker(jobs <-chan *rpm_repo_job, results chan<- error, wg *sync.WaitGroup) {
var job_item *rpm_repo_job
var udata *rpm_user_data
var task *rpm_pool_task
var location_href string
var location_base string
var pkg *RpmPackage
var err error
defer wg.Done()
for job_item = range jobs {
udata = job_item.user_data
task = job_item.pool_task
location_href = task.full_path[udata.repo_dir_name_len:]
location_base = udata.location_base
pkg, err = rpm_load_rpm(task.full_path, udata.checksum_type, location_href, location_base, udata.change_log_limit)
if err != nil {
results <- err
return
}
err = rpm_xml_dump(udata, task, pkg, udata.filelists_ext)
if err != nil {
results <- err
return
}
}
}
func rpm_xml_dump(udata *rpm_user_data, task *rpm_pool_task, pkg *RpmPackage, is_filelists_ext bool) error {
var task_id int64
if rpm_package_contains_forbidden_control_chars(pkg) {
return fmt.Errorf("forbidden control chars found (ASCII values <32 except 9, 10 and 13)")
}
task_id = task.id
udata.all_xml_structs.MutexPrimary.Lock()
udata.all_xml_structs.RpmPrimaryXMLData.PackageList = append(udata.all_xml_structs.RpmPrimaryXMLData.PackageList, rpm_get_primary_package(pkg, task_id))
udata.all_xml_structs.MutexPrimary.Unlock()
udata.all_xml_structs.MutexFilelists.Lock()
udata.all_xml_structs.RpmFilelistsXMLData.PackageList = append(udata.all_xml_structs.RpmFilelistsXMLData.PackageList, rpm_get_filelists_package(pkg, false, task_id))
udata.all_xml_structs.MutexFilelists.Unlock()
udata.all_xml_structs.MutexOtherData.Lock()
udata.all_xml_structs.RpmOtherXMLData.PackageList = append(udata.all_xml_structs.RpmOtherXMLData.PackageList, rpm_get_other_package(pkg, task_id))
udata.all_xml_structs.MutexOtherData.Unlock()
if is_filelists_ext {
udata.all_xml_structs.MutexFilelistsExt.Lock()
udata.all_xml_structs.RpmFilelistsExtXMLData.PackageList = append(udata.all_xml_structs.RpmFilelistsExtXMLData.PackageList, rpm_get_filelists_package(pkg, true, task_id))
udata.all_xml_structs.MutexFilelistsExt.Unlock()
}
return nil
}