From 1d2768d218fdfc2deefd5deed92ae98e7e6555e1 Mon Sep 17 00:00:00 2001 From: Simon Bettison Date: Sun, 12 Apr 2026 20:08:02 +0100 Subject: [PATCH 1/8] chore: add implementation plans for broadcast queue phases Co-Authored-By: Claude Opus 4.6 (1M context) --- .../20260412-380-broadcast-queue-phase1.md | 258 ++++++++++++++++++ .../plans/20260412-403-solid-queue-adapter.md | 167 ++++++++++++ 2 files changed, 425 insertions(+) create mode 100644 .claude/plans/20260412-380-broadcast-queue-phase1.md create mode 100644 .claude/plans/20260412-403-solid-queue-adapter.md diff --git a/.claude/plans/20260412-380-broadcast-queue-phase1.md b/.claude/plans/20260412-380-broadcast-queue-phase1.md new file mode 100644 index 00000000..0e4cd416 --- /dev/null +++ b/.claude/plans/20260412-380-broadcast-queue-phase1.md @@ -0,0 +1,258 @@ +# Plan: HLR #380 Phase 1 — BroadcastQueue Interface, InlineQueue, MemoryStore Demotion + +## Context + +`accept_delayed_broadcast: true` is stubbed — it logs a warning and broadcasts synchronously. The TS SDK uses a full server-side `Monitor` + database-backed state machine, but analysis shows this is over-engineered for Ruby's single-process wallet. The pragmatic approach: a pluggable `BroadcastQueue` interface with an `InlineQueue` default (synchronous), enabling async adapters (SolidQueue, Sidekiq) in follow-up gems. + +MemoryStore is simultaneously demoted to test-only since it has no persistence guarantees for production wallet use, and async broadcast requires a persistent storage backend. + +**Goal:** Zero functional change for existing consumers. The stubs become real code, the architecture becomes extensible, and the foundation is laid for Phase 2/3 async adapters. + +## Architecture + +``` +WalletClient + └── @broadcast_queue (BroadcastQueue) + ├── InlineQueue (bsv-wallet, default — synchronous) + ├── SolidQueueAdapter (bsv-wallet-postgres, Phase 2) + └── SidekiqAdapter (bsv-wallet-redis, Phase 3) +``` + +The queue receives a payload hash containing everything needed to broadcast and promote/rollback. InlineQueue executes immediately; async adapters persist the job for a worker. + +## Tasks + +### Task 1: BroadcastQueue interface module + +**New file:** `gem/bsv-wallet/lib/bsv/wallet_interface/broadcast_queue.rb` + +Duck-typed module following the `StorageAdapter` pattern: + +```ruby +module BSV::Wallet::BroadcastQueue + def enqueue(payload) # => Hash (result) + raise NotImplementedError, "#{self.class}#enqueue not implemented" + end + + def async? # => Boolean + false + end + + def status(txid) # => String or nil + raise NotImplementedError, "#{self.class}#status not implemented" + end + + # Shared helper: map broadcast exceptions to status strings. + # Moved from WalletClient#broadcast_status_for. + def self.status_for_error(error) + return 'serviceError' unless error.is_a?(BSV::Network::BroadcastError) + arc = error.arc_status.to_s.upcase + return 'doubleSpend' if arc == 'DOUBLE_SPEND_ATTEMPTED' + invalid = %w[REJECTED INVALID MALFORMED MINED_IN_STALE_BLOCK] + return 'invalidTx' if invalid.include?(arc) || arc.include?('ORPHAN') + 'serviceError' + end +end +``` + +**Payload contract** (the `enqueue` argument): + +```ruby +{ + tx: Transaction, # Signed tx object (for InlineQueue) + txid: String, # Hex txid + beef_binary: String, # Raw BEEF bytes (for serialisation/return) + input_outpoints: Array, # Locked input outpoints (nil for finalize path) + change_outpoints: Array, # Change outpoints (nil for finalize path) + fund_ref: String, # Fund reference for rollback (nil for finalize path) + accept_delayed_broadcast: Boolean # From caller options +} +``` + +**Add autoload** in `gem/bsv-wallet/lib/bsv/wallet_interface.rb`. + +### Task 2: InlineQueue default adapter + +**New file:** `gem/bsv-wallet/lib/bsv/wallet_interface/inline_queue.rb` + +```ruby +class BSV::Wallet::InlineQueue + include BSV::Wallet::BroadcastQueue + + def initialize(broadcaster: nil, storage:) + @broadcaster = broadcaster + @storage = storage + end + + def async? + false + end + + def status(txid) + actions = @storage.find_actions({ txid: txid, limit: 1, offset: 0 }) + actions.first&.dig(:status) + end + + def enqueue(payload) + if @broadcaster + broadcast_and_promote(payload) + else + promote_without_broadcast(payload) + end + end +end +``` + +`broadcast_and_promote` consolidates the logic currently at `wallet_client.rb:1233-1251`: +- Call `@broadcaster.broadcast(payload[:tx])` +- On success: promote inputs → `:spent`, change → `:spendable`, action → `'completed'` +- On failure: rollback inputs → `:spendable`, delete change outputs, action → `'failed'` +- Return result hash with `:txid`, `:tx`, `:broadcast_result`/`:broadcast_error`, `:broadcast_status` + +`promote_without_broadcast` consolidates the no-broadcaster fallback at `wallet_client.rb:733-748`: +- Promote inputs → `:spent`, change → `:spendable` +- Set action status to `'unproven'` if `accept_delayed_broadcast`, else `'completed'` +- Return `{ txid:, tx: }` (BEEF for manual broadcast) + +The rollback logic (`release_pending_utxos` pattern) is replicated here (~8 lines) rather than calling back into WalletClient. This avoids circular coupling. + +**Add autoload** in `gem/bsv-wallet/lib/bsv/wallet_interface.rb`. + +### Task 3: Wire WalletClient to use the queue + +**Modify:** `gem/bsv-wallet/lib/bsv/wallet_interface/wallet_client.rb` + +**Constructor** (line ~63): Add `broadcast_queue: nil` parameter. Auto-create if not provided: +```ruby +@broadcast_queue = broadcast_queue || InlineQueue.new( + broadcaster: @broadcaster, + storage: @storage +) +``` + +Add `attr_reader :broadcast_queue`. + +**Auto-fund path** (lines 729-749): Replace the `if broadcast_enabled? ... else ...` block with: +```ruby +@broadcast_queue.enqueue( + tx: tx, txid: txid, beef_binary: beef_binary, + input_outpoints: selected_outpoints, + change_outpoints: change_outpoints, + fund_ref: fund_ref, + accept_delayed_broadcast: args.dig(:options, :accept_delayed_broadcast) +) +``` + +This eliminates the `accept_delayed_broadcast` stub warning and the manual promote-without-broadcast fallback. + +**Finalize path** (lines 1372-1395): Replace the `elsif broadcast_enabled? ... else ...` block with: +```ruby +result.merge!( + @broadcast_queue.enqueue( + tx: tx, txid: txid, beef_binary: beef_binary, + input_outpoints: nil, change_outpoints: nil, fund_ref: nil, + accept_delayed_broadcast: delayed + ) +) +``` + +InlineQueue detects `input_outpoints: nil` and skips UTXO state transitions (finalize path has no locked inputs/change). + +**`broadcast_and_promote` private method**: Becomes a thin delegate to `@broadcast_queue.enqueue` for any remaining internal callers, or is removed if `promote_no_send` is also refactored. + +**`promote_no_send`** (line ~1299): This is called from `send_with` and promotes to `'unproven'` not `'completed'`. Keep this method as-is for now — it has different promotion semantics (unproven + pending index cleanup) that don't fit the queue pattern cleanly. Phase 2 can address this. + +**`broadcast_status_for`**: Delegate to `BroadcastQueue.status_for_error(e)`. Keep the private method as a thin wrapper for backward compat. + +**`broadcast_enabled?`**: Unchanged — still returns `!@broadcaster.nil?`. + +### Task 4: MemoryStore demotion + +**Modify:** `gem/bsv-wallet/lib/bsv/wallet_interface/memory_store.rb` + +Add to `initialize`: +```ruby +if self.class.warn_in_production? && production_env? + warn '[bsv-wallet] MemoryStore is intended for testing only. ' \ + 'Use PostgresStore for production wallets. ' \ + 'Set BSV_MEMORY_STORE_OK=1 to silence this warning.' +end +``` + +Add class methods: +```ruby +def self.warn_in_production? = @warn_in_production != false +def self.warn_in_production=(val) = (@warn_in_production = val) +``` + +Add private helper: +```ruby +def production_env? + env = ENV['RACK_ENV'] || ENV['RAILS_ENV'] || ENV['APP_ENV'] + env && %w[production staging].include?(env) && !ENV['BSV_MEMORY_STORE_OK'] +end +``` + +No impact on test suites (they don't set production env vars). + +### Task 5: Specs + +**New:** `gem/bsv-wallet/spec/bsv/wallet_interface/broadcast_queue_spec.rb` +- Module raises `NotImplementedError` for `enqueue` and `status` +- `async?` defaults to `false` +- `BroadcastQueue.status_for_error` maps errors correctly + +**New:** `gem/bsv-wallet/spec/bsv/wallet_interface/inline_queue_spec.rb` +- With broadcaster, success: promotes state, returns `broadcast_status: 'success'` +- With broadcaster, failure: rolls back, returns `broadcast_error` +- Without broadcaster: promotes immediately, returns BEEF +- Without broadcaster + `accept_delayed_broadcast`: status `'unproven'` +- `async?` returns `false` +- `status(txid)` returns action status from storage +- Finalize path (nil outpoints): only updates action status, no UTXO transitions + +**Modify:** `gem/bsv-wallet/spec/bsv/wallet_interface/wallet_client_spec.rb` +- Remove `warn` expectations from `accept_delayed_broadcast` tests (stubs gone) +- Add test: custom `broadcast_queue:` is used when provided +- Add test: auto-created InlineQueue is used by default +- Verify `broadcast_enabled?` still reflects `@broadcaster` + +**Modify:** `gem/bsv-wallet/spec/bsv/wallet_interface/memory_store_spec.rb` +- Test production warning emitted when `RACK_ENV=production` +- Test warning suppressed by `BSV_MEMORY_STORE_OK=1` +- Test warning suppressed by `MemoryStore.warn_in_production = false` +- Test no warning in test/development/unset env + +**Existing specs** (`broadcast_rollback_spec.rb`, `auto_funding_spec.rb`): Should pass without changes since InlineQueue replicates the same logic. Run full suite to verify. + +## Critical files + +| File | Action | +|------|--------| +| `gem/bsv-wallet/lib/bsv/wallet_interface/broadcast_queue.rb` | Create | +| `gem/bsv-wallet/lib/bsv/wallet_interface/inline_queue.rb` | Create | +| `gem/bsv-wallet/lib/bsv/wallet_interface.rb` | Modify (autoloads) | +| `gem/bsv-wallet/lib/bsv/wallet_interface/wallet_client.rb` | Modify (constructor, auto-fund path, finalize path) | +| `gem/bsv-wallet/lib/bsv/wallet_interface/memory_store.rb` | Modify (production warning) | + +## Verification + +```bash +cd /opt/ruby/bsv-ruby-sdk +bundle exec rake spec:wallet # All wallet specs pass +bundle exec rubocop gem/bsv-wallet/ # Clean +``` + +Specific checks: +1. Existing `broadcast_rollback_spec.rb` passes unchanged (InlineQueue produces identical results) +2. Existing `accept_delayed_broadcast` specs pass with updated expectations (no warn, same status) +3. `auto_funding_spec.rb` passes unchanged +4. New InlineQueue specs cover all four paths (broadcast+success, broadcast+fail, no-broadcast, no-broadcast+delayed) +5. MemoryStore warning only fires in production-like environments + +## Sequencing + +``` +Task 1 (BroadcastQueue module) ──→ Task 2 (InlineQueue) ──→ Task 3 (WalletClient wiring) ──→ Task 5 (specs) +Task 4 (MemoryStore demotion) ── independent, parallel with any task +``` diff --git a/.claude/plans/20260412-403-solid-queue-adapter.md b/.claude/plans/20260412-403-solid-queue-adapter.md new file mode 100644 index 00000000..ea0fd4c0 --- /dev/null +++ b/.claude/plans/20260412-403-solid-queue-adapter.md @@ -0,0 +1,167 @@ +# Plan: HLR #403 — SolidQueueAdapter for bsv-wallet-postgres + +## Context + +Phase 1 (#380) delivered the pluggable `BroadcastQueue` interface and synchronous `InlineQueue` default. Phase 2 delivers `SolidQueueAdapter` — a PostgreSQL-backed async adapter so `accept_delayed_broadcast: true` actually defers broadcast to a background worker thread. + +The adapter uses the same pattern as PostgresStore: caller owns the `Sequel::Database`, adapter is stateless beyond its dependencies. A background `Thread` polls a `wallet_broadcast_jobs` table, broadcasts transactions, and promotes/rolls back wallet state using the same logic as `InlineQueue`. + +## Architecture + +``` +WalletClient + └── @broadcast_queue = SolidQueueAdapter.new(db:, storage:, broadcaster:) + ├── enqueue(payload) → INSERT into wallet_broadcast_jobs, return immediately + ├── poll_once() → SELECT FOR UPDATE SKIP LOCKED, broadcast, promote/rollback + └── drain() → stop + join worker thread +``` + +## Tasks + +### Task 1: Migration 006 — broadcast jobs table + +**New file:** `gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/migrations/006_create_broadcast_jobs.rb` + +```ruby +create_table(:wallet_broadcast_jobs) do + primary_key :id, type: :Bignum + String :txid, null: false, unique: true + String :status, null: false, default: 'unsent' # unsent|sending|completed|failed + String :beef_hex, text: true, null: false + column :input_outpoints, 'text[]' # nil for finalize path + column :change_outpoints, 'text[]' # nil for finalize path + String :fund_ref + Integer :attempts, null: false, default: 0 + String :last_error, text: true + DateTime :locked_at + DateTime :created_at, null: false, default: Sequel::CURRENT_TIMESTAMP + DateTime :updated_at, null: false, default: Sequel::CURRENT_TIMESTAMP + index [:status, :locked_at], name: :broadcast_jobs_poll_idx +end +``` + +Key decisions: +- `beef_hex` stores BEEF as hex (not bytea) — human-readable for debugging, reconstructible via `Transaction.from_beef_hex` +- `text[]` for outpoints matches the existing `tags` column pattern in `wallet_outputs` +- `FOR UPDATE SKIP LOCKED`-friendly via `locked_at` + `status` index (multi-process safe) +- Unique on `txid` prevents duplicate enqueues + +### Task 2: SolidQueueAdapter class + +**New file:** `gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb` + +**Namespace:** `BSV::Wallet::SolidQueueAdapter` (consistent with `BSV::Wallet::PostgresStore`) + +**Constructor:** +```ruby +def initialize(db:, storage:, broadcaster:, poll_interval: 8) +``` +- Raises `ArgumentError` if `storage.is_a?(BSV::Wallet::MemoryStore)` (MemoryStore guard) +- Raises `ArgumentError` if `broadcaster.nil?` (async without broadcaster is meaningless) +- Shares the same `db` as PostgresStore — Sequel's connection pool is thread-safe + +**Public methods:** + +| Method | Behaviour | +|--------|-----------| +| `async?` | Returns `true` | +| `enqueue(payload)` | INSERT into `wallet_broadcast_jobs` with status `'unsent'`; returns `{ txid:, broadcast_status: 'sending' }` | +| `status(txid)` | `@db[:wallet_broadcast_jobs].where(txid:).get(:status)` | +| `start` | Spawns `@worker_thread` with polling loop | +| `stop` | Sets `@running = false` (non-blocking) | +| `drain` | Sets `@running = false` + `@worker_thread.join` (blocks until done) | + +**Worker loop (`poll_once` private):** +1. `SELECT ... WHERE status = 'unsent' OR (status = 'sending' AND locked_at < NOW() - 300s) ORDER BY created_at LIMIT 1 FOR UPDATE SKIP LOCKED` +2. Mark row as `'sending'`, set `locked_at`, increment `attempts` +3. Reconstruct tx via `Transaction.from_beef_hex(job[:beef_hex])` +4. Call `@broadcaster.broadcast(tx)` +5. **Success:** promote state (inputs → spent, change → spendable, action → completed), mark job `'completed'` +6. **Failure:** rollback state (release inputs matching fund_ref, delete change, action → failed), mark job `'failed'` with `last_error` + +**Promote/rollback** are private methods duplicating InlineQueue's ~8-line implementations. Both operate through `@storage` calls which are individually atomic at the SQL level. No wrapping DB transaction needed — if process crashes mid-promote, recovery re-broadcasts (idempotent via ARC) and re-promotes (already-spent is a no-op). + +**Recovery:** On `start`, the worker's first poll naturally finds stale `'sending'` jobs via the `locked_at < threshold` clause. No special recovery code needed. + +**Thread safety:** `@mutex` protects `@running` flag. All DB access goes through Sequel's thread-safe connection pool. `FOR UPDATE SKIP LOCKED` prevents two poll threads from claiming the same job. + +### Task 3: Autoload registration + +**Modify:** `gem/bsv-wallet-postgres/lib/bsv/wallet_postgres.rb` + +Add one line: +```ruby +module Wallet + autoload :PostgresStore, 'bsv/wallet_postgres/postgres_store' + autoload :SolidQueueAdapter, 'bsv/wallet_postgres/solid_queue_adapter' # NEW +end +``` + +### Task 4: Test helper update + +**Modify:** `gem/bsv-wallet-postgres/spec/support/postgres_helper.rb` + +Add `wallet_broadcast_jobs` to `POSTGRES_WALLET_TABLES` for per-test cleanup. + +### Task 5: Specs + +**New file:** `gem/bsv-wallet-postgres/spec/bsv/wallet_postgres/solid_queue_adapter_spec.rb` + +Coverage map (maps to acceptance criteria): + +| Spec group | AC | +|------------|-----| +| Constructor raises for MemoryStore | Guard: refuses to attach when storage is MemoryStore | +| Constructor raises for nil broadcaster | Constructor validation | +| `async?` returns `true` | `async?` returns `true` | +| `enqueue` inserts row, returns `{ broadcast_status: 'sending' }` | `enqueue` persists to PG, returns status | +| `enqueue` handles nil outpoints (finalize path) | Finalize path support | +| Concurrent enqueue from multiple threads (unique txid) | Thread safety | +| `status` returns status / nil | `status(txid)` returns broadcast status | +| Worker success: promotes inputs, change, action | Worker promotes on success | +| Worker failure: rolls back inputs, change, action, sets last_error | Worker rolls back on failure | +| Worker finalize path (nil outpoints) | Both paths covered | +| Stale 'sending' job retried on poll | Recovery on restart | +| `drain` blocks until worker finishes | Graceful shutdown | +| Existing specs pass | Run full suite | + +Test strategy: Use real PostgreSQL (same `postgres_helper.rb` pattern), mock broadcaster, real PostgresStore. Start/drain worker within each test to avoid thread leaks. + +## Critical files + +| File | Action | +|------|--------| +| `gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/migrations/006_create_broadcast_jobs.rb` | Create | +| `gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb` | Create | +| `gem/bsv-wallet-postgres/lib/bsv/wallet_postgres.rb` | Modify (add autoload) | +| `gem/bsv-wallet-postgres/spec/support/postgres_helper.rb` | Modify (add table) | +| `gem/bsv-wallet-postgres/spec/bsv/wallet_postgres/solid_queue_adapter_spec.rb` | Create | + +## Reference files (read-only) + +| File | Why | +|------|-----| +| `gem/bsv-wallet/lib/bsv/wallet_interface/broadcast_queue.rb` | Interface contract | +| `gem/bsv-wallet/lib/bsv/wallet_interface/inline_queue.rb` | Promote/rollback reference | +| `gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/postgres_store.rb` | DB patterns, Sequel conventions | +| `gem/bsv-wallet-postgres/spec/support/postgres_helper.rb` | Test helper pattern | + +## Verification + +```bash +cd /opt/ruby/bsv-ruby-sdk +bundle exec rake spec:wallet # Existing wallet specs still pass +cd gem/bsv-wallet-postgres +bundle exec rspec spec/bsv/wallet_postgres/solid_queue_adapter_spec.rb # New specs pass +bundle exec rspec # All postgres specs pass +cd /opt/ruby/bsv-ruby-sdk +bundle exec rubocop gem/bsv-wallet-postgres/ # Clean +``` + +## Sequencing + +``` +Task 1 (migration) ──→ Task 2 (adapter class) ──→ Task 5 (specs) +Task 3 (autoload) ── independent, parallel with Task 2 +Task 4 (test helper) ── independent, must precede Task 5 +``` From be103a9a63596dd3e0201b1230f57df2d6794cc6 Mon Sep 17 00:00:00 2001 From: Simon Bettison Date: Sun, 12 Apr 2026 20:18:32 +0100 Subject: [PATCH 2/8] feat: add SolidQueueAdapter autoload registration (#407) Co-Authored-By: Claude Opus 4.6 (1M context) --- gem/bsv-wallet-postgres/lib/bsv/wallet_postgres.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres.rb b/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres.rb index 54c17a79..b35784fa 100644 --- a/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres.rb +++ b/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres.rb @@ -8,5 +8,6 @@ module WalletPostgres module Wallet autoload :PostgresStore, 'bsv/wallet_postgres/postgres_store' + autoload :SolidQueueAdapter, 'bsv/wallet_postgres/solid_queue_adapter' end end From 0d676f3c760354e6dde17495868e20046f2f2fc2 Mon Sep 17 00:00:00 2001 From: Simon Bettison Date: Sun, 12 Apr 2026 20:18:32 +0100 Subject: [PATCH 3/8] feat: add broadcast jobs table to test helper cleanup (#408) Co-Authored-By: Claude Opus 4.6 (1M context) --- gem/bsv-wallet-postgres/spec/support/postgres_helper.rb | 1 + 1 file changed, 1 insertion(+) diff --git a/gem/bsv-wallet-postgres/spec/support/postgres_helper.rb b/gem/bsv-wallet-postgres/spec/support/postgres_helper.rb index 465d3cb5..3d75462c 100644 --- a/gem/bsv-wallet-postgres/spec/support/postgres_helper.rb +++ b/gem/bsv-wallet-postgres/spec/support/postgres_helper.rb @@ -30,6 +30,7 @@ wallet_proofs wallet_transactions wallet_settings + wallet_broadcast_jobs ].freeze # Apply the shipped wallet schema once per suite load. Idempotent — no From cf994883365e6dcc7c338d33969996267ccc72f0 Mon Sep 17 00:00:00 2001 From: Simon Bettison Date: Sun, 12 Apr 2026 20:18:38 +0100 Subject: [PATCH 4/8] feat: add migration 006 for broadcast jobs table (#405) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../migrations/006_create_broadcast_jobs.rb | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/migrations/006_create_broadcast_jobs.rb diff --git a/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/migrations/006_create_broadcast_jobs.rb b/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/migrations/006_create_broadcast_jobs.rb new file mode 100644 index 00000000..1b3f1b72 --- /dev/null +++ b/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/migrations/006_create_broadcast_jobs.rb @@ -0,0 +1,65 @@ +# frozen_string_literal: true + +# Broadcast job queue table for the SolidQueueAdapter. +# +# Tracks outbound transactions that need to be broadcast to the network, +# with state machine, retry counters, and advisory locking support so that +# multiple worker processes can poll the queue without double-processing. +# +# === Columns +# +# * +txid+ — TEXT NOT NULL UNIQUE. Prevents duplicate queue entries +# for the same transaction. +# +# * +status+ — TEXT NOT NULL DEFAULT 'unsent'. State machine values: +# 'unsent', 'broadcasting', 'sent', 'failed'. +# +# * +beef_hex+ — TEXT NOT NULL. Serialised BEEF payload to broadcast. +# +# * +input_outpoints+ — text[], nullable. Outpoints consumed by the tx, used +# to release UTXOs on success or rollback on failure. +# +# * +change_outpoints+ — text[], nullable. Change outpoints created by the tx, +# used to mark outputs spendable after confirmation. +# +# * +fund_ref+ — TEXT, nullable. Caller-supplied funding reference for +# observability and reconciliation. +# +# * +attempts+ — INTEGER NOT NULL DEFAULT 0. Incremented on each +# broadcast attempt; gates retry logic. +# +# * +last_error+ — TEXT, nullable. Message from the last failed attempt. +# +# * +locked_at+ — TIMESTAMPTZ, nullable. Set when a worker claims the +# job; cleared on completion or stale-lock recovery. +# +# * +created_at+ — TIMESTAMPTZ NOT NULL DEFAULT NOW(). +# +# * +updated_at+ — TIMESTAMPTZ NOT NULL DEFAULT NOW(). +# +# === Index +# +# A composite index on +(status, locked_at)+ (named +broadcast_jobs_poll_idx+) +# supports the hot-path poll query: +# +# WHERE status = 'unsent' AND locked_at IS NULL +# ORDER BY created_at +Sequel.migration do + change do + create_table(:wallet_broadcast_jobs) do + primary_key :id, type: :Bignum + String :txid, null: false, unique: true + String :status, null: false, default: 'unsent' + String :beef_hex, text: true, null: false + column :input_outpoints, 'text[]' + column :change_outpoints, 'text[]' + String :fund_ref + Integer :attempts, null: false, default: 0 + String :last_error, text: true + DateTime :locked_at + DateTime :created_at, null: false, default: Sequel::CURRENT_TIMESTAMP + DateTime :updated_at, null: false, default: Sequel::CURRENT_TIMESTAMP + index %i[status locked_at], name: :broadcast_jobs_poll_idx + end + end +end From cd8c62b59952c62396809149073486d26bff449d Mon Sep 17 00:00:00 2001 From: Simon Bettison Date: Sun, 12 Apr 2026 20:20:32 +0100 Subject: [PATCH 5/8] feat: implement SolidQueueAdapter for async broadcast (#406) Co-Authored-By: Claude Opus 4.6 (1M context) --- .../wallet_postgres/solid_queue_adapter.rb | 289 ++++++++++++++++++ 1 file changed, 289 insertions(+) create mode 100644 gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb diff --git a/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb b/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb new file mode 100644 index 00000000..5fe46879 --- /dev/null +++ b/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb @@ -0,0 +1,289 @@ +# frozen_string_literal: true + +require 'sequel' + +module BSV + module Wallet + # PostgreSQL-backed asynchronous broadcast queue adapter. + # + # Persists outbound transactions to the +wallet_broadcast_jobs+ table and + # processes them in a background worker thread. Designed for multi-process + # deployments where +InlineQueue+'s synchronous broadcast is unacceptable. + # + # The worker uses +SELECT ... FOR UPDATE SKIP LOCKED+ so multiple processes + # can run +SolidQueueAdapter+ against the same database safely — only one + # worker will claim a given job. + # + # === Lifecycle + # + # adapter = BSV::Wallet::SolidQueueAdapter.new( + # db: sequel_db, + # storage: postgres_store, + # broadcaster: arc_broadcaster + # ) + # adapter.start # spawns background worker thread + # # ... process transactions ... + # adapter.drain # stop + join (blocks until current poll cycle completes) + # + # === Recovery + # + # On restart, the worker's first poll naturally finds stale +sending+ jobs + # (those whose +locked_at+ is older than +stale_threshold+ seconds) and + # re-broadcasts them. No special recovery code is needed. + # + # === Drain warning + # + # +drain+ blocks until the current poll cycle completes. If a job is + # mid-broadcast this may take several seconds. Jobs enqueued after + # +@running = false+ but before the worker thread exits will remain + # +unsent+ until the next +start+. + class SolidQueueAdapter + include BSV::Wallet::BroadcastQueue + + # Default number of seconds between poll cycles. + DEFAULT_POLL_INTERVAL = 8 + + # Default number of seconds before a +sending+ job is considered stale + # and eligible for retry. Configurable for testability. + STALE_THRESHOLD = 300 + + # @param db [Sequel::Database] a Sequel database handle (shared with PostgresStore) + # @param storage [StorageAdapter] wallet storage adapter (must not be MemoryStore) + # @param broadcaster [#broadcast] broadcaster object + # @param poll_interval [Integer] seconds between worker poll cycles + # @param stale_threshold [Integer] seconds before a +sending+ job is retried + # @raise [ArgumentError] if +storage+ is a +BSV::Wallet::MemoryStore+ + # @raise [ArgumentError] if +broadcaster+ is +nil+ + def initialize(db:, storage:, broadcaster:, poll_interval: DEFAULT_POLL_INTERVAL, stale_threshold: STALE_THRESHOLD) + if storage.is_a?(BSV::Wallet::MemoryStore) + raise ArgumentError, 'SolidQueueAdapter requires a persistent storage adapter — MemoryStore is not supported' + end + raise ArgumentError, 'SolidQueueAdapter requires a broadcaster' if broadcaster.nil? + + @db = db + @storage = storage + @broadcaster = broadcaster + @poll_interval = poll_interval + @stale_threshold = stale_threshold + + @mutex = Mutex.new + @running = false + @worker_thread = nil + end + + # Returns +true+ — this adapter executes broadcast asynchronously. + # + # @return [Boolean] + def async? + true + end + + # Persists a transaction to the broadcast job queue and returns immediately. + # + # Inserts a row into +wallet_broadcast_jobs+ with status +unsent+. If a row + # already exists for the same +txid+ (e.g. after a crash and restart), the + # +UniqueConstraintViolation+ is rescued and the existing status is returned + # instead. + # + # @param payload [Hash] broadcast payload (see +BroadcastQueue+ module docs) + # @return [Hash] +{ txid: String, broadcast_status: 'sending' }+ + def enqueue(payload) + txid = payload[:txid] + beef_binary = payload[:beef_binary] + input_outpoints = payload[:input_outpoints] + change_outpoints = payload[:change_outpoints] + fund_ref = payload[:fund_ref] + + row = { + txid: txid, + beef_hex: beef_binary.unpack1('H*'), + input_outpoints: input_outpoints ? Sequel.pg_array(input_outpoints, :text) : nil, + change_outpoints: change_outpoints ? Sequel.pg_array(change_outpoints, :text) : nil, + fund_ref: fund_ref, + status: 'unsent' + } + + begin + @db[:wallet_broadcast_jobs].insert(row) + rescue Sequel::UniqueConstraintViolation + existing_status = @db[:wallet_broadcast_jobs].where(txid: txid).get(:status) + return { txid: txid, broadcast_status: existing_status || 'sending' } + end + + { txid: txid, broadcast_status: 'sending' } + end + + # Returns the broadcast status for a previously enqueued transaction. + # + # Reads directly from the jobs table — the authoritative source of truth + # for async broadcast status. + # + # @param txid [String] hex transaction identifier + # @return [String, nil] status string or +nil+ if no job exists + def status(txid) + @db[:wallet_broadcast_jobs].where(txid: txid).get(:status) + end + + # Spawns the background worker thread. + # + # Safe to call multiple times — returns immediately if already running. + # + # @return [void] + def start + return if running? + + @mutex.synchronize { @running = true } + @worker_thread = Thread.new { worker_loop } + end + + # Signals the worker to stop after the current poll cycle. + # + # Non-blocking — returns immediately without waiting for the thread. + # + # @return [void] + def stop + @mutex.synchronize { @running = false } + end + + # Stops the worker and blocks until the current poll cycle completes. + # + # Safe to call when +start+ has not been called (+@worker_thread+ is nil). + # + # @return [void] + def drain + stop + @worker_thread&.join + end + + private + + # Main loop for the background worker thread. + # + # Calls +poll_once+ in a loop, sleeping +poll_interval+ seconds between + # cycles. Unexpected errors from +poll_once+ are logged to stderr and the + # loop continues — the worker thread must not die silently. + def worker_loop + while running? + begin + poll_once + rescue StandardError => e + warn "[bsv-wallet] SolidQueueAdapter worker error: #{e.message}" + end + sleep(@poll_interval) if running? + end + end + + # Thread-safe check of the running flag. + # + # @return [Boolean] + def running? + @mutex.synchronize { @running } + end + + # Claims and processes a single pending broadcast job. + # + # Uses +SELECT ... FOR UPDATE SKIP LOCKED+ so concurrent workers do not + # double-process the same job. Stale +sending+ jobs (those whose +locked_at+ + # is older than +stale_threshold+ seconds) are also eligible for retry. + # + # @return [void] + def poll_once + @db.transaction do + job = @db[:wallet_broadcast_jobs] + .where(Sequel.lit( + "status = 'unsent' OR " \ + "(status = 'sending' AND locked_at < NOW() - interval '? second')", + @stale_threshold + )) + .order(:created_at) + .limit(1) + .for_update + .skip_locked + .first + return unless job + + process_job(job) + end + end + + # Marks a job as +sending+, broadcasts the transaction, and promotes or + # rolls back wallet state based on the outcome. + # + # @param job [Hash] row from +wallet_broadcast_jobs+ + # @return [void] + def process_job(job) + @db[:wallet_broadcast_jobs].where(id: job[:id]).update( + status: 'sending', + locked_at: Sequel.lit('NOW()'), + attempts: Sequel.lit('attempts + 1'), + updated_at: Sequel.lit('NOW()') + ) + + tx = BSV::Transaction::Transaction.from_beef_hex(job[:beef_hex]) + + input_outpoints = job[:input_outpoints]&.to_a + change_outpoints = job[:change_outpoints]&.to_a + txid = job[:txid] + fund_ref = job[:fund_ref] + + begin + @broadcaster.broadcast(tx) + rescue StandardError => e + if input_outpoints + rollback(input_outpoints, change_outpoints, txid, fund_ref) + elsif txid + @storage.update_action_status(txid, 'failed') + end + @db[:wallet_broadcast_jobs].where(id: job[:id]).update( + status: 'failed', + last_error: e.message, + updated_at: Sequel.lit('NOW()') + ) + return + end + + promote(input_outpoints, change_outpoints, txid) + @db[:wallet_broadcast_jobs].where(id: job[:id]).update( + status: 'completed', + updated_at: Sequel.lit('NOW()') + ) + end + + # Promotes UTXO state after a successful broadcast. + # + # Marks inputs as +:spent+, change as +:spendable+, and updates the action + # status to +completed+. When outpoints are +nil+ (finalize path), UTXO + # transitions are skipped. + # + # @param input_outpoints [Array, nil] + # @param change_outpoints [Array, nil] + # @param txid [String, nil] + def promote(input_outpoints, change_outpoints, txid) + Array(input_outpoints).each { |op| @storage.update_output_state(op, :spent) } + Array(change_outpoints).each { |op| @storage.update_output_state(op, :spendable) } + @storage.update_action_status(txid, 'completed') if txid + end + + # Rolls back wallet state after a failed broadcast. + # + # Releases locked inputs (only those matching +fund_ref+), deletes phantom + # change outputs, and marks the action as +failed+. + # + # @param input_outpoints [Array] + # @param change_outpoints [Array] + # @param txid [String, nil] + # @param fund_ref [String] fund reference used when locking inputs + def rollback(input_outpoints, change_outpoints, txid, fund_ref) + Array(input_outpoints).each do |op| + outputs = @storage.find_outputs({ outpoint: op, include_spent: true, limit: 1, offset: 0 }) + next if outputs.empty? + next unless outputs.first[:pending_reference] == fund_ref + + @storage.update_output_state(op, :spendable) + end + Array(change_outpoints).each { |op| @storage.delete_output(op) } + @storage.update_action_status(txid, 'failed') if txid + end + end + end +end From 6cc2fcac383c0aa00d9e53ed0aa5de9b2094ca5c Mon Sep 17 00:00:00 2001 From: Simon Bettison Date: Sun, 12 Apr 2026 20:26:47 +0100 Subject: [PATCH 6/8] test: add comprehensive SolidQueueAdapter specs (#409) 40 examples covering constructor guards, interface compliance, enqueue (auto-fund and finalize paths), idempotent duplicate handling, status queries, worker success/failure/recovery paths, drain/shutdown, and thread safety. All specs tagged :postgres, use real PostgresStore, and mock the broadcaster to keep tests fast (poll_interval: 0.1s). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../solid_queue_adapter_spec.rb | 513 ++++++++++++++++++ 1 file changed, 513 insertions(+) create mode 100644 gem/bsv-wallet-postgres/spec/bsv/wallet_postgres/solid_queue_adapter_spec.rb diff --git a/gem/bsv-wallet-postgres/spec/bsv/wallet_postgres/solid_queue_adapter_spec.rb b/gem/bsv-wallet-postgres/spec/bsv/wallet_postgres/solid_queue_adapter_spec.rb new file mode 100644 index 00000000..1e75c372 --- /dev/null +++ b/gem/bsv-wallet-postgres/spec/bsv/wallet_postgres/solid_queue_adapter_spec.rb @@ -0,0 +1,513 @@ +# frozen_string_literal: true + +require 'spec_helper' +require 'bsv-wallet-postgres' + +RSpec.describe 'BSV::Wallet::SolidQueueAdapter', :postgres do + let(:db) { POSTGRES_TEST_DB } + let(:storage) { BSV::Wallet::PostgresStore.new(db) } + let(:broadcaster) { double('broadcaster') } # rubocop:disable RSpec/VerifiedDoubles + let(:adapter) do + BSV::Wallet::SolidQueueAdapter.new( + db: db, + storage: storage, + broadcaster: broadcaster, + poll_interval: 0.1, + stale_threshold: 1 + ) + end + + # Minimal valid txids + let(:txid_a) { 'a' * 64 } + let(:txid_b) { 'b' * 64 } + + # Minimal BEEF binary — an empty transaction serialised as BEEF + let(:beef_binary) { BSV::Transaction::Transaction.new.to_beef } + let(:beef_hex) { beef_binary.unpack1('H*') } + + before do + POSTGRES_WALLET_TABLES.each { |t| db[t].truncate(cascade: true) } + end + + after do + adapter.drain rescue nil # rubocop:disable Style/RescueModifier + end + + # --------------------------------------------------------------------------- + # Seeding helpers (mirror InlineQueue spec patterns) + # --------------------------------------------------------------------------- + + def seed_pending_input(outpoint:, fund_ref: 'ref-001') + storage.store_output( + outpoint: outpoint, + satoshis: 1000, + state: :pending, + basket: 'default', + pending_reference: fund_ref + ) + end + + def seed_pending_change(outpoint:) + storage.store_output( + outpoint: outpoint, + satoshis: 500, + state: :pending, + basket: 'default' + ) + end + + def seed_action(txid:, status: 'signed') + storage.store_action(txid: txid, description: 'test action', status: status) + end + + def enqueue_auto_fund(txid:, input_outpoints:, change_outpoints:, fund_ref: 'ref-001') + adapter.enqueue( + txid: txid, + beef_binary: beef_binary, + input_outpoints: input_outpoints, + change_outpoints: change_outpoints, + fund_ref: fund_ref + ) + end + + def enqueue_finalize(txid:) + adapter.enqueue( + txid: txid, + beef_binary: beef_binary, + input_outpoints: nil, + change_outpoints: nil, + fund_ref: nil + ) + end + + def job_row(txid) + db[:wallet_broadcast_jobs].where(txid: txid).first + end + + def output_state(outpoint) + storage.find_outputs({ outpoint: outpoint, include_spent: true, limit: 1, offset: 0 }).first&.dig(:state) + end + + def action_status(txid) + storage.find_actions({ txid: txid, limit: 1, offset: 0 }).first&.dig(:status) + end + + # --------------------------------------------------------------------------- + # 1. Constructor guards + # --------------------------------------------------------------------------- + describe 'constructor validation' do + it 'raises ArgumentError when storage is MemoryStore' do + expect do + BSV::Wallet::SolidQueueAdapter.new( + db: db, + storage: BSV::Wallet::MemoryStore.new, + broadcaster: broadcaster + ) + end.to raise_error(ArgumentError, /MemoryStore/) + end + + it 'raises ArgumentError when broadcaster is nil' do + expect do + BSV::Wallet::SolidQueueAdapter.new(db: db, storage: storage, broadcaster: nil) + end.to raise_error(ArgumentError, /broadcaster/) + end + + it 'accepts PostgresStore with a broadcaster without raising' do + expect do + BSV::Wallet::SolidQueueAdapter.new(db: db, storage: storage, broadcaster: broadcaster) + end.not_to raise_error + end + end + + # --------------------------------------------------------------------------- + # 2. Interface compliance + # --------------------------------------------------------------------------- + describe '#async?' do + it 'returns true' do + expect(adapter.async?).to be(true) + end + end + + describe 'BroadcastQueue inclusion' do + it 'includes BroadcastQueue' do + expect(described_class).to be_nil # no-op; verify via ancestors + expect(BSV::Wallet::SolidQueueAdapter.ancestors).to include(BSV::Wallet::BroadcastQueue) + end + end + + # --------------------------------------------------------------------------- + # 3. Enqueue — auto-fund path + # --------------------------------------------------------------------------- + describe '#enqueue (auto-fund path)' do + it 'inserts a row into wallet_broadcast_jobs with status unsent' do + enqueue_auto_fund(txid: txid_a, input_outpoints: ['abc:0'], change_outpoints: ['xyz:0']) + row = job_row(txid_a) + expect(row).not_to be_nil + expect(row[:status]).to eq('unsent') + end + + it 'stores beef_hex as hex encoding of beef_binary' do + enqueue_auto_fund(txid: txid_a, input_outpoints: ['abc:0'], change_outpoints: ['xyz:0']) + expect(job_row(txid_a)[:beef_hex]).to eq(beef_hex) + end + + it 'stores input_outpoints as a postgres text array' do + enqueue_auto_fund(txid: txid_a, input_outpoints: %w[abc:0 def:1], change_outpoints: []) + stored = job_row(txid_a)[:input_outpoints].to_a + expect(stored).to eq(%w[abc:0 def:1]) + end + + it 'stores change_outpoints as a postgres text array' do + enqueue_auto_fund(txid: txid_a, input_outpoints: ['abc:0'], change_outpoints: %w[xyz:0 xyz:1]) + stored = job_row(txid_a)[:change_outpoints].to_a + expect(stored).to eq(%w[xyz:0 xyz:1]) + end + + it 'stores fund_ref' do + enqueue_auto_fund(txid: txid_a, input_outpoints: ['abc:0'], change_outpoints: [], fund_ref: 'ref-xyz') + expect(job_row(txid_a)[:fund_ref]).to eq('ref-xyz') + end + + it 'returns { txid:, broadcast_status: "sending" }' do + result = enqueue_auto_fund(txid: txid_a, input_outpoints: [], change_outpoints: []) + expect(result).to eq(txid: txid_a, broadcast_status: 'sending') + end + end + + # --------------------------------------------------------------------------- + # 4. Enqueue — finalize path (nil outpoints) + # --------------------------------------------------------------------------- + describe '#enqueue (finalize path)' do + it 'stores nil input_outpoints and change_outpoints' do + enqueue_finalize(txid: txid_a) + row = job_row(txid_a) + expect(row[:input_outpoints]).to be_nil + expect(row[:change_outpoints]).to be_nil + end + + it 'stores nil fund_ref' do + enqueue_finalize(txid: txid_a) + expect(job_row(txid_a)[:fund_ref]).to be_nil + end + end + + # --------------------------------------------------------------------------- + # 5. Enqueue — duplicate txid (idempotent crash recovery) + # --------------------------------------------------------------------------- + describe '#enqueue (duplicate txid)' do + it 'does not raise on duplicate txid and returns existing status' do + enqueue_auto_fund(txid: txid_a, input_outpoints: [], change_outpoints: []) + result = nil + expect { result = enqueue_auto_fund(txid: txid_a, input_outpoints: [], change_outpoints: []) }.not_to raise_error + expect(result[:txid]).to eq(txid_a) + expect(result[:broadcast_status]).to be_a(String) + end + + it 'inserts exactly one row on duplicate enqueue' do + enqueue_auto_fund(txid: txid_a, input_outpoints: [], change_outpoints: []) + enqueue_auto_fund(txid: txid_a, input_outpoints: [], change_outpoints: []) + expect(db[:wallet_broadcast_jobs].where(txid: txid_a).count).to eq(1) + end + end + + # --------------------------------------------------------------------------- + # 6. Status + # --------------------------------------------------------------------------- + describe '#status' do + it 'returns the job status for a known txid' do + enqueue_auto_fund(txid: txid_a, input_outpoints: [], change_outpoints: []) + expect(adapter.status(txid_a)).to eq('unsent') + end + + it 'returns nil for an unknown txid' do + expect(adapter.status('0' * 64)).to be_nil + end + end + + # --------------------------------------------------------------------------- + # 7. Worker — success path (auto-fund) + # --------------------------------------------------------------------------- + describe 'worker success path (auto-fund)' do + let(:mock_tx) { double('tx') } # rubocop:disable RSpec/VerifiedDoubles + + before do + seed_pending_input(outpoint: 'inp:0', fund_ref: 'ref-001') + seed_pending_change(outpoint: 'chg:0') + seed_action(txid: txid_a) + enqueue_auto_fund( + txid: txid_a, + input_outpoints: ['inp:0'], + change_outpoints: ['chg:0'], + fund_ref: 'ref-001' + ) + allow(broadcaster).to receive(:broadcast).and_return(double('result')) # rubocop:disable RSpec/VerifiedDoubles + allow(BSV::Transaction::Transaction).to receive(:from_beef_hex).and_return(mock_tx) + adapter.start + sleep(0.5) + adapter.drain + end + + it 'marks the job as completed' do + expect(job_row(txid_a)[:status]).to eq('completed') + end + + it 'promotes input outputs to spent' do + expect(output_state('inp:0').to_s).to eq('spent') + end + + it 'promotes change outputs to spendable' do + expect(output_state('chg:0').to_s).to eq('spendable') + end + + it 'updates action status to completed' do + expect(action_status(txid_a)).to eq('completed') + end + end + + # --------------------------------------------------------------------------- + # 8. Worker — failure path (auto-fund) + # --------------------------------------------------------------------------- + describe 'worker failure path (auto-fund)' do + let(:mock_tx) { double('tx') } # rubocop:disable RSpec/VerifiedDoubles + + before do + seed_pending_input(outpoint: 'inp:0', fund_ref: 'ref-001') + seed_pending_change(outpoint: 'chg:0') + seed_action(txid: txid_a) + enqueue_auto_fund( + txid: txid_a, + input_outpoints: ['inp:0'], + change_outpoints: ['chg:0'], + fund_ref: 'ref-001' + ) + allow(broadcaster).to receive(:broadcast).and_raise(StandardError, 'network down') + allow(BSV::Transaction::Transaction).to receive(:from_beef_hex).and_return(mock_tx) + adapter.start + sleep(0.5) + adapter.drain + end + + it 'marks the job as failed' do + expect(job_row(txid_a)[:status]).to eq('failed') + end + + it 'populates last_error on the job' do + expect(job_row(txid_a)[:last_error]).to eq('network down') + end + + it 'rolls back input outputs to spendable' do + expect(output_state('inp:0').to_s).to eq('spendable') + end + + it 'deletes change outputs' do + all = storage.find_outputs({ include_spent: true, limit: 100, offset: 0 }) + expect(all.map { |o| o[:outpoint] }).not_to include('chg:0') + end + + it 'updates action status to failed' do + expect(action_status(txid_a)).to eq('failed') + end + + it 'increments attempts' do + expect(job_row(txid_a)[:attempts]).to eq(1) + end + end + + # --------------------------------------------------------------------------- + # 9. Worker — finalize path success + # --------------------------------------------------------------------------- + describe 'worker finalize path, broadcast succeeds' do + let(:mock_tx) { double('tx') } # rubocop:disable RSpec/VerifiedDoubles + + before do + seed_action(txid: txid_a) + enqueue_finalize(txid: txid_a) + allow(broadcaster).to receive(:broadcast).and_return(double('result')) # rubocop:disable RSpec/VerifiedDoubles + allow(BSV::Transaction::Transaction).to receive(:from_beef_hex).and_return(mock_tx) + adapter.start + sleep(0.5) + adapter.drain + end + + it 'updates action status to completed' do + expect(action_status(txid_a)).to eq('completed') + end + + it 'marks job as completed' do + expect(job_row(txid_a)[:status]).to eq('completed') + end + + it 'does not create or remove any outputs' do + all = storage.find_outputs({ include_spent: true, limit: 100, offset: 0 }) + expect(all).to be_empty + end + end + + # --------------------------------------------------------------------------- + # 10. Worker — finalize path failure + # --------------------------------------------------------------------------- + describe 'worker finalize path, broadcast fails' do + let(:mock_tx) { double('tx') } # rubocop:disable RSpec/VerifiedDoubles + + before do + seed_action(txid: txid_a) + enqueue_finalize(txid: txid_a) + allow(broadcaster).to receive(:broadcast).and_raise(StandardError, 'timeout') + allow(BSV::Transaction::Transaction).to receive(:from_beef_hex).and_return(mock_tx) + adapter.start + sleep(0.5) + adapter.drain + end + + it 'updates action status to failed' do + expect(action_status(txid_a)).to eq('failed') + end + + it 'marks job as failed with last_error' do + row = job_row(txid_a) + expect(row[:status]).to eq('failed') + expect(row[:last_error]).to eq('timeout') + end + + it 'does not create or remove any outputs' do + all = storage.find_outputs({ include_spent: true, limit: 100, offset: 0 }) + expect(all).to be_empty + end + end + + # --------------------------------------------------------------------------- + # 11. Recovery — stale sending job + # --------------------------------------------------------------------------- + describe 'recovery of stale sending jobs' do + let(:mock_tx) { double('tx') } # rubocop:disable RSpec/VerifiedDoubles + + it 'retries a stale sending job (locked_at well past threshold)' do + db[:wallet_broadcast_jobs].insert( + txid: txid_a, + beef_hex: beef_hex, + input_outpoints: nil, + change_outpoints: nil, + fund_ref: nil, + status: 'sending', + locked_at: Sequel.lit('NOW() - interval \'600 second\''), + attempts: 1, + created_at: Sequel.lit('NOW()'), + updated_at: Sequel.lit('NOW()') + ) + seed_action(txid: txid_a) + allow(broadcaster).to receive(:broadcast).and_return(double('result')) # rubocop:disable RSpec/VerifiedDoubles + allow(BSV::Transaction::Transaction).to receive(:from_beef_hex).and_return(mock_tx) + + adapter.start + sleep(0.5) + adapter.drain + + expect(job_row(txid_a)[:status]).to eq('completed') + end + + it 'does not retry a recently-locked sending job' do + # Insert a job locked just now (within the 60s stale_threshold we will use) + db[:wallet_broadcast_jobs].insert( + txid: txid_a, + beef_hex: beef_hex, + input_outpoints: nil, + change_outpoints: nil, + fund_ref: nil, + status: 'sending', + locked_at: Sequel.lit('NOW()'), + attempts: 1, + created_at: Sequel.lit('NOW()'), + updated_at: Sequel.lit('NOW()') + ) + allow(BSV::Transaction::Transaction).to receive(:from_beef_hex).and_return(mock_tx) + allow(broadcaster).to receive(:broadcast).and_return(double('result')) # rubocop:disable RSpec/VerifiedDoubles + + # Use a 60s threshold — a job locked NOW() is not yet stale + adapter2 = BSV::Wallet::SolidQueueAdapter.new( + db: db, + storage: storage, + broadcaster: broadcaster, + poll_interval: 0.1, + stale_threshold: 60 + ) + adapter2.start + sleep(0.4) + adapter2.drain + + # Job should remain 'sending' — it was not stale enough to be retried + expect(job_row(txid_a)[:status]).to eq('sending') + end + end + + # --------------------------------------------------------------------------- + # 12. Drain / shutdown + # --------------------------------------------------------------------------- + describe '#drain' do + it 'blocks until the worker thread finishes' do + allow(broadcaster).to receive(:broadcast) + adapter.start + thread = adapter.instance_variable_get(:@worker_thread) + adapter.drain + expect(thread.alive?).to be(false) + end + + it 'worker thread is not alive after drain' do + adapter.start + adapter.drain + thread = adapter.instance_variable_get(:@worker_thread) + expect(thread&.alive?).not_to be(true) + end + end + + # --------------------------------------------------------------------------- + # 13. Thread safety + # --------------------------------------------------------------------------- + describe 'thread safety' do + it 'handles concurrent enqueue from multiple threads with unique txids' do + txids = (1..5).map { |i| i.to_s.rjust(64, '0') } + threads = txids.map do |id| + Thread.new do + adapter.enqueue( + txid: id, + beef_binary: beef_binary, + input_outpoints: [], + change_outpoints: [], + fund_ref: 'ref' + ) + end + end + threads.each(&:join) + expect(db[:wallet_broadcast_jobs].count).to eq(5) + end + + it 'handles concurrent enqueue with the same txid without raising' do + errors = [] + threads = Array.new(5) do + Thread.new do + adapter.enqueue( + txid: txid_a, + beef_binary: beef_binary, + input_outpoints: [], + change_outpoints: [], + fund_ref: 'ref' + ) + rescue StandardError => e + errors << e + end + end + threads.each(&:join) + expect(errors).to be_empty + expect(db[:wallet_broadcast_jobs].where(txid: txid_a).count).to eq(1) + end + + it 'start is idempotent — calling twice does not spawn two threads' do + allow(broadcaster).to receive(:broadcast) + adapter.start + thread_before = adapter.instance_variable_get(:@worker_thread) + adapter.start + thread_after = adapter.instance_variable_get(:@worker_thread) + expect(thread_after).to equal(thread_before) + adapter.drain + end + end +end From 6f2459681d1b957e9bcf0673b7cd73e47a41c5b2 Mon Sep 17 00:00:00 2001 From: Simon Bettison Date: Sun, 12 Apr 2026 21:47:47 +0100 Subject: [PATCH 7/8] fix: address review findings in SolidQueueAdapter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add MAX_ATTEMPTS (5) enforcement to poll query — permanently-failing jobs no longer retry indefinitely - Replace Sequel.lit placeholder inside SQL quotes with explicit integer interpolation for the interval expression - Guard empty-array outpoints in rollback path (nil vs [] semantics) - Document that process_job holds a DB connection during broadcast Co-Authored-By: Claude Opus 4.6 (1M context) --- .../wallet_postgres/solid_queue_adapter.rb | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb b/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb index 5fe46879..57062377 100644 --- a/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb +++ b/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb @@ -47,6 +47,10 @@ class SolidQueueAdapter # and eligible for retry. Configurable for testability. STALE_THRESHOLD = 300 + # Maximum number of broadcast attempts before a job is abandoned. + # After this many failures the job remains +failed+ permanently. + MAX_ATTEMPTS = 5 + # @param db [Sequel::Database] a Sequel database handle (shared with PostgresStore) # @param storage [StorageAdapter] wallet storage adapter (must not be MemoryStore) # @param broadcaster [#broadcast] broadcaster object @@ -184,7 +188,14 @@ def running? # # Uses +SELECT ... FOR UPDATE SKIP LOCKED+ so concurrent workers do not # double-process the same job. Stale +sending+ jobs (those whose +locked_at+ - # is older than +stale_threshold+ seconds) are also eligible for retry. + # is older than +stale_threshold+ seconds) are also eligible for retry, + # up to +MAX_ATTEMPTS+. + # + # NOTE: The entire +process_job+ call (including the network broadcast) + # runs inside this transaction, holding the row lock and a database + # connection for the duration. This is acceptable for a single worker + # thread with an 8-second poll interval but would need restructuring + # for high-throughput multi-worker deployments. # # @return [void] def poll_once @@ -192,8 +203,9 @@ def poll_once job = @db[:wallet_broadcast_jobs] .where(Sequel.lit( "status = 'unsent' OR " \ - "(status = 'sending' AND locked_at < NOW() - interval '? second')", - @stale_threshold + "(status = 'sending' AND locked_at < NOW() - " \ + "#{@stale_threshold.to_i} * interval '1 second' " \ + "AND attempts < #{MAX_ATTEMPTS})" )) .order(:created_at) .limit(1) @@ -229,7 +241,7 @@ def process_job(job) begin @broadcaster.broadcast(tx) rescue StandardError => e - if input_outpoints + if input_outpoints && !input_outpoints.empty? rollback(input_outpoints, change_outpoints, txid, fund_ref) elsif txid @storage.update_action_status(txid, 'failed') From 6d4ae0c42694c62611280b3a686faeecb0bbe0b1 Mon Sep 17 00:00:00 2001 From: Simon Bettison Date: Sun, 12 Apr 2026 22:06:22 +0100 Subject: [PATCH 8/8] fix: address Copilot review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix migration comments: status values now match adapter code ('unsent', 'sending', 'completed', 'failed') - Fix poll query example in migration to reflect actual query shape - Use :timestamptz for locked_at/created_at/updated_at (matches pending_since pattern from migration 004) - Make start() check-and-set atomic under mutex (prevents TOCTOU double-spawn) - Guard from_beef_hex deserialization — corrupt jobs fail immediately instead of retrying in a tight loop Co-Authored-By: Claude Opus 4.6 (1M context) --- .../migrations/006_create_broadcast_jobs.rb | 13 ++++++----- .../wallet_postgres/solid_queue_adapter.rb | 22 ++++++++++++++++--- 2 files changed, 27 insertions(+), 8 deletions(-) diff --git a/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/migrations/006_create_broadcast_jobs.rb b/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/migrations/006_create_broadcast_jobs.rb index 1b3f1b72..a2405f13 100644 --- a/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/migrations/006_create_broadcast_jobs.rb +++ b/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/migrations/006_create_broadcast_jobs.rb @@ -12,7 +12,7 @@ # for the same transaction. # # * +status+ — TEXT NOT NULL DEFAULT 'unsent'. State machine values: -# 'unsent', 'broadcasting', 'sent', 'failed'. +# 'unsent', 'sending', 'completed', 'failed'. # # * +beef_hex+ — TEXT NOT NULL. Serialised BEEF payload to broadcast. # @@ -42,8 +42,11 @@ # A composite index on +(status, locked_at)+ (named +broadcast_jobs_poll_idx+) # supports the hot-path poll query: # -# WHERE status = 'unsent' AND locked_at IS NULL +# WHERE status = 'unsent' +# OR (status = 'sending' AND locked_at < NOW() - interval '300 seconds' +# AND attempts < 5) # ORDER BY created_at +# FOR UPDATE SKIP LOCKED Sequel.migration do change do create_table(:wallet_broadcast_jobs) do @@ -56,9 +59,9 @@ String :fund_ref Integer :attempts, null: false, default: 0 String :last_error, text: true - DateTime :locked_at - DateTime :created_at, null: false, default: Sequel::CURRENT_TIMESTAMP - DateTime :updated_at, null: false, default: Sequel::CURRENT_TIMESTAMP + column :locked_at, :timestamptz + column :created_at, :timestamptz, null: false, default: Sequel::CURRENT_TIMESTAMP + column :updated_at, :timestamptz, null: false, default: Sequel::CURRENT_TIMESTAMP index %i[status locked_at], name: :broadcast_jobs_poll_idx end end diff --git a/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb b/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb index 57062377..11c58a21 100644 --- a/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb +++ b/gem/bsv-wallet-postgres/lib/bsv/wallet_postgres/solid_queue_adapter.rb @@ -131,12 +131,16 @@ def status(txid) # Spawns the background worker thread. # # Safe to call multiple times — returns immediately if already running. + # The check-and-set is atomic under the mutex to prevent two concurrent + # +start+ calls from spawning duplicate worker threads. # # @return [void] def start - return if running? + @mutex.synchronize do + return if @running || @worker_thread&.alive? - @mutex.synchronize { @running = true } + @running = true + end @worker_thread = Thread.new { worker_loop } end @@ -221,6 +225,9 @@ def poll_once # Marks a job as +sending+, broadcasts the transaction, and promotes or # rolls back wallet state based on the outcome. # + # Deserialization failures (corrupt +beef_hex+) are caught and mark the + # job as +failed+ immediately to prevent infinite retry loops. + # # @param job [Hash] row from +wallet_broadcast_jobs+ # @return [void] def process_job(job) @@ -231,7 +238,16 @@ def process_job(job) updated_at: Sequel.lit('NOW()') ) - tx = BSV::Transaction::Transaction.from_beef_hex(job[:beef_hex]) + begin + tx = BSV::Transaction::Transaction.from_beef_hex(job[:beef_hex]) + rescue StandardError => e + @db[:wallet_broadcast_jobs].where(id: job[:id]).update( + status: 'failed', + last_error: "Deserialization failed: #{e.message}", + updated_at: Sequel.lit('NOW()') + ) + return + end input_outpoints = job[:input_outpoints]&.to_a change_outpoints = job[:change_outpoints]&.to_a