Skip to content

Commit 77cf21b

Browse files
authored
Merge pull request #2167 from broadinstitute/development
Release 1.85.0
2 parents fa3e063 + 5db0893 commit 77cf21b

File tree

4 files changed

+98
-29
lines changed

4 files changed

+98
-29
lines changed

app/models/delete_queue_job.rb

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,50 @@ def self.prepare_file_for_deletion(study_file_id)
175175
file_type: 'DELETE')
176176
end
177177

178+
# do a targeted cleanup of data to allow a file to retry ingest after OOM failure
179+
def self.prepare_file_for_retry(study_file, action, cluster_name: nil)
180+
study_file.reload
181+
return false if study_file.queued_for_deletion
182+
183+
study = study_file.study
184+
case action.to_sym
185+
when :ingest_anndata
186+
# delete raw counts 'all cells' array since this may be extracted during this phase
187+
query = {
188+
name: 'h5ad_frag.matrix.raw.mtx.gz Cells', array_type: 'cells', linear_data_type: 'Study',
189+
linear_data_id: study.id, study_id: study.id, study_file_id: study_file.id, cluster_group_id: nil,
190+
subsample_annotation: nil, subsample_threshold: nil
191+
}
192+
DataArray.where(query).delete_all
193+
when :ingest_cell_metadata
194+
metadata = CellMetadatum.where(study:, study_file:)
195+
delete_arrays_by_query(study, study_file, 'CellMetadatum', metadata.pluck(:id))
196+
metadata.delete_all
197+
# now delete 'all cells' array
198+
query = {
199+
name: 'All Cells', array_type: 'cells', linear_data_type: 'Study', linear_data_id: study.id,
200+
study_id: study.id, study_file_id: study_file.id, cluster_group_id: nil, subsample_annotation: nil,
201+
subsample_threshold: nil
202+
}
203+
DataArray.where(query).delete_all
204+
when :ingest_expression
205+
genes = Gene.where(study:, study_file:)
206+
delete_arrays_by_query(study, study_file, 'Gene', genes.pluck(:id))
207+
genes.delete_all
208+
when :ingest_cluster
209+
cluster_group = study.cluster_groups.by_name(cluster_name)
210+
delete_arrays_by_query(study, study_file, 'ClusterGroup', [cluster_group.id])
211+
cluster_group.delete
212+
else
213+
# ingest_subsample and differential_expression require no extra data cleanup
214+
return false
215+
end
216+
end
217+
218+
def self.delete_arrays_by_query(study, study_file, linear_data_type, ids = [])
219+
DataArray.where(study:, study_file:, linear_data_type:, :linear_data_id.in => ids).delete_all
220+
end
221+
178222
private
179223

180224
# remove a study_file from a study_file_bundle, and clean original_file_list up as necessary

app/models/ingest_job.rb

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -398,20 +398,19 @@ def poll_for_completion(run_at: 1.minute.from_now)
398398
log_to_mixpanel # log before queuing file for deletion to preserve properties
399399
# don't delete files or notify users if this is a 'special action', like DE or image pipeline jobs
400400
subject = "Error: #{study_file.file_type} file: '#{study_file.upload_file_name}' parse has failed"
401-
retry_job = should_retry?
402-
handle_ingest_failure(subject, retry_job:) unless special_action?
401+
handle_ingest_failure(subject) unless (special_action? || should_retry?)
402+
403403
admin_email_content = generate_error_email_body(email_type: :dev)
404-
if retry_job && params_object&.next_machine_type
404+
if should_retry? && params_object&.next_machine_type
405405
new_machine = params_object.next_machine_type
406406
params_object.machine_type = new_machine
407-
cloned_file = study_file.clone
408-
# free up filename and other values so cloned_file can save properly, including deleting nested documents
409-
# this prevents Frozen BSON::Document error
410-
DeleteQueueJob.prepare_file_for_deletion(study_file.id)
411-
cloned_file.update!(parse_status: 'parsing')
412-
file_identifier = "#{cloned_file.upload_file_name}:#{cloned_file.id} (#{study.accession})"
407+
# run a selective cleanup to allow file to retry ingest on the next machine type
408+
# this leaves any prior ingested valid data in place, and only removes data associated with this exact run
409+
DeleteQueueJob.prepare_file_for_retry(study_file, action, cluster_name: params_object.try(:name))
410+
study_file.update!(parse_status: 'parsing')
411+
file_identifier = "#{study_file.upload_file_name}:#{study_file.id} (#{study.accession})"
413412
Rails.logger.info "Retrying #{action} after #{exit_code} failure for #{file_identifier} with machine_type: #{new_machine}"
414-
retry_job = IngestJob.new(study:, study_file: cloned_file, user:, params_object:, action:, persist_on_fail:)
413+
retry_job = IngestJob.new(study:, study_file:, user:, params_object:, action:, persist_on_fail:)
415414
retry_job.push_remote_and_launch_ingest
416415
# notify admins that the parse failed for visibility purposes
417416
SingleCellMailer.notify_admin_parse_fail(user.email, subject, admin_email_content).deliver_now
@@ -428,7 +427,7 @@ def poll_for_completion(run_at: 1.minute.from_now)
428427
# will automatically clean up data and notify user
429428
# in case of subsampling, only subsampled data cleanup is run and all other data is left in place
430429
# this reduces churn for study owners as full-resolution data is still valid
431-
def handle_ingest_failure(email_subject, retry_job: false)
430+
def handle_ingest_failure(email_subject)
432431
if action.to_sym == :ingest_subsample
433432
study_file.update(parse_status: 'parsed') # reset parse flag
434433
cluster_name = cluster_name_by_file_type
@@ -439,17 +438,15 @@ def handle_ingest_failure(email_subject, retry_job: false)
439438
create_study_file_copy
440439
study_file.update(parse_status: 'failed')
441440
DeleteQueueJob.new(study_file).delay.perform
442-
unless persist_on_fail || retry_job
441+
unless persist_on_fail
443442
ApplicationController.firecloud_client.delete_workspace_file(study.bucket_id, study_file.bucket_location)
444443
study_file.bundled_files.each do |bundled_file|
445444
ApplicationController.firecloud_client.delete_workspace_file(study.bucket_id, bundled_file.bucket_location)
446445
end
447446
end
448447
end
449-
unless retry_job
450-
user_email_content = generate_error_email_body
451-
SingleCellMailer.notify_user_parse_fail(user.email, email_subject, user_email_content, study).deliver_now
452-
end
448+
user_email_content = generate_error_email_body
449+
SingleCellMailer.notify_user_parse_fail(user.email, email_subject, user_email_content, study).deliver_now
453450
end
454451

455452
# TODO (SCP-4709, SCP-4710) Processed and Raw expression files

test/models/delete_queue_job_test.rb

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -228,9 +228,9 @@ class DeleteQueueJobTest < ActiveSupport::TestCase
228228
assert_not DataArray.where(study_id: @basic_study.id, name: 'All Cells').exists?
229229
end
230230

231-
test 'should prepare file for deletion to allow cloning' do
231+
test 'should prepare file for deletion' do
232232
study = FactoryBot.create(:detached_study,
233-
name_prefix: 'Clone Test',
233+
name_prefix: 'Deletion Prepare Test',
234234
user: @user,
235235
test_array: @@studies_to_clean)
236236
matrix = FactoryBot.create(:ann_data_file,
@@ -247,12 +247,43 @@ class DeleteQueueJobTest < ActiveSupport::TestCase
247247
'phex' => [['A', 0.3], ['B', 1.0], ['C', 0.5], ['D', 0.1]]
248248
})
249249

250-
cloned_matrix = matrix.clone
251250
DeleteQueueJob.prepare_file_for_deletion(matrix.id)
252251
matrix.reload
253252
assert matrix.is_deleting?
254-
assert cloned_matrix.valid?
255-
cloned_matrix.save!
256-
assert cloned_matrix.persisted?
253+
end
254+
255+
test 'should prepare file for retry' do
256+
study = FactoryBot.create(:detached_study,
257+
name_prefix: 'Retry Prepare Test',
258+
user: @user,
259+
test_array: @@studies_to_clean)
260+
matrix = FactoryBot.create(:ann_data_file,
261+
name: 'matrix.h5ad',
262+
study:,
263+
cell_input: %w[A B C D],
264+
annotation_input: [
265+
{ name: 'disease', type: 'group', values: %w[cancer cancer normal normal] }
266+
],
267+
coordinate_input: [
268+
{ tsne: { x: [1, 2, 3, 4], y: [5, 6, 7, 8] } }
269+
],
270+
expression_input: {
271+
'phex' => [['A', 0.3], ['B', 1.0], ['C', 0.5], ['D', 0.1]]
272+
})
273+
assert_equal 1, study.cell_metadata.count
274+
assert_equal 1, study.genes.count
275+
assert_equal 1, study.cluster_groups.count
276+
DeleteQueueJob.prepare_file_for_retry(matrix, :ingest_cell_metadata)
277+
assert_empty CellMetadatum.where(study:, study_file: matrix)
278+
assert_empty CellMetadatum.where(study:, study_file: matrix, linear_data_type: 'CellMetadatum')
279+
DeleteQueueJob.prepare_file_for_retry(matrix, :ingest_expression)
280+
assert_empty Gene.where(study:, study_file: matrix)
281+
assert_empty CellMetadatum.where(study:, study_file: matrix, linear_data_type: 'Gene')
282+
DeleteQueueJob.prepare_file_for_retry(matrix, :ingest_cluster, cluster_name: 'tsne')
283+
assert_empty ClusterGroup.where(study:, study_file: matrix, name: 'tsne')
284+
assert_empty CellMetadatum.where(study:, study_file: matrix, linear_data_type: 'ClusterGroup')
285+
DeleteQueueJob.prepare_file_for_retry(matrix, :ingest_anndata)
286+
study.reload
287+
assert_empty study.expression_matrix_cells(matrix, matrix_type: 'raw')
257288
end
258289
end

test/models/ingest_job_test.rb

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -929,12 +929,12 @@ class IngestJobTest < ActiveSupport::TestCase
929929
}.with_indifferent_access
930930
# first pipeline mock represents the failed OOM job
931931
pipeline_mock = Minitest::Mock.new
932-
8.times { pipeline_mock.expect :metadata, mock_metadata }
933-
5.times { pipeline_mock.expect :done?, true }
932+
9.times { pipeline_mock.expect :metadata, mock_metadata }
933+
6.times { pipeline_mock.expect :done?, true }
934934
3.times { pipeline_mock.expect :error, { code: 137, message: 'OOM exception' } }
935935
# must mock life_sciences_api_client getting pipeline metadata
936936
client_mock = Minitest::Mock.new
937-
16.times { client_mock.expect :get_pipeline, pipeline_mock, [{ name: pipeline_name }] }
937+
18.times { client_mock.expect :get_pipeline, pipeline_mock, [{ name: pipeline_name }] }
938938
# new pipeline mock is resubmitted job with larger machine_type
939939
new_pipeline = Minitest::Mock.new
940940
new_op = Google::Apis::LifesciencesV2beta::Operation.new(name: 'oom-retry')
@@ -947,17 +947,14 @@ class IngestJobTest < ActiveSupport::TestCase
947947
# also prevents mock 'unexpected arguments' errors that can happen
948948
client_mock.expect :run_pipeline, new_op do |args|
949949
args[:study_file].upload_file_name == study_file.upload_file_name &&
950-
args[:study_file].id.to_s != study_file.id.to_s && # this should be a new file with the same name
950+
args[:study_file].id.to_s == study_file.id.to_s && # this should be the exact same file
951951
args[:action] == :ingest_anndata &&
952952
args[:params_object].machine_type == 'n2d-highmem-16'
953953
end
954954
terra_mock = Minitest::Mock.new
955955
terra_mock.expect :get_workspace_file,
956956
Google::Cloud::Storage::File.new,
957957
[bucket, study_file.bucket_location]
958-
terra_mock.expect :execute_gcloud_method,
959-
Google::Cloud::Storage::File.new,
960-
[:copy_workspace_file, 0, bucket, study_file.bucket_location, study_file.parse_fail_bucket_location]
961958
terra_mock.expect :workspace_file_exists?,
962959
false,
963960
[bucket, String]

0 commit comments

Comments
 (0)