diff --git a/internal/services/crossseed/service.go b/internal/services/crossseed/service.go index e49c9babb..3140cd10d 100644 --- a/internal/services/crossseed/service.go +++ b/internal/services/crossseed/service.go @@ -219,23 +219,27 @@ type automationContext struct { } const ( - searchResultCacheTTL = 5 * time.Minute - indexerDomainCacheTTL = 1 * time.Minute - contentFilteringWaitTimeout = 5 * time.Second - contentFilteringPollInterval = 150 * time.Millisecond - selectedIndexerContentSkipReason = "selected indexers were filtered out" - selectedIndexerCapabilitySkipReason = "selected indexers do not support required caps" - crossSeedRenameWaitTimeout = 15 * time.Second - crossSeedRenamePollInterval = 200 * time.Millisecond - automationSettingsQueryTimeout = 5 * time.Second - recheckPollInterval = 3 * time.Second // Batch API calls per instance - recheckAbsoluteTimeout = 60 * time.Minute // Allow time for large recheck queues - recheckAPITimeout = 30 * time.Second - minSearchIntervalSecondsTorznab = 60 - minSearchIntervalSecondsGazelleOnly = 5 - minSearchCooldownMinutes = 720 - maxCompletionSearchAttempts = 3 - defaultCompletionRetryDelay = 30 * time.Second + searchResultCacheTTL = 5 * time.Minute + indexerDomainCacheTTL = 1 * time.Minute + contentFilteringWaitTimeout = 5 * time.Second + contentFilteringPollInterval = 150 * time.Millisecond + selectedIndexerContentSkipReason = "selected indexers were filtered out" + selectedIndexerCapabilitySkipReason = "selected indexers do not support required caps" + crossSeedRenameWaitTimeout = 15 * time.Second + crossSeedRenamePollInterval = 200 * time.Millisecond + automationSettingsQueryTimeout = 5 * time.Second + recheckPollInterval = 3 * time.Second // Batch API calls per instance + recheckAbsoluteTimeout = 60 * time.Minute // Allow time for large recheck queues + recheckAPITimeout = 30 * time.Second + minSearchIntervalSecondsTorznab = 60 + minSearchIntervalSecondsGazelleOnly = 5 + minSearchCooldownMinutes = 720 + maxCompletionSearchAttempts = 3 + maxCompletionCheckingAttempts = 3 + defaultCompletionRetryDelay = 30 * time.Second + defaultCompletionCheckingRetryDelay = 30 * time.Second + defaultCompletionCheckingPollInterval = 2 * time.Second + defaultCompletionCheckingTimeout = 5 * time.Minute // User-facing message when cross-seed is skipped due to recheck requirement skippedRecheckMessage = "Skipped: requires recheck. Disable 'Skip recheck' in Cross-Seed settings to allow" @@ -336,9 +340,15 @@ type Service struct { metrics *ServiceMetrics // Per-instance completion coordination. - // Ensures completion-triggered searches run serially per instance. + // Queue bookkeeping/polling and completion-triggered search serialization + // use separate mutexes so a slow search does not stall other waits. completionLaneMu sync.Mutex completionLanes map[int]*completionLane + // Completion polling timings are injectable for tests; zero values use package defaults. + completionPollInterval time.Duration + completionTimeout time.Duration + completionRetryDelay time.Duration + completionMaxAttempts int // test hooks crossSeedInvoker func(ctx context.Context, req *CrossSeedRequest) (*CrossSeedResponse, error) @@ -360,7 +370,28 @@ type pendingResume struct { } type completionLane struct { - mu sync.Mutex + mu sync.Mutex + searchMu sync.Mutex + waits map[string]*completionWaitState + polling bool +} + +type completionWaitState struct { + done chan struct{} + attempt int + retryAt time.Time + deadline time.Time + timeout time.Duration + eventTorrent qbt.Torrent + lastSeen *qbt.Torrent + result *qbt.Torrent + err error + checkingLogged bool +} + +type completionWaitSnapshot struct { + state *completionWaitState + retryAt time.Time } // NewService creates a new cross-seed service @@ -418,6 +449,10 @@ func NewService( dedupCache: dedupCache, metrics: NewServiceMetrics(), completionLanes: make(map[int]*completionLane), + completionPollInterval: defaultCompletionCheckingPollInterval, + completionTimeout: defaultCompletionCheckingTimeout, + completionRetryDelay: defaultCompletionCheckingRetryDelay, + completionMaxAttempts: maxCompletionCheckingAttempts, recheckResumeChan: make(chan *pendingResume, 100), recheckResumeCtx: recheckCtx, recheckResumeCancel: recheckCancel, @@ -429,6 +464,38 @@ func NewService( return svc } +func (s *Service) getCompletionPollInterval() time.Duration { + if s != nil && s.completionPollInterval > 0 { + return s.completionPollInterval + } + + return defaultCompletionCheckingPollInterval +} + +func (s *Service) getCompletionTimeout() time.Duration { + if s != nil && s.completionTimeout > 0 { + return s.completionTimeout + } + + return defaultCompletionCheckingTimeout +} + +func (s *Service) getCompletionRetryDelay() time.Duration { + if s != nil && s.completionRetryDelay > 0 { + return s.completionRetryDelay + } + + return defaultCompletionCheckingRetryDelay +} + +func (s *Service) getCompletionMaxAttempts() int { + if s != nil && s.completionMaxAttempts > 0 { + return s.completionMaxAttempts + } + + return maxCompletionCheckingAttempts +} + // HealthCheck performs comprehensive health checks on the cross-seed service func (s *Service) HealthCheck(ctx context.Context) error { // Check if we can list instances @@ -1410,34 +1477,62 @@ func (s *Service) HandleTorrentCompletion(ctx context.Context, instanceID int, t } if !completionSettings.Enabled { - log.Debug(). - Int("instanceID", instanceID). - Str("hash", torrent.Hash). - Str("name", torrent.Name). - Msg("[CROSSSEED-COMPLETION] Completion search disabled for this instance") + logCompletionSkip(instanceID, &torrent, "[CROSSSEED-COMPLETION] Completion search disabled for this instance") return } - if torrent.CompletionOn <= 0 || torrent.Hash == "" { - // Safety check – the qbittorrent completion hook should only fire for completed torrents. + if shouldSkipCompletionTorrent(instanceID, &torrent, completionSettings) { return } - if hasCrossSeedTag(torrent.Tags) { - log.Debug(). + readyTorrent, err := s.waitForCompletionTorrentReady(ctx, instanceID, torrent) + if err != nil { + log.Warn(). + Err(err). Int("instanceID", instanceID). Str("hash", torrent.Hash). Str("name", torrent.Name). - Msg("[CROSSSEED-COMPLETION] Skipping already tagged cross-seed torrent") + Msg("[CROSSSEED-COMPLETION] Failed to execute completion search") return } - if !matchesCompletionFilters(&torrent, completionSettings) { - log.Debug(). + lane := s.getCompletionLane(instanceID) + lane.searchMu.Lock() + defer lane.searchMu.Unlock() + + readyTorrent, err = s.getCompletionTorrent(ctx, instanceID, readyTorrent.Hash) + if err != nil { + log.Warn(). + Err(err). Int("instanceID", instanceID). Str("hash", torrent.Hash). Str("name", torrent.Name). - Msg("[CROSSSEED-COMPLETION] Torrent does not match completion filters") + Msg("[CROSSSEED-COMPLETION] Failed to reload completion torrent") + return + } + if isCompletionCheckingState(readyTorrent.State) { + logCompletionSkip(instanceID, readyTorrent, "[CROSSSEED-COMPLETION] Torrent resumed checking before completion search") + return + } + if readyTorrent.Progress < 1.0 { + logCompletionSkip(instanceID, readyTorrent, "[CROSSSEED-COMPLETION] Torrent is no longer fully downloaded") + return + } + + completionSettings, err = s.completionStore.Get(ctx, instanceID) + if err != nil { + log.Warn(). + Err(err). + Int("instanceID", instanceID). + Str("hash", readyTorrent.Hash). + Msg("[CROSSSEED-COMPLETION] Failed to reload instance completion settings") + return + } + if !completionSettings.Enabled { + logCompletionSkip(instanceID, readyTorrent, "[CROSSSEED-COMPLETION] Completion search disabled for this instance") + return + } + if shouldSkipCompletionTorrent(instanceID, readyTorrent, completionSettings) { return } @@ -1447,7 +1542,7 @@ func (s *Service) HandleTorrentCompletion(ctx context.Context, instanceID int, t log.Warn(). Err(err). Int("instanceID", instanceID). - Str("hash", torrent.Hash). + Str("hash", readyTorrent.Hash). Msg("[CROSSSEED-COMPLETION] Failed to load automation settings") return } @@ -1455,21 +1550,48 @@ func (s *Service) HandleTorrentCompletion(ctx context.Context, instanceID int, t settings = models.DefaultCrossSeedAutomationSettings() } - lane := s.getCompletionLane(instanceID) - lane.mu.Lock() - defer lane.mu.Unlock() - - err = s.executeCompletionSearchWithRetry(ctx, instanceID, &torrent, settings, completionSettings) + err = s.executeCompletionSearchWithRetry(ctx, instanceID, readyTorrent, settings, completionSettings) if err != nil { log.Warn(). Err(err). Int("instanceID", instanceID). - Str("hash", torrent.Hash). - Str("name", torrent.Name). + Str("hash", readyTorrent.Hash). + Str("name", readyTorrent.Name). Msg("[CROSSSEED-COMPLETION] Failed to execute completion search") } } +func shouldSkipCompletionTorrent(instanceID int, torrent *qbt.Torrent, completionSettings *models.InstanceCrossSeedCompletionSettings) bool { + if torrent == nil { + return true + } + + if torrent.CompletionOn <= 0 || torrent.Hash == "" { + // Safety check – the qbittorrent completion hook should only fire for completed torrents. + return true + } + + if hasCrossSeedTag(torrent.Tags) { + logCompletionSkip(instanceID, torrent, "[CROSSSEED-COMPLETION] Skipping already tagged cross-seed torrent") + return true + } + + if !matchesCompletionFilters(torrent, completionSettings) { + logCompletionSkip(instanceID, torrent, "[CROSSSEED-COMPLETION] Torrent does not match completion filters") + return true + } + + return false +} + +func logCompletionSkip(instanceID int, torrent *qbt.Torrent, message string) { + event := log.Debug().Int("instanceID", instanceID) + if torrent != nil { + event = event.Str("hash", torrent.Hash).Str("name", torrent.Name) + } + event.Msg(message) +} + func (s *Service) getCompletionLane(instanceID int) *completionLane { s.completionLaneMu.Lock() defer s.completionLaneMu.Unlock() @@ -1486,6 +1608,454 @@ func (s *Service) getCompletionLane(instanceID int) *completionLane { return lane } +func (s *Service) waitForCompletionTorrentReady(ctx context.Context, instanceID int, eventTorrent qbt.Torrent) (*qbt.Torrent, error) { + lane := s.getCompletionLane(instanceID) + lane.mu.Lock() + defer lane.mu.Unlock() + + return s.waitForCompletionTorrentReadyLocked(ctx, instanceID, lane, eventTorrent) +} + +func (s *Service) waitForCompletionTorrentReadyLocked( + ctx context.Context, + instanceID int, + lane *completionLane, + eventTorrent qbt.Torrent, +) (*qbt.Torrent, error) { + wait := s.registerCompletionWaitLocked(instanceID, lane, eventTorrent) + done := wait.done + + lane.mu.Unlock() + + var result *qbt.Torrent + var err error + + select { + case <-ctx.Done(): + err = ctx.Err() + case <-done: + err = wait.err + if wait.result != nil { + torrent := *wait.result + result = &torrent + } + } + + lane.mu.Lock() + + if err != nil { + return nil, err + } + + return result, nil +} + +func (s *Service) registerCompletionWaitLocked( + instanceID int, + lane *completionLane, + eventTorrent qbt.Torrent, +) *completionWaitState { + if lane.waits == nil { + lane.waits = make(map[string]*completionWaitState) + } + + hash := normalizeHash(eventTorrent.Hash) + timeout := s.getCompletionTimeout() + now := time.Now() + deadline := now.Add(timeout) + + wait, ok := lane.waits[hash] + if ok { + base := now + if wait.retryAt.After(base) { + base = wait.retryAt + } + deadline = base.Add(timeout) + if deadline.After(wait.deadline) { + wait.deadline = deadline + wait.timeout = timeout + } + s.startCompletionLanePollerLocked(instanceID, lane) + return wait + } + + wait = &completionWaitState{ + done: make(chan struct{}), + attempt: 1, + deadline: deadline, + timeout: timeout, + eventTorrent: eventTorrent, + } + lane.waits[hash] = wait + + s.startCompletionLanePollerLocked(instanceID, lane) + + return wait +} + +func (s *Service) startCompletionLanePollerLocked(instanceID int, lane *completionLane) { + if lane.polling { + return + } + + lane.polling = true + + go s.runCompletionLanePoller(instanceID, lane) +} + +func (s *Service) runCompletionLanePoller(instanceID int, lane *completionLane) { + timer := time.NewTimer(0) + defer timer.Stop() + + for { + <-timer.C + nextDelay, ok := s.pollCompletionLane(instanceID, lane) + if !ok { + return + } + timer.Reset(nextDelay) + } +} + +func (s *Service) pollCompletionLane(instanceID int, lane *completionLane) (time.Duration, bool) { + waits := s.snapshotCompletionWaits(lane) + if len(waits) == 0 { + return 0, false + } + + now := time.Now() + activeWaits := make(map[string]*completionWaitState, len(waits)) + hashes := make([]string, 0, len(waits)) + for hash, wait := range waits { + if wait.retryAt.After(now) { + continue + } + activeWaits[hash] = wait.state + hashes = append(hashes, hash) + } + + if len(activeWaits) == 0 { + lane.mu.Lock() + defer lane.mu.Unlock() + + return s.nextCompletionPollDelayLocked(lane, now) + } + + torrents, err := s.getCompletionTorrents(context.Background(), instanceID, hashes) + now = time.Now() + + lane.mu.Lock() + defer lane.mu.Unlock() + + if err != nil { + log.Warn(). + Err(err). + Int("instanceID", instanceID). + Int("torrents", len(hashes)). + Msg("[CROSSSEED-COMPLETION] Failed to refresh completion torrents while waiting for checking to finish") + s.expireCompletionWaitsLocked(instanceID, lane, now) + return s.nextCompletionPollDelayLocked(lane, now) + } + + s.applyCompletionPollResultsLocked(instanceID, lane, activeWaits, torrents, now) + + return s.nextCompletionPollDelayLocked(lane, now) +} + +func (s *Service) snapshotCompletionWaits(lane *completionLane) map[string]completionWaitSnapshot { + lane.mu.Lock() + defer lane.mu.Unlock() + + if len(lane.waits) == 0 { + lane.polling = false + return nil + } + + waits := make(map[string]completionWaitSnapshot, len(lane.waits)) + for hash, wait := range lane.waits { + waits[hash] = completionWaitSnapshot{ + state: wait, + retryAt: wait.retryAt, + } + } + + return waits +} + +func (s *Service) applyCompletionPollResultsLocked( + instanceID int, + lane *completionLane, + waits map[string]*completionWaitState, + torrents map[string]qbt.Torrent, + now time.Time, +) { + for hash, wait := range waits { + currentWait, ok := lane.waits[hash] + if !ok || currentWait != wait { + continue + } + + torrent, ok := torrents[hash] + if !ok { + s.failMissingCompletionWaitLocked(instanceID, lane, hash, wait) + continue + } + + current := torrent + wait.lastSeen = ¤t + + if s.keepWaitingForCompletion(instanceID, lane, hash, wait, current, now) { + continue + } + + if current.Progress < 1.0 { + log.Warn(). + Int("instanceID", instanceID). + Str("hash", current.Hash). + Str("name", current.Name). + Str("state", string(current.State)). + Float64("progress", current.Progress). + Msg("[CROSSSEED-COMPLETION] Torrent finished checking but is still incomplete") + s.completeCompletionWaitLocked( + lane, + hash, + wait, + nil, + fmt.Errorf("%w: torrent %s is not fully downloaded (progress %.2f)", ErrTorrentNotComplete, current.Name, current.Progress), + ) + continue + } + + s.completeCompletionWaitLocked(lane, hash, wait, ¤t, nil) + } +} + +func (s *Service) keepWaitingForCompletion( + instanceID int, + lane *completionLane, + hash string, + wait *completionWaitState, + current qbt.Torrent, + now time.Time, +) bool { + if !isCompletionCheckingState(current.State) { + return false + } + + if !wait.checkingLogged { + log.Debug(). + Int("instanceID", instanceID). + Str("hash", current.Hash). + Str("name", current.Name). + Str("state", string(current.State)). + Float64("progress", current.Progress). + Msg("[CROSSSEED-COMPLETION] Deferring completion search while torrent is checking") + wait.checkingLogged = true + } + + if now.Before(wait.deadline) { + return true + } + + if wait.attempt < s.getCompletionMaxAttempts() { + s.retryCompletionWaitLocked(instanceID, wait, current, now) + return true + } + + log.Warn(). + Int("instanceID", instanceID). + Str("hash", current.Hash). + Str("name", current.Name). + Str("state", string(current.State)). + Float64("progress", current.Progress). + Dur("timeout", wait.timeout). + Msg("[CROSSSEED-COMPLETION] Timed out waiting for torrent checking to finish") + s.completeCompletionWaitLocked( + lane, + hash, + wait, + nil, + fmt.Errorf("completion torrent %s still checking after %s", current.Name, wait.timeout), + ) + + return true +} + +func (s *Service) retryCompletionWaitLocked(instanceID int, wait *completionWaitState, current qbt.Torrent, now time.Time) { + retryAfter := s.getCompletionRetryDelay() + retryAt := now.Add(retryAfter) + nextAttempt := wait.attempt + 1 + + log.Warn(). + Int("instanceID", instanceID). + Str("hash", current.Hash). + Str("name", current.Name). + Str("state", string(current.State)). + Float64("progress", current.Progress). + Int("attempt", wait.attempt). + Int("nextAttempt", nextAttempt). + Int("maxAttempts", s.getCompletionMaxAttempts()). + Dur("timeout", wait.timeout). + Dur("retryAfter", retryAfter). + Msg("[CROSSSEED-COMPLETION] Timed out waiting for torrent checking to finish, retrying") + + wait.attempt = nextAttempt + wait.retryAt = retryAt + wait.deadline = retryAt.Add(wait.timeout) + wait.lastSeen = ¤t + wait.checkingLogged = false +} + +func (s *Service) failMissingCompletionWaitLocked( + instanceID int, + lane *completionLane, + hash string, + wait *completionWaitState, +) { + err := fmt.Errorf("%w: torrent %s not found in instance %d", ErrTorrentNotFound, wait.eventTorrent.Hash, instanceID) + + log.Warn(). + Int("instanceID", instanceID). + Str("hash", wait.eventTorrent.Hash). + Str("name", wait.eventTorrent.Name). + Err(err). + Msg("[CROSSSEED-COMPLETION] Completion torrent disappeared while waiting for checking to finish") + + s.completeCompletionWaitLocked(lane, hash, wait, nil, err) +} + +func (s *Service) expireCompletionWaitsLocked(instanceID int, lane *completionLane, now time.Time) { + for hash, wait := range lane.waits { + if now.Before(wait.deadline) { + continue + } + + s.failTimedOutCompletionWaitLocked(instanceID, lane, hash, wait) + } +} + +func (s *Service) failTimedOutCompletionWaitLocked( + instanceID int, + lane *completionLane, + hash string, + wait *completionWaitState, +) { + name := wait.eventTorrent.Name + state := qbt.TorrentState("") + progress := 0.0 + + if wait.lastSeen != nil { + name = wait.lastSeen.Name + state = wait.lastSeen.State + progress = wait.lastSeen.Progress + } + + log.Warn(). + Int("instanceID", instanceID). + Str("hash", wait.eventTorrent.Hash). + Str("name", name). + Str("state", string(state)). + Float64("progress", progress). + Dur("timeout", wait.timeout). + Msg("[CROSSSEED-COMPLETION] Timed out waiting for torrent checking to finish") + s.completeCompletionWaitLocked( + lane, + hash, + wait, + nil, + fmt.Errorf("completion torrent %s still checking after %s", name, wait.timeout), + ) +} + +func (s *Service) completeCompletionWaitLocked( + lane *completionLane, + hash string, + wait *completionWaitState, + result *qbt.Torrent, + err error, +) { + delete(lane.waits, hash) + + if result != nil { + torrent := *result + wait.result = &torrent + } + wait.err = err + + close(wait.done) +} + +func (s *Service) updateCompletionPollerStateLocked(lane *completionLane) bool { + if len(lane.waits) > 0 { + return true + } + + lane.polling = false + return false +} + +func (s *Service) nextCompletionPollDelayLocked(lane *completionLane, now time.Time) (time.Duration, bool) { + if !s.updateCompletionPollerStateLocked(lane) { + return 0, false + } + + pollInterval := s.getCompletionPollInterval() + nextDelay := pollInterval + + for _, wait := range lane.waits { + if !wait.retryAt.After(now) { + return pollInterval, true + } + + delay := wait.retryAt.Sub(now) + if delay < nextDelay { + nextDelay = delay + } + } + + return nextDelay, true +} + +func (s *Service) getCompletionTorrent(ctx context.Context, instanceID int, hash string) (*qbt.Torrent, error) { + torrents, err := s.getCompletionTorrents(ctx, instanceID, []string{hash}) + if err != nil { + return nil, err + } + + torrent, ok := torrents[normalizeHash(hash)] + if !ok { + return nil, fmt.Errorf("%w: torrent %s not found in instance %d", ErrTorrentNotFound, hash, instanceID) + } + + current := torrent + return ¤t, nil +} + +func (s *Service) getCompletionTorrents(ctx context.Context, instanceID int, hashes []string) (map[string]qbt.Torrent, error) { + apiCtx, cancel := context.WithTimeout(ctx, recheckAPITimeout) + defer cancel() + + torrents, err := s.syncManager.GetTorrents(apiCtx, instanceID, qbt.TorrentFilterOptions{ + Hashes: hashes, + }) + if err != nil { + return nil, fmt.Errorf("load torrents: %w", err) + } + + result := make(map[string]qbt.Torrent, len(torrents)) + for _, torrent := range torrents { + result[normalizeHash(torrent.Hash)] = torrent + } + + return result, nil +} + +func isCompletionCheckingState(state qbt.TorrentState) bool { + return state == qbt.TorrentStateCheckingDl || + state == qbt.TorrentStateCheckingUp || + state == qbt.TorrentStateCheckingResumeData +} + func (s *Service) executeCompletionSearchWithRetry( ctx context.Context, instanceID int, diff --git a/internal/services/crossseed/service_completion_queue_test.go b/internal/services/crossseed/service_completion_queue_test.go index 5fbe77ad7..b6c567ddc 100644 --- a/internal/services/crossseed/service_completion_queue_test.go +++ b/internal/services/crossseed/service_completion_queue_test.go @@ -16,6 +16,7 @@ import ( _ "modernc.org/sqlite" "github.com/autobrr/qui/internal/models" + internalqb "github.com/autobrr/qui/internal/qbittorrent" "github.com/autobrr/qui/internal/services/jackett" ) @@ -58,11 +59,156 @@ func setupCompletionStoreForQueueTests(t *testing.T) *models.InstanceCrossSeedCo return models.NewInstanceCrossSeedCompletionStore(q) } +type completionPollingSyncMock struct { + mu sync.Mutex + sequences map[string][]qbt.Torrent + hits map[string]int + delay time.Duration +} + +func newCompletionPollingSyncMock(sequences map[string][]qbt.Torrent) *completionPollingSyncMock { + normalized := make(map[string][]qbt.Torrent, len(sequences)) + for hash, sequence := range sequences { + normalized[normalizeHash(hash)] = sequence + } + + return &completionPollingSyncMock{ + sequences: normalized, + hits: make(map[string]int), + } +} + +func (m *completionPollingSyncMock) GetTorrents(_ context.Context, _ int, filter qbt.TorrentFilterOptions) ([]qbt.Torrent, error) { + if len(filter.Hashes) == 0 { + return nil, nil + } + + if m.delay > 0 { + time.Sleep(m.delay) + } + + hash := normalizeHash(filter.Hashes[0]) + + m.mu.Lock() + defer m.mu.Unlock() + + sequence, ok := m.sequences[hash] + if !ok || len(sequence) == 0 { + return nil, nil + } + + index := m.hits[hash] + if index >= len(sequence) { + index = len(sequence) - 1 + } + m.hits[hash]++ + + torrent := sequence[index] + return []qbt.Torrent{torrent}, nil +} + +func (m *completionPollingSyncMock) hitCount(hash string) int { + m.mu.Lock() + defer m.mu.Unlock() + + return m.hits[normalizeHash(hash)] +} + +func (m *completionPollingSyncMock) GetTorrentFilesBatch(context.Context, int, []string) (map[string]qbt.TorrentFiles, error) { + return nil, nil +} + +func (*completionPollingSyncMock) ExportTorrent(context.Context, int, string) ([]byte, string, string, error) { + return nil, "", "", nil +} + +func (*completionPollingSyncMock) HasTorrentByAnyHash(context.Context, int, []string) (*qbt.Torrent, bool, error) { + return nil, false, nil +} + +func (*completionPollingSyncMock) GetTorrentProperties(context.Context, int, string) (*qbt.TorrentProperties, error) { + return &qbt.TorrentProperties{}, nil +} + +func (*completionPollingSyncMock) GetAppPreferences(context.Context, int) (qbt.AppPreferences, error) { + return qbt.AppPreferences{}, nil +} + +func (*completionPollingSyncMock) AddTorrent(context.Context, int, []byte, map[string]string) error { + return nil +} + +func (*completionPollingSyncMock) BulkAction(context.Context, int, []string, string) error { + return nil +} + +func (*completionPollingSyncMock) GetCachedInstanceTorrents(context.Context, int) ([]internalqb.CrossInstanceTorrentView, error) { + return nil, nil +} + +func (*completionPollingSyncMock) ExtractDomainFromURL(string) string { + return "" +} + +func (*completionPollingSyncMock) GetQBittorrentSyncManager(context.Context, int) (*qbt.SyncManager, error) { + return nil, nil +} + +func (*completionPollingSyncMock) RenameTorrent(context.Context, int, string, string) error { + return nil +} + +func (*completionPollingSyncMock) RenameTorrentFile(context.Context, int, string, string, string) error { + return nil +} + +func (*completionPollingSyncMock) RenameTorrentFolder(context.Context, int, string, string, string) error { + return nil +} + +func (*completionPollingSyncMock) SetTags(context.Context, int, []string, string) error { + return nil +} + +func (*completionPollingSyncMock) GetCategories(context.Context, int) (map[string]qbt.Category, error) { + return map[string]qbt.Category{}, nil +} + +func (*completionPollingSyncMock) CreateCategory(context.Context, int, string, string) error { + return nil +} + +func setCompletionCheckingTimings(svc *Service, pollInterval time.Duration, timeout time.Duration) { + svc.completionPollInterval = pollInterval + svc.completionTimeout = timeout +} + +func setCompletionCheckingRetryPolicy(svc *Service, retryDelay time.Duration, maxAttempts int) { + svc.completionRetryDelay = retryDelay + svc.completionMaxAttempts = maxAttempts +} + func TestHandleTorrentCompletion_QueuesPerInstance(t *testing.T) { completionStore := setupCompletionStoreForQueueTests(t) firstHash := "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" secondHash := "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + syncMock := newCompletionPollingSyncMock(map[string][]qbt.Torrent{ + firstHash: {{ + Hash: firstHash, + Name: "first", + Progress: 1.0, + State: qbt.TorrentStateUploading, + CompletionOn: 123, + }}, + secondHash: {{ + Hash: secondHash, + Name: "second", + Progress: 1.0, + State: qbt.TorrentStateUploading, + CompletionOn: 124, + }}, + }) firstStarted := make(chan struct{}) secondStarted := make(chan struct{}) releaseFirst := make(chan struct{}) @@ -73,6 +219,7 @@ func TestHandleTorrentCompletion_QueuesPerInstance(t *testing.T) { svc := &Service{ completionStore: completionStore, + syncManager: syncMock, automationSettingsLoader: func(context.Context) (*models.CrossSeedAutomationSettings, error) { return models.DefaultCrossSeedAutomationSettings(), nil }, @@ -140,12 +287,153 @@ func TestHandleTorrentCompletion_QueuesPerInstance(t *testing.T) { require.Equal(t, []string{firstHash, secondHash}, invocationOrder) } +func TestHandleTorrentCompletion_ContinuesPollingWhileSearchIsSerialized(t *testing.T) { + completionStore := setupCompletionStoreForQueueTests(t) + + firstHash := "abababababababababababababababababababab" + secondHash := "cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd" + syncMock := newCompletionPollingSyncMock(map[string][]qbt.Torrent{ + firstHash: {{ + Hash: firstHash, + Name: "first", + Progress: 1.0, + State: qbt.TorrentStateUploading, + CompletionOn: 123, + }}, + secondHash: { + { + Hash: secondHash, + Name: "second", + Progress: 0.42, + State: qbt.TorrentStateCheckingResumeData, + CompletionOn: 124, + }, + { + Hash: secondHash, + Name: "second", + Progress: 1.0, + State: qbt.TorrentStateUploading, + CompletionOn: 124, + }, + }, + }) + + firstStarted := make(chan struct{}) + secondStarted := make(chan struct{}) + releaseFirst := make(chan struct{}) + var firstOnce sync.Once + var secondOnce sync.Once + + svc := &Service{ + completionStore: completionStore, + syncManager: syncMock, + automationSettingsLoader: func(context.Context) (*models.CrossSeedAutomationSettings, error) { + return models.DefaultCrossSeedAutomationSettings(), nil + }, + completionSearchInvoker: func(_ context.Context, _ int, torrent *qbt.Torrent, _ *models.CrossSeedAutomationSettings, _ *models.InstanceCrossSeedCompletionSettings) error { + switch torrent.Hash { + case firstHash: + firstOnce.Do(func() { close(firstStarted) }) + <-releaseFirst + case secondHash: + secondOnce.Do(func() { close(secondStarted) }) + } + return nil + }, + } + setCompletionCheckingTimings(svc, 5*time.Millisecond, 200*time.Millisecond) + + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + svc.HandleTorrentCompletion(context.Background(), 1, qbt.Torrent{ + Hash: firstHash, + Name: "first", + Progress: 1.0, + CompletionOn: 123, + }) + }() + + select { + case <-firstStarted: + case <-time.After(5 * time.Second): + t.Fatal("first completion search did not start") + } + + go func() { + defer wg.Done() + svc.HandleTorrentCompletion(context.Background(), 1, qbt.Torrent{ + Hash: secondHash, + Name: "second", + Progress: 1.0, + CompletionOn: 124, + }) + }() + + require.Eventually(t, func() bool { + return syncMock.hitCount(secondHash) >= 2 + }, time.Second, 10*time.Millisecond, "second wait was not polled while first search held the serialization lock") + + select { + case <-secondStarted: + t.Fatal("second completion search started before first released") + case <-time.After(50 * time.Millisecond): + } + + close(releaseFirst) + + select { + case <-secondStarted: + case <-time.After(5 * time.Second): + t.Fatal("second completion search did not start after first completed") + } + + wg.Wait() +} + +func TestSnapshotCompletionWaits_CopiesSchedulingFields(t *testing.T) { + lane := &completionLane{ + waits: make(map[string]*completionWaitState), + } + + initialRetryAt := time.Now().Add(15 * time.Second) + wait := &completionWaitState{ + done: make(chan struct{}), + retryAt: initialRetryAt, + } + lane.waits["abc"] = wait + + svc := &Service{} + snapshot := svc.snapshotCompletionWaits(lane) + + lane.mu.Lock() + updatedRetryAt := initialRetryAt.Add(30 * time.Second) + wait.retryAt = updatedRetryAt + lane.mu.Unlock() + + entry, ok := snapshot["abc"] + require.True(t, ok) + require.Same(t, wait, entry.state) + require.True(t, entry.retryAt.Equal(initialRetryAt)) + require.False(t, entry.retryAt.Equal(updatedRetryAt)) +} + func TestHandleTorrentCompletion_RetriesOnRateLimitError(t *testing.T) { completionStore := setupCompletionStoreForQueueTests(t) attempts := 0 svc := &Service{ completionStore: completionStore, + syncManager: newCompletionPollingSyncMock(map[string][]qbt.Torrent{ + "cccccccccccccccccccccccccccccccccccccccc": {{ + Hash: "cccccccccccccccccccccccccccccccccccccccc", + Name: "retry-me", + Progress: 1.0, + State: qbt.TorrentStateUploading, + CompletionOn: 125, + }}, + }), automationSettingsLoader: func(context.Context) (*models.CrossSeedAutomationSettings, error) { return models.DefaultCrossSeedAutomationSettings(), nil }, @@ -174,6 +462,376 @@ func TestHandleTorrentCompletion_RetriesOnRateLimitError(t *testing.T) { assert.Equal(t, 2, attempts) } +func TestHandleTorrentCompletion_DefersWhileChecking(t *testing.T) { + completionStore := setupCompletionStoreForQueueTests(t) + + hash := "dddddddddddddddddddddddddddddddddddddddd" + syncMock := newCompletionPollingSyncMock(map[string][]qbt.Torrent{ + hash: { + { + Hash: hash, + Name: "checking", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + CompletionOn: 200, + }, + { + Hash: hash, + Name: "checking", + Progress: 1.0, + State: qbt.TorrentStateUploading, + CompletionOn: 200, + }, + }, + }) + + invoked := make(chan qbt.Torrent, 1) + svc := &Service{ + completionStore: completionStore, + syncManager: syncMock, + automationSettingsLoader: func(context.Context) (*models.CrossSeedAutomationSettings, error) { + return models.DefaultCrossSeedAutomationSettings(), nil + }, + completionSearchInvoker: func(_ context.Context, _ int, torrent *qbt.Torrent, _ *models.CrossSeedAutomationSettings, _ *models.InstanceCrossSeedCompletionSettings) error { + invoked <- *torrent + return nil + }, + } + setCompletionCheckingTimings(svc, 5*time.Millisecond, 50*time.Millisecond) + + svc.HandleTorrentCompletion(context.Background(), 1, qbt.Torrent{ + Hash: hash, + Name: "checking", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + CompletionOn: 200, + }) + + select { + case torrent := <-invoked: + require.InDelta(t, 1.0, torrent.Progress, 0.0001) + require.Equal(t, qbt.TorrentStateUploading, torrent.State) + case <-time.After(time.Second): + t.Fatal("completion search was not invoked after checking finished") + } +} + +func TestHandleTorrentCompletion_RetriesAfterCheckingTimeout(t *testing.T) { + completionStore := setupCompletionStoreForQueueTests(t) + + hash := "cdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcdcd" + syncMock := newCompletionPollingSyncMock(map[string][]qbt.Torrent{ + hash: { + { + Hash: hash, + Name: "checking-retry", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + CompletionOn: 220, + }, + { + Hash: hash, + Name: "checking-retry", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + CompletionOn: 220, + }, + { + Hash: hash, + Name: "checking-retry", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + CompletionOn: 220, + }, + { + Hash: hash, + Name: "checking-retry", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + CompletionOn: 220, + }, + { + Hash: hash, + Name: "checking-retry", + Progress: 1.0, + State: qbt.TorrentStateUploading, + CompletionOn: 220, + }, + }, + }) + + invoked := make(chan qbt.Torrent, 1) + svc := &Service{ + completionStore: completionStore, + syncManager: syncMock, + automationSettingsLoader: func(context.Context) (*models.CrossSeedAutomationSettings, error) { + return models.DefaultCrossSeedAutomationSettings(), nil + }, + completionSearchInvoker: func(_ context.Context, _ int, torrent *qbt.Torrent, _ *models.CrossSeedAutomationSettings, _ *models.InstanceCrossSeedCompletionSettings) error { + invoked <- *torrent + return nil + }, + } + setCompletionCheckingTimings(svc, 5*time.Millisecond, 12*time.Millisecond) + setCompletionCheckingRetryPolicy(svc, 8*time.Millisecond, 3) + + svc.HandleTorrentCompletion(context.Background(), 1, qbt.Torrent{ + Hash: hash, + Name: "checking-retry", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + CompletionOn: 220, + }) + + select { + case torrent := <-invoked: + require.InDelta(t, 1.0, torrent.Progress, 0.0001) + require.Equal(t, qbt.TorrentStateUploading, torrent.State) + case <-time.After(time.Second): + t.Fatal("completion search was not invoked after checking retry") + } + + require.Equal(t, 6, syncMock.hitCount(hash)) +} + +func TestHandleTorrentCompletion_RechecksSkipConditionsAfterWaiting(t *testing.T) { + completionStore := setupCompletionStoreForQueueTests(t) + + hash := "abababababababababababababababababababab" + syncMock := newCompletionPollingSyncMock(map[string][]qbt.Torrent{ + hash: { + { + Hash: hash, + Name: "checking-then-tagged", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + CompletionOn: 300, + }, + { + Hash: hash, + Name: "checking-then-tagged", + Progress: 1.0, + State: qbt.TorrentStateUploading, + CompletionOn: 300, + Tags: "cross-seed", + }, + }, + }) + + invoked := make(chan struct{}, 1) + svc := &Service{ + completionStore: completionStore, + syncManager: syncMock, + automationSettingsLoader: func(context.Context) (*models.CrossSeedAutomationSettings, error) { + return models.DefaultCrossSeedAutomationSettings(), nil + }, + completionSearchInvoker: func(_ context.Context, _ int, _ *qbt.Torrent, _ *models.CrossSeedAutomationSettings, _ *models.InstanceCrossSeedCompletionSettings) error { + invoked <- struct{}{} + return nil + }, + } + setCompletionCheckingTimings(svc, 5*time.Millisecond, 50*time.Millisecond) + + svc.HandleTorrentCompletion(context.Background(), 1, qbt.Torrent{ + Hash: hash, + Name: "checking-then-tagged", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + CompletionOn: 300, + }) + + select { + case <-invoked: + t.Fatal("completion search should be skipped after refreshed torrent gains cross-seed tag") + case <-time.After(100 * time.Millisecond): + } +} + +func TestWaitForCompletionTorrentReady_ReturnsNotCompleteAfterChecking(t *testing.T) { + hash := "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee" + svc := &Service{ + syncManager: newCompletionPollingSyncMock(map[string][]qbt.Torrent{ + hash: { + { + Hash: hash, + Name: "partial", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + }, + { + Hash: hash, + Name: "partial", + Progress: 0.27, + State: qbt.TorrentStatePausedUp, + }, + }, + }), + } + setCompletionCheckingTimings(svc, 5*time.Millisecond, 50*time.Millisecond) + + _, err := svc.waitForCompletionTorrentReady(context.Background(), 1, qbt.Torrent{ + Hash: hash, + Name: "partial", + }) + require.ErrorIs(t, err, ErrTorrentNotComplete) + require.Contains(t, err.Error(), "progress 0.27") +} + +func TestWaitForCompletionTorrentReady_TimesOutWhileChecking(t *testing.T) { + hash := "ffffffffffffffffffffffffffffffffffffffff" + svc := &Service{ + syncManager: newCompletionPollingSyncMock(map[string][]qbt.Torrent{ + hash: {{ + Hash: hash, + Name: "stuck-checking", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + }}, + }), + } + setCompletionCheckingTimings(svc, 5*time.Millisecond, 20*time.Millisecond) + setCompletionCheckingRetryPolicy(svc, 5*time.Millisecond, 1) + + _, err := svc.waitForCompletionTorrentReady(context.Background(), 1, qbt.Torrent{ + Hash: hash, + Name: "stuck-checking", + }) + require.EqualError(t, err, "completion torrent stuck-checking still checking after 20ms") +} + +func TestWaitForCompletionTorrentReady_DeduplicatesConcurrentWaiters(t *testing.T) { + hash := "9999999999999999999999999999999999999999" + syncMock := newCompletionPollingSyncMock(map[string][]qbt.Torrent{ + hash: { + { + Hash: hash, + Name: "shared-wait", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + }, + { + Hash: hash, + Name: "shared-wait", + Progress: 1.0, + State: qbt.TorrentStateUploading, + }, + }, + }) + syncMock.delay = 2 * time.Millisecond + + svc := &Service{ + syncManager: syncMock, + } + setCompletionCheckingTimings(svc, 5*time.Millisecond, 50*time.Millisecond) + + start := make(chan struct{}) + errs := make(chan error, 2) + + for range 2 { + go func() { + <-start + _, err := svc.waitForCompletionTorrentReady(context.Background(), 1, qbt.Torrent{ + Hash: hash, + Name: "shared-wait", + }) + errs <- err + }() + } + + close(start) + + for range 2 { + require.NoError(t, <-errs) + } + + require.Equal(t, 2, syncMock.hitCount(hash)) +} + +func TestWaitForCompletionTorrentReady_TimesOutAfterCheckingRetries(t *testing.T) { + hash := "1212121212121212121212121212121212121212" + syncMock := newCompletionPollingSyncMock(map[string][]qbt.Torrent{ + hash: {{ + Hash: hash, + Name: "retry-timeout", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + }}, + }) + + svc := &Service{ + syncManager: syncMock, + } + setCompletionCheckingTimings(svc, 5*time.Millisecond, 10*time.Millisecond) + setCompletionCheckingRetryPolicy(svc, 8*time.Millisecond, 3) + + _, err := svc.waitForCompletionTorrentReady(context.Background(), 1, qbt.Torrent{ + Hash: hash, + Name: "retry-timeout", + }) + require.EqualError(t, err, "completion torrent retry-timeout still checking after 10ms") + require.GreaterOrEqual(t, syncMock.hitCount(hash), 5) +} + +func TestWaitForCompletionTorrentReady_DeduplicatesConcurrentWaitersDuringRetryBackoff(t *testing.T) { + hash := "3434343434343434343434343434343434343434" + syncMock := newCompletionPollingSyncMock(map[string][]qbt.Torrent{ + hash: { + { + Hash: hash, + Name: "shared-retry-wait", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + }, + { + Hash: hash, + Name: "shared-retry-wait", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + }, + { + Hash: hash, + Name: "shared-retry-wait", + Progress: 0.27, + State: qbt.TorrentStateCheckingResumeData, + }, + { + Hash: hash, + Name: "shared-retry-wait", + Progress: 1.0, + State: qbt.TorrentStateUploading, + }, + }, + }) + + svc := &Service{ + syncManager: syncMock, + } + setCompletionCheckingTimings(svc, 5*time.Millisecond, 10*time.Millisecond) + setCompletionCheckingRetryPolicy(svc, 8*time.Millisecond, 3) + + start := make(chan struct{}) + errs := make(chan error, 2) + + for range 2 { + go func() { + <-start + _, err := svc.waitForCompletionTorrentReady(context.Background(), 1, qbt.Torrent{ + Hash: hash, + Name: "shared-retry-wait", + }) + errs <- err + }() + } + + close(start) + + for range 2 { + require.NoError(t, <-errs) + } + + require.Equal(t, 4, syncMock.hitCount(hash)) +} + func TestCompletionRetryDelay_FallbackRateLimitMessages(t *testing.T) { t.Parallel()