Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ packages:
filename: external/hstore.go
github.com/evstack/ev-node/block/internal/syncing:
interfaces:
daRetriever:
DaRetrieverI:
config:
dir: ./block/internal/syncing
pkgname: syncing
Expand All @@ -65,8 +65,3 @@ packages:
dir: ./block/internal/common
pkgname: common
filename: broadcaster_mock.go
p2pHandler:
config:
dir: ./block/internal/syncing
pkgname: syncing
filename: syncer_mock.go
3 changes: 3 additions & 0 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,13 @@ func NewAggregatorComponents(
// error channel for critical failures
errorCh := make(chan error, 1)

daRetriever := syncing.NewDARetriever(da, cacheManager, config, genesis, logger)

executor, err := executing.NewExecutor(
store,
exec,
sequencer,
daRetriever,
signer,
cacheManager,
metrics,
Expand Down
7 changes: 7 additions & 0 deletions block/internal/common/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,10 @@ type DAHeightEvent struct {
// Source indicates where this event originated from (DA or P2P)
Source EventSource
}

// ForcedIncluded represents a forced inclusion event for caching
type ForcedIncludedEvent struct {
Txs [][]byte
StartDaHeight uint64
EndDaHeight uint64
}
29 changes: 25 additions & 4 deletions block/internal/executing/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/block/internal/syncing"
coreexecutor "github.com/evstack/ev-node/core/execution"
coresequencer "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/config"
Expand All @@ -27,10 +28,11 @@ import (
// Executor handles block production, transaction processing, and state management
type Executor struct {
// Core components
store store.Store
exec coreexecutor.Executor
sequencer coresequencer.Sequencer
signer signer.Signer
store store.Store
exec coreexecutor.Executor
sequencer coresequencer.Sequencer
signer signer.Signer
daRetriever syncing.DaRetrieverI

// Shared components
cache cache.Manager
Expand Down Expand Up @@ -71,6 +73,7 @@ func NewExecutor(
store store.Store,
exec coreexecutor.Executor,
sequencer coresequencer.Sequencer,
daRetriever syncing.DaRetrieverI,
signer signer.Signer,
cache cache.Manager,
metrics *common.Metrics,
Expand Down Expand Up @@ -99,6 +102,7 @@ func NewExecutor(
store: store,
exec: exec,
sequencer: sequencer,
daRetriever: daRetriever,
signer: signer,
cache: cache,
metrics: metrics,
Expand Down Expand Up @@ -330,6 +334,12 @@ func (e *Executor) produceBlock() error {
}
}

// fetch forced included txs
forcedIncludedTxsEvent, err := e.daRetriever.RetrieveForcedIncludedTxsFromDA(e.ctx, currentState.DAHeight)
if err != nil && !errors.Is(err, syncing.ErrForceInclusionNotConfigured) {
e.logger.Error().Err(err).Msg("failed to retrieve forced included txs")
}

var (
header *types.SignedHeader
data *types.Data
Expand All @@ -356,6 +366,12 @@ func (e *Executor) produceBlock() error {
return fmt.Errorf("failed to retrieve batch: %w", err)
}

// append forced included txs to batch data
// TODO(@julienrbrt): if the batch is at size, adding more txs isn't what we want.
// maybe we need to add a limit to retrieveBatch based on the forced included txs size.
// for the poc this is ok as is.
batchData.Transactions = append(batchData.Transactions, forcedIncludedTxsEvent.Txs...)

header, data, err = e.createBlock(e.ctx, newHeight, batchData)
if err != nil {
return fmt.Errorf("failed to create block: %w", err)
Expand All @@ -379,6 +395,11 @@ func (e *Executor) produceBlock() error {
return fmt.Errorf("failed to apply block: %w", err)
}

// update da height, based on last retrieved.
if forcedIncludedTxsEvent.EndDaHeight > newState.DAHeight {
newState.DAHeight = forcedIncludedTxsEvent.EndDaHeight
}

// signing the header is done after applying the block
// as for signing, the state of the block may be required by the signature payload provider.
signature, err := e.signHeader(header.Header)
Expand Down
3 changes: 3 additions & 0 deletions block/internal/executing/executor_lazy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/block/internal/syncing"
coreseq "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/config"
"github.com/evstack/ev-node/pkg/genesis"
Expand Down Expand Up @@ -56,6 +57,7 @@ func TestLazyMode_ProduceBlockLogic(t *testing.T) {
memStore,
mockExec,
mockSeq,
syncing.NewMockDaRetrieverI(t),
signerWrapper,
cacheManager,
metrics,
Expand Down Expand Up @@ -166,6 +168,7 @@ func TestRegularMode_ProduceBlockLogic(t *testing.T) {
memStore,
mockExec,
mockSeq,
syncing.NewMockDaRetrieverI(t),
signerWrapper,
cacheManager,
metrics,
Expand Down
3 changes: 3 additions & 0 deletions block/internal/executing/executor_logic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/block/internal/syncing"
coreseq "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/config"
"github.com/evstack/ev-node/pkg/genesis"
Expand Down Expand Up @@ -78,6 +79,7 @@ func TestProduceBlock_EmptyBatch_SetsEmptyDataHash(t *testing.T) {
memStore,
mockExec,
mockSeq,
syncing.NewMockDaRetrieverI(t),
signerWrapper,
cacheManager,
metrics,
Expand Down Expand Up @@ -165,6 +167,7 @@ func TestPendingLimit_SkipsProduction(t *testing.T) {
memStore,
mockExec,
mockSeq,
syncing.NewMockDaRetrieverI(t),
signerWrapper,
cacheManager,
metrics,
Expand Down
5 changes: 5 additions & 0 deletions block/internal/executing/executor_restart_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/block/internal/syncing"
coreseq "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/config"
"github.com/evstack/ev-node/pkg/genesis"
Expand Down Expand Up @@ -56,6 +57,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
memStore,
mockExec1,
mockSeq1,
syncing.NewMockDaRetrieverI(t),
signerWrapper,
cacheManager,
metrics,
Expand Down Expand Up @@ -175,6 +177,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
memStore, // same store
mockExec2,
mockSeq2,
syncing.NewMockDaRetrieverI(t),
signerWrapper,
cacheManager,
metrics,
Expand Down Expand Up @@ -273,6 +276,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) {
memStore,
mockExec1,
mockSeq1,
syncing.NewMockDaRetrieverI(t),
signerWrapper,
cacheManager,
metrics,
Expand Down Expand Up @@ -325,6 +329,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) {
memStore,
mockExec2,
mockSeq2,
syncing.NewMockDaRetrieverI(t),
signerWrapper,
cacheManager,
metrics,
Expand Down
11 changes: 7 additions & 4 deletions block/internal/executing/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/common"
"github.com/evstack/ev-node/block/internal/syncing"
"github.com/evstack/ev-node/pkg/config"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/store"
Expand Down Expand Up @@ -46,8 +47,9 @@ func TestExecutor_BroadcasterIntegration(t *testing.T) {
// Create executor with broadcasters
executor, err := NewExecutor(
memStore,
nil, // nil executor (we're not testing execution)
nil, // nil sequencer (we're not testing sequencing)
nil, // nil executor (we're not testing execution)
nil, // nil sequencer (we're not testing sequencing)
syncing.NewMockDaRetrieverI(t),
testSigner, // test signer (required for executor)
cacheManager,
metrics,
Expand Down Expand Up @@ -96,8 +98,9 @@ func TestExecutor_NilBroadcasters(t *testing.T) {
// Create executor with nil broadcasters (light node scenario)
executor, err := NewExecutor(
memStore,
nil, // nil executor
nil, // nil sequencer
nil, // nil executor
nil, // nil sequencer
syncing.NewMockDaRetrieverI(t),
testSigner, // test signer (required for executor)
cacheManager,
metrics,
Expand Down
1 change: 1 addition & 0 deletions block/internal/reaping/reaper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func newTestExecutor(t *testing.T) *executing.Executor {
nil, // store (unused)
nil, // core executor (unused)
nil, // sequencer (unused)
nil, // daretriever (unused)
s, // signer (required)
nil, // cache (unused)
nil, // metrics (unused)
Expand Down
72 changes: 62 additions & 10 deletions block/internal/syncing/da_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,12 @@ type DARetriever struct {
logger zerolog.Logger

// calculate namespaces bytes once and reuse them
namespaceBz []byte
namespaceDataBz []byte
namespaceBz []byte
namespaceDataBz []byte
namespaceForcedInclusionBz []byte

hasForcedInclusionNs bool
daEpochSize uint64

// transient cache, only full event need to be passed to the syncer
// on restart, will be refetch as da height is updated by syncer
Expand All @@ -47,15 +51,26 @@ func NewDARetriever(
genesis genesis.Genesis,
logger zerolog.Logger,
) *DARetriever {
forcedInclusionNs := config.DA.GetForcedInclusionNamespace()
hasForcedInclusionNs := forcedInclusionNs != ""

var namespaceForcedInclusionBz []byte
if hasForcedInclusionNs {
namespaceForcedInclusionBz = coreda.NamespaceFromString(forcedInclusionNs).Bytes()
}

return &DARetriever{
da: da,
cache: cache,
genesis: genesis,
logger: logger.With().Str("component", "da_retriever").Logger(),
namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(),
namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(),
pendingHeaders: make(map[uint64]*types.SignedHeader),
pendingData: make(map[uint64]*types.Data),
da: da,
cache: cache,
genesis: genesis,
logger: logger.With().Str("component", "da_retriever").Logger(),
namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(),
namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(),
namespaceForcedInclusionBz: namespaceForcedInclusionBz,
hasForcedInclusionNs: hasForcedInclusionNs,
daEpochSize: config.DA.ForcedInclusionDAEpoch,
pendingHeaders: make(map[uint64]*types.SignedHeader),
pendingData: make(map[uint64]*types.Data),
}
}

Expand All @@ -76,6 +91,43 @@ func (r *DARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]co
return r.processBlobs(ctx, blobsResp.Data, daHeight), nil
}

// ErrForceInclusionNotConfigured is returned when the forced inclusion namespace is not configured.
var ErrForceInclusionNotConfigured = errors.New("forced inclusion namespace not configured")

// RetrieveForcedIncludedTxsFromDA retrieves forced inclusion transactions from the DA layer.
// It fetches from the daHeight for the da epoch range defined in the config.
func (r *DARetriever) RetrieveForcedIncludedTxsFromDA(ctx context.Context, daHeight uint64) (*common.ForcedIncludedEvent, error) {
if !r.hasForcedInclusionNs {
return nil, ErrForceInclusionNotConfigured
}

event := &common.ForcedIncludedEvent{
StartDaHeight: daHeight,
}

r.logger.Debug().Uint64("da_height", daHeight).Uint64("range", r.daEpochSize).Msg("retrieving forced included transactions from DA")

for epochHeight := daHeight + 1; epochHeight <= daHeight+r.daEpochSize; epochHeight++ {
result := types.RetrieveWithHelpers(ctx, r.da, r.logger, epochHeight, r.namespaceForcedInclusionBz, defaultDATimeout)

// quickly break if we are too ahead.
if result.Code == coreda.StatusHeightFromFuture {
break
}

if result.Code == coreda.StatusSuccess {
if err := r.validateBlobResponse(result, epochHeight); !errors.Is(err, coreda.ErrBlobNotFound) && err != nil {
return nil, err
}

event.StartDaHeight = epochHeight
event.Txs = append(event.Txs, result.Data...)
}
}

return event, nil
}

// fetchBlobs retrieves blobs from the DA layer
func (r *DARetriever) fetchBlobs(ctx context.Context, daHeight uint64) (coreda.ResultRetrieve, error) {
// Retrieve from both namespaces
Expand Down
Loading
Loading