Skip to content

Commit a0e58b7

Browse files
committed
contractcourt: update startup log reads to take optional read tx
When we startup large nodes, each arbitrator needs a minimum of 2 db transactions to lookup its current state. This commit updates our arb log interface and implementation to allow passing in an optional read transaction. This will allow us to batch these reads in a single db tx in the following commit.
1 parent 158c9b5 commit a0e58b7

File tree

5 files changed

+94
-68
lines changed

5 files changed

+94
-68
lines changed

contractcourt/briefcase.go

+64-38
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,10 @@ type ArbitratorLog interface {
5454
// TODO(roasbeef): document on interface the errors expected to be
5555
// returned
5656

57-
// CurrentState returns the current state of the ChannelArbitrator.
58-
CurrentState() (ArbitratorState, error)
57+
// CurrentState returns the current state of the ChannelArbitrator. It
58+
// takes an optional database transaction, which will be used if it is
59+
// non-nil, otherwise the lookup will be done in its own transaction.
60+
CurrentState(tx kvdb.RTx) (ArbitratorState, error)
5961

6062
// CommitState persists, the current state of the chain attendant.
6163
CommitState(ArbitratorState) error
@@ -96,8 +98,10 @@ type ArbitratorLog interface {
9698
InsertConfirmedCommitSet(c *CommitSet) error
9799

98100
// FetchConfirmedCommitSet fetches the known confirmed active HTLC set
99-
// from the database.
100-
FetchConfirmedCommitSet() (*CommitSet, error)
101+
// from the database. It takes an optional database transaction, which
102+
// will be used if it is non-nil, otherwise the lookup will be done in
103+
// its own transaction.
104+
FetchConfirmedCommitSet(tx kvdb.RTx) (*CommitSet, error)
101105

102106
// FetchChainActions attempts to fetch the set of previously stored
103107
// chain actions. We'll use this upon restart to properly advance our
@@ -412,32 +416,47 @@ func (b *boltArbitratorLog) writeResolver(contractBucket kvdb.RwBucket,
412416
return contractBucket.Put(resKey, buf.Bytes())
413417
}
414418

415-
// CurrentState returns the current state of the ChannelArbitrator.
419+
// CurrentState returns the current state of the ChannelArbitrator. It takes an
420+
// optional database transaction, which will be used if it is non-nil, otherwise
421+
// the lookup will be done in its own transaction.
416422
//
417423
// NOTE: Part of the ContractResolver interface.
418-
func (b *boltArbitratorLog) CurrentState() (ArbitratorState, error) {
419-
var s ArbitratorState
420-
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
421-
scopeBucket := tx.ReadBucket(b.scopeKey[:])
422-
if scopeBucket == nil {
423-
return errScopeBucketNoExist
424-
}
424+
func (b *boltArbitratorLog) CurrentState(tx kvdb.RTx) (ArbitratorState, error) {
425+
var (
426+
s ArbitratorState
427+
err error
428+
)
425429

426-
stateBytes := scopeBucket.Get(stateKey)
427-
if stateBytes == nil {
428-
return nil
429-
}
430+
if tx != nil {
431+
s, err = b.currentState(tx)
432+
} else {
433+
err = kvdb.View(b.db, func(tx kvdb.RTx) error {
434+
s, err = b.currentState(tx)
435+
return err
436+
})
437+
}
430438

431-
s = ArbitratorState(stateBytes[0])
432-
return nil
433-
})
434439
if err != nil && err != errScopeBucketNoExist {
435440
return s, err
436441
}
437442

438443
return s, nil
439444
}
440445

446+
func (b *boltArbitratorLog) currentState(tx kvdb.RTx) (ArbitratorState, error) {
447+
scopeBucket := tx.ReadBucket(b.scopeKey[:])
448+
if scopeBucket == nil {
449+
return 0, errScopeBucketNoExist
450+
}
451+
452+
stateBytes := scopeBucket.Get(stateKey)
453+
if stateBytes == nil {
454+
return 0, nil
455+
}
456+
457+
return ArbitratorState(stateBytes[0]), nil
458+
}
459+
441460
// CommitState persists, the current state of the chain attendant.
442461
//
443462
// NOTE: Part of the ContractResolver interface.
@@ -843,29 +862,20 @@ func (b *boltArbitratorLog) InsertConfirmedCommitSet(c *CommitSet) error {
843862
}
844863

845864
// FetchConfirmedCommitSet fetches the known confirmed active HTLC set from the
846-
// database.
865+
// database. It takes an optional database transaction, which will be used if it
866+
// is non-nil, otherwise the lookup will be done in its own transaction.
847867
//
848868
// NOTE: Part of the ContractResolver interface.
849-
func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) {
869+
func (b *boltArbitratorLog) FetchConfirmedCommitSet(tx kvdb.RTx) (*CommitSet, error) {
870+
if tx != nil {
871+
return b.fetchConfirmedCommitSet(tx)
872+
}
873+
850874
var c *CommitSet
851875
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
852-
scopeBucket := tx.ReadBucket(b.scopeKey[:])
853-
if scopeBucket == nil {
854-
return errScopeBucketNoExist
855-
}
856-
857-
commitSetBytes := scopeBucket.Get(commitSetKey)
858-
if commitSetBytes == nil {
859-
return errNoCommitSet
860-
}
861-
862-
commitSet, err := decodeCommitSet(bytes.NewReader(commitSetBytes))
863-
if err != nil {
864-
return err
865-
}
866-
867-
c = commitSet
868-
return nil
876+
var err error
877+
c, err = b.fetchConfirmedCommitSet(tx)
878+
return err
869879
})
870880
if err != nil {
871881
return nil, err
@@ -874,6 +884,22 @@ func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) {
874884
return c, nil
875885
}
876886

887+
func (b *boltArbitratorLog) fetchConfirmedCommitSet(tx kvdb.RTx) (*CommitSet,
888+
error) {
889+
890+
scopeBucket := tx.ReadBucket(b.scopeKey[:])
891+
if scopeBucket == nil {
892+
return nil, errScopeBucketNoExist
893+
}
894+
895+
commitSetBytes := scopeBucket.Get(commitSetKey)
896+
if commitSetBytes == nil {
897+
return nil, errNoCommitSet
898+
}
899+
900+
return decodeCommitSet(bytes.NewReader(commitSetBytes))
901+
}
902+
877903
// WipeHistory is to be called ONLY once *all* contracts have been fully
878904
// resolved, and the channel closure if finalized. This method will delete all
879905
// on-disk state within the persistent log.

contractcourt/briefcase_test.go

+6-6
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ func TestStateMutation(t *testing.T) {
611611
defer cleanUp()
612612

613613
// The default state of an arbitrator should be StateDefault.
614-
arbState, err := testLog.CurrentState()
614+
arbState, err := testLog.CurrentState(nil)
615615
if err != nil {
616616
t.Fatalf("unable to read arb state: %v", err)
617617
}
@@ -625,7 +625,7 @@ func TestStateMutation(t *testing.T) {
625625
if err := testLog.CommitState(StateFullyResolved); err != nil {
626626
t.Fatalf("unable to write state: %v", err)
627627
}
628-
arbState, err = testLog.CurrentState()
628+
arbState, err = testLog.CurrentState(nil)
629629
if err != nil {
630630
t.Fatalf("unable to read arb state: %v", err)
631631
}
@@ -643,7 +643,7 @@ func TestStateMutation(t *testing.T) {
643643

644644
// If we try to query for the state again, we should get the default
645645
// state again.
646-
arbState, err = testLog.CurrentState()
646+
arbState, err = testLog.CurrentState(nil)
647647
if err != nil {
648648
t.Fatalf("unable to query current state: %v", err)
649649
}
@@ -687,11 +687,11 @@ func TestScopeIsolation(t *testing.T) {
687687

688688
// Querying each log, the states should be the prior one we set, and be
689689
// disjoint.
690-
log1State, err := testLog1.CurrentState()
690+
log1State, err := testLog1.CurrentState(nil)
691691
if err != nil {
692692
t.Fatalf("unable to read arb state: %v", err)
693693
}
694-
log2State, err := testLog2.CurrentState()
694+
log2State, err := testLog2.CurrentState(nil)
695695
if err != nil {
696696
t.Fatalf("unable to read arb state: %v", err)
697697
}
@@ -752,7 +752,7 @@ func TestCommitSetStorage(t *testing.T) {
752752
t.Fatalf("unable to write commit set: %v", err)
753753
}
754754

755-
diskCommitSet, err := testLog.FetchConfirmedCommitSet()
755+
diskCommitSet, err := testLog.FetchConfirmedCommitSet(nil)
756756
if err != nil {
757757
t.Fatalf("unable to read commit set: %v", err)
758758
}

contractcourt/chain_arbitrator.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -616,7 +616,7 @@ func (c *ChainArbitrator) Start() error {
616616
// Launch all the goroutines for each arbitrator so they can carry out
617617
// their duties.
618618
for _, arbitrator := range c.activeChannels {
619-
if err := arbitrator.Start(); err != nil {
619+
if err := arbitrator.Start(nil); err != nil {
620620
c.Stop()
621621
return err
622622
}
@@ -1036,7 +1036,7 @@ func (c *ChainArbitrator) WatchNewChannel(newChan *channeldb.OpenChannel) error
10361036
// arbitrators, then launch it.
10371037
c.activeChannels[chanPoint] = channelArb
10381038

1039-
if err := channelArb.Start(); err != nil {
1039+
if err := channelArb.Start(nil); err != nil {
10401040
return err
10411041
}
10421042

contractcourt/channel_arbitrator.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,7 @@ func NewChannelArbitrator(cfg ChannelArbitratorConfig,
376376
}
377377

378378
// Start starts all the goroutines that the ChannelArbitrator needs to operate.
379-
func (c *ChannelArbitrator) Start() error {
379+
func (c *ChannelArbitrator) Start(tx kvdb.RTx) error {
380380
if !atomic.CompareAndSwapInt32(&c.started, 0, 1) {
381381
return nil
382382
}
@@ -394,7 +394,7 @@ func (c *ChannelArbitrator) Start() error {
394394

395395
// First, we'll read our last state from disk, so our internal state
396396
// machine can act accordingly.
397-
c.state, err = c.log.CurrentState()
397+
c.state, err = c.log.CurrentState(tx)
398398
if err != nil {
399399
return err
400400
}
@@ -450,7 +450,7 @@ func (c *ChannelArbitrator) Start() error {
450450
// older nodes, this won't be found at all, and will rely on the
451451
// existing written chain actions. Additionally, if this channel hasn't
452452
// logged any actions in the log, then this field won't be present.
453-
commitSet, err := c.log.FetchConfirmedCommitSet()
453+
commitSet, err := c.log.FetchConfirmedCommitSet(tx)
454454
if err != nil && err != errNoCommitSet && err != errScopeBucketNoExist {
455455
return err
456456
}

0 commit comments

Comments
 (0)