Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions block/components.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,7 @@ func NewAggregatorComponents(
genesis,
logger,
executor,
cacheManager,
reaping.DefaultInterval,
)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion block/internal/cache/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ ok github.com/evstack/ev-node/block/internal/cache 25.834s
*/

func benchSetupStore(b *testing.B, n int, txsPer int, chainID string) store.Store {
ds, err := store.NewDefaultInMemoryKVStore()
ds, err := store.NewTestInMemoryKVStore()
if err != nil {
b.Fatal(err)
}
Expand Down
28 changes: 28 additions & 0 deletions block/internal/cache/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (
headerCacheDir = filepath.Join(cacheDir, "header")
dataCacheDir = filepath.Join(cacheDir, "data")
pendingEventsCacheDir = filepath.Join(cacheDir, "pending_da_events")
txCacheDir = filepath.Join(cacheDir, "tx")
)

// gobRegisterOnce ensures gob type registration happens exactly once process-wide.
Expand Down Expand Up @@ -51,6 +52,10 @@ type Manager interface {
GetDataDAIncluded(hash string) (uint64, bool)
SetDataDAIncluded(hash string, daHeight uint64, blockHeight uint64)

// Transaction operations
IsTxSeen(hash string) bool
SetTxSeen(hash string)

// Pending operations
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error)
GetPendingData(ctx context.Context) ([]*types.SignedData, error)
Expand Down Expand Up @@ -78,6 +83,7 @@ var _ Manager = (*implementation)(nil)
type implementation struct {
headerCache *Cache[types.SignedHeader]
dataCache *Cache[types.Data]
txCache *Cache[struct{}]
pendingEventsCache *Cache[common.DAHeightEvent]
pendingHeaders *PendingHeaders
pendingData *PendingData
Expand All @@ -90,6 +96,7 @@ func NewManager(cfg config.Config, store store.Store, logger zerolog.Logger) (Ma
// Initialize caches
headerCache := NewCache[types.SignedHeader]()
dataCache := NewCache[types.Data]()
txCache := NewCache[struct{}]()
pendingEventsCache := NewCache[common.DAHeightEvent]()

// Initialize pending managers
Expand All @@ -107,6 +114,7 @@ func NewManager(cfg config.Config, store store.Store, logger zerolog.Logger) (Ma
impl := &implementation{
headerCache: headerCache,
dataCache: dataCache,
txCache: txCache,
pendingEventsCache: pendingEventsCache,
pendingHeaders: pendingHeaders,
pendingData: pendingData,
Expand Down Expand Up @@ -167,12 +175,24 @@ func (m *implementation) SetDataDAIncluded(hash string, daHeight uint64, blockHe
m.dataCache.setDAIncluded(hash, daHeight, blockHeight)
}

// Transaction operations
func (m *implementation) IsTxSeen(hash string) bool {
return m.txCache.isSeen(hash)
}

func (m *implementation) SetTxSeen(hash string) {
// Use 0 as height since transactions don't have a block height yet
m.txCache.setSeen(hash, 0)
}

// DeleteHeight removes from all caches the given height.
// This can be done when a height has been da included.
func (m *implementation) DeleteHeight(blockHeight uint64) {
m.headerCache.deleteAllForHeight(blockHeight)
m.dataCache.deleteAllForHeight(blockHeight)
m.pendingEventsCache.deleteAllForHeight(blockHeight)

// tx cache is not deleted as not height dependent
}

// Pending operations
Expand Down Expand Up @@ -246,6 +266,10 @@ func (m *implementation) SaveToDisk() error {
return fmt.Errorf("failed to save data cache to disk: %w", err)
}

if err := m.txCache.SaveToDisk(filepath.Join(cfgDir, txCacheDir)); err != nil {
return fmt.Errorf("failed to save tx cache to disk: %w", err)
}

if err := m.pendingEventsCache.SaveToDisk(filepath.Join(cfgDir, pendingEventsCacheDir)); err != nil {
return fmt.Errorf("failed to save pending events cache to disk: %w", err)
}
Expand All @@ -267,6 +291,10 @@ func (m *implementation) LoadFromDisk() error {
return fmt.Errorf("failed to load data cache from disk: %w", err)
}

if err := m.txCache.LoadFromDisk(filepath.Join(cfgDir, txCacheDir)); err != nil {
return fmt.Errorf("failed to load tx cache from disk: %w", err)
}

if err := m.pendingEventsCache.LoadFromDisk(filepath.Join(cfgDir, pendingEventsCacheDir)); err != nil {
return fmt.Errorf("failed to load pending events cache from disk: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion block/internal/cache/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func tempConfig(t *testing.T) config.Config {

// helper to make an in-memory store
func memStore(t *testing.T) store.Store {
ds, err := store.NewDefaultInMemoryKVStore()
ds, err := store.NewTestInMemoryKVStore()
require.NoError(t, err)
return store.New(ds)
}
Expand Down
4 changes: 2 additions & 2 deletions block/internal/cache/pending_base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
func TestPendingBase_ErrorConditions(t *testing.T) {
t.Parallel()
ctx := context.Background()
dsKV, err := store.NewDefaultInMemoryKVStore()
dsKV, err := store.NewTestInMemoryKVStore()
require.NoError(t, err)
st := store.New(dsKV)
logger := zerolog.Nop()
Expand Down Expand Up @@ -49,7 +49,7 @@ func TestPendingBase_ErrorConditions(t *testing.T) {
func TestPendingBase_PersistLastSubmitted(t *testing.T) {
t.Parallel()
ctx := context.Background()
dsKV, err := store.NewDefaultInMemoryKVStore()
dsKV, err := store.NewTestInMemoryKVStore()
require.NoError(t, err)
st := store.New(dsKV)
logger := zerolog.Nop()
Expand Down
2 changes: 1 addition & 1 deletion block/internal/cache/pending_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestPendingData_BasicFlow(t *testing.T) {
func TestPendingData_InitFromMetadata(t *testing.T) {
t.Parallel()
ctx := context.Background()
ds, err := store.NewDefaultInMemoryKVStore()
ds, err := store.NewTestInMemoryKVStore()
require.NoError(t, err)
store := store.New(ds)

Expand Down
43 changes: 10 additions & 33 deletions block/internal/reaping/reaper.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,14 +9,13 @@ import (
"sync"
"time"

ds "github.com/ipfs/go-datastore"
"github.com/rs/zerolog"

"github.com/evstack/ev-node/block/internal/cache"
"github.com/evstack/ev-node/block/internal/executing"
coreexecutor "github.com/evstack/ev-node/core/execution"
coresequencer "github.com/evstack/ev-node/core/sequencer"
"github.com/evstack/ev-node/pkg/genesis"
"github.com/evstack/ev-node/pkg/store"
)

const (
Expand All @@ -35,7 +34,7 @@ type Reaper struct {
sequencer coresequencer.Sequencer
chainID string
interval time.Duration
seenStore ds.Batching
cache cache.Manager
executor *executing.Executor

// shared components
Expand All @@ -47,33 +46,33 @@ type Reaper struct {
wg sync.WaitGroup
}

// NewReaper creates a new Reaper instance with persistent seenTx storage.
// NewReaper creates a new Reaper instance.
func NewReaper(
exec coreexecutor.Executor,
sequencer coresequencer.Sequencer,
genesis genesis.Genesis,
logger zerolog.Logger,
executor *executing.Executor,
cache cache.Manager,
scrapeInterval time.Duration,
) (*Reaper, error) {
if executor == nil {
return nil, errors.New("executor cannot be nil")
}
if cache == nil {
return nil, errors.New("cache cannot be nil")
}
if scrapeInterval == 0 {
return nil, errors.New("scrape interval cannot be empty")
}
store, err := store.NewDefaultInMemoryKVStore()
if err != nil {
return nil, fmt.Errorf("failed to create reaper store: %w", err)
}

return &Reaper{
exec: exec,
sequencer: sequencer,
chainID: genesis.ChainID,
interval: scrapeInterval,
logger: logger.With().Str("component", "reaper").Logger(),
seenStore: store,
cache: cache,
executor: executor,
}, nil
}
Expand Down Expand Up @@ -155,26 +154,13 @@ func (r *Reaper) SubmitTxs() error {
}

var newTxs [][]byte
var seenStoreErrors int
for _, tx := range txs {
txHash := hashTx(tx)
key := ds.NewKey(txHash)
has, err := r.seenStore.Has(r.ctx, key)
if err != nil {
r.logger.Error().Err(err).Msg("failed to check seenStore")
seenStoreErrors++
continue
}
if !has {
if !r.cache.IsTxSeen(txHash) {
newTxs = append(newTxs, tx)
}
}

// If all transactions failed seenStore check, return error
if seenStoreErrors > 0 && len(newTxs) == 0 {
return fmt.Errorf("failed to check seenStore for all %d transactions", seenStoreErrors)
}

if len(newTxs) == 0 {
r.logger.Debug().Msg("no new txs to submit")
return nil
Expand All @@ -192,11 +178,7 @@ func (r *Reaper) SubmitTxs() error {

for _, tx := range newTxs {
txHash := hashTx(tx)
key := ds.NewKey(txHash)
if err := r.seenStore.Put(r.ctx, key, []byte{1}); err != nil {
// Log but don't fail on persistence errors
r.logger.Error().Err(err).Str("txHash", txHash).Msg("failed to persist seen tx")
}
r.cache.SetTxSeen(txHash)
}

// Notify the executor that new transactions are available
Expand All @@ -209,11 +191,6 @@ func (r *Reaper) SubmitTxs() error {
return nil
}

// SeenStore returns the datastore used to track seen transactions.
func (r *Reaper) SeenStore() ds.Datastore {
return r.seenStore
}

func hashTx(tx []byte) string {
hash := sha256.Sum256(tx)
return hex.EncodeToString(hash[:])
Expand Down
Loading
Loading