Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions app/models/maintenance_tasks/parallel_executor.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# frozen_string_literal: true

module MaintenanceTasks
# Executes items in parallel using a thread pool.
#
# Handles thread creation, error collection, and ensures all threads
# complete before raising exceptions.
#
# @api private
class ParallelExecutor
class << self
# Executes a block for each item in parallel.
#
# @param items [Array] items to process
# @yield [item] block to execute for each item
# @return [void]
# @raise [StandardError] the first exception encountered during execution
def execute(items, &block)
exceptions = []
exception_mutex = Mutex.new

threads = items.map do |item|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Batches can be of arbitrary size, e.g. 1000+ items. There are risks of performance degradation / system instability in generating an unbounded number of threads. Should we implement some sort of thread pool with a configurable size?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(We may also want to coordinate with Rails' connection pool size, which defaults to 5 connections)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another idea is to make the thread count part of the API, ie. parallelize(threads: 5). I don't think we should tie thread count to the batch size though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, I don't want to allow people spawning unbounded number of threads if they just follow the conventions which for in_batches is 1000 elements per batch

Thread.new do
ActiveRecord::Base.connection_pool.with_connection do
block.call(item)
rescue => error
exception_mutex.synchronize do
exceptions << { item: item, error: error }
end
end
end
end

# Wait for all threads to complete
threads.each(&:join)

# Raise first exception if any occurred
raise_first_exception(exceptions) if exceptions.any?
end

private

# Raises the first exception from the collection.
#
# @param exceptions [Array<Hash>] array of {item:, error:} hashes
# @return [void]
# @raise [StandardError] the first error from the collection
def raise_first_exception(exceptions)
first_exception = exceptions.first

# Store context for error reporting (matches maintenance_tasks convention)
# The calling task will set @errored_element for error context
error = first_exception[:error]
error.define_singleton_method(:errored_item) { first_exception[:item] }

raise error
end
end
end
end
112 changes: 108 additions & 4 deletions app/models/maintenance_tasks/task.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ class NotFoundError < NameError; end
# @api private
class_attribute :status_reload_frequency, default: MaintenanceTasks.status_reload_frequency

# Whether this Task processes items in parallel.
#
# @api private
class_attribute :parallelized, default: false

define_callbacks :start, :complete, :error, :cancel, :pause, :interrupt

attr_accessor :metadata
Expand Down Expand Up @@ -113,6 +118,54 @@ def no_collection
self.collection_builder_strategy = MaintenanceTasks::NoCollectionBuilder.new
end

# Enable parallel processing for this Task.
#
# When enabled, the Task processes items in parallel using threads.
# Task authors define their collection with batching (using in_batches,
# csv_collection(in_batches:), or each_slice), and implement
# process_item(item) instead of process(item).
#
# @example ActiveRecord with batching
# class Maintenance::UpdateUsersTask < MaintenanceTasks::Task
# parallelize
#
# def collection
# User.where(status: 'pending').in_batches(of: 10)
# end
#
# def process_item(user)
# # This will be called in parallel (10 concurrent threads per batch)
# user.update!(status: 'processed')
# end
# end
#
# @note Cursor granularity: The cursor tracks batches, not individual items.
# If the task is interrupted mid-batch, items from that batch will be
# reprocessed on resume. Ensure your process_item method is idempotent.
#
# @note Thread safety requirements:
# - Your process_item method MUST be thread-safe
# - Avoid shared mutable state between items
# - Most ActiveRecord operations are thread-safe if each thread gets its own connection
# - ActiveRecord handles connection pooling automatically
#
# @note Error handling: If any thread raises an exception, the entire batch
# fails and the exception is propagated to the maintenance task's error handler.
# The first exception encountered is raised.
#
# @note Progress tracking: Progress is tracked per batch, not per item.
# The UI will show "X batches processed" rather than "X items processed".
def parallelize
self.parallelized = true
end

# Returns whether this Task processes items in parallel.
#
# @return [Boolean] whether the Task is parallelized.
def parallelized?
parallelized
end

delegate :has_csv_content?, :no_collection?, to: :collection_builder_strategy

# Processes one item.
Expand Down Expand Up @@ -306,12 +359,35 @@ def cursor_columns
# Placeholder method to raise in case a subclass fails to implement the
# expected instance method.
#
# @param _item [Object] the current item from the enumerator being iterated.
# When the Task is parallelized, this method processes a batch by spawning
# threads for parallel execution. Otherwise, it raises an error advising
# subclasses to implement an override.
#
# @raise [NotImplementedError] with a message advising subclasses to
# @param item_or_batch [Object] the current item from the enumerator being
# iterated, or a batch when parallelized.
#
# @raise [NoMethodError] with a message advising subclasses to
# implement an override for this method.
def process(item_or_batch)
if self.class.parallelized?
process_batch_in_parallel(item_or_batch)
else
raise NoMethodError, "#{self.class.name} must implement `process`."
end
end

# Task authors implement this method instead of process(item) when using
# parallelize. It will be called in parallel for each item in a batch.
#
# @param _item [Object] the individual item to process
#
# @raise [NoMethodError] with a message advising subclasses to
# implement an override for this method.
def process(_item)
raise NoMethodError, "#{self.class.name} must implement `process`."
def process_item(_item)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure exactly what to name this, but I think we need an API that's more distinct from #process that indicates that this is for parallel processing in a batch. Maybe #process_for_batch?

raise NoMethodError, <<~MSG.squish
#{self.class.name} must implement `process_item(item)` when using
parallelize.
MSG
end

# Total count of iterations to be performed, delegated to the strategy.
Expand All @@ -331,5 +407,33 @@ def count
def enumerator_builder(cursor:)
nil
end

# Returns whether this Task processes items in parallel.
#
# @return [Boolean] whether the Task is parallelized.
def parallelized?
self.class.parallelized?
end

private

# Process a batch by spawning threads for parallel execution.
# This is called by the process method when the Task is parallelized.
#
# @param batch [Object] batch (ActiveRecord::Relation, Array of items/rows)
def process_batch_in_parallel(batch)
# Convert batch to array of items
# ActiveRecord::Relation responds to to_a, arrays are already arrays
items = batch.respond_to?(:to_a) ? batch.to_a : Array(batch)

# Execute items in parallel, storing errored item for context
ParallelExecutor.execute(items) do |item|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like we could return the exceptions array ([{ item: <item> , error: <error> }]) directly from .execute instead of raising the error. This would simplify things a lot, ie.

class ParallelExecutor
  class << self
     def execute(items, &block)
       ...

       threads = items.map do |item|
         Thread.new do
            ActiveRecord::Base.connection_pool.with_connection do
              block.call(item)
            rescue => error
              exception_mutex.synchronize do
                exceptions << { item: item, error: error }
              end
            end
          end
        end

        threads.each(&:join)

        exceptions
      end
...

And then here:

exceptions =  ParallelExecutor.execute(items) do |item|
  process_item(item)
end

if exceptions.any?
  @errored_element = exceptions.first[:item]
  raise exceptions.first.error
end

process_item(item)
end
rescue => error
# Store the errored item for maintenance tasks error reporting
@errored_element = error.errored_item if error.respond_to?(:errored_item)
raise
end
end
end
3 changes: 2 additions & 1 deletion app/models/maintenance_tasks/task_data_index.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@ class << self
def available_tasks
tasks = []

task_names = Task.load_all.map(&:name)
# Filter out anonymous classes (nil names) that may exist from test suites
task_names = Task.load_all.map(&:name).compact

active_runs = Run.with_attached_csv.active.where(task_name: task_names)
active_runs.each do |run|
Expand Down
108 changes: 108 additions & 0 deletions test/models/maintenance_tasks/parallel_executor_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
# frozen_string_literal: true

require "test_helper"

module MaintenanceTasks
class ParallelExecutorTest < ActiveSupport::TestCase
test "executes block for each item in parallel" do
results = Concurrent::Array.new

items = [1, 2, 3, 4, 5]
ParallelExecutor.execute(items) do |item|
sleep(0.001) # Simulate work
results << item
end

assert_equal 5, results.size
assert_equal [1, 2, 3, 4, 5].sort, results.sort
end

test "uses multiple threads" do
thread_ids = Concurrent::Array.new

items = [1, 2, 3, 4, 5]
ParallelExecutor.execute(items) do |_item|
thread_ids << Thread.current.object_id
end

# Should use more than one thread
assert thread_ids.uniq.size > 1, "Expected multiple threads, got #{thread_ids.uniq.size}"
end

test "waits for all threads to complete" do
completion_order = Concurrent::Array.new

items = [1, 2, 3]
ParallelExecutor.execute(items) do |item|
# Item 2 finishes last
sleep(0.01) if item == 2
completion_order << item
end

# All 3 items should be completed
assert_equal 3, completion_order.size
end

test "raises first exception after all threads complete" do
processed = Concurrent::Array.new

items = [1, 2, 3, 4, 5]
error = assert_raises(StandardError) do
ParallelExecutor.execute(items) do |item|
sleep(0.001) # Give other threads time to run
processed << item
raise StandardError, "Error on item #{item}" if item == 3
end
end

assert_equal "Error on item 3", error.message

# All threads should have attempted to run
assert processed.size >= 4, "Expected at least 4 items processed"
end

test "attaches errored item to exception" do
items = [1, 2, 3]
error = assert_raises(StandardError) do
ParallelExecutor.execute(items) do |item|
raise StandardError, "Error" if item == 2
end
end

assert_respond_to error, :errored_item
assert_equal 2, error.errored_item
end

test "handles exceptions from ActiveRecord operations" do
Post.delete_all
post = Post.create!(title: "Test", content: "Content")

items = [post.id, post.id + 1, post.id + 2]
error = assert_raises(ActiveRecord::RecordNotFound) do
ParallelExecutor.execute(items) do |post_id|
Post.find(post_id) # Will raise for non-existent IDs
end
end

# Should capture the error
assert_kind_of(ActiveRecord::RecordNotFound, error)
ensure
Post.delete_all
end

test "each thread gets own database connection" do
connection_ids = Concurrent::Array.new

items = [1, 2, 3, 4, 5]
ParallelExecutor.execute(items) do |_item|
# Each thread should have its own connection from the pool
conn = ActiveRecord::Base.connection
connection_ids << conn.object_id
end

# Connections may be reused from the pool, but should be managed safely
# At minimum, verify we didn't crash due to connection issues
assert_equal 5, connection_ids.size
end
end
end
Loading