Skip to content

Commit 9b2ec64

Browse files
committed
kvdb: implement ApplyScanBatch
Add recovery scan batch types and route kvdb scan writes through the legacy address and transaction managers so horizons, tx notifications, and synced blocks remain atomic.
1 parent b95cad0 commit 9b2ec64

2 files changed

Lines changed: 310 additions & 0 deletions

File tree

wallet/internal/db/kvdb/txstore.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,85 @@ func (s *Store) applyLegacyTxBatch(addrmgrNs walletdb.ReadWriteBucket,
373373
return nil
374374
}
375375

376+
// ApplyScanBatch atomically applies recovery scan writes through the legacy
377+
// walletdb managers.
378+
func (s *Store) ApplyScanBatch(_ context.Context,
379+
params db.ScanBatchParams) error {
380+
381+
if s.addrStore == nil {
382+
return fmt.Errorf("kvdb.Store.ApplyScanBatch: %w",
383+
errMissingAddrStore)
384+
}
385+
386+
err := walletdb.Update(s.db, func(tx walletdb.ReadWriteTx) error {
387+
addrmgrNs := tx.ReadWriteBucket(waddrmgr.NamespaceKey)
388+
if addrmgrNs == nil {
389+
return errMissingAddrmgrNamespace
390+
}
391+
392+
txmgrNs := tx.ReadWriteBucket(wtxmgrNamespaceKey)
393+
if txmgrNs == nil {
394+
return errMissingTxmgrNamespace
395+
}
396+
397+
// Horizons are extended first because the transaction batch
398+
// below resolves every scan-discovered credit against the
399+
// address manager, so a freshly derived address must already be
400+
// written to the addrmgr bucket before its crediting transaction
401+
// is recorded.
402+
err := s.applyLegacyScanHorizons(addrmgrNs, params.Horizons)
403+
if err != nil {
404+
return err
405+
}
406+
407+
err = s.applyLegacyTxBatch(
408+
addrmgrNs, txmgrNs, params.Transactions,
409+
)
410+
if err != nil {
411+
return err
412+
}
413+
414+
return s.applyLegacySyncedBlocks(addrmgrNs, params.SyncedBlocks)
415+
})
416+
if err != nil {
417+
// ExtendAddresses advances the scoped manager's in-memory next
418+
// indices and cached account info as a side effect of writing
419+
// the addrmgr bucket. walletdb has now rolled that bucket back,
420+
// but those in-memory mutations survive, so a failed batch would
421+
// otherwise leave the live manager reporting an unpersisted
422+
// horizon advance. Invalidate the touched account caches so the
423+
// next read reloads the persisted (rolled-back) indices.
424+
s.rollbackScanHorizonCaches(params.Horizons)
425+
426+
return fmt.Errorf("kvdb.Store.ApplyScanBatch: %w", err)
427+
}
428+
429+
return nil
430+
}
431+
432+
// rollbackScanHorizonCaches drops the scoped-manager account caches advanced by
433+
// a failed ApplyScanBatch horizon extension. ExtendAddresses bumps the
434+
// manager's in-memory next indices and cached account info before the
435+
// surrounding walletdb transaction commits; once that transaction rolls back,
436+
// invalidating these caches forces the persisted (rolled-back) indices to be
437+
// reloaded on the next access, so no unpersisted horizon advance stays
438+
// observable from the live manager.
439+
func (s *Store) rollbackScanHorizonCaches(horizons []db.ScanHorizon) {
440+
for _, horizon := range horizons {
441+
scopedMgr, err := s.addrStore.FetchScopedKeyManager(
442+
waddrmgr.KeyScope(horizon.Scope),
443+
)
444+
if err != nil {
445+
// The scope resolved while applying the batch, so a
446+
// lookup miss here is unexpected; skip it as there is no
447+
// cache to invalidate for an unknown scope.
448+
continue
449+
}
450+
451+
scopedMgr.InvalidateAccountCache(horizon.Account)
452+
}
453+
}
454+
376455
// GetTx retrieves one wallet-scoped transaction snapshot through the legacy
377456
// wtxmgr query path.
378457
func (s *Store) GetTx(_ context.Context, query db.GetTxQuery) (
@@ -606,6 +685,48 @@ func (s *Store) RollbackToBlock(_ context.Context, height uint32) error {
606685
return nil
607686
}
608687

688+
// applyLegacyScanHorizons extends legacy address branches for scan hits.
689+
func (s *Store) applyLegacyScanHorizons(ns walletdb.ReadWriteBucket,
690+
horizons []db.ScanHorizon) error {
691+
692+
for _, horizon := range horizons {
693+
scopedMgr, err := s.addrStore.FetchScopedKeyManager(
694+
waddrmgr.KeyScope(horizon.Scope),
695+
)
696+
if err != nil {
697+
return fmt.Errorf("fetch scoped manager: %w", err)
698+
}
699+
700+
err = scopedMgr.ExtendAddresses(
701+
ns, horizon.Account, horizon.Index, horizon.Branch,
702+
)
703+
if err != nil {
704+
return fmt.Errorf("extend addresses: %w", err)
705+
}
706+
}
707+
708+
return nil
709+
}
710+
711+
// applyLegacySyncedBlocks connects a sequence of legacy synced blocks.
712+
func (s *Store) applyLegacySyncedBlocks(ns walletdb.ReadWriteBucket,
713+
blocks []db.Block) error {
714+
715+
for i := range blocks {
716+
block, err := db.BlockStampFromBlock(&blocks[i])
717+
if err != nil {
718+
return err
719+
}
720+
721+
err = s.addrStore.SetSyncedTo(ns, &block)
722+
if err != nil {
723+
return fmt.Errorf("set synced block %d: %w", i, err)
724+
}
725+
}
726+
727+
return nil
728+
}
729+
609730
// applyLegacyTxNotification records one relevant transaction notification using
610731
// legacy wtxmgr semantics.
611732
func (s *Store) applyLegacyTxNotification(addrmgrNs walletdb.ReadWriteBucket,

wallet/internal/db/kvdb/txstore_test.go

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package kvdb
22

33
import (
4+
"math"
45
"testing"
56
"time"
67

@@ -323,6 +324,86 @@ func TestApplyTxBatchCreditNilFallback(t *testing.T) {
323324
addrStore.AssertNotCalled(t, "MarkUsed", mock.Anything, mock.Anything)
324325
}
325326

327+
// TestApplyScanBatchRecordsTxAndSyncedBlocks verifies that kvdb.Store applies
328+
// scan-discovered transactions and connected sync blocks in one write batch.
329+
func TestApplyScanBatchRecordsTxAndSyncedBlocks(t *testing.T) {
330+
t.Parallel()
331+
332+
dbConn, cleanup := newTestDB(t)
333+
t.Cleanup(cleanup)
334+
335+
newAddrmgrNamespace(t, dbConn)
336+
txStore := newTxStore(t, dbConn)
337+
338+
addr, script := newTestAddressScript(t)
339+
managedAddr := &bwmock.ManagedAddress{}
340+
managedAddr.On("Internal").Return(false).Maybe()
341+
managedAddr.On("Address").Return(addr).Maybe()
342+
managedAddr.On("AddrType").Return(waddrmgr.WitnessPubKey).Maybe()
343+
managedAddr.On("Imported").Return(false).Maybe()
344+
managedAddr.On("InternalAccount").Return(uint32(0)).Maybe()
345+
managedAddr.On("Compressed").Return(true).Maybe()
346+
managedAddr.On("AddrHash").Return([]byte(nil)).Maybe()
347+
managedAddr.On("Used", mock.Anything).Return(false).Maybe()
348+
349+
addrStore := &bwmock.AddrStore{}
350+
addrStore.On("ChainParams").Return(&chaincfg.RegressionNetParams)
351+
addrStore.On("Address", mock.Anything, mock.Anything).
352+
Return(managedAddr, nil)
353+
addrStore.On("MarkUsed", mock.Anything, mock.Anything).Return(nil)
354+
addrStore.On("SetSyncedTo", mock.Anything, mock.Anything).Return(nil)
355+
store := NewStore(dbConn, txStore, addrStore)
356+
357+
txMsg := &wire.MsgTx{Version: 1}
358+
txMsg.AddTxIn(&wire.TxIn{PreviousOutPoint: wire.OutPoint{
359+
Hash: chainhash.Hash{65},
360+
}})
361+
txMsg.AddTxOut(&wire.TxOut{Value: 11_000, PkScript: script})
362+
363+
syncedBlocks := []db.Block{{
364+
Hash: chainhash.Hash{66},
365+
Height: 145,
366+
Timestamp: time.Unix(1710003300, 0),
367+
}, {
368+
Hash: chainhash.Hash{67},
369+
Height: 146,
370+
Timestamp: time.Unix(1710003400, 0),
371+
}}
372+
err := store.ApplyScanBatch(t.Context(), db.ScanBatchParams{
373+
WalletID: 0,
374+
Transactions: []db.CreateTxParams{{
375+
WalletID: 0,
376+
Tx: txMsg,
377+
Received: time.Unix(1710003350, 0),
378+
Status: db.TxStatusPublished,
379+
Credits: map[uint32]btcutil.Address{0: addr},
380+
}},
381+
SyncedBlocks: syncedBlocks,
382+
})
383+
require.NoError(t, err)
384+
385+
txid := txMsg.TxHash()
386+
err = walletdb.View(dbConn, func(tx walletdb.ReadTx) error {
387+
ns := tx.ReadBucket(wtxmgrNamespaceKey)
388+
require.NotNil(t, ns)
389+
390+
details, err := txStore.TxDetails(ns, &txid)
391+
require.NoError(t, err)
392+
require.NotNil(t, details)
393+
require.Len(t, details.Credits, 1)
394+
395+
return nil
396+
})
397+
require.NoError(t, err)
398+
addrStore.AssertCalled(t, "MarkUsed", mock.Anything, mock.Anything)
399+
addrStore.AssertCalled(
400+
t, "SetSyncedTo", mock.Anything,
401+
mock.MatchedBy(func(bs *waddrmgr.BlockStamp) bool {
402+
return bs.Height == int32(146)
403+
}),
404+
)
405+
}
406+
326407
// TestCreateTxCreditAddrMismatch verifies that crediting an output with an
327408
// address that the output script does not pay to is rejected, so a caller
328409
// cannot corrupt UTXO ownership by mislabeling a credit.
@@ -1169,3 +1250,111 @@ func newMultisigScript(t *testing.T) (btcutil.Address, []byte) {
11691250

11701251
return memberAddr, script
11711252
}
1253+
1254+
// TestApplyScanBatchHorizonRollbackSafety verifies that when a recovery scan
1255+
// batch fails after its horizon extension has already advanced the scoped
1256+
// manager's in-memory address state, the live manager does not keep reporting
1257+
// the unpersisted horizon advance once walletdb rolls the batch back.
1258+
func TestApplyScanBatchHorizonRollbackSafety(t *testing.T) {
1259+
t.Parallel()
1260+
1261+
dbConn, cleanup := newTestDB(t)
1262+
t.Cleanup(cleanup)
1263+
1264+
// newSpendableAddrMgr creates the addrmgr namespace and a real manager
1265+
// with the default account already present on every default scope.
1266+
addrStore := newSpendableAddrMgr(t, dbConn)
1267+
t.Cleanup(func() {
1268+
_ = addrStore.Lock()
1269+
addrStore.Close()
1270+
})
1271+
txStore := newTxStore(t, dbConn)
1272+
store := NewStore(dbConn, txStore, addrStore)
1273+
1274+
err := walletdb.View(dbConn, func(tx walletdb.ReadTx) error {
1275+
ns := tx.ReadBucket(waddrmgr.NamespaceKey)
1276+
1277+
return addrStore.Unlock(ns, testPrivPass)
1278+
})
1279+
require.NoError(t, err)
1280+
1281+
const account = waddrmgr.DefaultAccountNum
1282+
1283+
scope := waddrmgr.KeyScopeBIP0084
1284+
scopedMgr, err := addrStore.FetchScopedKeyManager(scope)
1285+
require.NoError(t, err)
1286+
1287+
// The fresh default account has not derived any external addresses yet.
1288+
require.Zero(t, externalKeyCount(t, dbConn, scopedMgr, account))
1289+
1290+
const horizonIndex = 9
1291+
1292+
horizon := db.ScanHorizon{
1293+
Scope: db.KeyScope(scope),
1294+
Account: account,
1295+
Branch: waddrmgr.ExternalBranch,
1296+
Index: horizonIndex,
1297+
}
1298+
1299+
// A synced block whose height overflows the legacy int32 height domain
1300+
// fails the synced-block step, which runs after the horizon extension
1301+
// has already mutated the manager's in-memory address state.
1302+
overflowBlock := db.Block{
1303+
Hash: chainhash.Hash{0xAB},
1304+
Height: uint32(math.MaxInt32) + 1,
1305+
Timestamp: time.Unix(1_700_000_000, 0),
1306+
}
1307+
1308+
err = store.ApplyScanBatch(t.Context(), db.ScanBatchParams{
1309+
WalletID: 0,
1310+
Horizons: []db.ScanHorizon{horizon},
1311+
SyncedBlocks: []db.Block{overflowBlock},
1312+
})
1313+
require.Error(t, err)
1314+
1315+
// The live manager must not observe the rolled-back horizon advance, and
1316+
// the persisted addrmgr bucket must not record any derived external
1317+
// addresses either.
1318+
require.Zero(t, externalKeyCount(t, dbConn, scopedMgr, account))
1319+
1320+
// As a positive control, the same horizon extension applied without the
1321+
// failing block must succeed and advance the external next index past the
1322+
// recovered child, proving the rollback path above did not simply skip the
1323+
// extension and that the invalidated cache reloads cleanly.
1324+
err = store.ApplyScanBatch(t.Context(), db.ScanBatchParams{
1325+
WalletID: 0,
1326+
Horizons: []db.ScanHorizon{horizon},
1327+
})
1328+
require.NoError(t, err)
1329+
require.Equal(
1330+
t, uint32(horizonIndex+1),
1331+
externalKeyCount(t, dbConn, scopedMgr, account),
1332+
)
1333+
}
1334+
1335+
// externalKeyCount reports the number of external keys the scoped manager
1336+
// considers derived for the account, read through the live manager so any
1337+
// unpersisted in-memory advance would be observable.
1338+
func externalKeyCount(t *testing.T, dbConn walletdb.DB,
1339+
scopedMgr waddrmgr.AccountStore, account uint32) uint32 {
1340+
1341+
t.Helper()
1342+
1343+
var count uint32
1344+
1345+
err := walletdb.View(dbConn, func(tx walletdb.ReadTx) error {
1346+
ns := tx.ReadBucket(waddrmgr.NamespaceKey)
1347+
1348+
props, err := scopedMgr.AccountProperties(ns, account)
1349+
if err != nil {
1350+
return err
1351+
}
1352+
1353+
count = props.ExternalKeyCount
1354+
1355+
return nil
1356+
})
1357+
require.NoError(t, err)
1358+
1359+
return count
1360+
}

0 commit comments

Comments
 (0)