Skip to content

Commit 6d4ae0c

Browse files
sgbettclaude
andcommitted
fix: address Copilot review findings
- Fix migration comments: status values now match adapter code ('unsent', 'sending', 'completed', 'failed') - Fix poll query example in migration to reflect actual query shape - Use :timestamptz for locked_at/created_at/updated_at (matches pending_since pattern from migration 004) - Make start() check-and-set atomic under mutex (prevents TOCTOU double-spawn) - Guard from_beef_hex deserialization — corrupt jobs fail immediately instead of retrying in a tight loop Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6f24596 commit 6d4ae0c

File tree

2 files changed

+27
-8
lines changed

2 files changed

+27
-8
lines changed

gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/migrations/006_create_broadcast_jobs.rb

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
# for the same transaction.
1313
#
1414
# * +status+ — TEXT NOT NULL DEFAULT 'unsent'. State machine values:
15-
# 'unsent', 'broadcasting', 'sent', 'failed'.
15+
# 'unsent', 'sending', 'completed', 'failed'.
1616
#
1717
# * +beef_hex+ — TEXT NOT NULL. Serialised BEEF payload to broadcast.
1818
#
@@ -42,8 +42,11 @@
4242
# A composite index on +(status, locked_at)+ (named +broadcast_jobs_poll_idx+)
4343
# supports the hot-path poll query:
4444
#
45-
# WHERE status = 'unsent' AND locked_at IS NULL
45+
# WHERE status = 'unsent'
46+
# OR (status = 'sending' AND locked_at < NOW() - interval '300 seconds'
47+
# AND attempts < 5)
4648
# ORDER BY created_at
49+
# FOR UPDATE SKIP LOCKED
4750
Sequel.migration do
4851
change do
4952
create_table(:wallet_broadcast_jobs) do
@@ -56,9 +59,9 @@
5659
String :fund_ref
5760
Integer :attempts, null: false, default: 0
5861
String :last_error, text: true
59-
DateTime :locked_at
60-
DateTime :created_at, null: false, default: Sequel::CURRENT_TIMESTAMP
61-
DateTime :updated_at, null: false, default: Sequel::CURRENT_TIMESTAMP
62+
column :locked_at, :timestamptz
63+
column :created_at, :timestamptz, null: false, default: Sequel::CURRENT_TIMESTAMP
64+
column :updated_at, :timestamptz, null: false, default: Sequel::CURRENT_TIMESTAMP
6265
index %i[status locked_at], name: :broadcast_jobs_poll_idx
6366
end
6467
end

gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb

Lines changed: 19 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,12 +131,16 @@ def status(txid)
131131
# Spawns the background worker thread.
132132
#
133133
# Safe to call multiple times — returns immediately if already running.
134+
# The check-and-set is atomic under the mutex to prevent two concurrent
135+
# +start+ calls from spawning duplicate worker threads.
134136
#
135137
# @return [void]
136138
def start
137-
return if running?
139+
@mutex.synchronize do
140+
return if @running || @worker_thread&.alive?
138141

139-
@mutex.synchronize { @running = true }
142+
@running = true
143+
end
140144
@worker_thread = Thread.new { worker_loop }
141145
end
142146

@@ -221,6 +225,9 @@ def poll_once
221225
# Marks a job as +sending+, broadcasts the transaction, and promotes or
222226
# rolls back wallet state based on the outcome.
223227
#
228+
# Deserialization failures (corrupt +beef_hex+) are caught and mark the
229+
# job as +failed+ immediately to prevent infinite retry loops.
230+
#
224231
# @param job [Hash] row from +wallet_broadcast_jobs+
225232
# @return [void]
226233
def process_job(job)
@@ -231,7 +238,16 @@ def process_job(job)
231238
updated_at: Sequel.lit('NOW()')
232239
)
233240

234-
tx = BSV::Transaction::Transaction.from_beef_hex(job[:beef_hex])
241+
begin
242+
tx = BSV::Transaction::Transaction.from_beef_hex(job[:beef_hex])
243+
rescue StandardError => e
244+
@db[:wallet_broadcast_jobs].where(id: job[:id]).update(
245+
status: 'failed',
246+
last_error: "Deserialization failed: #{e.message}",
247+
updated_at: Sequel.lit('NOW()')
248+
)
249+
return
250+
end
235251

236252
input_outpoints = job[:input_outpoints]&.to_a
237253
change_outpoints = job[:change_outpoints]&.to_a

0 commit comments

Comments
 (0)