Skip to content

Commit 4d46b1e

Browse files
authored
Safer and more efficient email sends (forem#22784)
1 parent aecd525 commit 4d46b1e

File tree

6 files changed

+270
-115
lines changed

6 files changed

+270
-115
lines changed

app/models/email.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ class Email < ApplicationRecord
33
belongs_to :user_query, optional: true
44
has_many :email_messages
55

6-
after_save :deliver_to_users
6+
after_commit :deliver_to_users, on: [:create, :update]
77

88
validates :subject, presence: true
99
validates :body, presence: true
@@ -91,7 +91,7 @@ def deliver_to_test_emails(addresses_string)
9191

9292
def deliver_to_users
9393
return if type_of == "onboarding_drip"
94-
return if status != "active"
94+
return unless saved_change_to_status? && active?
9595

9696
Emails::EnqueueCustomBatchSendWorker.perform_async(id)
9797
update_columns(status: "delivered")

app/services/user_query_executor.rb

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,41 @@ def execute
4949
execute_safe_query
5050
end
5151

52+
def each_id_batch(batch_size: 1000)
53+
return unless block_given?
54+
return [] unless valid?
55+
56+
ReadOnlyDatabaseService.with_connection do |conn|
57+
setup_execution_environment(conn)
58+
final_query = user_query.substitute_variables(variables)
59+
safe_query = build_safe_query(final_query)
60+
61+
begin
62+
result = execute_with_timeout(conn, safe_query)
63+
return unless result.is_a?(PG::Result)
64+
65+
current_batch = []
66+
result.each do |row|
67+
user_id = row["id"] || row["user_id"] || row["users.id"]
68+
next unless user_id
69+
70+
current_batch << user_id.to_i
71+
72+
if current_batch.size >= batch_size
73+
yield current_batch
74+
current_batch = []
75+
end
76+
end
77+
78+
yield current_batch if current_batch.any?
79+
80+
update_execution_tracking
81+
rescue StandardError => e
82+
handle_execution_error(e)
83+
end
84+
end
85+
end
86+
5287
def test_execute(limit: MAX_TEST_USER_LIMIT)
5388
@limit = limit
5489
execute

app/workers/emails/enqueue_custom_batch_send_worker.rb

Lines changed: 92 additions & 105 deletions
Original file line numberDiff line numberDiff line change
@@ -5,124 +5,111 @@ class EnqueueCustomBatchSendWorker
55

66
sidekiq_options queue: :medium_priority, retry: 15
77

8-
BATCH_SIZE = Rails.env.production? ? 500 : 10
8+
BATCH_SIZE = Rails.env.production? ? 1000 : 10 # Increased batch size is safe with pluck
99

10-
def perform(email_id)
11-
# 1) Remember the old ENV timeout (milliseconds)
12-
original_timeout = ENV.fetch("STATEMENT_TIMEOUT", 10_000).to_i
10+
def perform(email_id, min_id = nil, max_id = nil)
11+
# 1) Define timeout locally (do not touch ENV)
12+
custom_timeout = 10_000
1313

14-
# 2) Check out exactly one connection for the entire block
15-
# Use read-only database if available, otherwise fall back to main database
14+
# 2) Open the connection block
1615
ReadOnlyDatabaseService.with_connection do |conn|
17-
# 3) Ensure subsequent PG SETs inherit that same original timeout
18-
# (in case some middleware or initializer reads ENV["STATEMENT_TIMEOUT"])
19-
ENV["STATEMENT_TIMEOUT"] = original_timeout.to_s
20-
conn.execute("SET statement_timeout TO #{original_timeout}")
16+
17+
# SAFE: Set timeout only on this current connection thread
18+
conn.execute("SET statement_timeout TO #{custom_timeout}")
2119

2220
email = Email.find_by(id: email_id)
2321
return unless email
2422

25-
user_scope = if email.user_query.present?
26-
# Use custom user query for targeting
27-
begin
28-
# Extract variables from email if available
29-
variables = extract_email_variables(email)
30-
executor = UserQueryExecutor.new(email.user_query, variables: variables)
31-
# Apply the same filtering as other paths to ensure proper notification settings and roles
32-
executor.execute
33-
.registered
34-
.joins(:notification_setting)
35-
.without_role(:suspended)
36-
.without_role(:spam)
37-
.where(notification_setting: { email_newsletter: true })
38-
.where.not(email: "")
39-
rescue StandardError => e
40-
Rails.logger.error("UserQuery execution failed for email #{email.id}: #{e.message}")
41-
User.none
42-
end
43-
elsif email.audience_segment
44-
email.audience_segment.users
45-
.registered
46-
.joins(:notification_setting)
47-
.without_role(:suspended)
48-
.without_role(:spam)
49-
.where(notification_setting: { email_newsletter: true })
50-
.where.not(email: "")
51-
else
52-
User.registered
53-
.joins(:notification_setting)
54-
.without_role(:suspended)
55-
.without_role(:spam)
56-
.where(notification_setting: { email_newsletter: true })
57-
.where.not(email: "")
58-
end
59-
60-
# 4) Now open a transaction. Everything inside here uses "SET LOCAL statement_timeout = 0"
61-
conn.transaction do
62-
# This sets statement_timeout=0 for *this* transaction only.
63-
# No matter how many queries ActiveRecord fires inside this block,
64-
# they all see infinite timeout.
65-
conn.execute("SET LOCAL statement_timeout TO 0")
66-
67-
# 5) Run your batches inside the same transaction, so every SELECT is "no timeout"
68-
batch_count = 0
69-
total_users = 0
70-
71-
user_scope.find_in_batches(batch_size: BATCH_SIZE) do |users_batch|
72-
batch_count += 1
73-
74-
# Skip empty batches
75-
next if users_batch.empty?
76-
77-
# Validate we have valid user IDs
78-
user_ids = users_batch.map(&:id).compact
79-
next if user_ids.empty?
80-
81-
total_users += user_ids.size
82-
83-
Rails.logger.info("Processing email batch #{batch_count} for email #{email.id}: #{user_ids.size} users (first ID: #{user_ids.first})")
84-
85-
begin
86-
Emails::BatchCustomSendWorker.perform_async(
87-
user_ids,
88-
email.subject,
89-
email.body,
90-
email.type_of,
91-
email.id,
92-
)
93-
rescue StandardError => e
94-
Rails.logger.error("Failed to enqueue batch #{batch_count} for email #{email.id}: #{e.message}")
95-
# Continue processing other batches even if one fails
96-
next
97-
end
98-
end
99-
100-
Rails.logger.info("Completed email processing for email #{email.id}: #{batch_count} batches, #{total_users} total users")
23+
if email.user_query.present?
24+
process_custom_query(email, min_id, max_id)
25+
else
26+
process_standard_scope(email, min_id, max_id)
10127
end
102-
# As soon as this transaction block ends, Postgres automatically reverts
103-
# statement_timeout back to the previous session value (original_timeout).
104-
ensure
105-
# 6) Just to be safe, set ENV back and explicitly restore on the connection
106-
ENV["STATEMENT_TIMEOUT"] = original_timeout.to_s
107-
conn.execute("SET statement_timeout TO #{ENV.fetch('STATEMENT_TIMEOUT', 10_000)}")
28+
29+
# Connection is returned to pool automatically;
30+
# Ideally, the pool creates new connections or resets them,
31+
# but resetting the timeout here is good practice if connections are reused immediately.
32+
conn.execute("RESET statement_timeout")
10833
end
10934
end
110-
end
111-
112-
private
11335

114-
def extract_email_variables(email)
115-
variables = {}
36+
private
37+
38+
def process_custom_query(email, min_id = nil, max_id = nil)
39+
variables = extract_email_variables(email)
40+
executor = UserQueryExecutor.new(email.user_query, variables: variables)
41+
42+
executor.each_id_batch(batch_size: 1000) do |id_batch|
43+
# Optional: Apply ID range filtering in Ruby for custom query results
44+
filtered_ids = id_batch
45+
filtered_ids = filtered_ids.select { |id| id >= min_id.to_i } if min_id
46+
filtered_ids = filtered_ids.select { |id| id <= max_id.to_i } if max_id
47+
48+
next if filtered_ids.empty?
49+
50+
# Load ONLY ids, don't instantiate User objects if possible
51+
filtered_user_ids = User.where(id: filtered_ids)
52+
.registered
53+
.joins(:notification_setting)
54+
.without_role(:suspended)
55+
.without_role(:spam)
56+
.where(notification_setting: { email_newsletter: true })
57+
.where.not(email: "")
58+
.pluck(:id)
59+
60+
enqueue_batch(email, filtered_user_ids, "Custom Query")
61+
end
62+
end
11663

117-
# Extract variables from email's variables field if it exists
118-
if email.respond_to?(:variables) && email.variables.present?
119-
begin
120-
variables.merge!(JSON.parse(email.variables))
121-
rescue JSON::ParserError
122-
Rails.logger.warn("Invalid variables JSON in email #{email.id}")
64+
def process_standard_scope(email, min_id = nil, max_id = nil)
65+
# Build the relation
66+
base_scope = User.registered
67+
.joins(:notification_setting)
68+
.without_role(:suspended)
69+
.without_role(:spam)
70+
.where(notification_setting: { email_newsletter: true })
71+
.where.not(email: "")
72+
73+
# Apply ID range filtering at the DB level for standard scopes
74+
base_scope = base_scope.where("users.id >= ?", min_id) if min_id
75+
base_scope = base_scope.where("users.id <= ?", max_id) if max_id
76+
77+
user_scope = if email.audience_segment
78+
base_scope.merge(email.audience_segment.users)
79+
else
80+
base_scope
81+
end
82+
83+
# OPTIMIZED: Use in_batches + pluck to avoid loading User models
84+
user_scope.in_batches(of: BATCH_SIZE) do |relation|
85+
# relation is an ActiveRecord::Relation for the batch (e.g. "WHERE id BETWEEN 1 and 1000")
86+
# .pluck(:id) executes "SELECT id FROM ..." directly
87+
user_ids = relation.pluck(:id)
88+
89+
enqueue_batch(email, user_ids, "Segment/Default")
12390
end
12491
end
12592

126-
variables
93+
def enqueue_batch(email, user_ids, source_label)
94+
return if user_ids.empty?
95+
96+
Rails.logger.info("Processing #{source_label} batch for email #{email.id}: #{user_ids.size} users")
97+
98+
Emails::BatchCustomSendWorker.perform_async(
99+
user_ids,
100+
email.subject,
101+
email.body,
102+
email.type_of,
103+
email.id,
104+
)
105+
end
106+
107+
def extract_email_variables(email)
108+
return {} unless email.respond_to?(:variables) && email.variables.present?
109+
JSON.parse(email.variables)
110+
rescue JSON::ParserError
111+
Rails.logger.warn("Invalid variables JSON in email #{email.id}")
112+
{}
113+
end
127114
end
128-
end
115+
end

spec/models/email_spec.rb

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,11 @@
77
end
88

99
describe "Callbacks" do
10-
it "calls #deliver_to_users after create" do
11-
email = build(:email)
12-
expect(email).to receive(:deliver_to_users)
13-
email.save
10+
it "registers #deliver_to_users as an after_commit callback" do
11+
# Verify the callback is registered in the chain
12+
# Note: checking private internal Rails structure is brittle but confirms configuration
13+
callback_names = Email._commit_callbacks.select { |cb| cb.kind == :after }.map(&:filter)
14+
expect(callback_names).to include(:deliver_to_users)
1415
end
1516
end
1617

@@ -27,7 +28,8 @@
2728

2829
it "does not enqueue any jobs to EnqueueCustomBatchSendWorker" do
2930
expect(Emails::EnqueueCustomBatchSendWorker).not_to receive(:perform_async)
30-
email.send(:deliver_to_users)
31+
# Manually trigger since after_commit doesn't run in transactional tests
32+
email.deliver_to_users
3133
end
3234
end
3335

@@ -36,7 +38,7 @@
3638

3739
it "does not enqueue any jobs to EnqueueCustomBatchSendWorker" do
3840
expect(Emails::EnqueueCustomBatchSendWorker).not_to receive(:perform_async)
39-
email.send(:deliver_to_users)
41+
email.deliver_to_users
4042
end
4143
end
4244

@@ -45,18 +47,28 @@
4547

4648
it "enqueues jobs to EnqueueCustomBatchSendWorker" do
4749
email.update(status: "active")
50+
email.deliver_to_users
4851
expect(Emails::EnqueueCustomBatchSendWorker).to have_received(:perform_async).with(email.id)
4952
end
5053

5154
it "only enqueues once even if re-saved" do
5255
email.update(status: "active")
56+
email.deliver_to_users
57+
expect(Emails::EnqueueCustomBatchSendWorker).to have_received(:perform_async).with(email.id)
58+
59+
# Clear expectations
60+
RSpec::Mocks.space.proxy_for(Emails::EnqueueCustomBatchSendWorker).reset
61+
allow(Emails::EnqueueCustomBatchSendWorker).to receive(:perform_async)
62+
5363
email.reload.save
54-
expect(Emails::EnqueueCustomBatchSendWorker).to have_received(:perform_async).with(email.id).once
64+
email.deliver_to_users
65+
expect(Emails::EnqueueCustomBatchSendWorker).not_to have_received(:perform_async)
5566
end
5667
end
5768

5869
it "updates the email status to 'delivered'" do
59-
email = create(:email, status: "active")
70+
email = create(:email, status: "draft") # Start as draft
71+
email.update(status: "active") # Make it active/dirty
6072
email.deliver_to_users
6173
expect(email.reload.status).to eq("delivered")
6274
end

0 commit comments

Comments
 (0)