Skip to content

Commit 6f24596

Browse files
sgbettclaude
andcommitted
fix: address review findings in SolidQueueAdapter
- Add MAX_ATTEMPTS (5) enforcement to poll query — permanently-failing jobs no longer retry indefinitely - Replace Sequel.lit placeholder inside SQL quotes with explicit integer interpolation for the interval expression - Guard empty-array outpoints in rollback path (nil vs [] semantics) - Document that process_job holds a DB connection during broadcast Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 6cc2fca commit 6f24596

File tree

1 file changed

+16
-4
lines changed

1 file changed

+16
-4
lines changed

gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,10 @@ class SolidQueueAdapter
4747
# and eligible for retry. Configurable for testability.
4848
STALE_THRESHOLD = 300
4949

50+
# Maximum number of broadcast attempts before a job is abandoned.
51+
# After this many failures the job remains +failed+ permanently.
52+
MAX_ATTEMPTS = 5
53+
5054
# @param db [Sequel::Database] a Sequel database handle (shared with PostgresStore)
5155
# @param storage [StorageAdapter] wallet storage adapter (must not be MemoryStore)
5256
# @param broadcaster [#broadcast] broadcaster object
@@ -184,16 +188,24 @@ def running?
184188
#
185189
# Uses +SELECT ... FOR UPDATE SKIP LOCKED+ so concurrent workers do not
186190
# double-process the same job. Stale +sending+ jobs (those whose +locked_at+
187-
# is older than +stale_threshold+ seconds) are also eligible for retry.
191+
# is older than +stale_threshold+ seconds) are also eligible for retry,
192+
# up to +MAX_ATTEMPTS+.
193+
#
194+
# NOTE: The entire +process_job+ call (including the network broadcast)
195+
# runs inside this transaction, holding the row lock and a database
196+
# connection for the duration. This is acceptable for a single worker
197+
# thread with an 8-second poll interval but would need restructuring
198+
# for high-throughput multi-worker deployments.
188199
#
189200
# @return [void]
190201
def poll_once
191202
@db.transaction do
192203
job = @db[:wallet_broadcast_jobs]
193204
.where(Sequel.lit(
194205
"status = 'unsent' OR " \
195-
"(status = 'sending' AND locked_at < NOW() - interval '? second')",
196-
@stale_threshold
206+
"(status = 'sending' AND locked_at < NOW() - " \
207+
"#{@stale_threshold.to_i} * interval '1 second' " \
208+
"AND attempts < #{MAX_ATTEMPTS})"
197209
))
198210
.order(:created_at)
199211
.limit(1)
@@ -229,7 +241,7 @@ def process_job(job)
229241
begin
230242
@broadcaster.broadcast(tx)
231243
rescue StandardError => e
232-
if input_outpoints
244+
if input_outpoints && !input_outpoints.empty?
233245
rollback(input_outpoints, change_outpoints, txid, fund_ref)
234246
elsif txid
235247
@storage.update_action_status(txid, 'failed')

0 commit comments

Comments
 (0)