package rpm import "log" import "os" import "path/filepath" import "strings" import "sync" import "time" import repokit "repokit" type MetaManager struct { mutex sync.Mutex states map[string]*metaState } type metaState struct { inProgress bool pending bool } func NewMetaManager() *MetaManager { var mgr *MetaManager mgr = &MetaManager{ states: map[string]*metaState{}, } return mgr } func (m *MetaManager) IsRunning(dir string) bool { var state *metaState var ok bool m.mutex.Lock() defer m.mutex.Unlock() state, ok = m.states[dir] if !ok || state == nil { return false } return state.inProgress } func (m *MetaManager) Schedule(dir string) { var state *metaState var ok bool m.mutex.Lock() state, ok = m.states[dir] if !ok { state = &metaState{} m.states[dir] = state } if state.inProgress { state.pending = true m.mutex.Unlock() return } state.inProgress = true m.mutex.Unlock() go m.run(dir) } func (m *MetaManager) run(dir string) { var err error var opts repokit.RpmRepoOptions var state *metaState var ok bool var repodataDir string var repomdPath string var entries []os.DirEntry var repomdInfo os.FileInfo var statErr error for { log.Printf("rpm metadata: job begin dir=%s", dir) opts = repokit.RpmDefaultRepoOptions() opts.LockMode = repokit.RpmLockFail opts.AllowMissingRepomd = true opts.CompressionType = repokit.RPM_COMPRESSION_GZ log.Printf("rpm metadata: build start dir=%s", dir) err = repokit.RpmCreateRepoWithOptions(dir, opts) m.mutex.Lock() if err != nil { if isLockError(err) { log.Printf("rpm metadata: lock busy dir=%s err=%v", dir, err) log.Printf("rpm metadata: job end dir=%s result=lock_busy", dir) state, ok = m.states[dir] if ok { state.pending = true state.inProgress = false } m.mutex.Unlock() time.AfterFunc(2*time.Second, func() { m.Schedule(dir) }) return } log.Printf("rpm metadata: build failed dir=%s err=%v", dir, err) log.Printf("rpm metadata: job end dir=%s result=failed err=%v", dir, err) state, ok = m.states[dir] if ok { state.inProgress = false } m.mutex.Unlock() return } repodataDir = filepath.Join(dir, "repodata") repomdPath = filepath.Join(repodataDir, "repomd.xml") entries, err = os.ReadDir(repodataDir) if err != nil { log.Printf("rpm metadata: post-check dir=%s repodata_dir=%s read_err=%v", dir, repodataDir, err) } else { statErr = nil repomdInfo = nil repomdInfo, statErr = os.Stat(repomdPath) if statErr != nil { log.Printf("rpm metadata: post-check dir=%s repodata_entries=%d repomd_path=%s repomd_err=%v", dir, len(entries), repomdPath, statErr) } else { log.Printf("rpm metadata: post-check dir=%s repodata_entries=%d repomd_path=%s repomd_size=%d", dir, len(entries), repomdPath, repomdInfo.Size()) } } log.Printf("rpm metadata: build done dir=%s", dir) state, ok = m.states[dir] if ok && state.pending { log.Printf("rpm metadata: job end dir=%s result=pending_rerun", dir) state.pending = false m.mutex.Unlock() continue } if ok { state.inProgress = false } m.mutex.Unlock() log.Printf("rpm metadata: job end dir=%s result=success", dir) return } } func isLockError(err error) bool { if err == nil { return false } return strings.Contains(err.Error(), "repodata lock exists") }