Files
repokit/rpm-util.go

694 lines
17 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package repokit
import "compress/gzip"
import "encoding/xml"
import "fmt"
import "github.com/klauspost/compress/zstd"
import "github.com/sassoftware/go-rpmutils"
import "io"
import "os"
import "path/filepath"
import "regexp"
import "strconv"
import "strings"
import "syscall"
func rpm_normalize_dir_path(path_value string) string {
var normalized string
if path_value == "" {
return "./"
}
normalized = filepath.Clean(path_value)
if !strings.HasSuffix(normalized, string(filepath.Separator)) && normalized != "/" {
normalized = normalized + string(filepath.Separator)
}
return normalized
}
func rpm_path_exists(path_value string) (bool, error) {
var err error
_, err = os.Stat(path_value)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return false, err
}
func rpm_copy_dir(src string, dst string) error {
var src_info os.FileInfo
var err error
var entries []os.DirEntry
var entry os.DirEntry
var src_path string
var dst_path string
src_info, err = os.Stat(src)
if err != nil {
return err
}
err = os.MkdirAll(dst, src_info.Mode())
if err != nil {
return err
}
entries, err = os.ReadDir(src)
if err != nil {
return err
}
for _, entry = range entries {
src_path = filepath.Join(src, entry.Name())
dst_path = filepath.Join(dst, entry.Name())
if entry.IsDir() {
err = rpm_copy_dir(src_path, dst_path)
if err != nil {
return err
}
} else {
err = rpm_copy_file(src_path, dst_path)
if err != nil {
return err
}
}
}
return nil
}
func rpm_copy_file(src string, dst string) error {
var source_file *os.File
var dest_file *os.File
var err error
source_file, err = os.Open(src)
if err != nil {
return fmt.Errorf("could not open source file: %v", err)
}
defer source_file.Close()
dest_file, err = os.Create(dst)
if err != nil {
return fmt.Errorf("could not create destination file: %v", err)
}
defer dest_file.Close()
_, err = io.Copy(dest_file, source_file)
if err != nil {
return fmt.Errorf("could not copy file contents: %v", err)
}
err = dest_file.Close()
if err != nil {
return fmt.Errorf("could not close destination file: %v", err)
}
err = rpm_copy_metadata_and_ownership(src, dst)
if err != nil {
return fmt.Errorf("could not copy metadata: %v", err)
}
return nil
}
func rpm_copy_metadata_and_ownership(src string, dst string) error {
var src_info os.FileInfo
var err error
var src_sys *syscall.Stat_t
src_info, err = os.Stat(src)
if err != nil {
return fmt.Errorf("could not get source file info: %v", err)
}
err = os.Chmod(dst, src_info.Mode())
if err != nil {
return fmt.Errorf("could not set file permissions: %v", err)
}
err = os.Chtimes(dst, src_info.ModTime(), src_info.ModTime())
if err != nil {
return fmt.Errorf("could not set file timestamps: %v", err)
}
src_sys = src_info.Sys().(*syscall.Stat_t)
err = os.Chown(dst, int(src_sys.Uid), int(src_sys.Gid))
if err != nil {
return fmt.Errorf("could not change file ownership: %v", err)
}
return nil
}
func rpm_contains(slice []string, elem string) bool {
var v string
for _, v = range slice {
if v == elem {
return true
}
}
return false
}
func rpm_flag_to_str(flags uint64) string {
var result string
flags = flags & 0xf
switch flags {
case 0:
result = ""
case 2:
result = "LT"
case 4:
result = "GT"
case 8:
result = "EQ"
case 10:
result = "LE"
case 12:
result = "GE"
default:
result = ""
}
return result
}
func rpm_is_primary_file(file_path string) bool {
// it mirrors the historical createrepo/createrepo_c definition of “primary” files. In RPM repodata,
// primary.xml only lists a subset of files (to keep metadata smaller). The heuristic is:
//
// - /etc/* - configuration files are important for dependency and discovery.
// - /usr/lib/sendmail - special legacy binary that historically needed to be listed even though it
// doesnt live in /bin.
// - any path containing bin/ - executables are treated as primary.
if strings.HasPrefix(file_path, "/etc/") {
return true
}
if file_path == "/usr/lib/sendmail" {
return true
}
if strings.Contains(file_path, "bin/") {
return true
}
return false
}
func rpm_compare_dependency(dep1 string, dep2 string) int {
// NOTE: The function assume first parts must be same!
// libc.so.6() < libc.so.6(GLIBC_2.3.4)(64 bit) < libc.so.6(GLIBC_2.4)
// Returns -1 if first < second, 1 if first > second, and 0 if first == second.
var regex *regexp.Regexp
var matches1 []string
var matches2 []string
regex = regexp.MustCompile(`\((?i:glibc)_([^)]+)\)`)
matches1 = regex.FindStringSubmatch(dep1)
matches2 = regex.FindStringSubmatch(dep2)
if len(matches1) == 0 && len(matches2) == 0 { return 0 }
if len(matches1) == 0 { return -1 }
if len(matches2) == 0 { return 1 }
return rpmutils.Vercmp(matches1[1], matches2[1])
}
func rpm_package_nevra(pkg *RpmPackage) string {
var epoch string
epoch = pkg.Epoch
if epoch == "" {
epoch = "0"
}
return fmt.Sprintf("%s-%s:%s-%s.%s", pkg.Name, epoch, pkg.Version, pkg.Release, pkg.Arch)
}
func rpm_str_to_evr(input string) *RpmEVR {
var evr *RpmEVR
var epoch_end int
var epoch_str string
var err error
var bad_epoch bool
var version_release []string
evr = &RpmEVR{}
if input == "" {
return evr
}
bad_epoch = false
epoch_end = strings.Index(input, ":")
if epoch_end != -1 {
epoch_str = input[:epoch_end]
_, err = strconv.Atoi(epoch_str)
if err != nil {
bad_epoch = true
} else {
evr.Epoch = epoch_str
}
input = input[epoch_end+1:]
}
if evr.Epoch == "" && !bad_epoch {
evr.Epoch = "0"
}
version_release = strings.SplitN(input, "-", 2)
if len(version_release) > 0 {
evr.Version = version_release[0]
if len(version_release) == 2 {
evr.Release = version_release[1]
}
}
return evr
}
func rpm_has_control_chars(value string) bool {
var i int
for i = 0; i < len(value); i++ {
if value[i] < 32 && value[i] != 9 && value[i] != 10 && value[i] != 13 {
return true
}
}
return false
}
func rpm_dependency_list_contains_forbidden_control_chars(deps []RpmDependency) bool {
var ret bool
var dep RpmDependency
ret = false
for _, dep = range deps {
if dep.Name != "" && rpm_has_control_chars(dep.Name) {
ret = true
break
}
if dep.Epoch != "" && rpm_has_control_chars(dep.Epoch) {
ret = true
break
}
if dep.Version != "" && rpm_has_control_chars(dep.Version) {
ret = true
break
}
if dep.Release != "" && rpm_has_control_chars(dep.Release) {
ret = true
break
}
}
return ret
}
func rpm_package_contains_forbidden_control_chars(pkg *RpmPackage) bool {
var ret bool
var file_item RpmPackageFile
var change_log RpmChangelogEntry
ret = false
if pkg.Name != "" && rpm_has_control_chars(pkg.Name) {
ret = true
goto done
}
if pkg.Epoch != "" && rpm_has_control_chars(pkg.Epoch) {
ret = true
goto done
}
if pkg.Version != "" && rpm_has_control_chars(pkg.Version) {
ret = true
goto done
}
if pkg.Release != "" && rpm_has_control_chars(pkg.Release) {
ret = true
goto done
}
if pkg.Arch != "" && rpm_has_control_chars(pkg.Arch) {
ret = true
goto done
}
if pkg.Summary != "" && rpm_has_control_chars(pkg.Summary) {
ret = true
goto done
}
if pkg.Description != "" && rpm_has_control_chars(pkg.Description) {
ret = true
goto done
}
if pkg.Url != "" && rpm_has_control_chars(pkg.Url) {
ret = true
goto done
}
if pkg.RpmLicense != "" && rpm_has_control_chars(pkg.RpmLicense) {
ret = true
goto done
}
if pkg.RpmVendor != "" && rpm_has_control_chars(pkg.RpmVendor) {
ret = true
goto done
}
if pkg.RpmGroup != "" && rpm_has_control_chars(pkg.RpmGroup) {
ret = true
goto done
}
if pkg.RpmBuildHost != "" && rpm_has_control_chars(pkg.RpmBuildHost) {
ret = true
goto done
}
if pkg.RpmSourceRpm != "" && rpm_has_control_chars(pkg.RpmSourceRpm) {
ret = true
goto done
}
if pkg.RpmPackager != "" && rpm_has_control_chars(pkg.RpmPackager) {
ret = true
goto done
}
if pkg.LocationHref != "" && rpm_has_control_chars(pkg.LocationHref) {
ret = true
goto done
}
if pkg.LocationBase != "" && rpm_has_control_chars(pkg.LocationBase) {
ret = true
goto done
}
if rpm_dependency_list_contains_forbidden_control_chars(pkg.Requires) {
ret = true
goto done
}
if rpm_dependency_list_contains_forbidden_control_chars(pkg.Provides) {
ret = true
goto done
}
if rpm_dependency_list_contains_forbidden_control_chars(pkg.Conflicts) {
ret = true
goto done
}
if rpm_dependency_list_contains_forbidden_control_chars(pkg.Obsoletes) {
ret = true
goto done
}
if rpm_dependency_list_contains_forbidden_control_chars(pkg.Suggests) {
ret = true
goto done
}
if rpm_dependency_list_contains_forbidden_control_chars(pkg.Enhances) {
ret = true
goto done
}
if rpm_dependency_list_contains_forbidden_control_chars(pkg.Recommends) {
ret = true
goto done
}
if rpm_dependency_list_contains_forbidden_control_chars(pkg.Supplements) {
ret = true
goto done
}
for _, file_item = range pkg.Files {
if file_item.FullPath != "" && rpm_has_control_chars(file_item.FullPath) {
ret = true
goto done
}
}
for _, change_log = range pkg.Changelogs {
if change_log.Author != "" && rpm_has_control_chars(change_log.Author) {
ret = true
goto done
}
if change_log.Changelog != "" && rpm_has_control_chars(change_log.Changelog) {
ret = true
goto done
}
}
done:
return ret
}
func rpm_get_primary_package(pkg *RpmPackage, task_id int64) *RpmPrimaryPackage {
var primary_pkg *RpmPrimaryPackage
primary_pkg = &RpmPrimaryPackage{}
primary_pkg.ID = task_id
primary_pkg.Type = "rpm"
primary_pkg.Name = pkg.Name
primary_pkg.Arch = pkg.Arch
primary_pkg.Version = RpmVersion{Epoch: pkg.Epoch, Ver: pkg.Version, Rel: pkg.Release}
primary_pkg.Checksum = RpmChecksum{Type: pkg.RpmChecksumType, PkgID: "YES", Value: pkg.PkgId}
primary_pkg.Summary = pkg.Summary
primary_pkg.Description = pkg.Description
primary_pkg.Packager = pkg.RpmPackager
primary_pkg.URL = pkg.Url
primary_pkg.Time = RpmTime{File: fmt.Sprintf("%d", pkg.TimeFile), Build: fmt.Sprintf("%d", pkg.TimeBuild)}
primary_pkg.Size = RpmSize{Package: fmt.Sprintf("%d", pkg.SizePackage), Installed: fmt.Sprintf("%d", pkg.SizeInstalled), Archive: fmt.Sprintf("%d", pkg.SizeArchive)}
primary_pkg.Location = RpmLocation{Href: pkg.LocationHref, XMLBase: rpm_prepend_protocol(pkg.LocationBase)}
primary_pkg.Format = RpmFormat{}
primary_pkg.Format.License = pkg.RpmLicense
primary_pkg.Format.Vendor = pkg.RpmVendor
primary_pkg.Format.Group = pkg.RpmGroup
primary_pkg.Format.BuildHost = pkg.RpmBuildHost
primary_pkg.Format.SourceRPM = pkg.RpmSourceRpm
primary_pkg.Format.HeaderRange = RpmHeaderRange{Start: fmt.Sprintf("%d", pkg.RpmHeaderStart), End: fmt.Sprintf("%d", pkg.RpmHeaderEnd)}
primary_pkg.Format.Provides = rpm_xml_dump_primary_dump_pcor(pkg.Provides, RPM_DEP_PROVIDES)
primary_pkg.Format.Conflicts = rpm_xml_dump_primary_dump_pcor(pkg.Conflicts, RPM_DEP_CONFLICTS)
primary_pkg.Format.Obsoletes = rpm_xml_dump_primary_dump_pcor(pkg.Obsoletes, RPM_DEP_OBSOLETES)
primary_pkg.Format.Requires = rpm_xml_dump_primary_dump_pcor(pkg.Requires, RPM_DEP_REQUIRES)
primary_pkg.Format.Suggests = rpm_xml_dump_primary_dump_pcor(pkg.Suggests, RPM_DEP_SUGGESTS)
primary_pkg.Format.Enhances = rpm_xml_dump_primary_dump_pcor(pkg.Enhances, RPM_DEP_ENHANCES)
primary_pkg.Format.Recommends = rpm_xml_dump_primary_dump_pcor(pkg.Recommends, RPM_DEP_RECOMMENDS)
primary_pkg.Format.Supplements = rpm_xml_dump_primary_dump_pcor(pkg.Supplements, RPM_DEP_SUPPLEMENTS)
primary_pkg.Format.Files = rpm_xml_dump_files(pkg.Files, true, false)
return primary_pkg
}
func rpm_get_filelists_package(pkg *RpmPackage, is_filelists_ext bool, task_id int64) *RpmFilelistsPackage {
var filelists_pkg *RpmFilelistsPackage
filelists_pkg = &RpmFilelistsPackage{}
filelists_pkg.ID = task_id
filelists_pkg.PkgID = pkg.PkgId
filelists_pkg.Name = pkg.Name
filelists_pkg.Arch = pkg.Arch
filelists_pkg.Version = RpmVersion{Epoch: pkg.Epoch, Ver: pkg.Version, Rel: pkg.Release}
filelists_pkg.File = rpm_xml_dump_files(pkg.Files, false, is_filelists_ext)
if is_filelists_ext {
filelists_pkg.Checksum = &RpmFilelistsChecksum{Type: pkg.RpmChecksumType}
}
return filelists_pkg
}
func rpm_get_other_package(pkg *RpmPackage, task_id int64) *RpmOtherPackage {
var other_pkg *RpmOtherPackage
var change_log_list []*RpmChangelog
var c_log RpmChangelogEntry
var change_log *RpmChangelog
other_pkg = &RpmOtherPackage{}
other_pkg.ID = task_id
other_pkg.PkgID = pkg.PkgId
other_pkg.Name = pkg.Name
other_pkg.Arch = pkg.Arch
other_pkg.Version = RpmVersion{Epoch: pkg.Epoch, Ver: pkg.Version, Rel: pkg.Release}
change_log_list = nil
for _, c_log = range pkg.Changelogs {
change_log = &RpmChangelog{Content: c_log.Changelog, Author: c_log.Author, Date: fmt.Sprintf("%d", c_log.Date)}
change_log_list = append(change_log_list, change_log)
}
if change_log_list != nil {
other_pkg.Changelog = change_log_list
}
return other_pkg
}
func rpm_xml_dump_files(files []RpmPackageFile, is_primary bool, is_filelists_ext bool) []*RpmFile {
var xml_files []*RpmFile
var file_item RpmPackageFile
var file_value *RpmFile
xml_files = nil
for _, file_item = range files {
if file_item.FullPath == "" {
continue
}
if is_primary && !rpm_is_primary_file(file_item.FullPath) {
continue
}
file_value = &RpmFile{}
file_value.Value = file_item.FullPath
if file_item.Type != "" && file_item.Type != "file" {
file_value.Type = file_item.Type
}
if is_filelists_ext && file_item.Digest != "" {
file_value.Hash = file_item.Digest
}
xml_files = append(xml_files, file_value)
}
return xml_files
}
func rpm_xml_dump_primary_dump_pcor(dependencies []RpmDependency, dep_type RpmDepType) *RpmDepEntryList {
var dep_entry_list *RpmDepEntryList
var dep RpmDependency
var entry RpmDepEntry
dep_entry_list = nil
if len(dependencies) > 0 {
dep_entry_list = &RpmDepEntryList{}
for _, dep = range dependencies {
if dep.Name == "" {
continue
}
entry = RpmDepEntry{Name: dep.Name}
if dep.Flags != "" {
entry.Flags = dep.Flags
if dep.Epoch != "" {
entry.Epoch = dep.Epoch
}
if dep.Version != "" {
entry.Ver = dep.Version
}
if dep.Release != "" {
entry.Rel = dep.Release
}
}
if dep_type == RPM_DEP_REQUIRES && dep.Pre {
entry.Pre = "1"
}
dep_entry_list.Entries = append(dep_entry_list.Entries, entry)
}
}
return dep_entry_list
}
func rpm_prepend_protocol(url_value string) string {
if strings.HasPrefix(url_value, "/") {
return "file://" + url_value
}
return url_value
}
func rpm_dump_xml(obj any) ([]byte, error) {
var xml_data []byte
var err error
var combined []byte
xml_data, err = xml.MarshalIndent(obj, "", " ")
if err != nil {
return nil, err
}
combined = []byte(xml.Header + string(xml_data))
return combined, nil
}
func rpm_compress_xml(xml_data []byte, output_path string, compression_type RpmCompressionType) error {
var output_file *os.File
var err error
var encoder *zstd.Encoder
var gzip_writer *gzip.Writer
var writer io.Writer
output_file, err = os.Create(output_path)
if err != nil {
return err
}
defer output_file.Close()
if compression_type == RPM_COMPRESSION_NONE {
_, err = output_file.Write(xml_data)
if err != nil {
return err
}
return nil
}
if compression_type == RPM_COMPRESSION_GZ {
gzip_writer = gzip.NewWriter(output_file)
defer gzip_writer.Close()
writer = gzip_writer
} else if compression_type == RPM_COMPRESSION_ZSTD {
encoder, err = zstd.NewWriter(output_file, zstd.WithEncoderLevel(zstd.EncoderLevelFromZstd(7)))
if err != nil {
return err
}
defer encoder.Close()
writer = encoder
} else {
return fmt.Errorf("unsupported compression type")
}
_, err = writer.Write(xml_data)
if err != nil {
return err
}
return nil
}
func rpm_write_xml(obj any, path_value string) error {
var xml_data []byte
var err error
var file *os.File
var combined []byte
xml_data, err = xml.MarshalIndent(obj, "", " ")
if err != nil {
return fmt.Errorf("error marshalling to XML: %v", err)
}
combined = []byte(xml.Header + string(xml_data))
file, err = os.Create(path_value)
if err != nil {
return fmt.Errorf("error creating XML file %s: %v", path_value, err)
}
defer file.Close()
_, err = file.Write(combined)
if err != nil {
return fmt.Errorf("error writing XML data to file %s: %v", path_value, err)
}
return nil
}
func rpm_stat_file(full_path string) (int64, int64, error) {
var stat_buf os.FileInfo
var err error
var path_err *os.PathError
var ok bool
stat_buf, err = os.Stat(full_path)
if err != nil {
if os.IsNotExist(err) {
return 0, 0, fmt.Errorf("file does not exist: %s", full_path)
}
path_err, ok = err.(*os.PathError)
if ok && path_err.Err == syscall.ENOENT {
return 0, 0, fmt.Errorf("stat(%s) failed: %s", full_path, path_err.Err)
}
return 0, 0, fmt.Errorf("stat(%s) failed: %v", full_path, err)
}
return stat_buf.ModTime().Unix(), stat_buf.Size(), nil
}
func rpm_reverse_array[T any](slice []T) {
var i int
var j int
j = len(slice) - 1
for i = 0; i < j; i, j = i+1, j-1 {
slice[i], slice[j] = slice[j], slice[i]
}
}