Skip to content

Commit af5a598

Browse files
author
Igor Makhlin
authored
Merge pull request #59 from v3io/development
dev->master
2 parents 64c5075 + b41882c commit af5a598

File tree

18 files changed

+99
-85
lines changed

18 files changed

+99
-85
lines changed

Diff for: pkg/aggregate/aggregate.go

+19-3
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,22 @@ func getAggrFullName(field config.SchemaField, col string) config.SchemaField {
103103

104104
func (a AggrType) String() string { return aggrToString[a] }
105105

106+
func AggregatorsToStringList(aggregators string) ([]string, error) {
107+
aggrs := strings.Split(aggregators, ",")
108+
aggType, err := AggrsFromString(aggrs)
109+
if err != nil {
110+
return nil, err
111+
}
112+
var list []string
113+
for _, aggr := range rawAggregators {
114+
if aggr&aggType != 0 {
115+
list = append(list, aggrToString[aggr])
116+
}
117+
}
118+
119+
return list, nil
120+
}
121+
106122
// convert comma separated string to aggregator mask
107123
func AggrsFromString(split []string) (AggrType, error) {
108124
var aggrList AggrType
@@ -132,13 +148,13 @@ func NewAggregatorList(aggrType AggrType) *AggregatorList {
132148
list = append(list, &SqrAggregator{FloatAggregator{attr: "sqr"}})
133149
}
134150
if (aggrType & aggrTypeMin) != 0 {
135-
list = append(list, &MinAggregator{FloatAggregator{attr: "min", val: math.MaxFloat64}}) // TODO: use math.Inf(1)
151+
list = append(list, &MinAggregator{FloatAggregator{attr: "min", val: math.Inf(1)}})
136152
}
137153
if (aggrType & aggrTypeMax) != 0 {
138-
list = append(list, &MaxAggregator{FloatAggregator{attr: "max", val: -math.MaxFloat64}}) // TODO: use math.Inf(-1)
154+
list = append(list, &MaxAggregator{FloatAggregator{attr: "max", val: math.Inf(-1)}})
139155
}
140156
if (aggrType & aggrTypeLast) != 0 {
141-
list = append(list, &LastAggregator{FloatAggregator{attr: "last"}, 0})
157+
list = append(list, &LastAggregator{FloatAggregator{attr: "last", val: math.Inf(-1)}, 0})
142158
}
143159
return &list
144160
}

Diff for: pkg/aggregate/functions.go

+9-9
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,14 @@ func (a *FloatAggregator) UpdateExpr(col string, bucket int) string {
7575
}
7676

7777
func (a *FloatAggregator) InitExpr(col string, buckets int) string {
78-
return fmt.Sprintf("_%s_%s=init_array(%d,'double');", col, a.attr, buckets)
78+
return fmt.Sprintf("_%s_%s=init_array(%d,'double',%f);", col, a.attr, buckets, a.val)
7979
}
8080

8181
// Sum Aggregator
8282
type SumAggregator struct{ FloatAggregator }
8383

8484
func (a *SumAggregator) Aggregate(t int64, v float64) {
85-
if !math.IsNaN(v) {
85+
if utils.IsDefined(v) {
8686
a.val += v
8787
}
8888
}
@@ -91,18 +91,18 @@ func (a *SumAggregator) Aggregate(t int64, v float64) {
9191
type SqrAggregator struct{ FloatAggregator }
9292

9393
func (a *SqrAggregator) Aggregate(t int64, v float64) {
94-
if !math.IsNaN(v) {
94+
if utils.IsDefined(v) {
9595
a.val += v * v
9696
}
9797
}
9898

9999
// Minimum Aggregator
100100
type MinAggregator struct{ FloatAggregator }
101101

102-
func (a *MinAggregator) Clear() { a.val = math.MaxFloat64 } // TODO: use math.Inf(1)
102+
func (a *MinAggregator) Clear() { a.val = math.Inf(1) }
103103

104104
func (a *MinAggregator) Aggregate(t int64, v float64) {
105-
if !math.IsNaN(v) && (math.IsNaN(a.val) || v < a.val) {
105+
if v < a.val {
106106
a.val = v
107107
}
108108
}
@@ -114,10 +114,10 @@ func (a *MinAggregator) UpdateExpr(col string, bucket int) string {
114114
// Maximum Aggregator
115115
type MaxAggregator struct{ FloatAggregator }
116116

117-
func (a *MaxAggregator) Clear() { a.val = -math.MaxFloat64 } // TODO: use math.Inf(-1)
117+
func (a *MaxAggregator) Clear() { a.val = math.Inf(-1) }
118118

119119
func (a *MaxAggregator) Aggregate(t int64, v float64) {
120-
if !math.IsNaN(v) && (math.IsNaN(a.val) || v > a.val) {
120+
if v > a.val {
121121
a.val = v
122122
}
123123
}
@@ -132,7 +132,7 @@ type LastAggregator struct {
132132
lastT int64
133133
}
134134

135-
func (a *LastAggregator) Clear() { a.val = -math.MaxFloat64 } // TODO: use math.Inf(1)
135+
func (a *LastAggregator) Clear() { a.val = math.Inf(-1) }
136136

137137
func (a *LastAggregator) Aggregate(t int64, v float64) {
138138
if t > a.lastT {
@@ -142,7 +142,7 @@ func (a *LastAggregator) Aggregate(t int64, v float64) {
142142
}
143143

144144
func (a *LastAggregator) UpdateExpr(col string, bucket int) string {
145-
if math.IsNaN(a.val) {
145+
if utils.IsUndefined(a.val) {
146146
return ""
147147
}
148148

Diff for: pkg/appender/store.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func (cs *chunkStore) WriteChunks(mc *MetricsCache, metric *MetricState) (bool,
274274
var pendingSamplesCount int
275275

276276
// loop over pending samples, add to chunks & aggregates (create required update expressions)
277-
for pendingSampleIndex < len(cs.pending) && pendingSamplesCount < mc.cfg.BatchSize && partition.InRange(cs.pending[pendingSampleIndex].t) {
277+
for {
278278
sampleTime := cs.pending[pendingSampleIndex].t
279279

280280
if sampleTime <= cs.initMaxTime && !mc.cfg.OverrideOld {
@@ -308,6 +308,7 @@ func (cs *chunkStore) WriteChunks(mc *MetricsCache, metric *MetricState) (bool,
308308
if (pendingSampleIndex == len(cs.pending)-1) || pendingSamplesCount == mc.cfg.BatchSize-1 || !partition.InRange(cs.pending[pendingSampleIndex+1].t) {
309309
expr = expr + cs.aggrList.SetOrUpdateExpr("v", bucket, isNewBucket)
310310
expr = expr + cs.appendExpression(activeChunk)
311+
cs.aggrList.Clear()
311312
pendingSampleIndex++
312313
pendingSamplesCount++
313314
break
@@ -333,7 +334,6 @@ func (cs *chunkStore) WriteChunks(mc *MetricsCache, metric *MetricState) (bool,
333334
pendingSamplesCount++
334335
}
335336

336-
cs.aggrList.Clear()
337337
if pendingSampleIndex == len(cs.pending) {
338338
cs.pending = cs.pending[:0]
339339
} else {

Diff for: pkg/config/config.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,8 @@ type V3ioConfig struct {
5656
}
5757

5858
type Rollup struct {
59-
Aggregators string `json:"aggregators"`
60-
AggregatorsGranularity string `json:"aggregatorsGranularity"`
59+
Aggregators []string `json:"aggregators"`
60+
AggregatorsGranularity string `json:"aggregatorsGranularity"`
6161
//["cloud","local"] for the aggregators and sample chucks
6262
StorageClass string `json:"storageClass"`
6363
//in hours. 0 means no need to save samples

Diff for: pkg/formatter/type.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,7 @@ func (f baseFormatter) timeString(t int64) string {
4343
if f.cfg.TimeFormat == "" {
4444
return strconv.Itoa(int(t))
4545
}
46-
47-
return time.Unix(t/1000, 0).Format(f.cfg.TimeFormat)
46+
return time.Unix(t/1000, 0).UTC().Format(f.cfg.TimeFormat)
4847
}
4948

5049
func labelsToStr(labels utils.Labels) (string, string) {

Diff for: pkg/partmgr/partmgr.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,8 @@ func (p *DBPartition) GetShardingKeys(name string) []string {
227227
shardingKeysNum := p.manager.cfg.TableSchemaInfo.ShardingBuckets
228228
var res = make([]string, 0, shardingKeysNum)
229229
for i := 0; i < shardingKeysNum; i++ {
230-
res = append(res, fmt.Sprintf("%s_%x", name, i))
230+
// Trailing dot for rangescan queries
231+
res = append(res, fmt.Sprintf("%s_%x.", name, i))
231232
}
232233

233234
return res

Diff for: pkg/querier/querier.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,7 @@ func (q *V3ioQuerier) LabelValues(name string) ([]string, error) {
139139

140140
list := []string{}
141141
input := v3io.GetItemsInput{Path: q.cfg.Path + "/names/", AttributeNames: []string{"__name"}, Filter: ""}
142-
iter, err := utils.NewAsyncItemsCursor(q.container, &input, q.cfg.QryWorkers, []string{})
142+
iter, err := utils.NewAsyncItemsCursor(q.container, &input, q.cfg.QryWorkers, []string{}, q.logger)
143143
q.logger.DebugWith("GetItems to read names", "input", input, "err", err)
144144
if err != nil {
145145
return list, err

Diff for: pkg/querier/seriesset.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ func (s *V3ioSeriesSet) getItems(partition *partmgr.DBPartition, name, filter st
7373

7474
s.logger.DebugWith("Select - GetItems", "path", path, "attr", attrs, "filter", filter, "name", name)
7575
input := v3io.GetItemsInput{Path: path, AttributeNames: attrs, Filter: filter, ShardingKey: name}
76-
iter, err := utils.NewAsyncItemsCursor(container, &input, workers, shardingKeys)
76+
iter, err := utils.NewAsyncItemsCursor(container, &input, workers, shardingKeys, s.logger)
7777
if err != nil {
7878
return err
7979
}

Diff for: pkg/tsdb/tsdbtest/tsdbtest.go

+9-6
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func ValidateCountOfSamples(t testing.TB, adapter *V3ioAdapter, metricName strin
143143
}
144144

145145
if expected != actual {
146-
t.Fatalf("Check failed: actual result is not as expected (%d != %d)", expected, actual)
146+
t.Fatalf("Check failed: actual result is not as expected [%d(actual) != %d(expected)]", actual, expected)
147147
}
148148
}
149149

@@ -155,8 +155,12 @@ func NormalizePath(path string) string {
155155
}
156156

157157
func CreateSchema(t testing.TB, agg string) config.Schema {
158+
rollups, err := aggregate.AggregatorsToStringList(agg)
159+
if err != nil {
160+
t.Fatal(err)
161+
}
158162
defaultRollup := config.Rollup{
159-
Aggregators: agg,
163+
Aggregators: rollups,
160164
AggregatorsGranularity: "1h",
161165
StorageClass: "local",
162166
SampleRetention: 0,
@@ -166,21 +170,20 @@ func CreateSchema(t testing.TB, agg string) config.Schema {
166170
tableSchema := config.TableSchema{
167171
Version: 0,
168172
RollupLayers: []config.Rollup{defaultRollup},
169-
ShardingBuckets: 1,
173+
ShardingBuckets: 8,
170174
PartitionerInterval: "2d",
171175
ChunckerInterval: "1h",
172176
}
173177

174-
aggrs := []string{"*"}
175-
fields, err := aggregate.SchemaFieldFromString(aggrs, "v")
178+
fields, err := aggregate.SchemaFieldFromString(rollups, "v")
176179
if err != nil {
177180
t.Fatal("Failed to create aggregators list", err)
178181
}
179182
fields = append(fields, config.SchemaField{Name: "_name", Type: "string", Nullable: false, Items: ""})
180183

181184
partitionSchema := config.PartitionSchema{
182185
Version: tableSchema.Version,
183-
Aggregators: aggrs,
186+
Aggregators: rollups,
184187
AggregatorsGranularity: "1h",
185188
StorageClass: "local",
186189
SampleRetention: 0,

Diff for: pkg/tsdb/v3iotsdb.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func (a *V3ioAdapter) DeleteDB(configExists bool, force bool, fromTime int64, to
181181
for _, part := range partitions {
182182
if toTime == 0 || part.GetEndTime() < toTime {
183183
a.logger.Info("Delete partition %s", part.GetTablePath())
184-
err := utils.DeleteTable(a.container, part.GetTablePath(), "", a.cfg.QryWorkers)
184+
err := utils.DeleteTable(a.logger, a.container, part.GetTablePath(), "", a.cfg.QryWorkers)
185185
if err != nil && !force {
186186
return err
187187
}
@@ -191,7 +191,7 @@ func (a *V3ioAdapter) DeleteDB(configExists bool, force bool, fromTime int64, to
191191
}
192192
path := a.cfg.Path + "/names/"
193193
a.logger.Info("Delete metric names in path %s", path)
194-
err := utils.DeleteTable(a.container, path, "", a.cfg.QryWorkers)
194+
err := utils.DeleteTable(a.logger, a.container, path, "", a.cfg.QryWorkers)
195195
if err != nil && !force {
196196
return err
197197
}
@@ -218,7 +218,7 @@ func (a *V3ioAdapter) CountMetrics(part string) (int, error) {
218218
partitions := a.partitionMngr.GetPartitions()
219219
for _, part := range partitions {
220220
input := v3io.GetItemsInput{Path: part.GetTablePath(), Filter: "", AttributeNames: []string{"__size"}}
221-
iter, err := utils.NewAsyncItemsCursor(a.container, &input, a.cfg.QryWorkers, []string{})
221+
iter, err := utils.NewAsyncItemsCursor(a.container, &input, a.cfg.QryWorkers, []string{}, a.logger)
222222
if err != nil {
223223
return 0, err
224224
}

Diff for: pkg/tsdbctl/add.go

+9-8
Original file line numberDiff line numberDiff line change
@@ -136,18 +136,19 @@ func (ac *addCommandeer) add() error {
136136
return err
137137
}
138138

139-
ac.appendMetrics(append, lset)
139+
err = ac.appendMetrics(append, lset)
140+
if err != nil {
141+
return err
142+
}
140143

141144
// make sure all writes are committed
142145
_, err = append.WaitForCompletion(0)
143-
144-
if err == nil {
145-
fmt.Println("\nDone!")
146-
} else {
147-
fmt.Printf("operation timed out. Error: %v", err)
146+
if err != nil {
147+
return errors.Wrap(err, "operation timed out")
148148
}
149149

150-
return err
150+
ac.rootCommandeer.logger.Info("Done!")
151+
return nil
151152
}
152153

153154
func (ac *addCommandeer) appendMetrics(append tsdb.Appender, lset utils.Labels) error {
@@ -159,7 +160,7 @@ func (ac *addCommandeer) appendMetrics(append tsdb.Appender, lset utils.Labels)
159160
} else {
160161
fp, err = os.Open(ac.inFile)
161162
if err != nil {
162-
return errors.Wrapf(err, "cant open/read CSV input file: %s", ac.inFile)
163+
return errors.Wrapf(err, "failed to open CSV")
163164
}
164165
}
165166
defer fp.Close()

Diff for: pkg/tsdbctl/check.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ func (cc *checkCommandeer) check() error {
124124
iter := chunk.Iterator()
125125
for iter.Next() {
126126
t, v := iter.At()
127-
tstr := time.Unix(int64(t/1000), 0).Format(time.RFC3339)
127+
tstr := time.Unix(int64(t/1000), 0).UTC().Format(time.RFC3339)
128128
fmt.Printf("unix=%d, t=%s, v=%.4f \n", t, tstr, v)
129129
count++
130130
}

Diff for: pkg/tsdbctl/create.go

+13-10
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ import (
2828
"github.com/v3io/v3io-tsdb/pkg/config"
2929
"github.com/v3io/v3io-tsdb/pkg/tsdb"
3030
"strconv"
31-
"strings"
3231
)
3332

3433
const schemaVersion = 0
@@ -67,7 +66,7 @@ func newCreateCommandeer(rootCommandeer *RootCommandeer) *createCommandeer {
6766
cmd.Flags().StringVarP(&commandeer.defaultRollups, "rollups", "r", "",
6867
"Default aggregation rollups, comma seperated: count,avg,sum,min,max,stddev")
6968
cmd.Flags().StringVarP(&commandeer.rollupInterval, "rollup-interval", "i", "1h", "aggregation interval")
70-
cmd.Flags().IntVarP(&commandeer.shardingBuckets, "sharding-buckets", "b", 1, "number of buckets to split key")
69+
cmd.Flags().IntVarP(&commandeer.shardingBuckets, "sharding-buckets", "b", 8, "number of buckets to split key")
7170
cmd.Flags().IntVarP(&commandeer.sampleRetention, "sample-retention", "a", 0, "sample retention in hours")
7271

7372
commandeer.cmd = cmd
@@ -83,19 +82,24 @@ func (cc *createCommandeer) create() error {
8382
}
8483

8584
if err := cc.validateFormat(cc.partitionInterval); err != nil {
86-
return errors.Wrap(err, "Failed to parse partition interval")
85+
return errors.Wrap(err, "failed to parse partition interval")
8786
}
8887

8988
if err := cc.validateFormat(cc.chunkInterval); err != nil {
90-
return errors.Wrap(err, "Failed to parse chunk interval")
89+
return errors.Wrap(err, "failed to parse chunk interval")
9190
}
9291

9392
if err := cc.validateFormat(cc.rollupInterval); err != nil {
94-
return errors.Wrap(err, "Failed to parse rollup interval")
93+
return errors.Wrap(err, "failed to parse rollup interval")
94+
}
95+
96+
rollups, err := aggregate.AggregatorsToStringList(cc.defaultRollups)
97+
if err != nil {
98+
return errors.Wrap(err, "failed to parse default rollups")
9599
}
96100

97101
defaultRollup := config.Rollup{
98-
Aggregators: cc.defaultRollups,
102+
Aggregators: rollups,
99103
AggregatorsGranularity: cc.rollupInterval,
100104
StorageClass: defaultStorageClass,
101105
SampleRetention: cc.sampleRetention,
@@ -110,16 +114,15 @@ func (cc *createCommandeer) create() error {
110114
ChunckerInterval: cc.chunkInterval,
111115
}
112116

113-
aggrs := strings.Split(cc.defaultRollups, ",")
114-
fields, err := aggregate.SchemaFieldFromString(aggrs, "v")
117+
fields, err := aggregate.SchemaFieldFromString(rollups, "v")
115118
if err != nil {
116-
return errors.Wrap(err, "Failed to create aggregators list")
119+
return errors.Wrap(err, "failed to create aggregators list")
117120
}
118121
fields = append(fields, config.SchemaField{Name: "_name", Type: "string", Nullable: false, Items: ""})
119122

120123
partitionSchema := config.PartitionSchema{
121124
Version: tableSchema.Version,
122-
Aggregators: aggrs,
125+
Aggregators: rollups,
123126
AggregatorsGranularity: cc.rollupInterval,
124127
StorageClass: defaultStorageClass,
125128
SampleRetention: cc.sampleRetention,

Diff for: pkg/tsdbctl/time.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ func newTimeCommandeer(rootCommandeer *RootCommandeer) *timeCommandeer {
5151

5252
tint, err := strconv.Atoi(args[0])
5353
if err == nil {
54-
fmt.Println(time.Unix(int64(tint), 0).Format(time.RFC3339))
54+
fmt.Println(time.Unix(int64(tint), 0).UTC().Format(time.RFC3339))
5555
return nil
5656
}
5757

0 commit comments

Comments
 (0)