package repokit import "fmt" import "os" import "path/filepath" import "runtime" import "strings" import "testing" import "time" func TestRpmNormalizeDirPath(t *testing.T) { var input string var output string var expected string var sep string sep = string(filepath.Separator) input = "" output = rpm_normalize_dir_path(input) expected = "./" if output != expected { t.Fatalf("rpm_normalize_dir_path(\"\")=%q, want %q", output, expected) } input = "foo/bar" output = rpm_normalize_dir_path(input) if !strings.HasSuffix(output, sep) { t.Fatalf("rpm_normalize_dir_path(%q)=%q, expected trailing separator", input, output) } input = "foo/bar/" output = rpm_normalize_dir_path(input) if !strings.HasSuffix(output, sep) { t.Fatalf("rpm_normalize_dir_path(%q)=%q, expected trailing separator", input, output) } } func TestRpmFindRpmFilesSortAndIDs(t *testing.T) { var dir string var err error var subdir string var files []*rpm_pool_task var names []string var i int dir = t.TempDir() subdir = filepath.Join(dir, "sub") err = os.MkdirAll(subdir, 0775) if err != nil { t.Fatalf("mkdir: %v", err) } err = os.WriteFile(filepath.Join(dir, "b.rpm"), []byte(""), 0644) if err != nil { t.Fatalf("write b.rpm: %v", err) } err = os.WriteFile(filepath.Join(dir, "a.rpm"), []byte(""), 0644) if err != nil { t.Fatalf("write a.rpm: %v", err) } err = os.WriteFile(filepath.Join(subdir, "a.rpm"), []byte(""), 0644) if err != nil { t.Fatalf("write sub/a.rpm: %v", err) } files, err = rpm_find_rpm_files(dir) if err != nil { t.Fatalf("rpm_find_rpm_files: %v", err) } if len(files) != 3 { t.Fatalf("rpm_find_rpm_files count=%d, want 3", len(files)) } names = make([]string, 0, len(files)) for i = 0; i < len(files); i++ { names = append(names, files[i].filename) if files[i].id != int64(i) { t.Fatalf("task id at %d=%d, want %d", i, files[i].id, i) } } if names[0] != "a.rpm" || names[1] != "a.rpm" || names[2] != "b.rpm" { t.Fatalf("sorted filenames=%v, want [a.rpm a.rpm b.rpm]", names) } } func TestRpmRecordCmp(t *testing.T) { var a RpmRepomdRecord var b RpmRepomdRecord var cmp int a.Type = "primary" a.Location = RpmLocation{Href: "repodata/primary.xml.zst"} b.Type = "filelists" b.Location = RpmLocation{Href: "repodata/filelists.xml.zst"} cmp = rpm_repomd_record_cmp(&a, &b) if cmp >= 0 { t.Fatalf("rpm_repomd_record_cmp primary vs filelists=%d, want < 0", cmp) } a.Type = "filelists" b.Type = "filelists" a.Location = RpmLocation{Href: "b"} b.Location = RpmLocation{Href: "a"} cmp = rpm_repomd_record_cmp(&a, &b) if cmp <= 0 { t.Fatalf("rpm_repomd_record_cmp href order=%d, want > 0", cmp) } } func TestRpmChecksumBytes(t *testing.T) { var sum string var err error var input []byte input = []byte("abc") sum, err = rpm_checksum_bytes(input, RPM_CHECKSUM_SHA256) if err != nil { t.Fatalf("rpm_checksum_bytes error: %v", err) } if sum != "ba7816bf8f01cfea414140de5dae2223b00361a396177a9cb410ff61f20015ad" { t.Fatalf("rpm_checksum_bytes=%s, want sha256 of abc", sum) } } func TestRpmPathExists(t *testing.T) { var dir string var exists bool var err error dir = t.TempDir() exists, err = rpm_path_exists(dir) if err != nil { t.Fatalf("rpm_path_exists error: %v", err) } if !exists { t.Fatalf("rpm_path_exists returned false for temp dir") } exists, err = rpm_path_exists(filepath.Join(dir, "missing")) if err != nil { t.Fatalf("rpm_path_exists error: %v", err) } if exists { t.Fatalf("rpm_path_exists returned true for missing path") } } func TestRpmNormalizeDirPathWindowsDrive(t *testing.T) { if runtime.GOOS != "windows" { t.Skip("windows path behavior") } var input string var output string input = "C:\\temp" output = rpm_normalize_dir_path(input) if !strings.HasSuffix(output, "\\") { t.Fatalf("rpm_normalize_dir_path windows=%q, expected trailing \\", output) } } func TestRpmLockRepoModes(t *testing.T) { var dir string var lock_handle *os.File var tmp_dir string var err error var wait_done chan error var exists bool var wait_err error var lock_err error dir = t.TempDir() lock_handle, tmp_dir, err = rpm_lock_repo(dir, RpmLockWait) if err != nil { t.Fatalf("initial lock: %v", err) } defer rpm_exit_cleanup(tmp_dir) _, _, err = rpm_lock_repo(dir, RpmLockFail) if err == nil { t.Fatalf("RpmLockFail expected error when lock exists") } wait_done = make(chan error, 1) go func() { var tmp_lock *os.File var tmp_out string var inner_err error tmp_lock, tmp_out, inner_err = rpm_lock_repo(dir, RpmLockWait) if inner_err == nil { if tmp_lock == nil || tmp_out == "" { inner_err = fmt.Errorf("RpmLockWait returned empty paths") } _ = rpm_unlock_repo(tmp_lock) rpm_exit_cleanup(tmp_out) } wait_done <- inner_err }() time.Sleep(150 * time.Millisecond) lock_err = rpm_unlock_repo(lock_handle) if lock_err != nil { t.Fatalf("unlock lock handle: %v", lock_err) } wait_err = <-wait_done if wait_err != nil { t.Fatalf("RpmLockWait error: %v", wait_err) } lock_handle, tmp_dir, err = rpm_lock_repo(dir, RpmLockNone) if err != nil { t.Fatalf("RpmLockNone error: %v", err) } if lock_handle != nil { t.Fatalf("RpmLockNone should not return lock handle") } exists, err = rpm_path_exists(tmp_dir) if err != nil { t.Fatalf("path exists error: %v", err) } if !exists { t.Fatalf("RpmLockNone tmp dir missing: %q", tmp_dir) } rpm_exit_cleanup(tmp_dir) } func TestRpmCreateRepoWithOptionsLockNone(t *testing.T) { var dir string var options RpmRepoOptions var err error var repodata_dir string var repomd_path string var exists bool var entries []os.DirEntry var entry os.DirEntry dir = t.TempDir() options = RpmDefaultRepoOptions() options.LockMode = RpmLockNone options.AllowMissingRepomd = true err = RpmCreateRepoWithOptions(dir, options) if err != nil { t.Fatalf("RpmCreateRepoWithOptions lock none: %v", err) } repodata_dir = filepath.Join(dir, "repodata") repomd_path = filepath.Join(repodata_dir, "repomd.xml") exists, err = rpm_path_exists(repomd_path) if err != nil { t.Fatalf("repomd exists check: %v", err) } if !exists { t.Fatalf("repomd not found at %q", repomd_path) } entries, err = os.ReadDir(dir) if err != nil { t.Fatalf("readdir repo dir: %v", err) } for _, entry = range entries { if strings.HasPrefix(entry.Name(), ".repodata.") { t.Fatalf("temporary repodata dir leaked: %q", entry.Name()) } } }