feat: SolidQueueAdapter — async broadcast queue for bsv-wallet-postgres#410
feat: SolidQueueAdapter — async broadcast queue for bsv-wallet-postgres#410
Conversation
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
40 examples covering constructor guards, interface compliance, enqueue (auto-fund and finalize paths), idempotent duplicate handling, status queries, worker success/failure/recovery paths, drain/shutdown, and thread safety. All specs tagged :postgres, use real PostgresStore, and mock the broadcaster to keep tests fast (poll_interval: 0.1s). Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Codecov Report❌ Patch coverage is
📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Pull request overview
Adds a PostgreSQL-backed asynchronous broadcast queue adapter to the bsv-wallet-postgres gem so wallets can defer transaction broadcast/promote/rollback work to a background worker thread, persisting jobs in the database for multi-process safety and restart recovery.
Changes:
- Introduces
BSV::Wallet::SolidQueueAdapterwith a polling worker usingFOR UPDATE SKIP LOCKED. - Adds migration
006to createwallet_broadcast_jobsfor persisted broadcast jobs + polling index. - Adds a dedicated spec suite for the adapter and updates PG spec helper cleanup to include the new table.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb | New async broadcast queue adapter + worker loop + promote/rollback integration |
| gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/migrations/006_create_broadcast_jobs.rb | New persisted jobs table for async broadcasting |
| gem/bsv-wallet-postgres/lib/bsv/wallet_postgres.rb | Autoload registration for SolidQueueAdapter |
| gem/bsv-wallet-postgres/spec/bsv/wallet_postgres/solid_queue_adapter_spec.rb | New specs covering enqueue/status/worker/recovery/drain/thread-safety |
| gem/bsv-wallet-postgres/spec/support/postgres_helper.rb | Adds wallet_broadcast_jobs to per-example truncation list |
| .claude/plans/20260412-403-solid-queue-adapter.md | Implementation plan documentation for Phase 2 |
| .claude/plans/20260412-380-broadcast-queue-phase1.md | Phase 1 plan documentation (added in this PR) |
| # for the same transaction. | ||
| # | ||
| # * +status+ — TEXT NOT NULL DEFAULT 'unsent'. State machine values: | ||
| # 'unsent', 'broadcasting', 'sent', 'failed'. |
| # supports the hot-path poll query: | ||
| # | ||
| # WHERE status = 'unsent' AND locked_at IS NULL | ||
| # ORDER BY created_at |
| DateTime :locked_at | ||
| DateTime :created_at, null: false, default: Sequel::CURRENT_TIMESTAMP | ||
| DateTime :updated_at, null: false, default: Sequel::CURRENT_TIMESTAMP |
| # Stops the worker and blocks until the current poll cycle completes. | ||
| # | ||
| # Safe to call when +start+ has not been called (+@worker_thread+ is nil). | ||
| # | ||
| # @return [void] | ||
| def drain | ||
| stop | ||
| @worker_thread&.join | ||
| end |
| def poll_once | ||
| @db.transaction do | ||
| job = @db[:wallet_broadcast_jobs] | ||
| .where(Sequel.lit( | ||
| "status = 'unsent' OR " \ | ||
| "(status = 'sending' AND locked_at < NOW() - interval '? second')", | ||
| @stale_threshold | ||
| )) | ||
| .order(:created_at) | ||
| .limit(1) | ||
| .for_update | ||
| .skip_locked | ||
| .first | ||
| return unless job | ||
|
|
||
| process_job(job) | ||
| end |
| return if running? | ||
|
|
||
| @mutex.synchronize { @running = true } | ||
| @worker_thread = Thread.new { worker_loop } |
| @db[:wallet_broadcast_jobs].where(id: job[:id]).update( | ||
| status: 'sending', | ||
| locked_at: Sequel.lit('NOW()'), | ||
| attempts: Sequel.lit('attempts + 1'), | ||
| updated_at: Sequel.lit('NOW()') | ||
| ) | ||
|
|
||
| tx = BSV::Transaction::Transaction.from_beef_hex(job[:beef_hex]) | ||
|
|
||
| input_outpoints = job[:input_outpoints]&.to_a | ||
| change_outpoints = job[:change_outpoints]&.to_a | ||
| txid = job[:txid] | ||
| fund_ref = job[:fund_ref] | ||
|
|
||
| begin | ||
| @broadcaster.broadcast(tx) | ||
| rescue StandardError => e | ||
| if input_outpoints | ||
| rollback(input_outpoints, change_outpoints, txid, fund_ref) | ||
| elsif txid | ||
| @storage.update_action_status(txid, 'failed') | ||
| end | ||
| @db[:wallet_broadcast_jobs].where(id: job[:id]).update( | ||
| status: 'failed', | ||
| last_error: e.message, | ||
| updated_at: Sequel.lit('NOW()') | ||
| ) | ||
| return | ||
| end | ||
|
|
||
| promote(input_outpoints, change_outpoints, txid) | ||
| @db[:wallet_broadcast_jobs].where(id: job[:id]).update( | ||
| status: 'completed', | ||
| updated_at: Sequel.lit('NOW()') | ||
| ) |
| allow(broadcaster).to receive(:broadcast).and_return(double('result')) # rubocop:disable RSpec/VerifiedDoubles | ||
| allow(BSV::Transaction::Transaction).to receive(:from_beef_hex).and_return(mock_tx) | ||
| adapter.start | ||
| sleep(0.5) | ||
| adapter.drain | ||
| end |
- Add MAX_ATTEMPTS (5) enforcement to poll query — permanently-failing jobs no longer retry indefinitely - Replace Sequel.lit placeholder inside SQL quotes with explicit integer interpolation for the interval expression - Guard empty-array outpoints in rollback path (nil vs [] semantics) - Document that process_job holds a DB connection during broadcast Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Fix migration comments: status values now match adapter code
('unsent', 'sending', 'completed', 'failed')
- Fix poll query example in migration to reflect actual query shape
- Use :timestamptz for locked_at/created_at/updated_at (matches
pending_since pattern from migration 004)
- Make start() check-and-set atomic under mutex (prevents TOCTOU
double-spawn)
- Guard from_beef_hex deserialization — corrupt jobs fail immediately
instead of retrying in a tight loop
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Adds a PostgreSQL-backed async broadcast queue adapter (BSV::Wallet::SolidQueueAdapter) to the bsv-wallet-postgres gem, enabling accept_delayed_broadcast: true to defer broadcast/promotion work to a background worker persisted in the database.
Changes:
- Introduces
wallet_broadcast_jobstable (migration 006) to persist async broadcast jobs. - Adds
BSV::Wallet::SolidQueueAdapterwith a polling worker thread usingFOR UPDATE SKIP LOCKED. - Adds a comprehensive spec file for enqueue/status/worker success+failure/recovery/shutdown behaviors and updates postgres spec cleanup tables.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
| gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb | Implements the async PG-backed broadcast queue adapter and worker loop. |
| gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/migrations/006_create_broadcast_jobs.rb | Adds schema for persisted broadcast jobs. |
| gem/bsv-wallet-postgres/spec/bsv/wallet_postgres/solid_queue_adapter_spec.rb | New specs covering adapter behavior and worker paths. |
| gem/bsv-wallet-postgres/spec/support/postgres_helper.rb | Adds wallet_broadcast_jobs to per-example truncation list. |
| gem/bsv-wallet-postgres/lib/bsv/wallet_postgres.rb | Autoloads SolidQueueAdapter. |
| .claude/plans/20260412-403-solid-queue-adapter.md | Implementation plan documentation for the adapter/migration/specs. |
| .claude/plans/20260412-380-broadcast-queue-phase1.md | Phase 1 plan documentation (added to repo). |
| # NOTE: The entire +process_job+ call (including the network broadcast) | ||
| # runs inside this transaction, holding the row lock and a database | ||
| # connection for the duration. This is acceptable for a single worker | ||
| # thread with an 8-second poll interval but would need restructuring | ||
| # for high-throughput multi-worker deployments. | ||
| # | ||
| # @return [void] | ||
| def poll_once | ||
| @db.transaction do | ||
| job = @db[:wallet_broadcast_jobs] | ||
| .where(Sequel.lit( | ||
| "status = 'unsent' OR " \ | ||
| "(status = 'sending' AND locked_at < NOW() - " \ | ||
| "#{@stale_threshold.to_i} * interval '1 second' " \ | ||
| "AND attempts < #{MAX_ATTEMPTS})" | ||
| )) | ||
| .order(:created_at) | ||
| .limit(1) | ||
| .for_update | ||
| .skip_locked | ||
| .first | ||
| return unless job | ||
|
|
||
| process_job(job) | ||
| end | ||
| end |
| # Stops the worker and blocks until the current poll cycle completes. | ||
| # | ||
| # Safe to call when +start+ has not been called (+@worker_thread+ is nil). | ||
| # | ||
| # @return [void] | ||
| def drain | ||
| stop | ||
| @worker_thread&.join | ||
| end |
| begin | ||
| @broadcaster.broadcast(tx) | ||
| rescue StandardError => e | ||
| if input_outpoints && !input_outpoints.empty? | ||
| rollback(input_outpoints, change_outpoints, txid, fund_ref) | ||
| elsif txid | ||
| @storage.update_action_status(txid, 'failed') | ||
| end | ||
| @db[:wallet_broadcast_jobs].where(id: job[:id]).update( |
| # Maximum number of broadcast attempts before a job is abandoned. | ||
| # After this many failures the job remains +failed+ permanently. | ||
| MAX_ATTEMPTS = 5 | ||
|
|
| def initialize(db:, storage:, broadcaster:, poll_interval: DEFAULT_POLL_INTERVAL, stale_threshold: STALE_THRESHOLD) | ||
| if storage.is_a?(BSV::Wallet::MemoryStore) | ||
| raise ArgumentError, 'SolidQueueAdapter requires a persistent storage adapter — MemoryStore is not supported' | ||
| end | ||
| raise ArgumentError, 'SolidQueueAdapter requires a broadcaster' if broadcaster.nil? | ||
|
|
||
| @db = db | ||
| @storage = storage | ||
| @broadcaster = broadcaster | ||
| @poll_interval = poll_interval | ||
| @stale_threshold = stale_threshold | ||
|
|
||
| @mutex = Mutex.new | ||
| @running = false | ||
| @worker_thread = nil | ||
| end |
| # Tracks outbound transactions that need to be broadcast to the network, | ||
| # with state machine, retry counters, and advisory locking support so that | ||
| # multiple worker processes can poll the queue without double-processing. | ||
| # | ||
| # === Columns | ||
| # | ||
| # * +txid+ — TEXT NOT NULL UNIQUE. Prevents duplicate queue entries | ||
| # for the same transaction. | ||
| # | ||
| # * +status+ — TEXT NOT NULL DEFAULT 'unsent'. State machine values: | ||
| # 'unsent', 'sending', 'completed', 'failed'. | ||
| # | ||
| # * +beef_hex+ — TEXT NOT NULL. Serialised BEEF payload to broadcast. | ||
| # | ||
| # * +input_outpoints+ — text[], nullable. Outpoints consumed by the tx, used | ||
| # to release UTXOs on success or rollback on failure. | ||
| # | ||
| # * +change_outpoints+ — text[], nullable. Change outpoints created by the tx, | ||
| # used to mark outputs spendable after confirmation. | ||
| # | ||
| # * +fund_ref+ — TEXT, nullable. Caller-supplied funding reference for | ||
| # observability and reconciliation. | ||
| # | ||
| # * +attempts+ — INTEGER NOT NULL DEFAULT 0. Incremented on each | ||
| # broadcast attempt; gates retry logic. | ||
| # | ||
| # * +last_error+ — TEXT, nullable. Message from the last failed attempt. | ||
| # | ||
| # * +locked_at+ — TIMESTAMPTZ, nullable. Set when a worker claims the | ||
| # job; cleared on completion or stale-lock recovery. | ||
| # |
| allow(broadcaster).to receive(:broadcast).and_return(double('result')) # rubocop:disable RSpec/VerifiedDoubles | ||
| allow(BSV::Transaction::Transaction).to receive(:from_beef_hex).and_return(mock_tx) | ||
| adapter.start | ||
| sleep(0.5) | ||
| adapter.drain | ||
| end |
Summary
BSV::Wallet::SolidQueueAdapter, a PostgreSQL-backed async broadcast queue implementing theBroadcastQueueinterface from [HLR] Implement accept_delayed_broadcast background processing #380wallet_broadcast_jobstable withFOR UPDATE SKIP LOCKEDpolling supportlocked_atdetection — no special recovery code neededTest plan
Closes #403
Generated with Claude Code