package repokit import "encoding/xml" import "errors" import "fmt" import "io" import "io/fs" import "os" import "path" import "path/filepath" import "sort" import "strconv" import "strings" import "sync" import "syscall" import "time" type RpmLockMode int const ( RpmLockUnset RpmLockMode = iota RpmLockFail RpmLockWait RpmLockNone ) type RpmRepoOptions struct { OutputDir string FilelistsExt bool Workers int ChangeLogLimit int UniqueMDFilenames bool IgnoreLock bool CompressionType RpmCompressionType LockMode RpmLockMode AllowMissingRepomd bool } type rpm_pool_task struct { id int64 media_id int64 full_path string filename string path string } type rpm_user_data struct { repo_dir_name_len int location_base string checksum_type RpmChecksumType change_log_limit int filelists_ext bool mutex_nevra_table sync.Mutex nevra_table map[string][]*RpmDuplicateLocation all_xml_structs *RpmXmlStructAll } type rpm_repo_job struct { user_data *rpm_user_data pool_task *rpm_pool_task } func RpmDefaultRepoOptions() RpmRepoOptions { var options RpmRepoOptions options.OutputDir = "" options.FilelistsExt = false options.Workers = RpmDefaultWorkers options.ChangeLogLimit = RpmDefaultChangelogLimit options.UniqueMDFilenames = false options.IgnoreLock = false options.CompressionType = RpmDefaultCompressionType options.LockMode = RpmLockFail options.AllowMissingRepomd = false return options } func RpmCreateRepo(dir string) error { var options RpmRepoOptions var err error options = RpmDefaultRepoOptions() err = RpmCreateRepoWithOptions(dir, options) if err != nil { return err } return nil } func RpmCreateRepoWithOptions(dir string, options RpmRepoOptions) error { var err error err = rpm_create_rpm_repo(dir, options) if err != nil { return err } return nil } func rpm_create_rpm_repo(dir string, options RpmRepoOptions) (ret_err error) { var in_dir string var in_repo string var out_dir string var out_repo string var tmp_out_dir string var out_dir_exist bool var err error var pool_tasks []*rpm_pool_task var total_pkg_num int var jobs chan *rpm_repo_job var results chan error var wg sync.WaitGroup var i int var task *rpm_pool_task var job_item *rpm_repo_job var xml_compression_suffix string var pri_xml_filename string var file_xml_filename string var oth_xml_filename string var fex_xml_filename string var support_filelists_ext bool var primary_repomd_record *RpmRepomdRecord var file_repomd_record *RpmRepomdRecord var other_repomd_record *RpmRepomdRecord var fex_repomd_record *RpmRepomdRecord var revision string var repomd RpmRepomd var repomd_path string var exist bool var old_repo string var failed_pkg_num int var result error var worker_count int var data rpm_user_data var compression_type RpmCompressionType var lock_mode RpmLockMode var lock_handle *os.File in_dir = rpm_normalize_dir_path(dir) in_repo = path.Join(in_dir, "repodata") out_dir = options.OutputDir out_repo = "" tmp_out_dir = "" if out_dir != "" { out_dir_exist, err = rpm_path_exists(out_dir) if err != nil { return fmt.Errorf("error to check if outputdir %s exists: %v", out_dir, err) } if !out_dir_exist { return fmt.Errorf("specified outputdir \"%s\" doesn't exist", out_dir) } out_dir = rpm_normalize_dir_path(out_dir) out_repo = path.Join(out_dir, "repodata") } else { out_dir = in_dir out_repo = in_repo } pool_tasks, err = rpm_find_rpm_files(in_dir) if err != nil { return fmt.Errorf("error to walk through packages in directory %s", in_dir) } total_pkg_num = len(pool_tasks) lock_mode = options.LockMode if lock_mode == RpmLockUnset { if options.IgnoreLock { lock_mode = RpmLockNone } else { lock_mode = RpmLockFail } } lock_handle, tmp_out_dir, err = rpm_lock_repo(out_dir, lock_mode) if err != nil { return fmt.Errorf("error to lock repo: %v", err) } defer func() { var unlock_err error rpm_exit_cleanup(tmp_out_dir) unlock_err = rpm_unlock_repo(lock_handle) if ret_err == nil && unlock_err != nil { // i can't use the return statement because it is // set in a deferred function. ret_err = fmt.Errorf("unlock repo lock failed: %w", unlock_err) } }() jobs = make(chan *rpm_repo_job, 100) results = make(chan error, 100) worker_count = options.Workers if worker_count <= 0 { worker_count = RpmDefaultWorkers } for i = 0; i < worker_count; i++ { wg.Add(1) go rpm_worker(jobs, results, &wg) } data.location_base = "" data.filelists_ext = options.FilelistsExt data.change_log_limit = options.ChangeLogLimit if data.change_log_limit <= 0 { data.change_log_limit = RpmDefaultChangelogLimit } data.repo_dir_name_len = len(in_dir) data.nevra_table = make(map[string][]*RpmDuplicateLocation) data.all_xml_structs = rpm_init_xml_struct_all() data.checksum_type = RpmDefaultChecksum for _, task = range pool_tasks { job_item = &rpm_repo_job{} job_item.pool_task = task job_item.user_data = &data jobs <- job_item } close(jobs) wg.Wait() close(results) failed_pkg_num = 0 for result = range results { if result != nil { failed_pkg_num++ } } compression_type = options.CompressionType xml_compression_suffix = CompressionSuffix(compression_type) pri_xml_filename = tmp_out_dir + "/primary.xml" + xml_compression_suffix file_xml_filename = tmp_out_dir + "/filelists.xml" + xml_compression_suffix oth_xml_filename = tmp_out_dir + "/other.xml" + xml_compression_suffix fex_xml_filename = "" support_filelists_ext = options.FilelistsExt if support_filelists_ext { fex_xml_filename = tmp_out_dir + "/filelists-ext.xml" + xml_compression_suffix } data.all_xml_structs.SortPackageByTaskID() data.all_xml_structs.SetPackageNum() primary_repomd_record, err = rpm_get_repomd_record(pri_xml_filename, RpmRepomdTypePrimary, &data, options.UniqueMDFilenames, compression_type) if err != nil { return err } file_repomd_record, err = rpm_get_repomd_record(file_xml_filename, RpmRepomdTypeFilelists, &data, options.UniqueMDFilenames, compression_type) if err != nil { return err } other_repomd_record, err = rpm_get_repomd_record(oth_xml_filename, RpmRepomdTypeOther, &data, options.UniqueMDFilenames, compression_type) if err != nil { return err } revision = strconv.FormatInt(time.Now().Unix(), 10) repomd = RpmRepomd{} repomd.Xmlns = RPM_XML_NS_REPOMD repomd.XmlnsRpm = RPM_XML_NS_RPM repomd.Revision = revision repomd.Data = []*RpmRepomdRecord{primary_repomd_record, file_repomd_record, other_repomd_record} if support_filelists_ext { fex_repomd_record, err = rpm_get_repomd_record(fex_xml_filename, RpmRepomdTypeFilelistsExt, &data, options.UniqueMDFilenames, compression_type) if err != nil { return err } repomd.Data = append(repomd.Data, fex_repomd_record) } sort.Slice(repomd.Data, func(i int, j int) bool { var cmp int cmp = rpm_repomd_record_cmp(repomd.Data[i], repomd.Data[j]) return cmp < 0 }) repomd_path = filepath.Join(tmp_out_dir, "repomd.xml") err = rpm_write_xml(repomd, repomd_path) if err != nil { return fmt.Errorf("write %s failed: %v", repomd_path, err) } err = rpm_old_metadata_retention(out_repo, tmp_out_dir, options.AllowMissingRepomd) if err != nil { return fmt.Errorf("old metadata files retention failed: %v", err) } exist, err = rpm_path_exists(out_repo) if err != nil { return err } if exist { old_repo = path.Join(out_dir, rpm_append_pid_and_datetime("repodata.old", "")) err = os.Rename(out_repo, old_repo) if err != nil { return fmt.Errorf("rename %s to %s failed: %v", out_repo, old_repo, err) } defer rpm_exit_cleanup(old_repo) } err = os.Rename(tmp_out_dir, out_repo) if err != nil { return fmt.Errorf("rename %s to %s failed: %v", tmp_out_dir, out_repo, err) } if failed_pkg_num > 0 { return fmt.Errorf("finished with error: %d packages failed, %d packages succeeded", failed_pkg_num, total_pkg_num - failed_pkg_num) } return nil } func rpm_find_rpm_files(dir_path string) ([]*rpm_pool_task, error) { var rpm_files []*rpm_pool_task var err error var i int rpm_files = nil err = filepath.WalkDir(dir_path, func(walk_path string, d fs.DirEntry, walk_err error) error { var task *rpm_pool_task if walk_err != nil { return walk_err } if !d.IsDir() && strings.HasSuffix(d.Name(), ".rpm") { task = &rpm_pool_task{} task.full_path = walk_path task.filename = d.Name() task.path = filepath.Dir(walk_path) rpm_files = append(rpm_files, task) } return nil }) rpm_sort_pool_tasks(rpm_files) for i = 0; i < len(rpm_files); i++ { rpm_files[i].id = int64(i) rpm_files[i].media_id = 0 } return rpm_files, err } func rpm_task_cmp(a *rpm_pool_task, b *rpm_pool_task) int { var cmp int cmp = strings.Compare(a.filename, b.filename) if cmp != 0 { return cmp } return strings.Compare(a.full_path, b.full_path) } func rpm_sort_pool_tasks(package_tasks []*rpm_pool_task) { sort.Slice(package_tasks, func(i int, j int) bool { var cmp int cmp = rpm_task_cmp(package_tasks[i], package_tasks[j]) return cmp < 0 }) } func rpm_old_metadata_retention(old_repo string, new_repo string, allow_missing_repomd bool) error { var existed bool var err error var old_repomd_path string var xml_file *os.File var xml_data []byte var repomd_data RpmRepomd var excluded_list []string var repomd_record *RpmRepomdRecord var href string var entries []os.DirEntry var entry os.DirEntry var src_path string var dst_path string existed, err = rpm_path_exists(old_repo) if err != nil { return err } if !existed { return nil } old_repomd_path = path.Join(old_repo, "repomd.xml") xml_file, err = os.Open(old_repomd_path) if err != nil { if allow_missing_repomd && os.IsNotExist(err) { return nil } return fmt.Errorf("error opening file %s: %v", old_repomd_path, err) } defer xml_file.Close() xml_data, err = io.ReadAll(xml_file) if err != nil { return fmt.Errorf("error reading file %s: %v", old_repomd_path, err) } err = xml.Unmarshal(xml_data, &repomd_data) if err != nil { return fmt.Errorf("error unmarshaling XML to %s: %v", old_repomd_path, err) } excluded_list = []string{"repomd.xml"} for _, repomd_record = range repomd_data.Data { href = repomd_record.Location.Href if href == "" { continue } if repomd_record.Location.XMLBase != "" { continue } excluded_list = append(excluded_list, filepath.Base(href)) } entries, err = os.ReadDir(old_repo) if err != nil { return err } for _, entry = range entries { if rpm_contains(excluded_list, entry.Name()) { continue } src_path = path.Join(old_repo, entry.Name()) dst_path = path.Join(new_repo, entry.Name()) if entry.IsDir() { err = rpm_copy_dir(src_path, dst_path) if err != nil { return err } } else { err = rpm_copy_file(src_path, dst_path) if err != nil { return err } } } return nil } func rpm_get_repomd_record(xml_compressed_file_path string, repomd_type RpmRepomdType, data *rpm_user_data, unique_md_filenames bool, compression_type RpmCompressionType) (*RpmRepomdRecord, error) { var xml_bytes []byte var err error var xml_data_checksum string var compressed_file_checksum string var compressed_filename string var base_dir string var old_xml_compressed_file_path string var stat_mod_time int64 var stat_size int64 var repomd_rec *RpmRepomdRecord if repomd_type == RpmRepomdTypePrimary { xml_bytes, err = rpm_dump_xml(data.all_xml_structs.RpmPrimaryXMLData) } else if repomd_type == RpmRepomdTypeFilelists { xml_bytes, err = rpm_dump_xml(data.all_xml_structs.RpmFilelistsXMLData) } else if repomd_type == RpmRepomdTypeOther { xml_bytes, err = rpm_dump_xml(data.all_xml_structs.RpmOtherXMLData) } else if repomd_type == RpmRepomdTypeFilelistsExt { xml_bytes, err = rpm_dump_xml(data.all_xml_structs.RpmFilelistsExtXMLData) } else { return nil, fmt.Errorf("invalid repomd type %s", repomd_type) } if err != nil { return nil, err } xml_data_checksum, err = rpm_checksum_bytes(xml_bytes, RpmDefaultRepomdChecksum) if err != nil { return nil, err } err = rpm_compress_xml(xml_bytes, xml_compressed_file_path, compression_type) if err != nil { return nil, err } compressed_file_checksum, err = rpm_checksum_file(xml_compressed_file_path, RpmDefaultRepomdChecksum) if err != nil { return nil, err } compressed_filename = filepath.Base(xml_compressed_file_path) if unique_md_filenames { base_dir = filepath.Dir(xml_compressed_file_path) compressed_filename = fmt.Sprintf("%s-%s", compressed_file_checksum, compressed_filename) old_xml_compressed_file_path = xml_compressed_file_path xml_compressed_file_path = path.Join(base_dir, compressed_filename) err = os.Rename(old_xml_compressed_file_path, xml_compressed_file_path) if err != nil { return nil, fmt.Errorf("rename %s to %s failed: %v", old_xml_compressed_file_path, xml_compressed_file_path, err) } } stat_mod_time, stat_size, err = rpm_stat_file(xml_compressed_file_path) if err != nil { return nil, err } repomd_rec = &RpmRepomdRecord{} repomd_rec.Type = string(repomd_type) repomd_rec.OpenChecksum = RpmChecksum{Type: RpmChecksumName(RpmDefaultRepomdChecksum), Value: xml_data_checksum} repomd_rec.OpenSize = int64(len(xml_bytes)) repomd_rec.Checksum = RpmChecksum{Type: RpmChecksumName(RpmDefaultRepomdChecksum), Value: compressed_file_checksum} repomd_rec.Size = stat_size repomd_rec.Location = RpmLocation{Href: RpmLocationHrefPrefix + compressed_filename} repomd_rec.Timestamp = stat_mod_time return repomd_rec, nil } func rpm_exit_cleanup(lock_dir string) { _ = os.RemoveAll(lock_dir) } func rpm_lock_repo(repo_dir string, lock_mode RpmLockMode) (*os.File, string, error) { var lock_handle *os.File var tmp_repodata_dir string var err error var flock_flags int if lock_mode != RpmLockNone { lock_handle, err = os.Open(repo_dir) if err != nil { return nil, "", fmt.Errorf("cannot open repo dir %s for locking: %w", repo_dir, err) } flock_flags = syscall.LOCK_EX if lock_mode == RpmLockFail { flock_flags = flock_flags | syscall.LOCK_NB } else if lock_mode != RpmLockWait { lock_handle.Close() return nil, "", fmt.Errorf("unknown lock mode") } err = syscall.Flock(int(lock_handle.Fd()), flock_flags) if err != nil { lock_handle.Close() if errors.Is(err, syscall.EWOULDBLOCK) || errors.Is(err, syscall.EAGAIN) { return nil, "", fmt.Errorf("repo dir lock exists: %s", repo_dir) } return nil, "", fmt.Errorf("cannot lock repo dir %s: %w", repo_dir, err) } } tmp_repodata_dir = filepath.Join(repo_dir, rpm_append_pid_and_datetime(".repodata", "")) err = os.Mkdir(tmp_repodata_dir, os.ModePerm) if err != nil { rpm_unlock_repo(lock_handle) return nil, "", fmt.Errorf("cannot create %s: %w", tmp_repodata_dir, err) } return lock_handle, tmp_repodata_dir, nil } func rpm_unlock_repo(lock_handle *os.File) error { var err error if lock_handle == nil { // in case rpm_lock_repo() didn't really lock a repo or did fail. return nil } err = syscall.Flock(int(lock_handle.Fd()), syscall.LOCK_UN) if err != nil { lock_handle.Close() return err } return lock_handle.Close() } func rpm_append_pid_and_datetime(prefix string, suffix string) string { var pid int var current_time time.Time var timestamp string var microseconds int var result string pid = os.Getpid() current_time = time.Now() timestamp = current_time.Format("20060102150405") microseconds = current_time.Nanosecond() / 1000 result = fmt.Sprintf("%s.%d.%s.%d", prefix, pid, timestamp, microseconds) if suffix != "" { result = fmt.Sprintf("%s.%s", result, suffix) } return result } func rpm_worker(jobs <-chan *rpm_repo_job, results chan<- error, wg *sync.WaitGroup) { var job_item *rpm_repo_job var udata *rpm_user_data var task *rpm_pool_task var location_href string var location_base string var pkg *RpmPackage var err error defer wg.Done() for job_item = range jobs { udata = job_item.user_data task = job_item.pool_task location_href = task.full_path[udata.repo_dir_name_len:] location_base = udata.location_base pkg, err = rpm_load_rpm(task.full_path, udata.checksum_type, location_href, location_base, udata.change_log_limit) if err != nil { results <- err return } err = rpm_xml_dump(udata, task, pkg, udata.filelists_ext) if err != nil { results <- err return } } } func rpm_xml_dump(udata *rpm_user_data, task *rpm_pool_task, pkg *RpmPackage, is_filelists_ext bool) error { var task_id int64 if rpm_package_contains_forbidden_control_chars(pkg) { return fmt.Errorf("forbidden control chars found (ASCII values <32 except 9, 10 and 13)") } task_id = task.id udata.all_xml_structs.MutexPrimary.Lock() udata.all_xml_structs.RpmPrimaryXMLData.PackageList = append(udata.all_xml_structs.RpmPrimaryXMLData.PackageList, rpm_get_primary_package(pkg, task_id)) udata.all_xml_structs.MutexPrimary.Unlock() udata.all_xml_structs.MutexFilelists.Lock() udata.all_xml_structs.RpmFilelistsXMLData.PackageList = append(udata.all_xml_structs.RpmFilelistsXMLData.PackageList, rpm_get_filelists_package(pkg, false, task_id)) udata.all_xml_structs.MutexFilelists.Unlock() udata.all_xml_structs.MutexOtherData.Lock() udata.all_xml_structs.RpmOtherXMLData.PackageList = append(udata.all_xml_structs.RpmOtherXMLData.PackageList, rpm_get_other_package(pkg, task_id)) udata.all_xml_structs.MutexOtherData.Unlock() if is_filelists_ext { udata.all_xml_structs.MutexFilelistsExt.Lock() udata.all_xml_structs.RpmFilelistsExtXMLData.PackageList = append(udata.all_xml_structs.RpmFilelistsExtXMLData.PackageList, rpm_get_filelists_package(pkg, true, task_id)) udata.all_xml_structs.MutexFilelistsExt.Unlock() } return nil }