Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 13 additions & 7 deletions db/state/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -1418,14 +1418,20 @@ func (ht *HistoryRoTx) iterateChangedFrozen(fromTxNum, toTxNum int, asc order.By
}
g := ht.iit.dataReader(item.src.decompressor)
g.Reset(0)
wrapper := NewSegReaderWrapper(g)
if wrapper.HasNext() {
key, val, err := wrapper.Next()
if err != nil {
s.Close()
return nil, err
if g.HasNext() {
key, _ := g.Next(nil)
var val []byte
if g.HasNext() {
val, _ = g.Next(nil)
}
histFileIdx := -1
for j := range ht.files {
if ht.files[j].startTxNum == item.startTxNum && ht.files[j].endTxNum == item.endTxNum {
histFileIdx = j
break
}
}
heap.Push(&s.h, &ReconItem{g: wrapper, key: key, val: val, startTxNum: item.startTxNum, endTxNum: item.endTxNum, txNum: item.endTxNum})
heap.Push(&s.h, &ReconItem{g: g, key: key, val: val, startTxNum: item.startTxNum, endTxNum: item.endTxNum, txNum: item.endTxNum, histFileIdx: histFileIdx})
}
}
if err := s.advance(); err != nil {
Expand Down
28 changes: 15 additions & 13 deletions db/state/history_key_txnum_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,13 @@ func (ht *HistoryRoTx) iterateKeyTxNumFrozen(fromTxNum, toTxNum int, asc order.B
}
g := ht.iit.dataReader(item.src.decompressor)
g.Reset(0)
wrapper := NewSegReaderWrapper(g)
if wrapper.HasNext() {
key, val, err := wrapper.Next()
if err != nil {
s.Close()
return nil, err
if g.HasNext() {
key, _ := g.Next(nil)
var val []byte
if g.HasNext() {
val, _ = g.Next(nil)
}
heap.Push(&s.h, &ReconItem{g: wrapper, key: key, val: val, startTxNum: item.startTxNum, endTxNum: item.endTxNum, txNum: item.endTxNum})
heap.Push(&s.h, &ReconItem{g: g, key: key, val: val, startTxNum: item.startTxNum, endTxNum: item.endTxNum, txNum: item.endTxNum})
}
}
if err := s.advance(); err != nil {
Expand Down Expand Up @@ -145,16 +144,19 @@ func (hi *HistoryKeyTxNumIterFiles) advance() error {
return nil
}

top := heap.Pop(&hi.h).(*ReconItem)
top := hi.h[0]
key, idxVal := top.key, top.val

if top.g.HasNext() {
var err error
top.key, top.val, err = top.g.Next()
if err != nil {
return err
top.key, _ = top.g.Next(nil)
if top.g.HasNext() {
top.val, _ = top.g.Next(nil)
} else {
top.val = nil
}
heap.Push(&hi.h, top)
heap.Fix(&hi.h, 0)
} else {
heap.Pop(&hi.h)
}

// Clone: segment reader reuses buffers
Expand Down
77 changes: 45 additions & 32 deletions db/state/history_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,14 +81,19 @@ func (hi *HistoryRangeAsOfFiles) init(iiFiles visibleFiles) error {
}
g.Reset(offset)
if g.HasNext() {
wrapper := NewSegReaderWrapper(g)
if wrapper.HasNext() {
key, val, err := wrapper.Next()
if err != nil {
return err
key, _ := g.Next(nil)
var val []byte
if g.HasNext() {
val, _ = g.Next(nil)
}
histFileIdx := -1
for j := range hi.hc.files {
if hi.hc.files[j].startTxNum == item.startTxNum && hi.hc.files[j].endTxNum == item.endTxNum {
histFileIdx = j
break
}
heap.Push(&hi.h, &ReconItem{g: wrapper, key: key, val: val, startTxNum: item.startTxNum, endTxNum: item.endTxNum, txNum: item.endTxNum})
}
heap.Push(&hi.h, &ReconItem{g: g, key: key, val: val, startTxNum: item.startTxNum, endTxNum: item.endTxNum, txNum: item.endTxNum, histFileIdx: histFileIdx})
}
}
binary.BigEndian.PutUint64(hi.startTxKey[:], hi.startTxNum)
Expand All @@ -101,20 +106,25 @@ func (hi *HistoryRangeAsOfFiles) Trace(prefix string) *stream.TracedDuo[[]byte,

func (hi *HistoryRangeAsOfFiles) advanceInFiles() error {
for hi.h.Len() > 0 {
top := heap.Pop(&hi.h).(*ReconItem)
top := hi.h[0] // peek at minimum without removing
key := top.key
idxVal := top.val

// Get the next key-value pair for the next iteration
if top.g.HasNext() {
var err error
top.key, top.val, err = top.g.Next()
if err != nil {
return err
top.key, _ = top.g.Next(nil)
if top.g.HasNext() {
top.val, _ = top.g.Next(nil)
} else {
top.val = nil
}
if hi.toPrefix == nil || bytes.Compare(top.key, hi.toPrefix) < 0 {
heap.Push(&hi.h, top)
heap.Fix(&hi.h, 0) // sift-down only, O(log n) vs Pop+Push O(2 log n)
} else {
heap.Pop(&hi.h)
}
} else {
heap.Pop(&hi.h)
}

if hi.from != nil && bytes.Compare(key, hi.from) < 0 { //TODO: replace by seekInFiles()
Expand All @@ -131,13 +141,13 @@ func (hi *HistoryRangeAsOfFiles) advanceInFiles() error {
continue
}

if top.histFileIdx < 0 {
return fmt.Errorf("no %s file found for [%x]", hi.hc.h.FilenameBase, key)
}
historyItem := hi.hc.files[top.histFileIdx]
hi.nextKey = key
binary.BigEndian.PutUint64(hi.txnKey[:], txNum)
historyItem, ok := hi.hc.getFileDeprecated(top.startTxNum, top.endTxNum)
if !ok {
return fmt.Errorf("no %s file found for [%x]", hi.hc.h.FilenameBase, hi.nextKey)
}
reader := hi.hc.statelessIdxReader(historyItem.i)
reader := hi.hc.statelessIdxReader(top.histFileIdx)
offset, ok := reader.Lookup2(hi.txnKey[:], hi.nextKey)
if !ok {
continue
Expand All @@ -150,11 +160,11 @@ func (hi *HistoryRangeAsOfFiles) advanceInFiles() error {
}

if compressedPageValuesCount <= 1 {
g := hi.hc.statelessGetter(historyItem.i)
g := hi.hc.statelessGetter(top.histFileIdx)
g.Reset(offset)
hi.nextVal, _ = g.Next(nil)
} else {
g := seg.NewPagedReader(hi.hc.statelessGetter(historyItem.i), compressedPageValuesCount, true)
g := seg.NewPagedReader(hi.hc.statelessGetter(top.histFileIdx), compressedPageValuesCount, true)
g.Reset(offset)
for i := 0; i < compressedPageValuesCount && g.HasNext(); i++ {
k, v, _, _ := g.Next2(nil)
Expand Down Expand Up @@ -411,15 +421,18 @@ func (hi *HistoryChangesIterFiles) Close() {

func (hi *HistoryChangesIterFiles) advance() error {
for hi.h.Len() > 0 {
top := heap.Pop(&hi.h).(*ReconItem)
top := hi.h[0] // peek at minimum without removing
key, idxVal := top.key, top.val
if top.g.HasNext() {
var err error
top.key, top.val, err = top.g.Next()
if err != nil {
return err
top.key, _ = top.g.Next(nil)
if top.g.HasNext() {
top.val, _ = top.g.Next(nil)
} else {
top.val = nil
}
heap.Push(&hi.h, top)
heap.Fix(&hi.h, 0) // sift-down only, O(log n) vs Pop+Push O(2 log n)
} else {
heap.Pop(&hi.h)
}

if bytes.Equal(key, hi.nextKey) { // deduplication
Expand All @@ -435,13 +448,13 @@ func (hi *HistoryChangesIterFiles) advance() error {
continue
}

if top.histFileIdx < 0 {
return fmt.Errorf("HistoryChangesIterFiles: no %s file found for [%x]", hi.hc.h.FilenameBase, key)
}
historyItem := hi.hc.files[top.histFileIdx]
hi.nextKey = key
binary.BigEndian.PutUint64(hi.txnKey[:], txNum)
historyItem, ok := hi.hc.getFileDeprecated(top.startTxNum, top.endTxNum)
if !ok {
return fmt.Errorf("HistoryChangesIterFiles: no %s file found for [%x]", hi.hc.h.FilenameBase, hi.nextKey)
}
reader := hi.hc.statelessIdxReader(historyItem.i)
reader := hi.hc.statelessIdxReader(top.histFileIdx)
offset, ok := reader.Lookup2(hi.txnKey[:], hi.nextKey)
if !ok {
continue
Expand All @@ -454,11 +467,11 @@ func (hi *HistoryChangesIterFiles) advance() error {
}

if compressedPageValuesCount <= 1 {
g := hi.hc.statelessGetter(historyItem.i)
g := hi.hc.statelessGetter(top.histFileIdx)
g.Reset(offset)
hi.nextVal, _ = g.Next(nil)
} else {
g := seg.NewPagedReader(hi.hc.statelessGetter(historyItem.i), compressedPageValuesCount, true)
g := seg.NewPagedReader(hi.hc.statelessGetter(top.histFileIdx), compressedPageValuesCount, true)
g.Reset(offset)
for i := 0; i < compressedPageValuesCount && g.HasNext(); i++ {
k, v, _, _ := g.Next2(nil)
Expand Down
Loading
Loading