@@ -13,6 +13,7 @@ import (
13
13
"github.com/polarsignals/frostdb"
14
14
frost "github.com/polarsignals/frostdb"
15
15
"github.com/polarsignals/frostdb/dynparquet"
16
+ schemapb "github.com/polarsignals/frostdb/gen/proto/go/frostdb/schema/v1alpha1"
16
17
"github.com/polarsignals/frostdb/query"
17
18
"github.com/polarsignals/frostdb/query/logicalplan"
18
19
"github.com/prometheus/client_golang/prometheus"
@@ -41,18 +42,30 @@ type FrostQuerier struct {
41
42
}
42
43
43
44
// Open a new frostDB
44
- func Open (reg prometheus.Registerer , logger log.Logger ) (* FrostDB , error ) {
45
- store := frost .New (
45
+ func Open (dir string , reg prometheus.Registerer , logger log.Logger ) (* FrostDB , error ) {
46
+ ctx := context .Background ()
47
+ store , err := frost .New (
48
+ logger ,
46
49
reg ,
47
- 8192 ,
48
- 10 * 1024 * 1024 * 1024 ,
50
+ frost . WithWAL () ,
51
+ frost . WithStoragePath ( dir ) ,
49
52
)
50
- db , _ := store .DB ("prometheus" )
51
- schema := promSchema ()
53
+ if err != nil {
54
+ return nil , err
55
+ }
56
+
57
+ err = store .ReplayWALs (ctx )
58
+ if err != nil {
59
+ return nil , err
60
+ }
61
+ db , _ := store .DB (ctx , "prometheus" )
62
+ schema , err := promSchema ()
63
+ if err != nil {
64
+ return nil , err
65
+ }
52
66
table , err := db .Table (
53
67
"metrics" ,
54
68
frost .NewTableConfig (schema ),
55
- logger ,
56
69
)
57
70
if err != nil {
58
71
return nil , err
@@ -157,8 +170,8 @@ func (f *FrostQuerier) Select(sortSeries bool, hints *storage.SelectHints, match
157
170
err := engine .ScanTable ("metrics" ).
158
171
Filter (logicalplan .And (
159
172
logicalplan .And (
160
- logicalplan .Col ("timestamp" ).GT (logicalplan .Literal (hints .Start )),
161
- logicalplan .Col ("timestamp" ).LT (logicalplan .Literal (hints .End )),
173
+ logicalplan .Col ("timestamp" ).Gt (logicalplan .Literal (hints .Start )),
174
+ logicalplan .Col ("timestamp" ).Lt (logicalplan .Literal (hints .End )),
162
175
),
163
176
promMatchersToFrostDBExprs (matchers ),
164
177
)).
@@ -242,27 +255,40 @@ func (f *FrostDB) ChunkQuerier(ctx context.Context, mint, maxt int64) (storage.C
242
255
return nil , nil
243
256
}
244
257
245
- func promSchema () * dynparquet.Schema {
246
- return dynparquet .NewSchema (
247
- "metrics_schema" ,
248
- []dynparquet.ColumnDefinition {{
249
- Name : "labels" ,
250
- StorageLayout : parquet .Encoded (parquet .String (), & parquet .RLEDictionary ),
251
- Dynamic : true ,
258
+ func promSchema () (* dynparquet.Schema , error ) {
259
+ return dynparquet .SchemaFromDefinition (& schemapb.Schema {
260
+ Name : "metrics_schema" ,
261
+ Columns : []* schemapb.Column {{
262
+ Name : "labels" ,
263
+ StorageLayout : & schemapb.StorageLayout {
264
+ Type : schemapb .StorageLayout_TYPE_STRING ,
265
+ Encoding : schemapb .StorageLayout_ENCODING_RLE_DICTIONARY ,
266
+ Nullable : true ,
267
+ },
268
+ Dynamic : true ,
252
269
}, {
253
- Name : "timestamp" ,
254
- StorageLayout : parquet .Int (64 ),
255
- Dynamic : false ,
270
+ Name : "timestamp" ,
271
+ StorageLayout : & schemapb.StorageLayout {
272
+ Type : schemapb .StorageLayout_TYPE_INT64 ,
273
+ },
274
+ Dynamic : false ,
256
275
}, {
257
- Name : "value" ,
258
- StorageLayout : parquet .Leaf (parquet .DoubleType ),
259
- Dynamic : false ,
276
+ Name : "value" ,
277
+ StorageLayout : & schemapb.StorageLayout {
278
+ Type : schemapb .StorageLayout_TYPE_DOUBLE ,
279
+ },
280
+ Dynamic : false ,
260
281
}},
261
- []dynparquet.SortingColumn {
262
- dynparquet .NullsFirst (dynparquet .Ascending ("labels" )),
263
- dynparquet .Ascending ("timestamp" ),
282
+ SortingColumns : []* schemapb.SortingColumn {{
283
+ Name : "labels" ,
284
+ NullsFirst : true ,
285
+ Direction : schemapb .SortingColumn_DIRECTION_ASCENDING ,
264
286
},
265
- )
287
+ {
288
+ Name : "timestamp" ,
289
+ Direction : schemapb .SortingColumn_DIRECTION_ASCENDING ,
290
+ }},
291
+ })
266
292
}
267
293
268
294
type arrowSeriesSet struct {
0 commit comments