Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions docs/nodecore/05-upstream-config.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,28 @@ It brings together:

Together, these settings let you (1) register providers, (2) tune resiliency and polling, (3) define how nodecore scores and selects the best upstream at runtime, and (4) apply rate limiting to control request throughput.

## Chains without a finalized block tag

Some EVM-compatible networks do not expose a finalized block (Hyperledger Besu QBFT, classic Clique / PoA networks, consortium chains). Calling `eth_getBlockByNumber("finalized", ...)` against those networks returns `-39001: Unknown block`. NodeCore's upstream lifecycle normally promotes upstreams to `Available` only after both head and finalized bounds are known, so on a no-finality chain upstreams never become available and the router responds with `no available upstreams to process a request` to every call beyond `eth_chainId`.

Set `no-finality: true` in the chain's `settings` block to opt out of finalized-block expectations. NodeCore then skips the finalized-block poll loop entirely (no error logs, no wasted RPC calls) and piggybacks a `StateUpstreamEvent` on the first head publication so the chain supervisor learns the upstream exists from head data alone.

```yaml
chain-settings:
protocols:
- type: eth
chains:
- id: MyBesu
short-names: [my-besu]
chain-id: "0xbb1a"
grpcId: 60001
settings:
expected-block-time: 2s
no-finality: true
```

The flag defaults to `false`; existing chains are unaffected. Cache policies that key on `finalization-type: finalized` do not cache on no-finality chains because no finalized event ever fires; configure a TTL-based cache policy if you need caching for these chains.

## integrity

```yaml
Expand Down
14 changes: 13 additions & 1 deletion internal/upstreams/blocks/block_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ type EthLikeBlockProcessor struct {
blocks map[protocol.BlockType]protocol.Block
lifecycle *utils.BaseLifecycle
internalTimeout time.Duration
// noFinality is set when the configured chain has no finalized block tag
// (e.g. Besu QBFT). When true, the finalized-block poll loop is skipped
// entirely: the upstream still tracks head, but never tries to call
// eth_getBlockByNumber(finalized) which would log "Unknown block" forever.
noFinality bool
}

func (b *EthLikeBlockProcessor) Running() bool {
Expand All @@ -65,6 +70,7 @@ func NewEthLikeBlockProcessor(
upConfig *config.Upstream,
connector connectors.ApiConnector,
chainSpecific specific.ChainSpecific,
noFinality bool,
) *EthLikeBlockProcessor {
name := fmt.Sprintf("%s_block_processor", upConfig.Id)
return &EthLikeBlockProcessor{
Expand All @@ -77,6 +83,7 @@ func NewEthLikeBlockProcessor(
blocks: make(map[protocol.BlockType]protocol.Block),
lifecycle: utils.NewBaseLifecycle(name, ctx),
internalTimeout: upConfig.Options.InternalTimeout,
noFinality: noFinality,
}
}

Expand All @@ -95,7 +102,9 @@ func (b *EthLikeBlockProcessor) DisabledBlocks() mapset.Set[protocol.BlockType]
func (b *EthLikeBlockProcessor) Start() {
b.lifecycle.Start(func(ctx context.Context) error {
go func() {
b.poll(protocol.FinalizedBlock)
if !b.noFinality {
b.poll(protocol.FinalizedBlock)
}
for {
select {
case <-ctx.Done():
Expand All @@ -106,6 +115,9 @@ func (b *EthLikeBlockProcessor) Start() {
b.subManager.Publish(*event)
}
case <-time.After(b.upConfig.PollInterval):
if b.noFinality {
continue
}
b.poll(protocol.FinalizedBlock)
}
}
Expand Down
24 changes: 22 additions & 2 deletions internal/upstreams/blocks/block_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestEthLikeBlockProcessorGetFinalizedBlock(t *testing.T) {

connector.On("SendRequest", mock.Anything, mock.Anything).Return(response)

processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, test_utils.NewEvmChainSpecific(connector))
processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, test_utils.NewEvmChainSpecific(connector), false)
go processor.Start()

sub := processor.Subscribe("sub")
Expand Down Expand Up @@ -83,7 +83,7 @@ func TestEthLikeBlockProcessorDisableFinalizedBlock(t *testing.T) {

connector.On("SendRequest", mock.Anything, mock.Anything).Return(response)

processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, test_utils.NewEvmChainSpecific(connector))
processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, test_utils.NewEvmChainSpecific(connector), false)
go processor.Start()

sub := processor.Subscribe("sub")
Expand All @@ -97,3 +97,23 @@ func TestEthLikeBlockProcessorDisableFinalizedBlock(t *testing.T) {
assert.False(t, ok)
assert.True(t, processor.DisabledBlocks().Contains(protocol.FinalizedBlock))
}

func TestEthLikeBlockProcessorSkipsFinalizedPollWhenNoFinality(t *testing.T) {
upConfig := &config.Upstream{Id: "1", PollInterval: 50 * time.Millisecond, Options: &chains.Options{InternalTimeout: 5 * time.Second}}
ctx := context.Background()
connector := mocks.NewConnectorMock()

processor := blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, test_utils.NewEvmChainSpecific(connector), true)
go processor.Start()

sub := processor.Subscribe("sub")
go func() {
time.Sleep(150 * time.Millisecond)
sub.Unsubscribe()
}()
_, ok := <-sub.Events

connector.AssertNotCalled(t, "SendRequest", mock.Anything, mock.Anything)
assert.False(t, ok)
assert.True(t, processor.DisabledBlocks().IsEmpty())
}
17 changes: 17 additions & 0 deletions internal/upstreams/upstream_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,23 @@ import (

mapset "github.com/deckarep/golang-set/v2"
"github.com/drpcorg/nodecore/internal/protocol"
"github.com/drpcorg/nodecore/pkg/chains"
"github.com/rs/zerolog/log"
)

// update upstream state through one pipeline
func (u *BaseUpstream) processStateEvents(ctx context.Context, initialValid bool) {
bannedMethods := mapset.NewThreadUnsafeSet[string]()
validUpstream := initialValid
// noFinality chains never publish a BlockUpstreamStateEvent for the
// finalized block (there isn't one) and the health validator emits
// StatusUpstreamStateEvent{Status: Available}, which deduplicates against
// the initial state. Without anything publishing a StateUpstreamEvent the
// chain supervisor never registers the upstream and the router cannot
// route requests. Force a one-shot StateUpstreamEvent piggyback on the
// first head publication so the supervisor learns the upstream exists.
noFinality := chains.IsNoFinalityChain(u.chain)
stateBroadcast := false
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -59,6 +69,10 @@ func (u *BaseUpstream) processStateEvents(ctx context.Context, initialValid bool
case *protocol.HeadUpstreamStateEvent:
state = stateEvent.ProcessEvent(state)
eventType = &protocol.HeadUpstreamEvent{Status: state.Status, Head: state.HeadData}
if noFinality && !stateBroadcast && validUpstream {
u.publishUpstreamEvent(state, &protocol.StateUpstreamEvent{State: &state})
stateBroadcast = true
}
default:
if stateEvent.Same(state) {
continue
Expand All @@ -68,6 +82,9 @@ func (u *BaseUpstream) processStateEvents(ctx context.Context, initialValid bool

if validUpstream {
u.publishUpstreamEvent(state, eventType)
if _, isState := eventType.(*protocol.StateUpstreamEvent); isState {
stateBroadcast = true
}
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions internal/upstreams/upstream_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,11 @@ func createBlockProcessor(
upConfig *config.Upstream,
connector connectors.ApiConnector,
chainSpecific specific.ChainSpecific,
blockchainType chains.BlockchainType,
configuredChain *chains.ConfiguredChain,
) blocks.BlockProcessor {
switch blockchainType {
switch configuredChain.Type {
case chains.Ethereum:
return blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, chainSpecific)
return blocks.NewEthLikeBlockProcessor(ctx, upConfig, connector, chainSpecific, configuredChain.Settings.NoFinality)
default:
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/upstreams/upstream_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func CreateBlockEventProcessor(
chainSpecific specific.ChainSpecific,
configuredChain *chains.ConfiguredChain,
) event_processors.UpstreamStateEventProcessor {
blockProcessor := createBlockProcessor(ctx, conf, requestConnector, chainSpecific, configuredChain.Type)
blockProcessor := createBlockProcessor(ctx, conf, requestConnector, chainSpecific, configuredChain)
eventProcessor := event_processors.NewBaseBlockEventProcessor(ctx, conf.Id, configuredChain.Chain, blockProcessor)
if eventProcessor == nil {
return nil
Expand Down
18 changes: 18 additions & 0 deletions pkg/chains/chains.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ type Settings struct {
MethodSpec string `yaml:"method-spec"`
Lags LagConfig `yaml:"lags"`
Options *Options `yaml:"options"`
// NoFinality marks chains whose consensus protocol does not expose a
// finalized block tag (e.g. Hyperledger Besu QBFT, classic PoA networks).
// When true, the upstream pipeline skips finalized-block polling and the
// chain supervisor promotes upstreams to Available using only head
// information. Defaults to false; opt-in per chain.
NoFinality bool `yaml:"no-finality"`
}

type ConfiguredChain struct {
Expand Down Expand Up @@ -145,6 +151,18 @@ func GetMethodSpecNameByChain(chain Chain) string {
return configuredChain.MethodSpec
}

// IsNoFinalityChain reports whether the given chain is marked as having no
// finalized block tag in its chain-settings (e.g. Besu QBFT, classic PoA).
// Callers in the upstream lifecycle use this to skip finalized-block polling
// and to promote upstreams to Available using only head information.
func IsNoFinalityChain(chain Chain) bool {
configuredChain := GetChain(chain.String())
if configuredChain == UnknownChain {
return false
}
return configuredChain.Settings.NoFinality
}

func GetMethodSpecNameByChainName(chainName string) string {
return GetChain(chainName).MethodSpec
}
Expand Down
21 changes: 21 additions & 0 deletions pkg/chains/chains_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,24 @@ func TestGetChainByGrpcIdUnknown(t *testing.T) {
unknown := GetChainByGrpcId(-1)
assert.Equal(t, UnknownChain, unknown)
}

func TestIsNoFinalityChain_DefaultsFalseForKnownChain(t *testing.T) {
ethereum := GetChain("ethereum")
assert.NotEqual(t, UnknownChain, ethereum)
assert.False(t, IsNoFinalityChain(ethereum.Chain))
}

func TestIsNoFinalityChain_UnknownChainReturnsFalse(t *testing.T) {
assert.False(t, IsNoFinalityChain(UnknownChain.Chain))
}

func TestIsNoFinalityChain_TrueWhenSettingsFlagSet(t *testing.T) {
ethereum := GetChain("ethereum")
assert.NotEqual(t, UnknownChain, ethereum)

original := ethereum.Settings.NoFinality
ethereum.Settings.NoFinality = true
t.Cleanup(func() { ethereum.Settings.NoFinality = original })

assert.True(t, IsNoFinalityChain(ethereum.Chain))
}