Skip to content

Commit 282c474

Browse files
committed
Use latest FrostDB and implement latest Prometheus interfaces
1 parent 0555d45 commit 282c474

File tree

3 files changed

+330
-81
lines changed

3 files changed

+330
-81
lines changed

frostdb/frostdb.go

Lines changed: 113 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@ import (
66
"math"
77
"strings"
88

9-
"github.com/apache/arrow/go/v8/arrow"
10-
"github.com/apache/arrow/go/v8/arrow/array"
11-
"github.com/apache/arrow/go/v8/arrow/memory"
9+
"github.com/apache/arrow/go/v10/arrow"
10+
"github.com/apache/arrow/go/v10/arrow/array"
11+
"github.com/apache/arrow/go/v10/arrow/memory"
1212
"github.com/go-kit/log"
1313
"github.com/polarsignals/frostdb"
1414
frost "github.com/polarsignals/frostdb"
@@ -17,12 +17,15 @@ import (
1717
"github.com/polarsignals/frostdb/query"
1818
"github.com/polarsignals/frostdb/query/logicalplan"
1919
"github.com/prometheus/client_golang/prometheus"
20+
"github.com/segmentio/parquet-go"
21+
"github.com/thanos-io/objstore/providers/filesystem"
22+
2023
"github.com/prometheus/prometheus/model/exemplar"
24+
"github.com/prometheus/prometheus/model/histogram"
2125
"github.com/prometheus/prometheus/model/labels"
26+
"github.com/prometheus/prometheus/model/metadata"
2227
"github.com/prometheus/prometheus/storage"
2328
"github.com/prometheus/prometheus/tsdb/chunkenc"
24-
"github.com/segmentio/parquet-go"
25-
"github.com/thanos-io/objstore/providers/filesystem"
2629
)
2730

2831
type FrostDB struct {
@@ -38,6 +41,14 @@ type FrostAppender struct {
3841
tableRef *frostdb.Table
3942
}
4043

44+
func (f *FrostAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
45+
panic("histogram not supported")
46+
}
47+
48+
func (f *FrostAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
49+
panic("metadata not supported")
50+
}
51+
4152
type FrostQuerier struct {
4253
*FrostDB
4354
}
@@ -47,8 +58,8 @@ func Open(dir string, reg prometheus.Registerer, logger log.Logger) (*FrostDB, e
4758
bucket, err := filesystem.NewBucket(dir)
4859
ctx := context.Background()
4960
store, err := frost.New(
50-
logger,
51-
reg,
61+
frost.WithLogger(logger),
62+
frost.WithRegistry(reg),
5263
frost.WithWAL(),
5364
frost.WithStoragePath(dir),
5465
frost.WithBucketStorage(bucket),
@@ -67,7 +78,7 @@ func Open(dir string, reg prometheus.Registerer, logger log.Logger) (*FrostDB, e
6778
return nil, err
6879
}
6980
table, err := db.Table(
70-
"metrics",
81+
tableMetrics,
7182
frost.NewTableConfig(schema),
7283
)
7384
if err != nil {
@@ -92,10 +103,10 @@ func (f *FrostQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]
92103
)
93104

94105
sets := map[uint64]*series{}
95-
err := engine.ScanTable("metrics").
106+
err := engine.ScanTable(tableMetrics).
96107
Filter(promMatchersToFrostDBExprs(matchers)).
97108
Distinct(logicalplan.Col("labels."+name)).
98-
Execute(context.Background(), func(ar arrow.Record) error {
109+
Execute(context.Background(), func(ctx context.Context, ar arrow.Record) error {
99110
defer ar.Release()
100111
parseRecordIntoSeriesSet(ar, sets)
101112
return nil
@@ -105,7 +116,7 @@ func (f *FrostQuerier) LabelValues(name string, matchers ...*labels.Matcher) ([]
105116
}
106117

107118
s := flattenSeriesSets(sets)
108-
names := []string{}
119+
names := make([]string, 0, len(s.sets))
109120
for _, s := range s.sets {
110121
for _, l := range s.l {
111122
names = append(names, l.Value)
@@ -139,10 +150,10 @@ func (f *FrostQuerier) LabelNames(matchers ...*labels.Matcher) ([]string, storag
139150
)
140151

141152
sets := map[string]struct{}{}
142-
err := engine.ScanTable("metrics").
143-
Project(logicalplan.DynCol("labels")).
153+
err := engine.ScanTable(tableMetrics).
154+
Project(logicalplan.DynCol(columnLabels)).
144155
Filter(promMatchersToFrostDBExprs(matchers)).
145-
Execute(context.Background(), func(ar arrow.Record) error {
156+
Execute(context.Background(), func(ctx context.Context, ar arrow.Record) error {
146157
defer ar.Release()
147158
for i := 0; i < int(ar.NumCols()); i++ {
148159
sets[ar.ColumnName(i)] = struct{}{}
@@ -172,20 +183,20 @@ func (f *FrostQuerier) Select(sortSeries bool, hints *storage.SelectHints, match
172183
)
173184

174185
sets := map[uint64]*series{}
175-
err := engine.ScanTable("metrics").
186+
err := engine.ScanTable(tableMetrics).
176187
Filter(logicalplan.And(
177188
logicalplan.And(
178-
logicalplan.Col("timestamp").Gt(logicalplan.Literal(hints.Start)),
179-
logicalplan.Col("timestamp").Lt(logicalplan.Literal(hints.End)),
189+
logicalplan.Col(columnTimestamp).Gt(logicalplan.Literal(hints.Start)),
190+
logicalplan.Col(columnTimestamp).Lt(logicalplan.Literal(hints.End)),
180191
),
181192
promMatchersToFrostDBExprs(matchers),
182193
)).
183194
Project(
184-
logicalplan.DynCol("labels"),
185-
logicalplan.Col("timestamp"),
186-
logicalplan.Col("value"),
195+
logicalplan.DynCol(columnLabels),
196+
logicalplan.Col(columnTimestamp),
197+
logicalplan.Col(columnValue),
187198
).
188-
Execute(context.Background(), func(ar arrow.Record) error {
199+
Execute(context.Background(), func(ctx context.Context, ar arrow.Record) error {
189200
defer ar.Release()
190201
parseRecordIntoSeriesSet(ar, sets)
191202
return nil
@@ -260,39 +271,48 @@ func (f *FrostDB) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.C
260271
return nil, nil
261272
}
262273

274+
const (
275+
tableMetrics = "metrics"
276+
columnLabels = "labels"
277+
columnTimestamp = "timestamp"
278+
columnValue = "value"
279+
)
280+
263281
func promSchema() (*dynparquet.Schema, error) {
264282
return dynparquet.SchemaFromDefinition(&schemapb.Schema{
265283
Name: "metrics_schema",
266284
Columns: []*schemapb.Column{{
267-
Name: "labels",
285+
Name: columnLabels,
268286
StorageLayout: &schemapb.StorageLayout{
269287
Type: schemapb.StorageLayout_TYPE_STRING,
270288
Encoding: schemapb.StorageLayout_ENCODING_RLE_DICTIONARY,
271289
Nullable: true,
272290
},
273291
Dynamic: true,
274292
}, {
275-
Name: "timestamp",
293+
Name: columnTimestamp,
276294
StorageLayout: &schemapb.StorageLayout{
277295
Type: schemapb.StorageLayout_TYPE_INT64,
278296
},
279297
Dynamic: false,
280298
}, {
281-
Name: "value",
299+
Name: columnValue,
282300
StorageLayout: &schemapb.StorageLayout{
283301
Type: schemapb.StorageLayout_TYPE_DOUBLE,
284302
},
285303
Dynamic: false,
286304
}},
287-
SortingColumns: []*schemapb.SortingColumn{{
288-
Name: "labels",
289-
NullsFirst: true,
290-
Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
291-
},
305+
SortingColumns: []*schemapb.SortingColumn{
306+
{
307+
Name: columnLabels,
308+
NullsFirst: true,
309+
Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
310+
},
292311
{
293-
Name: "timestamp",
312+
Name: columnTimestamp,
294313
Direction: schemapb.SortingColumn_DIRECTION_ASCENDING,
295-
}},
314+
},
315+
},
296316
})
297317
}
298318

@@ -306,6 +326,18 @@ type arrowSeries struct {
306326
*series
307327
}
308328

329+
func (a *arrowSeries) AtHistogram() (int64, *histogram.Histogram) {
330+
panic("histogram not supported")
331+
}
332+
333+
func (a *arrowSeries) AtFloatHistogram() (int64, *histogram.FloatHistogram) {
334+
panic("histogram not supported")
335+
}
336+
337+
func (a *arrowSeries) AtT() int64 {
338+
return a.series.ts[a.index]
339+
}
340+
309341
func (a *arrowSeriesSet) Next() bool {
310342
a.index++
311343
return a.index < len(a.sets)
@@ -322,22 +354,27 @@ func (a *arrowSeriesSet) Err() error { return nil }
322354
func (a *arrowSeriesSet) Warnings() storage.Warnings { return nil }
323355

324356
func (a *arrowSeries) Labels() labels.Labels { return a.l }
325-
func (a *arrowSeries) Iterator() chunkenc.Iterator {
357+
358+
func (a *arrowSeries) Iterator(iterator chunkenc.Iterator) chunkenc.Iterator {
359+
// TODO: Use iterator here?
326360
return a
327361
}
328362

329-
func (a *arrowSeries) Next() bool {
363+
func (a *arrowSeries) Next() chunkenc.ValueType {
330364
a.index++
331-
return a.index < len(a.ts)
365+
if a.index < len(a.ts) {
366+
return chunkenc.ValFloat
367+
}
368+
return chunkenc.ValNone
332369
}
333370

334-
func (a *arrowSeries) Seek(i int64) bool {
371+
func (a *arrowSeries) Seek(i int64) chunkenc.ValueType {
335372
for ; a.index < len(a.series.ts); a.index++ {
336373
if a.series.ts[a.index] >= i {
337-
return true
374+
return chunkenc.ValFloat
338375
}
339376
}
340-
return false
377+
return chunkenc.ValNone
341378
}
342379

343380
func (a *arrowSeries) At() (int64, float64) {
@@ -377,26 +414,34 @@ type series struct {
377414
}
378415

379416
func parseRecord(r arrow.Record) map[uint64]*series {
380-
381417
seriesset := map[uint64]*series{}
382418

383419
for i := 0; i < int(r.NumRows()); i++ {
384420
lbls := labels.Labels{}
385421
var ts int64
386422
var v float64
387423
for j := 0; j < int(r.NumCols()); j++ {
424+
columnName := r.ColumnName(j)
388425
switch {
389-
case r.ColumnName(j) == "timestamp":
426+
case columnName == columnTimestamp:
390427
ts = r.Column(j).(*array.Int64).Value(i)
391-
case r.ColumnName(j) == "value":
428+
case columnName == columnValue:
392429
v = r.Column(j).(*array.Float64).Value(i)
393430
default:
394-
name := strings.TrimPrefix(r.ColumnName(j), "labels.")
395-
value := r.Column(j).(*array.Binary).Value(i)
431+
name := strings.TrimPrefix(columnName, "labels.")
432+
nameColumn, err := DictionaryFromRecord(r, columnName)
433+
if err != nil {
434+
continue
435+
}
436+
if nameColumn.IsNull(i) {
437+
continue
438+
}
439+
440+
value := StringValueFromDictionary(nameColumn, i)
396441
if string(value) != "" {
397442
lbls = append(lbls, labels.Label{
398443
Name: name,
399-
Value: string(value),
444+
Value: value,
400445
})
401446
}
402447
}
@@ -417,6 +462,31 @@ func parseRecord(r arrow.Record) map[uint64]*series {
417462
return seriesset
418463
}
419464

465+
func DictionaryFromRecord(ar arrow.Record, name string) (*array.Dictionary, error) {
466+
indices := ar.Schema().FieldIndices(name)
467+
if len(indices) != 1 {
468+
return nil, fmt.Errorf("expected 1 column named %q, got %d", name, len(indices))
469+
}
470+
471+
col, ok := ar.Column(indices[0]).(*array.Dictionary)
472+
if !ok {
473+
return nil, fmt.Errorf("expected column %q to be a dictionary column, got %T", name, ar.Column(indices[0]))
474+
}
475+
476+
return col, nil
477+
}
478+
479+
func StringValueFromDictionary(arr *array.Dictionary, i int) string {
480+
switch dict := arr.Dictionary().(type) {
481+
case *array.Binary:
482+
return string(dict.Value(arr.GetValueIndex(i)))
483+
case *array.String:
484+
return dict.Value(arr.GetValueIndex(i))
485+
default:
486+
panic(fmt.Sprintf("unsupported dictionary type: %T", dict))
487+
}
488+
}
489+
420490
// merge's a,b into an ordered list, maintains this same order for the floats
421491
func merge(a, b []int64, af, bf []float64) ([]int64, []float64) {
422492

0 commit comments

Comments
 (0)