Skip to content

Commit 9ea6450

Browse files
committed
go/worker/storage/committee: Fix stuck finalization
Max number of non-finalized versions is now exposed as part of the nodedb api.
1 parent 72c2cd1 commit 9ea6450

4 files changed

Lines changed: 29 additions & 28 deletions

File tree

.changelog/6239.bugfix.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
go/worker/storage/committee: Fix stuck storage finalization

go/storage/mkvs/db/api/api.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,14 @@ import (
1010
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/writelog"
1111
)
1212

13-
// ModuleName is the module name.
14-
const ModuleName = "storage/mkvs/db"
13+
const (
14+
// ModuleName is the module name.
15+
ModuleName = "storage/mkvs/db"
16+
17+
// MaxPendingVersions is the maximum number of allowed non-finalized versions.
18+
// Increasing this too much can result in the metadata growing too much.
19+
MaxPendingVersions = 5000
20+
)
1521

1622
var (
1723
// ErrNodeNotFound indicates that a node with the specified hash couldn't be found

go/storage/mkvs/db/pathbadger/metadata.go

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,6 @@ import (
1212
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api"
1313
)
1414

15-
// maxPendingVersions is the maximum number of allowed non-finalized versions. Increasing this too
16-
// much can result in the metadata growing too much.
17-
const maxPendingVersions = 5000
18-
1915
// serializedMetadata is the on-disk serialized metadata.
2016
type serializedMetadata struct {
2117
// Version is the database schema version.
@@ -123,7 +119,7 @@ func (m *metadata) reserveRootSeqNo(version uint64, rootType uint8) (uint16, err
123119
m.Lock()
124120
defer m.Unlock()
125121

126-
if len(m.value.NextPendingRootSeq) >= maxPendingVersions {
122+
if len(m.value.NextPendingRootSeq) > api.MaxPendingVersions {
127123
return math.MaxUint16, fmt.Errorf("mkvs/pathbadger: too many non-finalized versions")
128124
}
129125

@@ -146,7 +142,7 @@ func (m *metadata) setPendingRootSeqNo(version uint64, rootHash api.TypedHash, s
146142
m.Lock()
147143
defer m.Unlock()
148144

149-
if len(m.value.PendingRootSeqs) >= maxPendingVersions {
145+
if len(m.value.PendingRootSeqs) > api.MaxPendingVersions {
150146
return fmt.Errorf("mkvs/pathbadger: too many non-finalized versions")
151147
}
152148

go/worker/storage/committee/node.go

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/oasisprotocol/oasis-core/go/runtime/host"
2727
storageApi "github.com/oasisprotocol/oasis-core/go/storage/api"
2828
"github.com/oasisprotocol/oasis-core/go/storage/mkvs/checkpoint"
29+
dbApi "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api"
2930
mkvsDB "github.com/oasisprotocol/oasis-core/go/storage/mkvs/db/api"
3031
workerCommon "github.com/oasisprotocol/oasis-core/go/worker/common"
3132
"github.com/oasisprotocol/oasis-core/go/worker/common/committee"
@@ -1101,22 +1102,21 @@ func (n *Node) worker() { // nolint: gocyclo
11011102
pendingApply := &minRoundQueue{}
11021103
pendingFinalize := &minRoundQueue{}
11031104

1104-
// Main processing loop. When a new block comes in, its state and io roots are inspected and their
1105-
// writelogs fetched from peers in case we don't have them locally yet. Fetches are
1106-
// asynchronous and, once complete, trigger local Apply operations. These are serialized
1107-
// per round (all applies for a given round have to be complete before applying anything for following
1108-
// rounds) using the fetched diffs and pending finalization priority queue. Once a round has all its write
1109-
// logs applied, a Finalize for it is triggered, again serialized by round but otherwise asynchronous
1110-
// (pendingFinalization and cachedLastRound).
1105+
// Main processing loop. When a new block arrives, its state and I/O roots are inspected.
1106+
// If missing locally, diffs are fetched from peers, possibly for many rounds in parallel,
1107+
// including all missing rounds since the last fully applied one. Fetched diffs are then applied
1108+
// in round order, ensuring no gaps. Once a round has all its roots applied, background finalization
1109+
// for that round is triggered asynchronously, not blocking concurrent fetching and diff application.
11111110
mainLoop:
11121111
for {
1113-
// Drain the Apply and Finalize queues first, before waiting for new events in the select
1114-
// below. Applies are drained first, followed by finalizations (which are asynchronous
1115-
// but serialized, i.e. only one Finalize can be in progress at a time).
1116-
1117-
// Apply any writelogs that came in through fetchDiff, but only if they are for the round
1118-
// after the last fully applied one (lastFullyAppliedRound).
1119-
if len(*pendingApply) > 0 && lastFullyAppliedRound+1 == (*pendingApply)[0].GetRound() {
1112+
// Drain the Apply and Finalize queues first, before waiting for new events in the select below.
1113+
1114+
// Apply fetched writelogs, but only if they are for the round after the last fully applied one
1115+
// and current number of pending roots to be finalized is smaller than max allowed.
1116+
applyNext := pendingApply.Len() > 0 &&
1117+
lastFullyAppliedRound+1 == (*pendingApply)[0].GetRound() &&
1118+
pendingFinalize.Len() < dbApi.MaxPendingVersions-1 // -1 since one may be already finalizing.
1119+
if applyNext {
11201120
lastDiff := heap.Pop(pendingApply).(*fetchedDiff)
11211121
// Apply the write log if one exists.
11221122
err = nil
@@ -1172,15 +1172,13 @@ mainLoop:
11721172
continue
11731173
}
11741174

1175-
// Check if any new rounds were fully applied and need to be finalized. Only finalize
1176-
// if it's the round after the one that was finalized last (cachedLastRound).
1177-
// The finalization happens asynchronously with respect to this worker loop and any
1178-
// applies that happen for subsequent rounds (which can proceed while earlier rounds are
1179-
// still finalizing).
1175+
// Check if any new rounds were fully applied and need to be finalized.
1176+
// Only finalize if it's the round after the one that was finalized last.
1177+
// As a consequence at most one finalization can be happening at the time.
11801178
if len(*pendingFinalize) > 0 && cachedLastRound+1 == (*pendingFinalize)[0].GetRound() {
11811179
lastSummary := heap.Pop(pendingFinalize).(*blockSummary)
11821180
wg.Add(1)
1183-
go func() {
1181+
go func() { // Don't block fetching and applying remaining rounds.
11841182
defer wg.Done()
11851183
n.finalize(lastSummary)
11861184
}()

0 commit comments

Comments
 (0)