Skip to content

contractcourt: batch startup reads and block epoch notifications #4697

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Nov 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 66 additions & 40 deletions contractcourt/briefcase.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ type ArbitratorLog interface {
// TODO(roasbeef): document on interface the errors expected to be
// returned

// CurrentState returns the current state of the ChannelArbitrator.
CurrentState() (ArbitratorState, error)
// CurrentState returns the current state of the ChannelArbitrator. It
// takes an optional database transaction, which will be used if it is
// non-nil, otherwise the lookup will be done in its own transaction.
CurrentState(tx kvdb.RTx) (ArbitratorState, error)

// CommitState persists, the current state of the chain attendant.
CommitState(ArbitratorState) error
Expand Down Expand Up @@ -96,8 +98,10 @@ type ArbitratorLog interface {
InsertConfirmedCommitSet(c *CommitSet) error

// FetchConfirmedCommitSet fetches the known confirmed active HTLC set
// from the database.
FetchConfirmedCommitSet() (*CommitSet, error)
// from the database. It takes an optional database transaction, which
// will be used if it is non-nil, otherwise the lookup will be done in
// its own transaction.
FetchConfirmedCommitSet(tx kvdb.RTx) (*CommitSet, error)

// FetchChainActions attempts to fetch the set of previously stored
// chain actions. We'll use this upon restart to properly advance our
Expand Down Expand Up @@ -412,34 +416,49 @@ func (b *boltArbitratorLog) writeResolver(contractBucket kvdb.RwBucket,
return contractBucket.Put(resKey, buf.Bytes())
}

// CurrentState returns the current state of the ChannelArbitrator.
// CurrentState returns the current state of the ChannelArbitrator. It takes an
// optional database transaction, which will be used if it is non-nil, otherwise
// the lookup will be done in its own transaction.
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) CurrentState() (ArbitratorState, error) {
var s ArbitratorState
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return errScopeBucketNoExist
}
func (b *boltArbitratorLog) CurrentState(tx kvdb.RTx) (ArbitratorState, error) {
var (
s ArbitratorState
err error
)

stateBytes := scopeBucket.Get(stateKey)
if stateBytes == nil {
return nil
}
if tx != nil {
s, err = b.currentState(tx)
} else {
err = kvdb.View(b.db, func(tx kvdb.RTx) error {
s, err = b.currentState(tx)
return err
}, func() {
s = 0
})
}

s = ArbitratorState(stateBytes[0])
return nil
}, func() {
s = 0
})
if err != nil && err != errScopeBucketNoExist {
return s, err
}

return s, nil
}

func (b *boltArbitratorLog) currentState(tx kvdb.RTx) (ArbitratorState, error) {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return 0, errScopeBucketNoExist
}

stateBytes := scopeBucket.Get(stateKey)
if stateBytes == nil {
return 0, nil
}

return ArbitratorState(stateBytes[0]), nil
}

// CommitState persists, the current state of the chain attendant.
//
// NOTE: Part of the ContractResolver interface.
Expand Down Expand Up @@ -851,29 +870,20 @@ func (b *boltArbitratorLog) InsertConfirmedCommitSet(c *CommitSet) error {
}

// FetchConfirmedCommitSet fetches the known confirmed active HTLC set from the
// database.
// database. It takes an optional database transaction, which will be used if it
// is non-nil, otherwise the lookup will be done in its own transaction.
//
// NOTE: Part of the ContractResolver interface.
func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) {
func (b *boltArbitratorLog) FetchConfirmedCommitSet(tx kvdb.RTx) (*CommitSet, error) {
if tx != nil {
return b.fetchConfirmedCommitSet(tx)
}

var c *CommitSet
err := kvdb.View(b.db, func(tx kvdb.RTx) error {
scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return errScopeBucketNoExist
}

commitSetBytes := scopeBucket.Get(commitSetKey)
if commitSetBytes == nil {
return errNoCommitSet
}

commitSet, err := decodeCommitSet(bytes.NewReader(commitSetBytes))
if err != nil {
return err
}

c = commitSet
return nil
var err error
c, err = b.fetchConfirmedCommitSet(tx)
return err
}, func() {
c = nil
})
Expand All @@ -884,6 +894,22 @@ func (b *boltArbitratorLog) FetchConfirmedCommitSet() (*CommitSet, error) {
return c, nil
}

func (b *boltArbitratorLog) fetchConfirmedCommitSet(tx kvdb.RTx) (*CommitSet,
error) {

scopeBucket := tx.ReadBucket(b.scopeKey[:])
if scopeBucket == nil {
return nil, errScopeBucketNoExist
}

commitSetBytes := scopeBucket.Get(commitSetKey)
if commitSetBytes == nil {
return nil, errNoCommitSet
}

return decodeCommitSet(bytes.NewReader(commitSetBytes))
}

// WipeHistory is to be called ONLY once *all* contracts have been fully
// resolved, and the channel closure if finalized. This method will delete all
// on-disk state within the persistent log.
Expand Down
12 changes: 6 additions & 6 deletions contractcourt/briefcase_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,7 +611,7 @@ func TestStateMutation(t *testing.T) {
defer cleanUp()

// The default state of an arbitrator should be StateDefault.
arbState, err := testLog.CurrentState()
arbState, err := testLog.CurrentState(nil)
if err != nil {
t.Fatalf("unable to read arb state: %v", err)
}
Expand All @@ -625,7 +625,7 @@ func TestStateMutation(t *testing.T) {
if err := testLog.CommitState(StateFullyResolved); err != nil {
t.Fatalf("unable to write state: %v", err)
}
arbState, err = testLog.CurrentState()
arbState, err = testLog.CurrentState(nil)
if err != nil {
t.Fatalf("unable to read arb state: %v", err)
}
Expand All @@ -643,7 +643,7 @@ func TestStateMutation(t *testing.T) {

// If we try to query for the state again, we should get the default
// state again.
arbState, err = testLog.CurrentState()
arbState, err = testLog.CurrentState(nil)
if err != nil {
t.Fatalf("unable to query current state: %v", err)
}
Expand Down Expand Up @@ -687,11 +687,11 @@ func TestScopeIsolation(t *testing.T) {

// Querying each log, the states should be the prior one we set, and be
// disjoint.
log1State, err := testLog1.CurrentState()
log1State, err := testLog1.CurrentState(nil)
if err != nil {
t.Fatalf("unable to read arb state: %v", err)
}
log2State, err := testLog2.CurrentState()
log2State, err := testLog2.CurrentState(nil)
if err != nil {
t.Fatalf("unable to read arb state: %v", err)
}
Expand Down Expand Up @@ -752,7 +752,7 @@ func TestCommitSetStorage(t *testing.T) {
t.Fatalf("unable to write commit set: %v", err)
}

diskCommitSet, err := testLog.FetchConfirmedCommitSet()
diskCommitSet, err := testLog.FetchConfirmedCommitSet(nil)
if err != nil {
t.Fatalf("unable to read commit set: %v", err)
}
Expand Down
Loading