Skip to content

Commit 621f070

Browse files
talIguazGal Topper
authored and
Gal Topper
committed
partitions PR fixes (#45)
* update part manager API * add multi-partition query (WIP) * add multi-partition query (WIP) * partition manager impl (#31) * new schema * fix recursive imports * use chunk and partition format interval * remove prints * format * fix utils * implement getShardingBuckets * add multi-partition query - working * add multi-partition query - working (#34) * Merge Dev to Partitions @ Yaron (#1) * Make BatchSize configurable; Update BenchIngest test * Suppot `--stdin` and constant memory when reading CSV. (#27) Previously, the entire CSV file was read into memory before anything else could run. * Add validation * Refactor: move stuff to common; Add wait and validation to Nuclio bench * init configuration from testdata * address PR comments * Add tests (#28) * add aggregators test * add ingest+query+create tests * rename tests description, add negative aggregator test. * add integration target to make file + pr comments * add bug id to ignored tests * go format * add a test * format * create nuclio build tag, create test util * format * run every table test case as a sub test * more pr comments + move tests util to tsdbtest * move test models to tsdbtest * support ignoring tests * create unique DB name on test setup * format * add delete TSDB test * minor style fix * fix nuclio test * format * style changes * improve tests + use testify * add build tags to go get * fix tags on go get * add race condition check on test * add multi-partition query - working (#34) * Fix CI * partition manager fixes (#38) * code review * fix utils * cleanup * fmt * fix test * code review * invalid aggr list * tests * Fix skipping of start partition in query. (#42) * pr fixes, mostly style & readability * minor fix * edit comment * minor fix * fixed pr comments,propogate errors and other minor fixes * revert sharding bucket feature, change default to 1 bucket * revert w,M,Y support * Minor usability and code improvements. * logger -> println because logger is not initialized... * go fmt
1 parent 855b52a commit 621f070

22 files changed

+1067
-561
lines changed

pkg/aggregate/aggregate.go

+51-6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ package aggregate
2222

2323
import (
2424
"fmt"
25+
"github.com/v3io/v3io-tsdb/pkg/config"
2526
"math"
2627
"strings"
2728
)
@@ -58,18 +59,62 @@ var aggrToString = map[AggrType]string{
5859
aggrTypeStddev: "stddev", aggrTypeStdvar: "stdvar", aggrTypeAll: "*",
5960
}
6061

62+
var aggrToSchemaField = map[string]config.SchemaField{
63+
"count": {Name: "count", Type: "array", Nullable: true, Items: "double"},
64+
"sum": {Name: "sum", Type: "array", Nullable: true, Items: "double"},
65+
"sqr": {Name: "sqr", Type: "array", Nullable: true, Items: "double"},
66+
"max": {Name: "max", Type: "array", Nullable: true, Items: "double"},
67+
"min": {Name: "min", Type: "array", Nullable: true, Items: "double"},
68+
"last": {Name: "last", Type: "array", Nullable: true, Items: "double"},
69+
"avg": {Name: "avg", Type: "array", Nullable: true, Items: "double"},
70+
"rate": {Name: "rate", Type: "array", Nullable: true, Items: "double"},
71+
"stddev": {Name: "stddev", Type: "array", Nullable: true, Items: "double"},
72+
"stdvar": {Name: "stdvar", Type: "array", Nullable: true, Items: "double"},
73+
}
74+
75+
func SchemaFieldFromString(aggregators []string, col string) ([]config.SchemaField, error) {
76+
fieldList := make([]config.SchemaField, 0, len(aggregators))
77+
for _, s := range aggregators {
78+
trimmed := strings.TrimSpace(s)
79+
if trimmed != "" {
80+
if trimmed == "*" {
81+
fieldList = make([]config.SchemaField, 0, len(aggrToSchemaField))
82+
for _, val := range aggrToSchemaField {
83+
fieldList = append(fieldList, getAggrFullName(val, col))
84+
}
85+
return fieldList, nil
86+
} else {
87+
field, ok := aggrToSchemaField[trimmed]
88+
if !ok {
89+
return nil, fmt.Errorf("invalid aggragator type '%s'", trimmed)
90+
}
91+
fieldList = append(fieldList, getAggrFullName(field, col))
92+
}
93+
}
94+
}
95+
return fieldList, nil
96+
}
97+
98+
func getAggrFullName(field config.SchemaField, col string) config.SchemaField {
99+
fullName := fmt.Sprintf("_%s_%s", col, field.Name)
100+
field.Name = fullName
101+
return field
102+
}
103+
61104
func (a AggrType) String() string { return aggrToString[a] }
62105

63106
// convert comma separated string to aggregator mask
64-
func AggrsFromString(list string) (AggrType, error) {
65-
split := strings.Split(list, ",")
107+
func AggrsFromString(split []string) (AggrType, error) {
66108
var aggrList AggrType
67109
for _, s := range split {
68-
aggr, ok := aggrTypeString[s]
69-
if !ok {
70-
return aggrList, fmt.Errorf("Invalid aggragator type %s", s)
110+
trimmed := strings.TrimSpace(s)
111+
if trimmed != "" {
112+
aggr, ok := aggrTypeString[trimmed]
113+
if !ok {
114+
return aggrList, fmt.Errorf("invalid aggragator type '%s'", s)
115+
}
116+
aggrList = aggrList | aggr
71117
}
72-
aggrList = aggrList | aggr
73118
}
74119
return aggrList, nil
75120
}

pkg/aggregate/aggregate_test.go

+15-1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,20 @@ func TestAggregators(t *testing.T) {
7070
data: map[int64]float64{1: 7.5, 2: 2.5},
7171
exprCol: "v", bucket: 1,
7272
expectedUpdateExpr: "_v_count[1]=_v_count[1]+2;", expectedSetExpr: "_v_count[1]=2;", expectFail: true},
73+
74+
{desc: "Should aggregate data when specifying aggregators with sapces",
75+
aggString: "min , max ",
76+
data: map[int64]float64{1: 7.5, 2: 2.5},
77+
exprCol: "v", bucket: 1,
78+
expectedUpdateExpr: fmt.Sprintf("_v_min[1]=min(_v_min[1],%f);_v_max[1]=max(_v_max[1],%f);", 2.5, 7.5),
79+
expectedSetExpr: fmt.Sprintf("_v_min[1]=%f;_v_max[1]=%f;", 2.5, 7.5)},
80+
81+
{desc: "Should aggregate data when specifying aggregators with empty values",
82+
aggString: "min , ,max ",
83+
data: map[int64]float64{1: 7.5, 2: 2.5},
84+
exprCol: "v", bucket: 1,
85+
expectedUpdateExpr: fmt.Sprintf("_v_min[1]=min(_v_min[1],%f);_v_max[1]=max(_v_max[1],%f);", 2.5, 7.5),
86+
expectedSetExpr: fmt.Sprintf("_v_min[1]=%f;_v_max[1]=%f;", 2.5, 7.5)},
7387
}
7488

7589
for _, test := range testCases {
@@ -87,7 +101,7 @@ func TestAggregators(t *testing.T) {
87101
func testAggregatorCase(t *testing.T, aggString string, data map[int64]float64, exprCol string, bucket int,
88102
expectedUpdateExpr string, expectedSetExpr string, expectFail bool) {
89103

90-
aggregator, err := AggrsFromString(aggString)
104+
aggregator, err := AggrsFromString(strings.Split(aggString, ","))
91105
if err != nil {
92106
if !expectFail {
93107
t.Fatal(err)

pkg/appender/store.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func NewChunkStore() *chunkStore {
4444
// chunkStore store state & latest + previous chunk appenders
4545
type chunkStore struct {
4646
curChunk int
47-
lastTid int
47+
lastTid int64
4848
chunks [2]*attrAppender
4949

5050
aggrList *aggregate.AggregatorList
@@ -115,7 +115,10 @@ func (cs *chunkStore) GetChunksState(mc *MetricsCache, metric *MetricState) (boo
115115

116116
// init chunk and create aggregation list object based on partition policy
117117
t := cs.pending[0].t
118-
part := mc.partitionMngr.TimeToPart(t)
118+
part, err := mc.partitionMngr.TimeToPart(t)
119+
if err != nil {
120+
return false, err
121+
}
119122
cs.chunks[0].initialize(part, t)
120123
cs.aggrList = aggregate.NewAggregatorList(part.AggrType())
121124

@@ -182,7 +185,7 @@ func (cs *chunkStore) ProcessGetResp(mc *MetricsCache, metric *MetricState, resp
182185
}
183186

184187
// set Last TableId, indicate that there is no need to create metric object
185-
cs.lastTid = cs.chunks[0].partition.GetId()
188+
cs.lastTid = cs.chunks[0].partition.GetStartTime()
186189

187190
}
188191

@@ -217,7 +220,8 @@ func (cs *chunkStore) chunkByTime(t int64) *attrAppender {
217220
if err != nil {
218221
return nil
219222
}
220-
cur.initialize(part.NextPart(t), t) // TODO: next part
223+
nextPart, _ := part.NextPart(t)
224+
cur.initialize(nextPart, t)
221225
cur.appender = app
222226
cs.curChunk = cs.curChunk ^ 1
223227

@@ -251,10 +255,13 @@ func (cs *chunkStore) WriteChunks(mc *MetricsCache, metric *MetricState) (bool,
251255

252256
// init partition info and find if we need to init the metric headers (labels, ..) in case of new partition
253257
t0 := cs.pending[0].t
254-
partition := mc.partitionMngr.TimeToPart(t0)
255-
if partition.GetId() > cs.lastTid {
258+
partition, err := mc.partitionMngr.TimeToPart(t0)
259+
if err != nil {
260+
return false, err
261+
}
262+
if partition.GetStartTime() > cs.lastTid {
256263
notInitialized = true
257-
cs.lastTid = partition.GetId()
264+
cs.lastTid = partition.GetStartTime()
258265
}
259266

260267
// init aggregation buckets info

pkg/config/config.go

+47-36
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ import (
2727
)
2828

2929
const DefaultConfigurationFileName = "v3io.yaml"
30+
const SCHEMA_CONFIG = ".schema"
3031

3132
type V3ioConfig struct {
3233
// V3IO Connection details: Url, Data container, relative path for this dataset, credentials
@@ -54,42 +55,52 @@ type V3ioConfig struct {
5455
BatchSize int `json:"batchSize,omitempty"`
5556
}
5657

57-
type DBPartConfig struct {
58-
// Indicating this is a valid Partition file, Signature == 'TSDB'
59-
Signature string `json:"signature"`
60-
// Version of the config
61-
Version string `json:"version"`
62-
// Description of this TSDB
63-
Description string `json:"description,omitempty"`
64-
// Partition Key, __name__ by default
65-
ShardingKey string `json:"partitionKey,omitempty"`
66-
// Sorting Key, dimensions used for sorting per DB shard
67-
SortingKey string `json:"sortingKey,omitempty"`
68-
// indicate if it is cyclic (single partition, return to first chunk after the last)
69-
IsCyclic bool `json:"isCyclic,omitempty"`
70-
// Number of hours per chunk (1hr default)
71-
HrInChunk int `json:"hrInChunk,omitempty"`
72-
// Days per table/object (in a partition), after N days will use a new table or go to start (Cyclic partition)
73-
// this is used only for the Head configuration, per partition we look at StartTime & EndTime
74-
DaysPerObj int `json:"daysPerObj,omitempty"`
75-
// How many days to save samples
76-
DaysRetention int `json:"daysRetention,omitempty"`
77-
// Start from time/date in Unix milisec
78-
StartTime int64 `json:"startTime,omitempty"`
79-
// End by time/date in Unix milisec
80-
EndTime int64 `json:"endTime,omitempty"`
81-
// Partition name format e.g. 'dd-mm-yy'
82-
PartFormat string `json:"partFormat,omitempty"`
83-
84-
// Comma seperated list of default aggregation functions e.g. 'count,sum,avg,max'
85-
DefaultRollups string `json:"defaultRollups,omitempty"`
86-
// Number of minutes per aggregation bucket (aggregation interval)
87-
RollupMin int `json:"rollupMin,omitempty"`
88-
// If true, dont save raw samples/chunks, only aggregates
89-
DelRawSamples bool `json:"delRawSamples,omitempty"`
90-
91-
// Metric specific policy
92-
MetricsConfig map[string]MetricConfig `json:"metricsConfig,omitempty"`
58+
type Rollup struct {
59+
Aggregators string `json:"aggregators"`
60+
AggregatorsGranularity string `json:"aggregatorsGranularity"`
61+
//["cloud","local"] for the aggregators and sample chucks
62+
StorageClass string `json:"storageClass"`
63+
//in hours. 0 means no need to save samples
64+
SampleRetention int `json:"sampleRetention"`
65+
// format : 1m, 7d, 3h . Possible intervals: m/d/h
66+
LayerRetentionTime string `json:"layerRetentionTime"`
67+
}
68+
69+
type TableSchema struct {
70+
Version int `json:"version"`
71+
RollupLayers []Rollup `json:"rollupLayers"`
72+
ShardingBuckets int `json:"shardingBuckets"`
73+
PartitionerInterval string `json:"partitionerInterval"`
74+
ChunckerInterval string `json:"chunckerInterval"`
75+
}
76+
77+
type PartitionSchema struct {
78+
Version int `json:"version"`
79+
Aggregators []string `json:"aggregators"`
80+
AggregatorsGranularity string `json:"aggregatorsGranularity"`
81+
StorageClass string `json:"storageClass"`
82+
SampleRetention int `json:"sampleRetention"`
83+
PartitionerInterval string `json:"partitionerInterval"`
84+
ChunckerInterval string `json:"chunckerInterval"`
85+
}
86+
87+
type Partition struct {
88+
StartTime int64 `json:"startTime"`
89+
SchemaInfo PartitionSchema `json:"schemaInfo"`
90+
}
91+
92+
type SchemaField struct {
93+
Name string `json:"name"`
94+
Type string `json:"type"`
95+
Nullable bool `json:"nullable"`
96+
Items string `json:"items,omitempty"`
97+
}
98+
99+
type Schema struct {
100+
TableSchemaInfo TableSchema `json:"tableSchemaInfo"`
101+
PartitionSchemaInfo PartitionSchema `json:"partitionSchemaInfo"`
102+
Partitions []Partition `json:"partitions"`
103+
Fields []SchemaField `json:"fields"`
93104
}
94105

95106
type MetricConfig struct {

0 commit comments

Comments
 (0)