Skip to content

Commit d789650

Browse files
authored
node: stream reconciliation considers miniblock candidates (#4572)
# Fix stream reconciliation to handle stuck miniblock candidates ## Problem Stream reconciliation could fail in edge cases where miniblock candidates were registered on-chain but never promoted locally. This occurred in two scenarios: 1. Non-replicated streams (replication factor = 1): When a node sent the miniblock candidate registration transaction but missed the stream updated event, the candidate remained unpromoted. With no remotes to reconcile from, the stream became permanently stuck. 2. All replicas missing events: When all replicas of a stream missed the stream update event, none promoted their candidates. Even though remotes existed, reconciliation would fetch data unnecessarily instead of using the locally available candidate. If none of the replicas promoted the candidate the stream became permanently stuck. ## Solution Added tryPromoteLocalCandidate() helper method that: - Attempts to read and promote local miniblock candidates - Properly distinguishes between NOT_FOUND (expected) and critical errors (database failures) - Returns clear success/failure states via (bool, error) tuple - Prevents unnecessary network operations when local data is available Integrated candidate promotion checks at two key points: 1. Before failing with no remotes (stream_reconciler.go:179-196) - Tries to promote local candidate before giving up 2. When 1 miniblock behind (stream_reconciler.go:218-232) - Opportunistically promotes candidate before fetching from remotes ## Changes - stream_reconciler.go: - Added tryPromoteLocalCandidate() method with proper error handling - Updated two reconciliation paths to check for local candidates - Fixed silent error handling bug (was using _ to ignore all errors) - Improved error messages and comments - stream_reconciler_test.go: - Added TestReconciler_NoRemotesNoCandidateFails to verify error handling - All existing tests continue to pass (15 total)
1 parent 0f1de29 commit d789650

File tree

2 files changed

+111
-1
lines changed

2 files changed

+111
-1
lines changed

core/node/events/stream_reconciler.go

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import (
88

99
"github.com/ethereum/go-ethereum/common"
1010

11+
"github.com/towns-protocol/towns/core/node/shared"
12+
1113
"github.com/towns-protocol/towns/core/contracts/river"
1214
. "github.com/towns-protocol/towns/core/node/base"
1315
. "github.com/towns-protocol/towns/core/node/protocol"
@@ -175,7 +177,23 @@ func (sr *streamReconciler) reconcile() error {
175177
sr.stream.mu.RUnlock()
176178

177179
if len(remotes) == 0 {
178-
return RiverError(Err_UNAVAILABLE, "Stream has no remotes", "stream", sr.stream.streamId)
180+
// For non-replicated streams it is possible that the node missed the stream updated event
181+
// and never promoted the candidate in its DB and can't reconcile the stream from other nodes.
182+
// Try to promote the local candidate to finish reconciliation.
183+
lastMBHash := sr.streamRecord.LastMbHash()
184+
lastMbNum := sr.streamRecord.LastMbNum()
185+
186+
promoted, err := sr.tryPromoteLocalCandidate(lastMbNum, lastMBHash)
187+
if err != nil {
188+
return err
189+
}
190+
if promoted {
191+
return nil
192+
}
193+
194+
// Stream stuck: can't reconcile from other nodes nor make the local stream
195+
// in line with the stream registry.
196+
return RiverError(Err_UNAVAILABLE, "Stream stuck, no remotes", "stream", sr.stream.streamId)
179197
}
180198

181199
sr.remotes = newRemoteTracker(remote, remotes)
@@ -197,6 +215,22 @@ func (sr *streamReconciler) reconcile() error {
197215
return err
198216
}
199217

218+
// If stream is 1 miniblock behind the stream canonical chain, and has the candidate available,
219+
// promote it instead of asking remotes. Otherwise it is possible that all replicas missed the
220+
// stream update event and none of them promoted the candidate, causing the stream to get stuck.
221+
if sr.localLastMbInclusive+1 == sr.streamRecord.LastMbNum() {
222+
lastMBHash := sr.streamRecord.LastMbHash()
223+
lastMbNum := sr.streamRecord.LastMbNum()
224+
225+
promoted, err := sr.tryPromoteLocalCandidate(lastMbNum, lastMBHash)
226+
if err != nil {
227+
return err
228+
}
229+
if promoted {
230+
return nil
231+
}
232+
}
233+
200234
if sr.expectedLastMbInclusive <= sr.localLastMbInclusive {
201235
// Stream is up to date with the expected last miniblock, but it's possible that there are gaps in the middle.
202236
if enableBackwardReconciliation {
@@ -234,6 +268,36 @@ func (sr *streamReconciler) reconcile() error {
234268
return sr.backfillGaps()
235269
}
236270

271+
// tryPromoteLocalCandidate attempts to promote a local miniblock candidate if one exists.
272+
// Returns (true, nil) if a candidate was successfully promoted.
273+
// Returns (false, nil) if no candidate exists (NOT_FOUND error).
274+
// Returns (false, err) for any other error (database errors, promotion failures, etc.).
275+
func (sr *streamReconciler) tryPromoteLocalCandidate(mbNum int64, mbHash common.Hash) (bool, error) {
276+
candidate, err := sr.cache.params.Storage.ReadMiniblockCandidate(
277+
sr.ctx, sr.stream.StreamId(), mbHash, mbNum)
278+
if err != nil {
279+
if IsRiverErrorCode(err, Err_NOT_FOUND) {
280+
// No candidate available - this is expected in many cases
281+
return false, nil
282+
}
283+
// Database or other error - propagate it
284+
return false, err
285+
}
286+
287+
if candidate == nil {
288+
// Shouldn't happen if err is nil, but handle defensively
289+
return false, nil
290+
}
291+
292+
// Promote the candidate
293+
err = sr.stream.promoteCandidate(sr.ctx, &shared.MiniblockRef{Hash: mbHash, Num: mbNum})
294+
if err != nil {
295+
return false, err
296+
}
297+
298+
return true, nil
299+
}
300+
237301
// reconcileFromRegistryGenesisBlock attempts to load the genesis miniblock from the stream registry
238302
// and import it locally. Used when the stream registry indicates miniblock 0 and peers may not have
239303
// the genesis due to original node leaving the network.

core/node/events/stream_reconciler_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -946,3 +946,49 @@ func TestReconciler_ReconcileAndTrimEndToEnd(t *testing.T) {
946946
}
947947
assert.Equal(t, expectedSeqs, seqs)
948948
}
949+
950+
// TestReconciler_NoRemotesNoCandidateFails verifies that when a non-replicated stream
951+
// has no remotes and no local candidate, reconciliation fails gracefully.
952+
// This test ensures the error handling path in tryPromoteLocalCandidate works correctly.
953+
func TestReconciler_NoRemotesNoCandidateFails(t *testing.T) {
954+
cfg := config.GetDefaultConfig()
955+
cfg.StreamReconciliation.InitialWorkerPoolSize = 0
956+
cfg.StreamReconciliation.OnlineWorkerPoolSize = 0
957+
958+
ctx, tc := makeCacheTestContext(
959+
t,
960+
testParams{
961+
config: cfg,
962+
replFactor: 1, // Non-replicated stream
963+
numInstances: 1,
964+
disableStreamCacheCallbacks: true,
965+
recencyConstraintsGenerations: 5,
966+
backwardsReconciliationThreshold: ptrUint64(20),
967+
},
968+
)
969+
require := tc.require
970+
971+
tc.initCache(0, &MiniblockProducerOpts{TestDisableMbProdcutionOnBlock: true})
972+
streamId, _, _ := tc.allocateStream()
973+
974+
inst := tc.instances[0]
975+
976+
// Get stream record
977+
blockNum, err := inst.cache.params.Registry.Blockchain.GetBlockNumber(ctx)
978+
require.NoError(err)
979+
recordNoId, err := inst.cache.params.Registry.StreamRegistry.GetStreamOnBlock(ctx, streamId, blockNum)
980+
require.NoError(err)
981+
record := river.NewStreamWithId(streamId, recordNoId)
982+
983+
stream, err := inst.cache.getStreamImpl(ctx, streamId, true)
984+
require.NoError(err)
985+
986+
// Reconciliation should fail - this is the same scenario as TestReconciler_NoRemotes
987+
// but explicitly tests the new tryPromoteLocalCandidate code path
988+
reconciler := newStreamReconciler(inst.cache, stream, record)
989+
err = reconciler.reconcile()
990+
require.Error(err)
991+
require.True(IsRiverErrorCode(err, Err_UNAVAILABLE))
992+
993+
testfmt.Logf(t, "Correctly failed reconciliation with no remotes and no candidate")
994+
}

0 commit comments

Comments
 (0)