Skip to content
26 changes: 5 additions & 21 deletions go/store/nbs/archive_chunk_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package nbs

import (
"context"
"encoding/binary"
"io"
"path/filepath"

Expand Down Expand Up @@ -234,25 +233,10 @@ func (acs archiveChunkSource) getManyCompressed(ctx context.Context, eg *errgrou
}

func (acs archiveChunkSource) iterateAllChunks(ctx context.Context, cb func(chunks.Chunk), stats *Stats) error {
addrCount := uint64(len(acs.aRdr.prefixes))
for i := uint64(0); i < addrCount; i++ {
var h hash.Hash
suffix := acs.aRdr.getSuffixByID(i)

// Reconstruct the hash from the prefix and suffix.
binary.BigEndian.PutUint64(h[:uint64Size], acs.aRdr.prefixes[i])
copy(h[uint64Size:], suffix[:])

if ctx.Err() != nil {
return ctx.Err()
}

data, err := acs.aRdr.get(ctx, h, stats)
if err != nil {
return err
}

cb(chunks.NewChunkWithHash(h, data))
errCb := func(c chunks.Chunk) error {
cb(c)
return nil
}
return nil

return acs.aRdr.iterate(ctx, errCb, stats)
}
120 changes: 108 additions & 12 deletions go/store/nbs/archive_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package nbs

import (
"bufio"
"context"
"crypto/sha512"
"encoding/binary"
Expand Down Expand Up @@ -508,6 +509,14 @@ func (ar archiveReader) getSuffixByID(id uint64) suffix {
return suffix(ar.suffixes[start : start+hash.SuffixLen])
}

func (ar archiveReader) getHashByID(id uint64) hash.Hash {
var ret hash.Hash
binary.BigEndian.PutUint64(ret[:uint64Size], ar.prefixes[id])
suffix := ar.getSuffixByID(id)
copy(ret[uint64Size:], suffix[:])
return ret
}

func (ar archiveReader) getMetadata(ctx context.Context, stats *Stats) ([]byte, error) {
return ar.readByteSpan(ctx, ar.footer.metadataSpan(), stats)
}
Expand All @@ -528,26 +537,113 @@ func (ar archiveReader) verifyMetaCheckSum(ctx context.Context, stats *Stats) er
return verifyCheckSum(ctx, ar.reader, ar.footer.metadataSpan(), ar.footer.metaCheckSum, stats)
}

type bridgeReaderAt struct {
rdr ReaderAtWithStats
ctx context.Context
stats *Stats
}

func (r *bridgeReaderAt) ReadAt(p []byte, off int64) (n int, err error) {
return r.rdr.ReadAtWithStats(r.ctx, p, off, r.stats)
}

func (ar archiveReader) iterate(ctx context.Context, cb func(chunks.Chunk) error, stats *Stats) error {
for i := uint32(0); i < ar.footer.chunkCount; i++ {
var hasBytes [hash.ByteLen]byte
// Build reverse indexes for dictionary and data ByteSpans
// dictReverseIndex: Dictionary ByteSpan ID -> struct{} - indicates that we expect that span to be a dictionary.
// dataReverseIndex: Data ByteSpan ID -> chunk ref index - indicates that we expect that span to be a data chunk,
// and the value is the index into the chunkRefs slice where the chunk reference is stored.
dictReverseIndex := make(map[uint32]struct{})
dataReverseIndex := make(map[uint32]uint32)

for chunkRefIdx := uint32(0); chunkRefIdx < ar.footer.chunkCount; chunkRefIdx++ {
dictId, dataId := ar.getChunkRef(int(chunkRefIdx))
if dictId != 0 {
dictReverseIndex[dictId] = struct{}{}
}
dataReverseIndex[dataId] = chunkRefIdx
}

binary.BigEndian.PutUint64(hasBytes[:uint64Size], ar.prefixes[i])
suf := ar.getSuffixByID(uint64(i))
copy(hasBytes[hash.ByteLen-hash.SuffixLen:], suf[:])
h := hash.New(hasBytes[:])
dataSpan := ar.footer.dataSpan()
dataReader := io.NewSectionReader(&bridgeReaderAt{
rdr: ar.reader,
ctx: ctx,
stats: stats,
}, int64(dataSpan.offset), int64(dataSpan.length))
bufReader := bufio.NewReader(dataReader)
byteSpanCounter := uint32(1)

data, err := ar.get(ctx, h, stats)
if err != nil {
return err
buf := make([]byte, 4*1024*1024)
loadedDictionaries := make(map[uint32]*gozstd.DDict)

for byteSpanCounter <= ar.footer.byteSpanCount {
if ctx.Err() != nil {
return context.Cause(ctx)
}

chk := chunks.NewChunkWithHash(h, data)
err = cb(chk)
span := ar.getByteSpanByID(byteSpanCounter)
for cap(buf) < int(span.length) {
buf = append(buf, make([]byte, cap(buf))...)
}

_, err := io.ReadFull(bufReader, buf[:span.length])
if err != nil {
return err
return fmt.Errorf("error reading archive file: %w", err)
}
spanData := buf[:span.length]

if _, exists := dictReverseIndex[byteSpanCounter]; exists {
dict, err := NewDecompBundle(spanData)
if err != nil {
return fmt.Errorf("Failure creating dictionary from bytes: %w", err)
}
loadedDictionaries[byteSpanCounter] = dict.dDict
} else if chunkId, exists := dataReverseIndex[byteSpanCounter]; exists {
dictId, dataId := ar.getChunkRef(int(chunkId))
if byteSpanCounter != dataId {
panic("Reverse Index incorrect: ByteSpan ID does not match data ID in chunk reference")
}

h := ar.getHashByID(uint64(chunkId))

var chunkData []byte
if dictId == 0 {
// Snappy compression (no dictionary)
if ar.footer.formatVersion >= archiveVersionSnappySupport {
cc, err := NewCompressedChunk(h, spanData)
if err != nil {
return err
}
chk, err := cc.ToChunk()
if err != nil {
return err
}
chunkData = chk.Data()
} else {
return errors.New("runtime error: no dictionary for old format version")
}
} else {
dict, ok := loadedDictionaries[dictId]
if !ok {
panic("Reverse Index incomplete: Dictionary ID not found in loaded dictionaries")
}

chunkData, err = gozstd.DecompressDict(nil, spanData, dict)
if err != nil {
return fmt.Errorf("error decompressing span: %d, %v, %w", byteSpanCounter, span, err)
}
}

chk := chunks.NewChunkWithHash(h, chunkData)
err = cb(chk)
if err != nil {
return err
}
} else {
panic("Reverse Index incomplete: ByteSpan ID not found in either dictionary or data reverse index")
}
byteSpanCounter++
}

return nil
}

Expand Down
8 changes: 5 additions & 3 deletions go/store/nbs/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,11 +265,13 @@ type chunkSource interface {
// currentSize returns the current total physical size of the chunkSource.
currentSize() uint64

// scanAllChunks will call the provided function for each chunk in the chunkSource. This is currently used
// iterateAllChunks will call the provided function for each chunk in the chunkSource. This is currently used
// to perform integrity checks, and the chunk passed in will have the address from the index, and the content
// loaded. This iterator doesn't have a way to stop the iteration other than the context being canceled.
// loaded. Note that there may be duplicate chunks in the chunkSource, so the callback may be called multiple times
// with the same chunk id and content.
//
// If there is a failure reading the chunk, the error will be returned - note that this can happen in the middle of
// This iterator doesn't have a way to stop the iteration other than the context being canceled. If there is a failure
// reading the chunk, the error will be returned - note that this can happen in the middle of
// the scan, and will likely mean that the scan didn't complete. Note that errors returned by this method are not
// related to the callback - if the callback discovers an error, it must manage that out of band.
iterateAllChunks(context.Context, func(chunk chunks.Chunk), *Stats) error
Expand Down
60 changes: 48 additions & 12 deletions go/store/nbs/table_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package nbs

import (
"bufio"
"context"
"encoding/binary"
"errors"
Expand Down Expand Up @@ -765,27 +766,58 @@ func (tr tableReader) clone() (tableReader, error) {

func (tr tableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks.Chunk), stats *Stats) error {
count := tr.idx.chunkCount()
for i := uint32(0); i < count; i++ {
if ctx.Err() != nil {
return ctx.Err()
}
if count == 0 {
return nil
}

// Collect all chunk info then sort by offset.
// The index is sorted by prefix, but we need to process chunkRecs in storage order (by offset)
type chunkRecord struct {
offset uint64
length uint32
hash hash.Hash
}
chunkRecs := make([]chunkRecord, 0, count)
for i := uint32(0); i < count; i++ {
var h hash.Hash
ie, err := tr.idx.indexEntry(i, &h)
if err != nil {
return err
}

res := make([]byte, ie.Length())
n, err := tr.r.ReadAtWithStats(ctx, res, int64(ie.Offset()), stats)
if err != nil {
return err
}
if uint32(n) != ie.Length() {
return errors.New("failed to read all data")
chunkRecs = append(chunkRecs, chunkRecord{
offset: ie.Offset(),
length: ie.Length(),
hash: h,
})
}
sort.Slice(chunkRecs, func(i, j int) bool {
return chunkRecs[i].offset < chunkRecs[j].offset
})

lastChunk := chunkRecs[len(chunkRecs)-1]
totalDataSize := lastChunk.offset + uint64(lastChunk.length)

dataReader := io.NewSectionReader(&bridgeReaderAt{
rdr: tr.r,
ctx: ctx,
stats: stats,
}, int64(0), int64(totalDataSize))
bufReader := bufio.NewReader(dataReader)

chunkIndex := 0
buf := make([]byte, 4*1024*1024)

for chunkIndex < len(chunkRecs) {
if ctx.Err() != nil {
return context.Cause(ctx)
}

cchk, err := NewCompressedChunk(h, res)
chunk := chunkRecs[chunkIndex]
_, err := io.ReadFull(bufReader, buf[:chunk.length])
chunkData := buf[:chunk.length]

cchk, err := NewCompressedChunk(chunk.hash, chunkData)
if err != nil {
return err
}
Expand All @@ -794,7 +826,11 @@ func (tr tableReader) iterateAllChunks(ctx context.Context, cb func(chunk chunks
return err
}

// Process the chunk
cb(chk)

chunkIndex++
}

return nil
}
Loading