Skip to content

Commit fb14c2f

Browse files
committed
tmp
1 parent db18f7f commit fb14c2f

File tree

11 files changed

+797
-15
lines changed

11 files changed

+797
-15
lines changed

pkg/dataobj/metastore/metastore.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ type Metastore interface {
1111
// Sections returns a list of SectionDescriptors, including metadata (stream IDs, start & end times, bytes), for the given matchers & predicates between [start,end]
1212
Sections(ctx context.Context, start, end time.Time, matchers []*labels.Matcher, predicates []*labels.Matcher) ([]*DataobjSectionDescriptor, error)
1313

14+
GetIndexes(ctx context.Context, start, end time.Time) ([]string, error)
15+
1416
// Labels returns all possible labels from matching streams between [start,end]
1517
Labels(ctx context.Context, start, end time.Time, matchers ...*labels.Matcher) ([]string, error) // Used to get possible labels for a given stream
1618

pkg/dataobj/metastore/object.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -723,3 +723,33 @@ func dedupeAndSort(objects [][]string) []string {
723723
sort.Strings(paths)
724724
return paths
725725
}
726+
727+
func (m *ObjectMetastore) GetIndexes(ctx context.Context, start, end time.Time) ([]string, error) {
728+
ctx, region := xcap.StartRegion(ctx, "ObjectMetastore.GetIndexes")
729+
defer region.End()
730+
731+
// Get all metastore paths for the time range
732+
var tablePaths []string
733+
for path := range iterTableOfContentsPaths(start, end) {
734+
tablePaths = append(tablePaths, path)
735+
}
736+
737+
// Return early if no toc files are found
738+
if len(tablePaths) == 0 {
739+
m.metrics.indexObjectsTotal.Observe(0)
740+
m.metrics.resolvedSectionsTotal.Observe(0)
741+
level.Debug(utillog.WithContext(ctx, m.logger)).Log("msg", "no sections resolved", "reason", "no toc paths")
742+
return nil, nil
743+
}
744+
745+
// List index objects from all tables concurrently
746+
indexPaths, err := m.listObjectsFromTables(ctx, tablePaths, start, end)
747+
if err != nil {
748+
return nil, err
749+
}
750+
751+
m.metrics.indexObjectsTotal.Observe(float64(len(indexPaths)))
752+
region.Record(xcap.StatMetastoreIndexObjects.Observe(int64(len(indexPaths))))
753+
754+
return indexPaths, nil
755+
}

pkg/engine/engine.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -365,19 +365,20 @@ func (e *Engine) prepareCatalog(ctx context.Context, params logql.Params, logica
365365
}
366366

367367
resolved, err := unresolved.Resolve(func(req physical.CatalogRequest) (physical.CatalogResponse, error) {
368-
switch req.Kind {
369-
case physical.CatalogRequestKindResolveShardDescriptorsWithShard:
370-
descriptors, err := metastoreCatalog.ResolveShardDescriptorsWithShard(req.Selector, req.Predicates, req.Shard, req.From, req.Through)
371-
if err != nil {
372-
return physical.CatalogResponse{}, err
373-
}
374-
return physical.CatalogResponse{
375-
Kind: physical.CatalogRequestKindResolveShardDescriptorsWithShard,
376-
Descriptors: descriptors,
377-
}, nil
378-
default:
379-
return physical.CatalogResponse{}, fmt.Errorf("unsupported catalog request kind: %v", req.Kind)
380-
}
368+
return e.queryCatalog(ctx, req)
369+
//switch req.Kind {
370+
//case physical.CatalogRequestKindResolveShardDescriptorsWithShard:
371+
// descriptors, err := metastoreCatalog.ResolveShardDescriptorsWithShard(req.Selector, req.Predicates, req.Shard, req.From, req.Through)
372+
// if err != nil {
373+
// return physical.CatalogResponse{}, err
374+
// }
375+
// return physical.CatalogResponse{
376+
// Kind: physical.CatalogRequestKindResolveShardDescriptorsWithShard,
377+
// Descriptors: descriptors,
378+
// }, nil
379+
//default:
380+
// return physical.CatalogResponse{}, fmt.Errorf("unsupported catalog request kind: %v", req.Kind)
381+
//}
381382
})
382383
if err != nil {
383384
return nil, fmt.Errorf("resolving catalog: %w", err)

pkg/engine/engine_tmp.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package engine
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"time"
8+
9+
"github.com/apache/arrow-go/v18/arrow"
10+
"github.com/apache/arrow-go/v18/arrow/array"
11+
"github.com/go-kit/log/level"
12+
13+
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
14+
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
15+
"github.com/grafana/loki/v3/pkg/engine/internal/executor"
16+
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
17+
)
18+
19+
func (e *Engine) queryCatalog(ctx context.Context, req physical.CatalogRequest) (physical.CatalogResponse, error) {
20+
physicalPlan, err := physical.PlanCatalogRequest(ctx, req, e.metastore)
21+
if err != nil {
22+
return physical.CatalogResponse{}, fmt.Errorf("planning metastore request: %w", err)
23+
}
24+
25+
wf, _, err := e.buildWorkflow(ctx, e.logger, physicalPlan)
26+
if err != nil {
27+
return physical.CatalogResponse{}, fmt.Errorf("building workflow: %w", err)
28+
}
29+
30+
pipeline, err := wf.Run(ctx)
31+
if err != nil {
32+
return physical.CatalogResponse{}, fmt.Errorf("running workflow: %w", err)
33+
}
34+
35+
// TODO(ivkalita): use ResultBuilder here?
36+
// TODO(ivkalita): switch on request type?
37+
38+
objectSectionDescriptors := make(map[metastore.SectionKey]*metastore.DataobjSectionDescriptor)
39+
for {
40+
rec, err := pipeline.Read(ctx)
41+
if err != nil && !errors.Is(err, executor.EOF) {
42+
level.Warn(e.logger).Log(
43+
"msg", "error during execution",
44+
"err", err,
45+
)
46+
return physical.CatalogResponse{}, err
47+
}
48+
49+
if rec != nil && rec.NumRows() > 0 {
50+
if err := addSectionDescriptors(rec, objectSectionDescriptors); err != nil {
51+
return physical.CatalogResponse{}, err
52+
}
53+
}
54+
55+
if errors.Is(err, executor.EOF) {
56+
break
57+
}
58+
}
59+
60+
descriptors := make([]*metastore.DataobjSectionDescriptor, 0, len(objectSectionDescriptors))
61+
for _, s := range objectSectionDescriptors {
62+
descriptors = append(descriptors, s)
63+
}
64+
65+
filteredDescriptors, err := physical.FilterDescriptorsForShard(req.Shard, descriptors)
66+
if err != nil {
67+
return physical.CatalogResponse{}, err
68+
}
69+
70+
return physical.CatalogResponse{
71+
Kind: physical.CatalogRequestKindResolveShardDescriptorsWithShard,
72+
Descriptors: filteredDescriptors,
73+
}, nil
74+
}
75+
76+
func addSectionDescriptors(rec arrow.RecordBatch, result map[metastore.SectionKey]*metastore.DataobjSectionDescriptor) error {
77+
numRows := int(rec.NumRows())
78+
buf := make([]pointers.SectionPointer, numRows)
79+
schema := rec.Schema()
80+
for fIdx := range schema.Fields() {
81+
field := schema.Field(fIdx)
82+
col := rec.Column(fIdx)
83+
switch field.Name {
84+
case "path.path.utf8":
85+
values := col.(*array.String)
86+
for rIdx := range numRows {
87+
if col.IsNull(rIdx) {
88+
continue
89+
}
90+
buf[rIdx].Path = values.Value(rIdx)
91+
}
92+
case "section.int64":
93+
values := col.(*array.Int64)
94+
for rIdx := range numRows {
95+
if col.IsNull(rIdx) {
96+
continue
97+
}
98+
buf[rIdx].Section = values.Value(rIdx)
99+
}
100+
case "stream_id.int64":
101+
values := col.(*array.Int64)
102+
for rIdx := range numRows {
103+
if col.IsNull(rIdx) {
104+
continue
105+
}
106+
buf[rIdx].StreamID = values.Value(rIdx)
107+
}
108+
case "stream_id_ref.int64":
109+
values := col.(*array.Int64)
110+
for rIdx := range numRows {
111+
if col.IsNull(rIdx) {
112+
continue
113+
}
114+
buf[rIdx].StreamIDRef = values.Value(rIdx)
115+
}
116+
case "min_timestamp.timestamp":
117+
values := col.(*array.Timestamp)
118+
for rIdx := range numRows {
119+
if col.IsNull(rIdx) {
120+
continue
121+
}
122+
buf[rIdx].StartTs = time.Unix(0, int64(values.Value(rIdx)))
123+
}
124+
case "max_timestamp.timestamp":
125+
values := col.(*array.Timestamp)
126+
for rIdx := range numRows {
127+
if col.IsNull(rIdx) {
128+
continue
129+
}
130+
buf[rIdx].EndTs = time.Unix(0, int64(values.Value(rIdx)))
131+
}
132+
case "row_count.int64":
133+
values := col.(*array.Int64)
134+
for rIdx := range numRows {
135+
if col.IsNull(rIdx) {
136+
continue
137+
}
138+
buf[rIdx].LineCount = values.Value(rIdx)
139+
}
140+
case "uncompressed_size.int64":
141+
values := col.(*array.Int64)
142+
for rIdx := range numRows {
143+
if col.IsNull(rIdx) {
144+
continue
145+
}
146+
buf[rIdx].UncompressedSize = values.Value(rIdx)
147+
}
148+
default:
149+
continue
150+
}
151+
}
152+
153+
for _, ptr := range buf {
154+
key := metastore.SectionKey{ObjectPath: ptr.Path, SectionIdx: ptr.Section}
155+
existing, ok := result[key]
156+
if !ok {
157+
result[key] = metastore.NewSectionDescriptor(ptr)
158+
continue
159+
}
160+
existing.Merge(ptr)
161+
}
162+
return nil
163+
}

pkg/engine/internal/executor/executor.go

Lines changed: 87 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/grafana/loki/v3/pkg/dataobj"
1616
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
17+
"github.com/grafana/loki/v3/pkg/dataobj/sections/pointers"
1718
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
1819
"github.com/grafana/loki/v3/pkg/engine/internal/planner/physical"
1920
"github.com/grafana/loki/v3/pkg/xcap"
@@ -95,6 +96,10 @@ func (c *Context) execute(ctx context.Context, node physical.Node) Pipeline {
9596
return newObservedPipeline(c.executeDataObjScan(ctx, n, nodeRegion))
9697
}, inputs)
9798

99+
case *physical.PointersScan:
100+
return newLazyPipeline(func(ctx context.Context, _ []Pipeline) Pipeline {
101+
return newObservedPipeline(c.executePointersScan(ctx, n, nodeRegion))
102+
}, inputs)
98103
case *physical.TopK:
99104
return newObservedPipeline(c.executeTopK(ctx, n, inputs, nodeRegion))
100105
case *physical.Limit:
@@ -208,6 +213,73 @@ func (c *Context) executeDataObjScan(ctx context.Context, node *physical.DataObj
208213
return pipeline
209214
}
210215

216+
func (c *Context) executePointersScan(ctx context.Context, node *physical.PointersScan, region *xcap.Region) Pipeline {
217+
if c.bucket == nil {
218+
return errorPipeline(ctx, errors.New("no object store bucket configured"))
219+
}
220+
221+
obj, err := dataobj.FromBucket(ctx, c.bucket, string(node.Location))
222+
if err != nil {
223+
return errorPipeline(ctx, fmt.Errorf("creating data object: %w", err))
224+
}
225+
region.AddEvent("opened dataobj")
226+
227+
var (
228+
streamsSection *streams.Section
229+
pointersSections []*pointers.Section
230+
)
231+
232+
tenant, err := user.ExtractOrgID(ctx)
233+
if err != nil {
234+
return errorPipeline(ctx, fmt.Errorf("missing org ID: %w", err))
235+
}
236+
237+
for _, sec := range obj.Sections().Filter(streams.CheckSection) {
238+
if sec.Tenant != tenant {
239+
continue
240+
}
241+
242+
if streamsSection != nil {
243+
return errorPipeline(ctx, fmt.Errorf("multiple streams sections found in data object %q", node.Location))
244+
}
245+
246+
var err error
247+
streamsSection, err = streams.Open(ctx, sec)
248+
if err != nil {
249+
return errorPipeline(ctx, fmt.Errorf("opening streams section %q: %w", sec.Type, err))
250+
}
251+
region.AddEvent("opened streams section")
252+
}
253+
if streamsSection == nil {
254+
return errorPipeline(ctx, fmt.Errorf("streams section not found in data object %q", node.Location))
255+
}
256+
257+
for _, sec := range obj.Sections().Filter(pointers.CheckSection) {
258+
if sec.Tenant != tenant {
259+
continue
260+
}
261+
262+
s, err := pointers.Open(ctx, sec)
263+
if err != nil {
264+
return errorPipeline(ctx, fmt.Errorf("pointers logs section %q: %w", sec.Type, err))
265+
}
266+
pointersSections = append(pointersSections, s)
267+
region.AddEvent("opened pointers section")
268+
break
269+
}
270+
271+
var pipeline Pipeline = newPointersScanPipeline(pointersScanOptions{
272+
StreamsSection: streamsSection,
273+
PointersSections: pointersSections,
274+
Selector: node.Selector,
275+
BatchSize: c.batchSize,
276+
Start: node.Start,
277+
End: node.End,
278+
}, log.With(c.logger, "location", string(node.Location)), region)
279+
280+
return pipeline
281+
}
282+
211283
func (c *Context) executeTopK(ctx context.Context, topK *physical.TopK, inputs []Pipeline, region *xcap.Region) Pipeline {
212284
if len(inputs) == 0 {
213285
return emptyPipeline()
@@ -336,7 +408,7 @@ func (c *Context) executeScanSet(ctx context.Context, set *physical.ScanSet, _ *
336408
// ScanSet typically gets partitioned by the scheduler into multiple scan
337409
// nodes.
338410
//
339-
// However, for locally testing unpartitioned pipelines, we still supprt
411+
// However, for locally testing unpartitioned pipelines, we still support
340412
// running a ScanSet. In this case, we treat internally execute it as a
341413
// Merge on top of multiple sequential scans.
342414
ctx, mergeRegion := xcap.StartRegion(ctx, physical.NodeTypeMerge.String())
@@ -356,6 +428,16 @@ func (c *Context) executeScanSet(ctx context.Context, set *physical.ScanSet, _ *
356428
targets = append(targets, newLazyPipeline(func(_ context.Context, _ []Pipeline) Pipeline {
357429
return newObservedPipeline(c.executeDataObjScan(nodeCtx, partition, partitionRegion))
358430
}, nil))
431+
case physical.ScanTypePointers:
432+
// Make sure projections and predicates get passed down to the
433+
// individual scan.
434+
partition := target.PointersScan
435+
436+
nodeCtx, partitionRegion := startRegionForNode(ctx, partition)
437+
438+
targets = append(targets, newLazyPipeline(func(_ context.Context, _ []Pipeline) Pipeline {
439+
return newObservedPipeline(c.executePointersScan(nodeCtx, partition, partitionRegion))
440+
}, nil))
359441
default:
360442
return errorPipeline(ctx, fmt.Errorf("unrecognized ScanSet target %s", target.Type))
361443
}
@@ -452,6 +534,10 @@ func startRegionForNode(ctx context.Context, n physical.Node) (context.Context,
452534
attribute.Int("num_predicates", len(n.Predicates)),
453535
attribute.Int("num_projections", len(n.Projections)),
454536
)
537+
538+
case *physical.PointersScan:
539+
attributes = append(attributes,
540+
attribute.String("location", string(n.Location)))
455541
default:
456542
// do nothing.
457543
}

0 commit comments

Comments
 (0)