Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 196 additions & 40 deletions internal/services/crossseed/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,23 +219,25 @@ type automationContext struct {
}

const (
searchResultCacheTTL = 5 * time.Minute
indexerDomainCacheTTL = 1 * time.Minute
contentFilteringWaitTimeout = 5 * time.Second
contentFilteringPollInterval = 150 * time.Millisecond
selectedIndexerContentSkipReason = "selected indexers were filtered out"
selectedIndexerCapabilitySkipReason = "selected indexers do not support required caps"
crossSeedRenameWaitTimeout = 15 * time.Second
crossSeedRenamePollInterval = 200 * time.Millisecond
automationSettingsQueryTimeout = 5 * time.Second
recheckPollInterval = 3 * time.Second // Batch API calls per instance
recheckAbsoluteTimeout = 60 * time.Minute // Allow time for large recheck queues
recheckAPITimeout = 30 * time.Second
minSearchIntervalSecondsTorznab = 60
minSearchIntervalSecondsGazelleOnly = 5
minSearchCooldownMinutes = 720
maxCompletionSearchAttempts = 3
defaultCompletionRetryDelay = 30 * time.Second
searchResultCacheTTL = 5 * time.Minute
indexerDomainCacheTTL = 1 * time.Minute
contentFilteringWaitTimeout = 5 * time.Second
contentFilteringPollInterval = 150 * time.Millisecond
selectedIndexerContentSkipReason = "selected indexers were filtered out"
selectedIndexerCapabilitySkipReason = "selected indexers do not support required caps"
crossSeedRenameWaitTimeout = 15 * time.Second
crossSeedRenamePollInterval = 200 * time.Millisecond
automationSettingsQueryTimeout = 5 * time.Second
recheckPollInterval = 3 * time.Second // Batch API calls per instance
recheckAbsoluteTimeout = 60 * time.Minute // Allow time for large recheck queues
recheckAPITimeout = 30 * time.Second
minSearchIntervalSecondsTorznab = 60
minSearchIntervalSecondsGazelleOnly = 5
minSearchCooldownMinutes = 720
maxCompletionSearchAttempts = 3
defaultCompletionRetryDelay = 30 * time.Second
defaultCompletionCheckingPollInterval = 2 * time.Second
defaultCompletionCheckingTimeout = 5 * time.Minute

// User-facing message when cross-seed is skipped due to recheck requirement
skippedRecheckMessage = "Skipped: requires recheck. Disable 'Skip recheck' in Cross-Seed settings to allow"
Expand Down Expand Up @@ -339,6 +341,9 @@ type Service struct {
// Ensures completion-triggered searches run serially per instance.
completionLaneMu sync.Mutex
completionLanes map[int]*completionLane
// Completion polling timings are injectable for tests; zero values use package defaults.
completionPollInterval time.Duration
completionTimeout time.Duration

// test hooks
crossSeedInvoker func(ctx context.Context, req *CrossSeedRequest) (*CrossSeedResponse, error)
Expand Down Expand Up @@ -418,6 +423,8 @@ func NewService(
dedupCache: dedupCache,
metrics: NewServiceMetrics(),
completionLanes: make(map[int]*completionLane),
completionPollInterval: defaultCompletionCheckingPollInterval,
completionTimeout: defaultCompletionCheckingTimeout,
recheckResumeChan: make(chan *pendingResume, 100),
recheckResumeCtx: recheckCtx,
recheckResumeCancel: recheckCancel,
Expand All @@ -429,6 +436,22 @@ func NewService(
return svc
}

func (s *Service) getCompletionPollInterval() time.Duration {
if s != nil && s.completionPollInterval > 0 {
return s.completionPollInterval
}

return defaultCompletionCheckingPollInterval
}

func (s *Service) getCompletionTimeout() time.Duration {
if s != nil && s.completionTimeout > 0 {
return s.completionTimeout
}

return defaultCompletionCheckingTimeout
}

// HealthCheck performs comprehensive health checks on the cross-seed service
func (s *Service) HealthCheck(ctx context.Context) error {
// Check if we can list instances
Expand Down Expand Up @@ -1410,34 +1433,43 @@ func (s *Service) HandleTorrentCompletion(ctx context.Context, instanceID int, t
}

if !completionSettings.Enabled {
log.Debug().
Int("instanceID", instanceID).
Str("hash", torrent.Hash).
Str("name", torrent.Name).
Msg("[CROSSSEED-COMPLETION] Completion search disabled for this instance")
logCompletionSkip(instanceID, &torrent, "[CROSSSEED-COMPLETION] Completion search disabled for this instance")
return
}

if torrent.CompletionOn <= 0 || torrent.Hash == "" {
// Safety check – the qbittorrent completion hook should only fire for completed torrents.
if shouldSkipCompletionTorrent(instanceID, &torrent, completionSettings) {
return
}

if hasCrossSeedTag(torrent.Tags) {
log.Debug().
lane := s.getCompletionLane(instanceID)
lane.mu.Lock()
defer lane.mu.Unlock()

readyTorrent, err := s.waitForCompletionTorrentReady(ctx, instanceID, torrent)
if err != nil {
log.Warn().
Err(err).
Int("instanceID", instanceID).
Str("hash", torrent.Hash).
Str("name", torrent.Name).
Msg("[CROSSSEED-COMPLETION] Skipping already tagged cross-seed torrent")
Msg("[CROSSSEED-COMPLETION] Failed to execute completion search")
return
}

if !matchesCompletionFilters(&torrent, completionSettings) {
log.Debug().
completionSettings, err = s.completionStore.Get(ctx, instanceID)
if err != nil {
log.Warn().
Err(err).
Int("instanceID", instanceID).
Str("hash", torrent.Hash).
Str("name", torrent.Name).
Msg("[CROSSSEED-COMPLETION] Torrent does not match completion filters")
Str("hash", readyTorrent.Hash).
Msg("[CROSSSEED-COMPLETION] Failed to reload instance completion settings")
return
}
if !completionSettings.Enabled {
logCompletionSkip(instanceID, readyTorrent, "[CROSSSEED-COMPLETION] Completion search disabled for this instance")
return
}
if shouldSkipCompletionTorrent(instanceID, readyTorrent, completionSettings) {
return
}

Expand All @@ -1447,29 +1479,56 @@ func (s *Service) HandleTorrentCompletion(ctx context.Context, instanceID int, t
log.Warn().
Err(err).
Int("instanceID", instanceID).
Str("hash", torrent.Hash).
Str("hash", readyTorrent.Hash).
Msg("[CROSSSEED-COMPLETION] Failed to load automation settings")
return
}
if settings == nil {
settings = models.DefaultCrossSeedAutomationSettings()
}

lane := s.getCompletionLane(instanceID)
lane.mu.Lock()
defer lane.mu.Unlock()

err = s.executeCompletionSearchWithRetry(ctx, instanceID, &torrent, settings, completionSettings)
err = s.executeCompletionSearchWithRetry(ctx, instanceID, readyTorrent, settings, completionSettings)
if err != nil {
log.Warn().
Err(err).
Int("instanceID", instanceID).
Str("hash", torrent.Hash).
Str("name", torrent.Name).
Str("hash", readyTorrent.Hash).
Str("name", readyTorrent.Name).
Msg("[CROSSSEED-COMPLETION] Failed to execute completion search")
}
}

func shouldSkipCompletionTorrent(instanceID int, torrent *qbt.Torrent, completionSettings *models.InstanceCrossSeedCompletionSettings) bool {
if torrent == nil {
return true
}

if torrent.CompletionOn <= 0 || torrent.Hash == "" {
// Safety check – the qbittorrent completion hook should only fire for completed torrents.
return true
}

if hasCrossSeedTag(torrent.Tags) {
logCompletionSkip(instanceID, torrent, "[CROSSSEED-COMPLETION] Skipping already tagged cross-seed torrent")
return true
}

if !matchesCompletionFilters(torrent, completionSettings) {
logCompletionSkip(instanceID, torrent, "[CROSSSEED-COMPLETION] Torrent does not match completion filters")
return true
}

return false
}

func logCompletionSkip(instanceID int, torrent *qbt.Torrent, message string) {
event := log.Debug().Int("instanceID", instanceID)
if torrent != nil {
event = event.Str("hash", torrent.Hash).Str("name", torrent.Name)
}
event.Msg(message)
}

func (s *Service) getCompletionLane(instanceID int) *completionLane {
s.completionLaneMu.Lock()
defer s.completionLaneMu.Unlock()
Expand All @@ -1486,6 +1545,103 @@ func (s *Service) getCompletionLane(instanceID int) *completionLane {
return lane
}

func (s *Service) waitForCompletionTorrentReady(ctx context.Context, instanceID int, eventTorrent qbt.Torrent) (*qbt.Torrent, error) {
current, err := s.getCompletionTorrent(ctx, instanceID, eventTorrent.Hash)
if err != nil {
return nil, err
}

completionTimeout := s.getCompletionTimeout()
completionPollInterval := s.getCompletionPollInterval()

if !isCompletionCheckingState(current.State) {
if current.Progress < 1.0 {
return nil, fmt.Errorf("%w: torrent %s is not fully downloaded (progress %.2f)", ErrTorrentNotComplete, current.Name, current.Progress)
}
return current, nil
}

log.Debug().
Int("instanceID", instanceID).
Str("hash", current.Hash).
Str("name", current.Name).
Str("state", string(current.State)).
Float64("progress", current.Progress).
Msg("[CROSSSEED-COMPLETION] Deferring completion search while torrent is checking")

timeout := time.NewTimer(completionTimeout)
defer timeout.Stop()

ticker := time.NewTicker(completionPollInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-timeout.C:
log.Warn().
Int("instanceID", instanceID).
Str("hash", current.Hash).
Str("name", current.Name).
Str("state", string(current.State)).
Float64("progress", current.Progress).
Dur("timeout", completionTimeout).
Msg("[CROSSSEED-COMPLETION] Timed out waiting for torrent checking to finish")
return nil, fmt.Errorf("completion torrent %s still checking after %s", current.Name, completionTimeout)
case <-ticker.C:
current, err = s.getCompletionTorrent(ctx, instanceID, eventTorrent.Hash)
if err != nil {
log.Warn().
Int("instanceID", instanceID).
Str("hash", eventTorrent.Hash).
Str("name", eventTorrent.Name).
Err(err).
Msg("[CROSSSEED-COMPLETION] Failed to refresh completion torrent while waiting for checking to finish")
continue
}
if isCompletionCheckingState(current.State) {
continue
}
if current.Progress < 1.0 {
log.Warn().
Int("instanceID", instanceID).
Str("hash", current.Hash).
Str("name", current.Name).
Str("state", string(current.State)).
Float64("progress", current.Progress).
Msg("[CROSSSEED-COMPLETION] Torrent finished checking but is still incomplete")
return nil, fmt.Errorf("%w: torrent %s is not fully downloaded (progress %.2f)", ErrTorrentNotComplete, current.Name, current.Progress)
}
return current, nil
}
}
}

func (s *Service) getCompletionTorrent(ctx context.Context, instanceID int, hash string) (*qbt.Torrent, error) {
apiCtx, cancel := context.WithTimeout(ctx, recheckAPITimeout)
defer cancel()

torrents, err := s.syncManager.GetTorrents(apiCtx, instanceID, qbt.TorrentFilterOptions{
Hashes: []string{hash},
})
if err != nil {
return nil, fmt.Errorf("load torrents: %w", err)
}
if len(torrents) == 0 {
return nil, fmt.Errorf("%w: torrent %s not found in instance %d", ErrTorrentNotFound, hash, instanceID)
}

torrent := torrents[0]
return &torrent, nil
}

func isCompletionCheckingState(state qbt.TorrentState) bool {
return state == qbt.TorrentStateCheckingDl ||
state == qbt.TorrentStateCheckingUp ||
state == qbt.TorrentStateCheckingResumeData
}

func (s *Service) executeCompletionSearchWithRetry(
ctx context.Context,
instanceID int,
Expand Down
Loading
Loading