Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions pkg/storage/indexheader/bucket_binary_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ func NewBucketBinaryReader(
indexPath := filepath.Join(blockID.String(), block.IndexFilename)

r := &BucketBinaryReader{
bkt: bkt,

bkt: bkt,
factory: streamencoding.NewBucketDecbufFactory(ctx, bkt, indexPath),
}

Expand Down Expand Up @@ -297,6 +296,10 @@ func (r *BucketBinaryReader) fetchRange(ctx context.Context, objectPath string,
return data, nil
}

func (r *BucketBinaryReader) BufReaderStats() *streamencoding.BufReaderStats {
return r.postingsOffsetTable.BufReaderStats()
}

// Close implements Reader.
func (r *BucketBinaryReader) Close() error {
r.factory.Stop()
Expand Down
59 changes: 33 additions & 26 deletions pkg/storage/indexheader/encoding/bucket_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ type streamReader struct {
length int

seekReader func(off int) error
stats *BufReaderStats
}

var netbufPool = sync.Pool{
Expand All @@ -133,26 +134,27 @@ func newStreamReader(rc io.ReadCloser, pos, length int) *streamReader {
buf: netbufPool.Get().(*bufio.Reader),
pos: pos,
length: length,
stats: &BufReaderStats{},
}
r.buf.Reset(r.rc)
return r
}

// resetAt moves the cursor position to the given offset in the data segment.
// Attempting to resetAt to the end of the file segment is valid. Attempting to resetAt _beyond_ the end of the file
// ResetAt moves the cursor position to the given offset in the data segment.
// Attempting to ResetAt to the end of the file segment is valid. Attempting to ResetAt _beyond_ the end of the file
// segment will return an error.
func (r *streamReader) resetAt(off int) error {
func (r *streamReader) ResetAt(off int) error {
if off > r.length {
return ErrInvalidSize
}

if dist := off - r.pos; dist > 0 && dist < r.buffered() {
if dist := off - r.pos; dist > 0 && dist < r.Buffered() {
// skip ahead by discarding the distance bytes
return r.skip(dist)
return r.Skip(dist)
}

// Objstore hides the io.ReadSeekCloser, that the underlying bucket clients implement.
// So we reimplement it ourselves: close the r.rc, re-read the object from new offset, reset the r.buf and the rest of the state.
// So we reimplement it ourselves: Close the r.rc, re-read the object from new offset, reset the r.buf and the rest of the state.
if err := r.seekReader(off); err != nil {
return err
}
Expand All @@ -163,23 +165,24 @@ func (r *streamReader) resetAt(off int) error {
return nil
}

// skip advances the cursor position by the given number of bytes.
func (r *streamReader) skip(l int) error {
if l > r.len() {
// Skip advances the cursor position by the given number of bytes.
func (r *streamReader) Skip(l int) error {
if l > r.Len() {
return ErrInvalidSize
}

// TODO(v): how to make sure we don't trash the cache when skipping
n, err := r.buf.Discard(l)
if n > 0 {
r.pos += n
r.stats.BytesDiscarded.Add(uint64(n))
}

return err
}

// peek returns at most the given number of bytes without consuming them.
func (r *streamReader) peek(n int) ([]byte, error) {
// Peek returns at most the given number of bytes without consuming them.
func (r *streamReader) Peek(n int) ([]byte, error) {
b, err := r.buf.Peek(n)
if err != nil && !errors.Is(err, io.EOF) {
return nil, err
Expand All @@ -192,20 +195,20 @@ func (r *streamReader) peek(n int) ([]byte, error) {
return nil, nil
}

// read returns the given number of bytes, consuming them.
func (r *streamReader) read(n int) ([]byte, error) {
// Read returns the given number of bytes, consuming them.
func (r *streamReader) Read(n int) ([]byte, error) {
b := make([]byte, n)

err := r.readInto(b)
err := r.ReadInto(b)
if err != nil {
return nil, err
}

return b, nil
}

// readInto reads len(b) bytes into b, consuming them.
func (r *streamReader) readInto(b []byte) error {
// ReadInto reads len(b) bytes into b, consuming them.
func (r *streamReader) ReadInto(b []byte) error {
n, err := io.ReadFull(r.buf, b)
if n > 0 {
r.pos += n
Expand All @@ -220,28 +223,32 @@ func (r *streamReader) readInto(b []byte) error {
return nil
}

// size returns the length of the underlying buffer in bytes.
func (r *streamReader) size() int {
// Size returns the length of the underlying buffer in bytes.
func (r *streamReader) Size() int {
return r.buf.Size()
}

// len returns the remaining number of bytes in the stream.
func (r *streamReader) len() int {
// Len returns the remaining number of bytes in the stream.
func (r *streamReader) Len() int {
return r.length - r.pos
}

// position returns the current position.
func (r *streamReader) position() int {
// Position returns the current position.
func (r *streamReader) Position() int {
return r.pos
}

// buffered returns the number of bytes that can be read without I/O.
func (r *streamReader) buffered() int {
// Buffered returns the number of bytes that can be read without I/O.
func (r *streamReader) Buffered() int {
return r.buf.Buffered()
}

// close cleans up the underlying resources.
func (r *streamReader) close() error {
func (r *streamReader) Stats() *BufReaderStats {
return r.stats
}

// Close cleans up the underlying resources.
func (r *streamReader) Close() error {
err := r.rc.Close()
netbufPool.Put(r.buf)
return err
Expand Down
Loading
Loading