Skip to content

Commit a17b6f0

Browse files
Merge pull request #604 from Shopify/batch-csv-support
Support processing CSVs in batches
2 parents b00e2a4 + 84e0821 commit a17b6f0

File tree

9 files changed

+106
-3
lines changed

9 files changed

+106
-3
lines changed

README.md

+27
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,33 @@ The files uploaded to your Active Storage service provider will be renamed
118118
to include an ISO8601 timestamp and the Task name in snake case format.
119119
The CSV is expected to have a trailing newline at the end of the file.
120120

121+
#### Batch CSV Tasks
122+
123+
Tasks can process CSVs in batches. Add the `in_batches` option to your task's
124+
`csv_collection` macro:
125+
126+
```ruby
127+
# app/tasks/maintenance/batch_import_posts_task.rb
128+
129+
module Maintenance
130+
class BatchImportPostsTask < MaintenanceTasks::Task
131+
csv_collection(in_batches: 50)
132+
133+
def process(batch_of_rows)
134+
Post.insert_all(post_rows.map(&:to_h))
135+
end
136+
end
137+
end
138+
```
139+
140+
As with a regular CSV task, ensure you've implemented the following method:
141+
142+
* `process`: do the work of your Task on a batch (array of `CSV::Row` objects).
143+
144+
Note that `#count` is calculated automatically based on the number of batches in
145+
your collection, and your Task's progress will be displayed in terms of batches
146+
(not the total number of rows in your CSV).
147+
121148
### Processing Batch Collections
122149

123150
The Maintenance Tasks gem supports processing Active Records in batches. This

app/jobs/concerns/maintenance_tasks/task_job_concern.rb

+5
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,11 @@ def build_enumerator(_run, cursor:)
5757
)
5858
when Array
5959
enumerator_builder.build_array_enumerator(collection, cursor: cursor)
60+
when BatchCsvCollectionBuilder::BatchCsv
61+
JobIteration::CsvEnumerator.new(collection.csv).batches(
62+
batch_size: collection.batch_size,
63+
cursor: cursor,
64+
)
6065
when CSV
6166
JobIteration::CsvEnumerator.new(collection).rows(cursor: cursor)
6267
else
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
# frozen_string_literal: true
2+
3+
require "csv"
4+
5+
module MaintenanceTasks
6+
# Strategy for building a Task that processes CSV files in batches.
7+
#
8+
# @api private
9+
class BatchCsvCollectionBuilder < CsvCollectionBuilder
10+
BatchCsv = Struct.new(:csv, :batch_size, keyword_init: true)
11+
12+
# Initialize a BatchCsvCollectionBuilder with a batch size.
13+
#
14+
# @param batch_size [Integer] the number of CSV rows in a batch.
15+
def initialize(batch_size)
16+
@batch_size = batch_size
17+
super()
18+
end
19+
20+
# Defines the collection to be iterated over, based on the provided CSV.
21+
# Includes the CSV and the batch size.
22+
def collection(task)
23+
BatchCsv.new(
24+
csv: CSV.new(task.csv_content, headers: true),
25+
batch_size: @batch_size
26+
)
27+
end
28+
29+
# The number of batches to be processed. Excludes the header row from the
30+
# count and assumes a trailing newline is at the end of the CSV file.
31+
# Note that this number is an approximation based on the number of
32+
# newlines.
33+
#
34+
# @return [Integer] the approximate number of batches to process.
35+
def count(task)
36+
(task.csv_content.count("\n") + @batch_size - 1) / @batch_size
37+
end
38+
end
39+
end

app/models/maintenance_tasks/task.rb

+10-3
Original file line numberDiff line numberDiff line change
@@ -53,16 +53,23 @@ def available_tasks
5353

5454
# Make this Task a task that handles CSV.
5555
#
56+
# @param in_batches [Integer] optionally, supply a batch size if the CSV
57+
# should be processed in batches.
58+
#
5659
# An input to upload a CSV will be added in the form to start a Run. The
5760
# collection and count method are implemented.
58-
def csv_collection
61+
def csv_collection(in_batches: nil)
5962
unless defined?(ActiveStorage)
6063
raise NotImplementedError, "Active Storage needs to be installed\n"\
6164
"To resolve this issue run: bin/rails active_storage:install"
6265
end
6366

64-
self.collection_builder_strategy =
65-
MaintenanceTasks::CsvCollectionBuilder.new
67+
if in_batches
68+
self.collection_builder_strategy =
69+
BatchCsvCollectionBuilder.new(in_batches)
70+
else
71+
self.collection_builder_strategy = CsvCollectionBuilder.new
72+
end
6673
end
6774

6875
# Make this a Task that calls #process once, instead of iterating over
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
# frozen_string_literal: true
2+
3+
module Maintenance
4+
class BatchImportPostsTask < MaintenanceTasks::Task
5+
csv_collection(in_batches: 2)
6+
7+
def process(post_rows)
8+
Post.insert_all(post_rows.map(&:to_h))
9+
end
10+
end
11+
end

test/jobs/maintenance_tasks/task_job_test.rb

+11
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,17 @@ class << self
263263
assert_predicate run.reload, :succeeded?
264264
end
265265

266+
test ".perform_now accepts a CSV collection to be performed in batches" do
267+
Maintenance::BatchImportPostsTask.any_instance.expects(:process).times(3)
268+
269+
run = Run.new(task_name: "Maintenance::BatchImportPostsTask")
270+
run.csv_file.attach(
271+
{ io: File.open(file_fixture("sample.csv")), filename: "sample.csv" }
272+
)
273+
run.save
274+
TaskJob.perform_now(run)
275+
end
276+
266277
test ".perform_now sets the Run as errored when the Task collection is invalid" do
267278
freeze_time
268279
Maintenance::TestTask.any_instance.stubs(collection: "not a collection")

test/models/maintenance_tasks/task_data_test.rb

+1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class TaskDataTest < ActiveSupport::TestCase
2222

2323
test ".available_tasks returns a list of Tasks as TaskData, ordered alphabetically by name" do
2424
expected = [
25+
"Maintenance::BatchImportPostsTask",
2526
"Maintenance::CallbackTestTask",
2627
"Maintenance::CancelledEnqueueTask",
2728
"Maintenance::EnqueueErrorTask",

test/models/maintenance_tasks/task_test.rb

+1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ module MaintenanceTasks
66
class TaskTest < ActiveSupport::TestCase
77
test ".available_tasks returns list of tasks that inherit from the Task superclass" do
88
expected = [
9+
"Maintenance::BatchImportPostsTask",
910
"Maintenance::CallbackTestTask",
1011
"Maintenance::CancelledEnqueueTask",
1112
"Maintenance::EnqueueErrorTask",

test/system/maintenance_tasks/tasks_test.rb

+1
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ class TasksTest < ApplicationSystemTestCase
1818

1919
expected = [
2020
"New Tasks",
21+
"Maintenance::BatchImportPostsTask\nNew",
2122
"Maintenance::CallbackTestTask\nNew",
2223
"Maintenance::CancelledEnqueueTask\nNew",
2324
"Maintenance::EnqueueErrorTask\nNew",

0 commit comments

Comments
 (0)