Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 4 additions & 10 deletions app/services/prediction_results/process.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
module PredictionResults
class Process
SUBJECT_ACTION_API_BATCH_SIZE = ENV.fetch('SUBJECT_ACTION_API_BATCH_SIZE', '10').to_i
COMPLETION_NOTIFICATION_THRESHOLD = ENV.fetch('COMPLETION_NOTIFICATION_THRESHOLD', '0.95').to_f

attr_accessor :results_url, :subject_set_id, :probability_threshold,
:over_threshold_subject_ids, :under_threshold_subject_ids,
Expand All @@ -20,7 +19,6 @@ def initialize(results_url:, subject_set_id:, probability_threshold: 0.8, random
@under_threshold_subject_ids = []
@random_spice_subject_ids = []
@prediction_data = nil
@total_subjects = 0
end

def run
Expand All @@ -38,10 +36,11 @@ def run
remove_under_threshold_subjects_from_active_set
add_random_spice_subjects_to_active_set
end

schedule_subjects_retirement_check
end

def partition_results
@total_subjects = prediction_data.count
prediction_data.each do |subject_id, prediction_samples|
# data schema format is published in the file
# and https://github.com/zooniverse/bajor/blob/main/azure/batch/scripts/predict_on_catalog.py
Expand All @@ -53,7 +52,6 @@ def partition_results
@over_threshold_subject_ids << subject_id if probability >= probability_threshold
@under_threshold_subject_ids << subject_id if probability < probability_threshold
end
check_completion_and_notify
# now add some 'spice' to the results by adding some random under threshold subject ids
# but don't skew the prediction results by adding too many under threshold images
# ensure we only use apply the randomisation factor to the count of over threshold subject ids
Expand Down Expand Up @@ -90,12 +88,8 @@ def api_batch_bulk_job_args(subject_ids)
end

private
def check_completion_and_notify
total_under_threshold_subjects = @under_threshold_subject_ids.count
completion_rate = (total_under_threshold_subjects.to_f / @total_subjects.to_f)
if completion_rate >= COMPLETION_NOTIFICATION_THRESHOLD
NotifyProjectOwnerJob.perform_async(subject_set_id, completion_rate)
end
def schedule_subjects_retirement_check
SubjectsRetirementWorker.perform_async(subject_set_id)
end
end
end
17 changes: 4 additions & 13 deletions app/sidekiq/add_subject_to_subject_set_job.rb
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
# frozen_string_literal: true
require 'panoptes/api'
require 'concerns/panoptes_retry'

class AddSubjectToSubjectSetJob
include Sidekiq::Job
include PanoptesRetry

def perform(subject_ids, subject_set_id, max_retries = 3)
retries ||= 1
Sync do
with_panoptes_retry(max_retries: max_retries) do
Panoptes::Api.client.add_subjects_to_subject_set(subject_set_id, Array.wrap(subject_ids))
rescue Panoptes::Client::ServerError => e
# handle intermittent API errors like the following
# e.g. {"errors"=>[{"message"=>"Attempted to update a stale object: SubjectSet."}]}
retries += 1
raise e if retries > max_retries

# sleep for a random amount of time between 0 and max_retries and then retry the API client operation
sleep(rand(max_retries))
retry
ensure
Faraday.default_connection.close
end
end
end
21 changes: 21 additions & 0 deletions app/sidekiq/concerns/panoptes_retry.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# frozen_string_literal: true

module PanoptesRetry
extend ActiveSupport::Concern

def with_panoptes_retry(max_retries: 3)
attempts = 0

Sync do
yield
rescue Panoptes::Client::ServerError => e
attempts += 1
raise e if attempts >= max_retries

sleep(rand(max_retries))
retry
ensure
Faraday.default_connection.close
end
end
end
16 changes: 3 additions & 13 deletions app/sidekiq/remove_subject_from_subject_set_job.rb
Original file line number Diff line number Diff line change
@@ -1,32 +1,22 @@
# frozen_string_literal: true

require 'panoptes/api'
require 'concerns/panoptes_retry'

class RemoveSubjectFromSubjectSetJob
include Sidekiq::Job
include PanoptesRetry

def perform(subject_ids, subject_set_id, max_retries=3)
retries ||= 1
# format the subject ids into a comma separated string
# which the API expects for the destroy_links action
linked_subject_ids = Array.wrap(subject_ids).join(',')
# as the client subject set resource doesn't offer this method
# use the underlying client implementation to achieve it
# longer term I should add this to the client...
# apologies to all future selves for not doing so this time :sadpanda:
Sync do
with_panoptes_retry(max_retries: max_retries) do
Panoptes::Api.client.panoptes.delete("/subject_sets/#{subject_set_id}/links/subjects/#{linked_subject_ids}")
rescue Panoptes::Client::ServerError => e
# handle intermittent API errors like the following
# e.g. {"errors"=>[{"message"=>"Attempted to update a stale object: SubjectSet."}]}
retries += 1
raise e if retries > max_retries

# sleep for a random amount of time between 0 and max_retries and then retry the API client operation
sleep(rand(max_retries))
retry
ensure
Faraday.default_connection.close
end
end
end
33 changes: 33 additions & 0 deletions app/sidekiq/subjects_retirement_worker.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# frozen_string_literal: true
require 'panoptes/api'
require 'concerns/panoptes_retry'

class SubjectsRetirementWorker
include Sidekiq::Job
include PanoptesRetry

COMPLETION_NOTIFICATION_THRESHOLD = ENV.fetch('COMPLETION_NOTIFICATION_THRESHOLD', '0.95').to_f

def perform(subject_set_id, max_retries = 3)
context = Context.find_by(active_subject_set_id: subject_set_id)
return unless context&.workflow_id

workflow_id = context.workflow_id
with_panoptes_retry(max_retries: max_retries) do
resp = Panoptes::Api.client.subject_set(subject_set_id)
completeness = resp.fetch('completeness', {})[workflow_id.to_s]
notify_if_completion_threshold_met(subject_set_id, completeness)
end
end

private

def notify_if_completion_threshold_met(subject_set_id, completion_rate)
return if completion_rate.nil?

completion_rate = completion_rate.to_f
if completion_rate >= COMPLETION_NOTIFICATION_THRESHOLD
NotifyProjectOwnerJob.perform_async(subject_set_id, completion_rate)
end
end
end
17 changes: 6 additions & 11 deletions spec/services/prediction_results/process_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@
process_results_service.run
expect(process_results_service).to have_received(:add_random_spice_subjects_to_active_set)
end

it 'schedules the subject retirement worker' do
allow(SubjectsRetirementWorker).to receive(:perform_async)
process_results_service.run
expect(SubjectsRetirementWorker).to have_received(:perform_async).with(active_subject_set_id)
end
end

describe '#partition_results' do
Expand Down Expand Up @@ -94,17 +100,6 @@
expect(process_results_service.random_spice_subject_ids).to match_array([under_threshold_subject_id])
end

context 'when completion hits the notification threshold' do
before do
stub_const("PredictionResults::Process::COMPLETION_NOTIFICATION_THRESHOLD", 0.5)
allow(NotifyProjectOwnerJob).to receive(:perform_async)
end

it 'calls NotifyProjectOwnerJob for almost retired subjects' do
process_results_service.partition_results
expect(NotifyProjectOwnerJob).to have_received(:perform_async).with(active_subject_set_id, 0.5)
end
end
end

describe '#move_over_threshold_subjects_to_active_set' do
Expand Down
71 changes: 71 additions & 0 deletions spec/sidekiq/subjects_retirement_worker_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
# frozen_string_literal: true

require 'rails_helper'

RSpec.describe SubjectsRetirementWorker, type: :job do
describe '#perform' do
let(:worker) { described_class.new }
let(:context) { Context.first }
let(:subject_set_id) { context.active_subject_set_id }

before do
allow(worker).to receive(:with_panoptes_retry).and_yield
allow(NotifyProjectOwnerJob).to receive(:perform_async)
end

context 'when a context does not exist' do
it 'returns early when no context is found' do
local_subject_set_id = 20
worker.perform(local_subject_set_id)

expect(worker).not_to have_received(:with_panoptes_retry)
expect(NotifyProjectOwnerJob).not_to have_received(:perform_async)
end
end

context 'when a context with a workflow id exists' do
let(:workflow_id) { context.workflow_id }
let(:panoptes_client) { instance_double(Panoptes::Client) }

before do
allow(Panoptes::Api).to receive(:client).and_return(panoptes_client)
end

context 'and the Panoptes response lacks completeness' do
before do
allow(panoptes_client).to receive(:subject_set).with(subject_set_id).and_return('completeness' => {})
end

it 'does not enqueue a notification' do
worker.perform(subject_set_id)
expect(NotifyProjectOwnerJob).not_to have_received(:perform_async)
end
end

context 'and the completion rate is below the threshold' do
before do
allow(panoptes_client).to receive(:subject_set).with(subject_set_id).and_return('completeness' => { workflow_id.to_s => 0.25 })
end

it 'does not enqueue a notification' do
worker.perform(subject_set_id)
expect(NotifyProjectOwnerJob).not_to have_received(:perform_async)
end
end

context 'and the completion rate meets the threshold' do
let(:completeness_value) { 0.96 }

before do
allow(panoptes_client).to receive(:subject_set).with(subject_set_id).and_return('completeness' => { workflow_id.to_s => completeness_value })
end

it 'enqueues a notification with the completion rate' do
worker.perform(subject_set_id)
expect(NotifyProjectOwnerJob).to have_received(:perform_async).with(subject_set_id, completeness_value)
expect(worker).to have_received(:with_panoptes_retry).with(max_retries: 3)
end
end
end
end
end