Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/integration/commands/stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,7 @@ func stageExec(db kv.TemporalRwDB, ctx context.Context, logger log.Logger) error
if err != nil {
return err
}
doms.SetInMemHistoryReads(false)

if chainTipMode {
//if chainTip = true, forced noCommit = false
Expand Down
2 changes: 2 additions & 0 deletions cmd/integration/commands/state_stages.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ func syncBySmallSteps(db kv.TemporalRwDB, builderConfig buildercfg.BuilderConfig
return err
}
defer sd.Close()
sd.SetInMemHistoryReads(false)

var batchSize datasize.ByteSize
must(batchSize.UnmarshalText([]byte(batchSizeStr)))
Expand Down Expand Up @@ -398,6 +399,7 @@ func loopExec(db kv.TemporalRwDB, ctx context.Context, unwind uint64, logger log
return err
}
defer sd.Close()
sd.SetInMemHistoryReads(false)
_ = sync.SetCurrentStage(stages.Execution)
t := time.Now()
if _, err = sync.Run(sd, tx, initialCycle, false); err != nil {
Expand Down
1 change: 1 addition & 0 deletions db/kv/kv_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -495,6 +495,7 @@ type TemporalMemBatch interface {
DiscardWrites(domain Domain)
Unwind(txNumUnwindTo uint64, changeset *[DomainLen][]DomainEntryDiff)
GetAsOf(domain Domain, key []byte, ts uint64) (v []byte, ok bool, err error)
SetInMemHistoryReads(v bool)
}

type WithFreezeInfo interface {
Expand Down
1 change: 1 addition & 0 deletions db/state/execctx/domain_shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ func (sd *SharedDomains) CommitmentCapture() bool {
}

func (sd *SharedDomains) GetMemBatch() kv.TemporalMemBatch { return sd.mem }
func (sd *SharedDomains) SetInMemHistoryReads(v bool) { sd.mem.SetInMemHistoryReads(v) }
func (sd *SharedDomains) GetCommitmentCtx() *commitmentdb.SharedDomainsCommitmentContext {
return sd.sdCtx
}
Expand Down
35 changes: 29 additions & 6 deletions db/state/temporal_mem_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package state
import (
"context"
"encoding/binary"
"errors"
"fmt"
"maps"
"sort"
Expand Down Expand Up @@ -52,6 +53,12 @@ type TemporalMemBatch struct {

getCacheSize int

// inMemHistoryReads: accumulate all writes with txNums so GetAsOf can answer time-travel
// queries from in-flight state (needed for RPC reads during live chain-tip execution).
// Distinct from DomainBufferedWriter which also holds history in mem but only for writing.
// Disable for offline commands (stage_exec etc.) — history reads come from disk anyway.
inMemHistoryReads bool

latestStateLock sync.RWMutex
domains [kv.DomainLen]map[string][]dataWithTxNum
storage *btree2.Map[string, []dataWithTxNum] // TODO: replace hardcoded domain name to per-config configuration of available Guarantees/AccessMethods (range vs get)
Expand All @@ -75,8 +82,9 @@ type TemporalMemBatch struct {

func NewTemporalMemBatch(tx kv.TemporalTx, ioMetrics any) *TemporalMemBatch {
sd := &TemporalMemBatch{
storage: btree2.NewMap[string, []dataWithTxNum](128),
metrics: ioMetrics.(*changeset.DomainMetrics),
storage: btree2.NewMap[string, []dataWithTxNum](128),
metrics: ioMetrics.(*changeset.DomainMetrics),
inMemHistoryReads: true,
}
aggTx := AggTx(tx)
sd.stepSize = aggTx.StepSize()
Expand All @@ -100,6 +108,8 @@ func NewTemporalMemBatch(tx kv.TemporalTx, ioMetrics any) *TemporalMemBatch {
return sd
}

func (sd *TemporalMemBatch) SetInMemHistoryReads(v bool) { sd.inMemHistoryReads = v }

func (sd *TemporalMemBatch) DomainPut(domain kv.Domain, k string, v []byte, txNum uint64, preval []byte) error {
sd.putLatest(domain, k, v, txNum)
return sd.putHistory(domain, toBytesZeroCopy(k), v, txNum, preval)
Expand Down Expand Up @@ -148,8 +158,13 @@ func (sd *TemporalMemBatch) putLatest(domain kv.Domain, key string, val []byte,
putValueSize := 0
if domain == kv.StorageDomain {
if old, ok := sd.storage.Get(key); ok {
sd.storage.Set(key, append(old, valWithStep))
putValueSize += len(val)
if sd.inMemHistoryReads {
sd.storage.Set(key, append(old, valWithStep))
putValueSize += len(val)
} else {
putValueSize += len(val) - len(old[len(old)-1].data)
sd.storage.Set(key, []dataWithTxNum{valWithStep})
}
} else {
sd.storage.Set(key, []dataWithTxNum{valWithStep})
putKeySize += len(key)
Expand All @@ -161,8 +176,13 @@ func (sd *TemporalMemBatch) putLatest(domain kv.Domain, key string, val []byte,
}

if old, ok := sd.domains[domain][key]; ok {
putValueSize += len(val)
sd.domains[domain][key] = append(old, valWithStep)
if sd.inMemHistoryReads {
sd.domains[domain][key] = append(old, valWithStep)
putValueSize += len(val)
} else {
putValueSize += len(val) - len(old[len(old)-1].data)
sd.domains[domain][key] = []dataWithTxNum{valWithStep}
}
} else {
sd.domains[domain][key] = []dataWithTxNum{valWithStep}
putKeySize += len(key)
Expand Down Expand Up @@ -222,6 +242,9 @@ func (sd *TemporalMemBatch) getLatest(domain kv.Domain, key []byte) (v []byte, s
}

func (sd *TemporalMemBatch) GetAsOf(domain kv.Domain, key []byte, ts uint64) (v []byte, ok bool, err error) {
if !sd.inMemHistoryReads {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • I guess GetAsOf used by parallel exec @mh0lt ?
  • also if you drop old updates i guess you will get invalid state after flush SD to db? (i don't know)

Copy link
Member Author

@sudeepdino008 sudeepdino008 Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think it's only used for rpc (#18575)

`also if you drop old updates i guess you will get invalid state after flush SD to db?``

  • no we only use this data for rpc (GEtAsOf -- to provide read since now we collect updates temporalmembatch rather than commit tx). Flush SD to db ignores these old updates.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You only need the in memory reads during exec and rpc processing. These are cleared on flush, but they are also in the ETL queue so after flush the DB will reflect the previous in memory state.

return nil, false, errors.New("GetAsOf called on TemporalMemBatch with inMemHistoryReads disabled")
}
sd.latestStateLock.RLock()
defer sd.latestStateLock.RUnlock()

Expand Down
Loading