Skip to content

Commit 18410e7

Browse files
author
Gal Topper
authored
Merge pull request #115 from v3io/development
dev -> master (for v0.0.10)
2 parents 11c6f62 + 8316a37 commit 18410e7

File tree

8 files changed

+331
-50
lines changed

8 files changed

+331
-50
lines changed

Diff for: pkg/aggregate/iterator.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,9 @@ func (as *AggregateSeries) CanAggregate(partitionAggr AggrType) bool {
6969
// keep only real aggregators
7070
aggrMask := 0x7f & as.aggrMask
7171
// make sure the DB has all the aggregators we need (on bits in the mask)
72-
// and that the aggregator resolution is greater/eq to requested interval
73-
return ((aggrMask & partitionAggr) == aggrMask) && as.interval >= as.rollupTime
72+
// and that the requested interval is greater/eq to aggregator resolution and is an even divisor
73+
return ((aggrMask & partitionAggr) == aggrMask) &&
74+
as.interval >= as.rollupTime && (as.interval%as.rollupTime == 0)
7475
}
7576

7677
func (as *AggregateSeries) GetAggrMask() AggrType {
@@ -176,6 +177,11 @@ func (as *AggregateSeries) NewSetFromChunks(length int) *AggregateSet {
176177
for _, aggr := range rawAggregators {
177178
if aggr&as.aggrMask != 0 {
178179
dataArrays[aggr] = make([]float64, length, length) // TODO: len/capacity & reuse (pool)
180+
if aggr == aggrTypeMax || aggr == aggrTypeMin || aggr == aggrTypeLast {
181+
for i := 0; i < length; i++ {
182+
dataArrays[aggr][i] = math.NaN()
183+
}
184+
}
179185
}
180186
}
181187

@@ -256,11 +262,11 @@ func (as *AggregateSet) updateCell(aggr AggrType, cell int, val float64) {
256262
case aggrTypeSqr:
257263
as.dataArrays[aggr][cell] += val * val
258264
case aggrTypeMin:
259-
if val < as.dataArrays[aggr][cell] {
265+
if math.IsNaN(as.dataArrays[aggr][cell]) || val < as.dataArrays[aggr][cell] {
260266
as.dataArrays[aggr][cell] = val
261267
}
262268
case aggrTypeMax:
263-
if val > as.dataArrays[aggr][cell] {
269+
if math.IsNaN(as.dataArrays[aggr][cell]) || val > as.dataArrays[aggr][cell] {
264270
as.dataArrays[aggr][cell] = val
265271
}
266272
case aggrTypeLast:

Diff for: pkg/partmgr/partmgr.go

+1-11
Original file line numberDiff line numberDiff line change
@@ -369,21 +369,11 @@ func (p *DBPartition) InRange(t int64) bool {
369369
}
370370

371371
// return the mint and maxt for this partition, may need maxt for cyclic partition
372-
func (p *DBPartition) GetPartitionRange(maxt int64) (int64, int64) {
372+
func (p *DBPartition) GetPartitionRange() (int64, int64) {
373373
// start p.days ago, rounded to next hour
374374
return p.startTime, p.startTime + p.partitionInterval
375375
}
376376

377-
// return the valid minimum time in a cyclic partition based on max time
378-
func (p *DBPartition) CyclicMinTime(mint, maxt int64) int64 {
379-
// start p.days ago, rounded to next hour
380-
newMin, _ := p.GetPartitionRange(maxt)
381-
if mint > newMin {
382-
return mint
383-
}
384-
return newMin
385-
}
386-
387377
// Attribute name of a chunk
388378
func (p *DBPartition) ChunkID2Attr(col string, id int) string {
389379
return fmt.Sprintf("_%s%d", col, id)

Diff for: pkg/querier/querier.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -106,14 +106,20 @@ func (q *V3ioQuerier) selectQry(name, functions string, step int64, windows []in
106106
func (q *V3ioQuerier) queryNumericPartition(
107107
partition *partmgr.DBPartition, name, functions string, step int64, windows []int, filter string) (SeriesSet, error) {
108108

109-
mint, maxt := partition.GetPartitionRange(q.maxt)
110-
if q.mint > mint {
111-
mint = q.mint
112-
}
109+
mint, maxt := partition.GetPartitionRange()
110+
113111
if q.maxt < maxt {
114112
maxt = q.maxt
115113
}
116114

115+
if q.mint > mint {
116+
mint = q.mint
117+
if step != 0 && step < (maxt-mint) {
118+
// temporary Aggregation fix: if mint is not aligned with steps, move it to the next step tick
119+
mint += (maxt - mint) % step
120+
}
121+
}
122+
117123
newSet := &V3ioSeriesSet{mint: mint, maxt: maxt, partition: partition, logger: q.logger}
118124

119125
// if there are aggregations to be made

Diff for: pkg/querier/seriesset.go

+10-19
Original file line numberDiff line numberDiff line change
@@ -105,31 +105,30 @@ func (s *V3ioSeriesSet) Next() bool {
105105

106106
s.nullSeries = false
107107

108-
if s.aggrSeries.CanAggregate(s.partition.AggrType()) && s.maxt-s.mint > s.interval {
108+
if s.aggrSeries.CanAggregate(s.partition.AggrType()) && s.maxt-s.mint >= s.interval {
109109

110110
// create series from aggregation arrays (in DB) if the partition stored the desired aggregates
111111
maxtUpdate := s.maxt
112112
maxTime := s.iter.GetField("_maxtime")
113113
if maxTime != nil && int64(maxTime.(int)) < s.maxt {
114114
maxtUpdate = int64(maxTime.(int))
115115
}
116-
mint := s.partition.CyclicMinTime(s.mint, maxtUpdate)
117116

118-
start := s.partition.Time2Bucket(mint)
117+
start := s.partition.Time2Bucket(s.mint)
119118
end := s.partition.Time2Bucket(s.maxt+s.interval) + 1
120119

121-
// len of the returned array, cropped at the end in case of cyclic overlap
122-
length := int((maxtUpdate-mint)/s.interval) + 2
120+
// len of the returned array, time-range / interval + 2
121+
length := int((maxtUpdate-s.mint)/s.interval) + 2
123122

124123
if s.overlapWin != nil {
125-
s.baseTime = s.maxt //- int64(s.overlapWin[0]) * s.interval
124+
s.baseTime = s.maxt
126125
} else {
127-
s.baseTime = mint
126+
s.baseTime = s.mint
128127
}
129128

130129
if length > 0 {
131130
attrs := s.iter.GetFields()
132-
aggrSet, err := s.aggrSeries.NewSetFromAttrs(length, start, end, mint, s.maxt, &attrs)
131+
aggrSet, err := s.aggrSeries.NewSetFromAttrs(length, start, end, s.mint, s.maxt, &attrs)
133132
if err != nil {
134133
s.err = err
135134
return false
@@ -145,12 +144,8 @@ func (s *V3ioSeriesSet) Next() bool {
145144
// create series from raw chunks
146145
s.currSeries = NewSeries(s)
147146

148-
// the number of cells is equal to divisor of (maxt-mint) and interval. if there's a
149-
// remainder or if there are no cells (e.g. diff is smaller than interval), add a cell
150-
numCells := (s.maxt - s.mint) / s.interval
151-
if ((s.maxt-s.mint)%s.interval) != 0 || numCells == 0 {
152-
numCells++
153-
}
147+
// the number of cells is equal to divisor of (maxt-mint) and interval.
148+
numCells := (s.maxt-s.mint)/s.interval + 1
154149

155150
s.aggrSet = s.aggrSeries.NewSetFromChunks(int(numCells))
156151
if s.overlapWin != nil {
@@ -171,12 +166,8 @@ func (s *V3ioSeriesSet) chunks2IntervalAggregates() {
171166

172167
iter := s.currSeries.Iterator()
173168
if iter.Next() {
174-
t0, _ := iter.At()
175169

176-
// the base time should be the first sample we receive. initially, baseTime is zero
177-
if s.baseTime == 0 {
178-
s.baseTime = t0
179-
}
170+
s.baseTime = s.mint
180171

181172
for {
182173
t, v := iter.At()

Diff for: pkg/tsdb/v3iotsdb_integration_test.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,17 @@ func TestQueryData(t *testing.T) {
199199
aggregators: "sum",
200200
expected: map[string][]tsdbtest.DataPoint{"sum": {{Time: 1532940510, Value: 701.0}}}},
201201

202+
{desc: "Should ingest and query an aggregator EXTRA", metricName: "cpu",
203+
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),
204+
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 300.3},
205+
{Time: 1532940510 + 60, Value: 300.3},
206+
{Time: 1532940510 + 2*60, Value: 100.4},
207+
{Time: 1532940510 + 2*60, Value: 200.0}},
208+
from: 1532940510,
209+
to: 1532940510 + 6*60,
210+
aggregators: "sum",
211+
expected: map[string][]tsdbtest.DataPoint{"sum": {{Time: 1532940510, Value: 901.0}}}},
212+
202213
{desc: "Should ingest and query multiple aggregators", metricName: "cpu",
203214
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),
204215
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 300.3},
@@ -252,7 +263,8 @@ func testQueryDataCase(test *testing.T, v3ioConfig *config.V3ioConfig,
252263
}
253264
}
254265

255-
set, err := qry.Select(metricsName, aggregator, 1000, filter)
266+
step := int64(5 * 60 * 1000) // 5 minutes
267+
set, err := qry.Select(metricsName, aggregator, step, filter)
256268
if err != nil {
257269
test.Fatalf("Failed to run Select. reason: %v", err)
258270
}

Diff for: pkg/tsdbctl/create.go

+25-11
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@ import (
2727
"github.com/v3io/v3io-tsdb/pkg/aggregate"
2828
"github.com/v3io/v3io-tsdb/pkg/config"
2929
"github.com/v3io/v3io-tsdb/pkg/tsdb"
30+
"github.com/v3io/v3io-tsdb/pkg/utils"
3031
"strconv"
32+
"time"
3133
)
3234

3335
const (
3436
schemaVersion = 0
3537
defaultStorageClass = "local"
36-
defaultIngestionRate = "1/s"
38+
defaultIngestionRate = ""
3739
defaultRollupInterval = "1h"
3840
defaultShardingBuckets = 8
3941
defaultSampleRetentionHours = 0
@@ -86,7 +88,7 @@ func (cc *createCommandeer) create() error {
8688
return err
8789
}
8890

89-
if err := cc.validateFormat(cc.rollupInterval); err != nil {
91+
if err := cc.validateRollupInterval(); err != nil {
9092
return errors.Wrap(err, "failed to parse rollup interval")
9193
}
9294

@@ -95,6 +97,9 @@ func (cc *createCommandeer) create() error {
9597
return errors.Wrap(err, "failed to parse default rollups")
9698
}
9799

100+
if cc.sampleRate == "" {
101+
return errors.New(`sample rate not provided! Please provide sample rate with --rate flag in the format of [0-9]+/[hms]. Example: 12/m`)
102+
}
98103
rateInHours, err := rateToHours(cc.sampleRate)
99104
if err != nil {
100105
return errors.Wrap(err, "failed to parse sample rate")
@@ -147,14 +152,15 @@ func (cc *createCommandeer) create() error {
147152
return tsdb.CreateTSDB(cc.rootCommandeer.v3iocfg, &schema)
148153
}
149154

150-
func (cc *createCommandeer) validateFormat(format string) error {
151-
interval := format[0 : len(format)-1]
152-
if _, err := strconv.Atoi(interval); err != nil {
153-
return errors.New("format is incorrect, not a number")
155+
func (cc *createCommandeer) validateRollupInterval() error {
156+
dayMillis := 24 * int64(time.Hour/time.Millisecond)
157+
duration, err := utils.Str2duration(cc.rollupInterval)
158+
if err != nil {
159+
return err
154160
}
155-
unit := string(format[len(format)-1])
156-
if !(unit == "m" || unit == "d" || unit == "h") {
157-
return errors.New("format is incorrect, not part of m,d,h")
161+
162+
if dayMillis%duration != 0 && duration%dayMillis != 0 {
163+
return errors.New("rollup interval should be a divisor or a dividend of 1 day. Example: 10m, 30m, 2h, etc.")
158164
}
159165
return nil
160166
}
@@ -176,13 +182,21 @@ func (cc *createCommandeer) calculatePartitionAndChunkInterval(rateInHours int)
176182
}
177183

178184
actualCapacityOfChunk := chunkInterval * rateInHours * cc.rootCommandeer.v3iocfg.MaximumSampleSize
179-
numberOfChunksInPartition := cc.rootCommandeer.v3iocfg.MaximumPartitionSize / actualCapacityOfChunk
185+
numberOfChunksInPartition := 0
186+
187+
for (numberOfChunksInPartition+24)*actualCapacityOfChunk < cc.rootCommandeer.v3iocfg.MaximumPartitionSize {
188+
numberOfChunksInPartition += 24
189+
}
190+
if numberOfChunksInPartition == 0 {
191+
return "", "", errors.Errorf("given rate is too high, can not fit a partition in a day interval with the calculated chunk size %vh", chunkInterval)
192+
}
193+
180194
partitionInterval := numberOfChunksInPartition * chunkInterval
181195
return strconv.Itoa(chunkInterval) + "h", strconv.Itoa(partitionInterval) + "h", nil
182196
}
183197

184198
func rateToHours(sampleRate string) (int, error) {
185-
parsingError := errors.New(`not a valid rate. Accepted pattern: [0-9]+/[hms]. Examples: 12/m`)
199+
parsingError := errors.New(`not a valid rate. Accepted pattern: [0-9]+/[hms]. Example: 12/m`)
186200

187201
if len(sampleRate) < 3 {
188202
return 0, parsingError

Diff for: pkg/tsdbctl/create_test.go

+2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ func TestRateToHour(t *testing.T) {
1616
{input: "1m", shouldFail: true},
1717
{input: "1/t", shouldFail: true},
1818
{input: "-431/t", shouldFail: true},
19+
{input: "-1", shouldFail: true},
20+
{input: "", shouldFail: true},
1921
}
2022

2123
for _, testCase := range cases {

0 commit comments

Comments
 (0)