Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ query_engine:
# CLI flag: -query-engine.range-reads.min-range-size
[min_range_size: <int> | default = 1048576]

# Experimental: Enable ahead of time catalog lookups.
# CLI flag: -query-engine.ahead-of-time-catalog-lookups-enabled
[ahead_of_time_catalog_lookups_enabled: <boolean> | default = false]

# Experimental: Number of worker threads to spawn. Each worker thread runs one
# task at a time. 0 means to use GOMAXPROCS value.
# CLI flag: -query-engine.worker-threads
Expand Down
2 changes: 2 additions & 0 deletions pkg/dataobj/metastore/iter.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func forEachStreamSectionPointer(
streamIDs []int64,
f func(pointers.SectionPointer),
) error {
// TODO(ivkalita): remove
targetTenant, err := user.ExtractOrgID(ctx)
if err != nil {
return fmt.Errorf("extracting org ID: %w", err)
Expand Down Expand Up @@ -210,6 +211,7 @@ func forEachMatchedPointerSectionKey(
matchColumnValue string,
f func(key SectionKey),
) error {
// TODO(ivkalita): move to scan_pointers? and remove from here
targetTenant, err := user.ExtractOrgID(ctx)
if err != nil {
return fmt.Errorf("extracting org ID: %w", err)
Expand Down
11 changes: 11 additions & 0 deletions pkg/dataobj/metastore/metastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,27 @@ import (
"context"
"time"

"github.com/apache/arrow-go/v18/arrow"
"github.com/prometheus/prometheus/model/labels"
)

type Metastore interface {
// Sections returns a list of SectionDescriptors, including metadata (stream IDs, start & end times, bytes), for the given matchers & predicates between [start,end]
Sections(ctx context.Context, start, end time.Time, matchers []*labels.Matcher, predicates []*labels.Matcher) ([]*DataobjSectionDescriptor, error)

GetIndexes(ctx context.Context, start, end time.Time) ([]string, error)

ScanPointers(ctx context.Context, indexPath string, start, end time.Time, matchers []*labels.Matcher) (ArrowRecordBatchReader, error)
CollectScanPointersResult(ctx context.Context, reader ArrowRecordBatchReader) ([]*DataobjSectionDescriptor, error)

// Labels returns all possible labels from matching streams between [start,end]
Labels(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) // Used to get possible labels for a given stream

// Values returns all possible values for the given label matchers between [start,end]
Values(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) // Used to get all values for a given set of label matchers
}

type ArrowRecordBatchReader interface {
Read(ctx context.Context) (arrow.RecordBatch, error)
Close()
}
34 changes: 17 additions & 17 deletions pkg/dataobj/metastore/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (p *tocMetrics) observeMetastoreProcessing(recordTimestamp time.Time) {
}
}

type objectMetastoreMetrics struct {
type ObjectMetastoreMetrics struct {
indexObjectsTotal prometheus.Histogram
streamFilterTotalDuration prometheus.Histogram
streamFilterSections prometheus.Histogram
Expand All @@ -122,8 +122,8 @@ type objectMetastoreMetrics struct {
resolvedSectionsRatio prometheus.Histogram
}

func newObjectMetastoreMetrics() *objectMetastoreMetrics {
metrics := &objectMetastoreMetrics{
func NewObjectMetastoreMetrics(reg prometheus.Registerer) *ObjectMetastoreMetrics {
metrics := &ObjectMetastoreMetrics{
indexObjectsTotal: prometheus.NewHistogram(prometheus.HistogramOpts{
Name: "loki_metastore_index_objects_total",
Help: "Total number of objects to be searched for a Metastore query",
Expand Down Expand Up @@ -214,19 +214,19 @@ func newObjectMetastoreMetrics() *objectMetastoreMetrics {
}),
}

return metrics
}
if reg != nil {
reg.MustRegister(metrics.indexObjectsTotal)
reg.MustRegister(metrics.streamFilterTotalDuration)
reg.MustRegister(metrics.streamFilterSections)
reg.MustRegister(metrics.streamFilterStreamsReadDuration)
reg.MustRegister(metrics.streamFilterPointersReadDuration)
reg.MustRegister(metrics.estimateSectionsTotalDuration)
reg.MustRegister(metrics.estimateSectionsPointerReadDuration)
reg.MustRegister(metrics.estimateSectionsSections)
reg.MustRegister(metrics.resolvedSectionsTotalDuration)
reg.MustRegister(metrics.resolvedSectionsTotal)
reg.MustRegister(metrics.resolvedSectionsRatio)
}

func (p *objectMetastoreMetrics) register(reg prometheus.Registerer) {
reg.MustRegister(p.indexObjectsTotal)
reg.MustRegister(p.streamFilterTotalDuration)
reg.MustRegister(p.streamFilterSections)
reg.MustRegister(p.streamFilterStreamsReadDuration)
reg.MustRegister(p.streamFilterPointersReadDuration)
reg.MustRegister(p.estimateSectionsTotalDuration)
reg.MustRegister(p.estimateSectionsPointerReadDuration)
reg.MustRegister(p.estimateSectionsSections)
reg.MustRegister(p.resolvedSectionsTotalDuration)
reg.MustRegister(p.resolvedSectionsTotal)
reg.MustRegister(p.resolvedSectionsRatio)
return metrics
}
235 changes: 178 additions & 57 deletions pkg/dataobj/metastore/object.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/array"
"github.com/apache/arrow-go/v18/arrow/scalar"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
Expand All @@ -28,9 +29,9 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy"
"github.com/grafana/loki/v3/pkg/dataobj/sections/indexpointers"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
"github.com/grafana/loki/v3/pkg/storage/bucket"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/xcap"
)
Expand All @@ -43,7 +44,7 @@ type ObjectMetastore struct {
bucket objstore.Bucket
parallelism int
logger log.Logger
metrics *objectMetastoreMetrics
metrics *ObjectMetastoreMetrics
}

type SectionKey struct {
Expand Down Expand Up @@ -109,15 +110,12 @@ func iterTableOfContentsPaths(start, end time.Time) iter.Seq2[string, multitenan
}
}

func NewObjectMetastore(bucket objstore.Bucket, logger log.Logger, reg prometheus.Registerer) *ObjectMetastore {
func NewObjectMetastore(b objstore.Bucket, logger log.Logger, metrics *ObjectMetastoreMetrics) *ObjectMetastore {
store := &ObjectMetastore{
bucket: bucket,
bucket: bucket.NewXCapBucket(b),
parallelism: 64,
logger: logger,
metrics: newObjectMetastoreMetrics(),
}
if reg != nil {
store.metrics.register(reg)
metrics: metrics,
}
return store
}
Expand Down Expand Up @@ -167,29 +165,11 @@ func (m *ObjectMetastore) Sections(ctx context.Context, start, end time.Time, ma

sectionsTimer := prometheus.NewTimer(m.metrics.resolvedSectionsTotalDuration)

// Get all metastore paths for the time range
var tablePaths []string
for path := range iterTableOfContentsPaths(start, end) {
tablePaths = append(tablePaths, path)
}

// Return early if no toc files are found
if len(tablePaths) == 0 {
m.metrics.indexObjectsTotal.Observe(0)
m.metrics.resolvedSectionsTotal.Observe(0)
level.Debug(utillog.WithContext(ctx, m.logger)).Log("msg", "no sections resolved", "reason", "no toc paths")
return nil, nil
}

// List index objects from all tables concurrently
indexPaths, err := m.listObjectsFromTables(ctx, tablePaths, start, end)
tablePaths, indexPaths, err := m.getIndexes(ctx, start, end)
if err != nil {
return nil, err
}

m.metrics.indexObjectsTotal.Observe(float64(len(indexPaths)))
region.Record(xcap.StatMetastoreIndexObjects.Observe(int64(len(indexPaths))))

// Return early if no index files are found
if len(indexPaths) == 0 {
m.metrics.resolvedSectionsTotal.Observe(0)
Expand Down Expand Up @@ -462,36 +442,6 @@ func (m *ObjectMetastore) listStreamsFromObjects(ctx context.Context, paths []st
return streamsSlice, nil
}

func (m *ObjectMetastore) listStreamIDsFromLogObjects(ctx context.Context, objectPaths []string, predicate streams.RowPredicate) ([][]int64, []int, error) {
streamIDs := make([][]int64, len(objectPaths))
sections := make([]int, len(objectPaths))

g, ctx := errgroup.WithContext(ctx)
g.SetLimit(m.parallelism)

for idx, objectPath := range objectPaths {
g.Go(func() error {
object, err := dataobj.FromBucket(ctx, m.bucket, objectPath)
if err != nil {
return fmt.Errorf("getting object from bucket: %w", err)
}

sections[idx] = object.Sections().Count(logs.CheckSection)
streamIDs[idx] = make([]int64, 0, 8)

return forEachStream(ctx, object, predicate, func(stream streams.Stream) {
streamIDs[idx] = append(streamIDs[idx], stream.ID)
})
})
}

if err := g.Wait(); err != nil {
return nil, nil, err
}

return streamIDs, sections, nil
}

// getSectionsForStreams reads the section data from matching streams and aggregates them into section descriptors.
// This is an exact lookup and includes metadata from the streams in each section: the stream IDs, the min-max timestamps, the number of bytes & number of lines.
func (m *ObjectMetastore) getSectionsForStreams(ctx context.Context, indexObjects []*dataobj.Object, streamPredicate streams.RowPredicate, start, end time.Time) ([]*DataobjSectionDescriptor, error) {
Expand Down Expand Up @@ -723,3 +673,174 @@ func dedupeAndSort(objects [][]string) []string {
sort.Strings(paths)
return paths
}

func (m *ObjectMetastore) GetIndexes(ctx context.Context, start, end time.Time) ([]string, error) {
_, indexPaths, err := m.getIndexes(ctx, start, end)
return indexPaths, err
}

func (m *ObjectMetastore) getIndexes(ctx context.Context, start, end time.Time) ([]string, []string, error) {
ctx, region := xcap.StartRegion(ctx, "ObjectMetastore.GetIndexes")
defer region.End()

// Get all metastore paths for the time range
var tablePaths []string
for path := range iterTableOfContentsPaths(start, end) {
tablePaths = append(tablePaths, path)
}

// Return early if no toc files are found
if len(tablePaths) == 0 {
m.metrics.indexObjectsTotal.Observe(0)
m.metrics.resolvedSectionsTotal.Observe(0)
level.Debug(utillog.WithContext(ctx, m.logger)).Log("msg", "no sections resolved", "reason", "no toc paths")
return nil, nil, nil
}

// List index objects from all tables concurrently
indexPaths, err := m.listObjectsFromTables(ctx, tablePaths, start, end)
if err != nil {
return tablePaths, nil, err
}

m.metrics.indexObjectsTotal.Observe(float64(len(indexPaths)))
region.Record(xcap.StatMetastoreIndexObjects.Observe(int64(len(indexPaths))))

return tablePaths, indexPaths, nil
}

func (m *ObjectMetastore) ScanPointers(ctx context.Context, indexPath string, start, end time.Time, matchers []*labels.Matcher) (ArrowRecordBatchReader, error) {
idxObj, err := dataobj.FromBucket(ctx, m.bucket, indexPath)
if err != nil {
return nil, fmt.Errorf("prepare obj %s: %w", indexPath, err)
}

return &scanPointers{
obj: idxObj,
// TODO(ivkalita): this conversion happens for every index file, can be improved
streamPredicate: streamPredicateFromMatchers(start, end, matchers...),
sStart: scalar.NewTimestampScalar(arrow.Timestamp(start.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns),
sEnd: scalar.NewTimestampScalar(arrow.Timestamp(end.UnixNano()), arrow.FixedWidthTypes.Timestamp_ns),
}, nil

}

func (m *ObjectMetastore) CollectScanPointersResult(ctx context.Context, reader ArrowRecordBatchReader) ([]*DataobjSectionDescriptor, error) {
objectSectionDescriptors := make(map[SectionKey]*DataobjSectionDescriptor)
for {
rec, err := reader.Read(ctx)
if err != nil && !errors.Is(err, io.EOF) {
level.Warn(m.logger).Log(
"msg", "error during execution",
"err", err,
)
return nil, err
}

if rec != nil && rec.NumRows() > 0 {
if err := addSectionDescriptors(rec, objectSectionDescriptors); err != nil {
return nil, err
}
}

if errors.Is(err, io.EOF) {
break
}
}

descriptors := make([]*DataobjSectionDescriptor, 0, len(objectSectionDescriptors))
for _, s := range objectSectionDescriptors {
descriptors = append(descriptors, s)
}

return descriptors, nil
}

func addSectionDescriptors(rec arrow.RecordBatch, result map[SectionKey]*DataobjSectionDescriptor) error {
numRows := int(rec.NumRows())
buf := make([]pointers.SectionPointer, numRows)
schema := rec.Schema()
for fIdx := range schema.Fields() {
field := schema.Field(fIdx)
col := rec.Column(fIdx)
switch field.Name {
case "path.path.utf8":
values := col.(*array.String)
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].Path = values.Value(rIdx)
}
case "section.int64":
values := col.(*array.Int64)
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].Section = values.Value(rIdx)
}
case "stream_id.int64":
values := col.(*array.Int64)
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].StreamID = values.Value(rIdx)
}
case "stream_id_ref.int64":
values := col.(*array.Int64)
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].StreamIDRef = values.Value(rIdx)
}
case "min_timestamp.timestamp":
values := col.(*array.Timestamp)
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].StartTs = time.Unix(0, int64(values.Value(rIdx)))
}
case "max_timestamp.timestamp":
values := col.(*array.Timestamp)
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].EndTs = time.Unix(0, int64(values.Value(rIdx)))
}
case "row_count.int64":
values := col.(*array.Int64)
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].LineCount = values.Value(rIdx)
}
case "uncompressed_size.int64":
values := col.(*array.Int64)
for rIdx := range numRows {
if col.IsNull(rIdx) {
continue
}
buf[rIdx].UncompressedSize = values.Value(rIdx)
}
default:
continue
}
}

for _, ptr := range buf {
key := SectionKey{ObjectPath: ptr.Path, SectionIdx: ptr.Section}
existing, ok := result[key]
if !ok {
result[key] = NewSectionDescriptor(ptr)
continue
}
existing.Merge(ptr)
}
return nil
}
Loading
Loading