Skip to content

Commit bb7a7d1

Browse files
author
Gal Topper
authored
Merge pull request #78 from v3io/development
dev->master
2 parents 7ef6fbf + cb63bc0 commit bb7a7d1

File tree

11 files changed

+195
-66
lines changed

11 files changed

+195
-66
lines changed

Diff for: pkg/aggregate/iterator.go

+37-25
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,11 @@ func NewAggregateSeries(functions, col string, buckets int, interval, rollupTime
4545

4646
split := strings.Split(functions, ",")
4747
var aggrMask AggrType
48-
aggrList := []AggrType{}
48+
var aggrList []AggrType
4949
for _, s := range split {
5050
aggr, ok := aggrTypeString[s]
5151
if !ok {
52-
return nil, fmt.Errorf("Invalid aggragator type %s", s)
52+
return nil, fmt.Errorf("invalid aggragator type %s", s)
5353
}
5454
aggrMask = aggrMask | aggr
5555
aggrList = append(aggrList, aggr)
@@ -87,7 +87,7 @@ func (as *AggregateSeries) toAttrName(aggr AggrType) string {
8787
}
8888

8989
func (as *AggregateSeries) GetAttrNames() []string {
90-
names := []string{}
90+
var names []string
9191

9292
for _, aggr := range rawAggregators {
9393
if aggr&as.aggrMask != 0 {
@@ -115,7 +115,7 @@ func (as *AggregateSeries) NewSetFromAttrs(
115115
if aggr&as.aggrMask != 0 {
116116
attrBlob, ok := (*attrs)[as.toAttrName(aggr)]
117117
if !ok {
118-
return nil, fmt.Errorf("Aggregation Attribute %s was not found", as.toAttrName(aggr))
118+
return nil, fmt.Errorf("aggregation attribute %s was not found", as.toAttrName(aggr))
119119
}
120120
aggrArrays[aggr] = utils.AsInt64Array(attrBlob.([]byte))
121121
dataArrays[aggr] = make([]float64, length, length)
@@ -124,7 +124,6 @@ func (as *AggregateSeries) NewSetFromAttrs(
124124

125125
aggrSet := AggregateSet{length: length, interval: as.interval, overlapWin: as.overlapWindows}
126126
aggrSet.dataArrays = dataArrays
127-
aggrSet.validCells = make([]bool, length, length)
128127

129128
arrayIndex := start
130129
i := 0
@@ -138,7 +137,6 @@ func (as *AggregateSeries) NewSetFromAttrs(
138137
for aggr, array := range aggrArrays {
139138
aggrSet.mergeArrayCell(aggr, cellIndex, array[arrayIndex])
140139
}
141-
aggrSet.setValid(i)
142140
} else {
143141

144142
// overlapping time windows (last 1hr, 6hr, ..)
@@ -149,7 +147,6 @@ func (as *AggregateSeries) NewSetFromAttrs(
149147
for aggr, array := range aggrArrays {
150148
aggrSet.mergeArrayCell(aggr, i, array[arrayIndex])
151149
}
152-
aggrSet.setValid(i)
153150
}
154151
}
155152
}
@@ -180,13 +177,11 @@ func (as *AggregateSeries) NewSetFromChunks(length int) *AggregateSet {
180177
}
181178

182179
newAggregateSet.dataArrays = dataArrays
183-
newAggregateSet.validCells = make([]bool, length, length)
184180
return &newAggregateSet
185181
}
186182

187183
type AggregateSet struct {
188184
dataArrays map[AggrType][]float64
189-
validCells []bool
190185
length int
191186
maxCell int
192187
baseTime int64
@@ -198,27 +193,20 @@ func (as *AggregateSet) GetMaxCell() int {
198193
return as.maxCell
199194
}
200195

201-
func (as *AggregateSet) setValid(cell int) {
202-
if cell < as.length {
203-
as.validCells[cell] = true
204-
}
205-
}
206-
207196
// append the value to a cell in all relevant aggregation arrays
208197
func (as *AggregateSet) AppendAllCells(cell int, val float64) {
209198

210-
if cell >= as.length {
199+
if !isValidCell(cell, as) {
211200
return
212201
}
213202

214203
if cell > as.maxCell {
215204
as.maxCell = cell
216205
}
217206

218-
for aggr, _ := range as.dataArrays {
207+
for aggr := range as.dataArrays {
219208
as.updateCell(aggr, cell, val)
220209
}
221-
as.validCells[cell] = true
222210
}
223211

224212
// append/merge (v3io) aggregation values into aggregation per requested interval/step
@@ -241,9 +229,22 @@ func (as *AggregateSet) mergeArrayCell(aggr AggrType, cell int, val uint64) {
241229
}
242230
}
243231

232+
func isValidValue(v float64) bool {
233+
return !(math.IsNaN(v) || math.IsInf(v, 1) || math.IsInf(v, -1))
234+
}
235+
236+
func isValidCell(cellIndex int, aSet *AggregateSet) bool {
237+
return cellIndex >= 0 &&
238+
cellIndex < aSet.length
239+
}
240+
244241
// function specific aggregation
245242
func (as *AggregateSet) updateCell(aggr AggrType, cell int, val float64) {
246243

244+
if !isValidCell(cell, as) {
245+
return
246+
}
247+
247248
switch aggr {
248249
case aggrTypeCount:
249250
as.dataArrays[aggr][cell] += 1
@@ -252,11 +253,11 @@ func (as *AggregateSet) updateCell(aggr AggrType, cell int, val float64) {
252253
case aggrTypeSqr:
253254
as.dataArrays[aggr][cell] += val * val
254255
case aggrTypeMin:
255-
if val < as.dataArrays[aggr][cell] || !as.validCells[cell] {
256+
if val < as.dataArrays[aggr][cell] {
256257
as.dataArrays[aggr][cell] = val
257258
}
258259
case aggrTypeMax:
259-
if val > as.dataArrays[aggr][cell] || !as.validCells[cell] {
260+
if val > as.dataArrays[aggr][cell] {
260261
as.dataArrays[aggr][cell] = val
261262
}
262263
case aggrTypeLast:
@@ -267,13 +268,24 @@ func (as *AggregateSet) updateCell(aggr AggrType, cell int, val float64) {
267268
// return the value per aggregate or complex function
268269
func (as *AggregateSet) GetCellValue(aggr AggrType, cell int) (float64, bool) {
269270

270-
if cell > as.maxCell || cell >= as.length || !as.validCells[cell] { // TODO: should >Len return NaN or Zero ?
271+
if !isValidCell(cell, as) {
272+
return math.NaN(), false
273+
}
274+
275+
dependsOnSumAndCount := aggr == aggrTypeStddev || aggr == aggrTypeStdvar || aggr == aggrTypeAvg
276+
dependsOnSqr := aggr == aggrTypeStddev || aggr == aggrTypeStdvar
277+
dependsOnLast := aggr == aggrTypeLast || aggr == aggrTypeRate
278+
279+
// return undefined result one dependant fields is missing
280+
if (dependsOnSumAndCount && !(isValidValue(as.dataArrays[aggrTypeSum][cell]) && isValidValue(as.dataArrays[aggrTypeCount][cell]))) ||
281+
(dependsOnSqr && !isValidValue(as.dataArrays[aggrTypeSqr][cell])) ||
282+
(dependsOnLast && !isValidValue(as.dataArrays[aggrTypeLast][cell])) {
271283
return math.NaN(), false
272284
}
273285

274286
// if no samples in this bucket the result is undefined
275287
var cnt float64
276-
if aggr == aggrTypeAvg || aggr == aggrTypeStddev || aggr == aggrTypeStdvar {
288+
if dependsOnSumAndCount {
277289
cnt = as.dataArrays[aggrTypeCount][cell]
278290
if cnt == 0 {
279291
return math.NaN(), false
@@ -295,13 +307,13 @@ func (as *AggregateSet) GetCellValue(aggr AggrType, cell int) (float64, bool) {
295307
if cell == 0 {
296308
return math.NaN(), false
297309
}
310+
// TODO: need to clarify the meaning of this type of aggregation. IMHO, rate has meaning for monotonic counters only
298311
last := as.dataArrays[aggrTypeLast][cell-1]
299312
this := as.dataArrays[aggrTypeLast][cell]
300-
return (this - last) / float64(as.interval/1000), true // clac rate per sec
313+
return (this - last) / float64(as.interval/1000), true // rate per sec
301314
default:
302315
return as.dataArrays[aggr][cell], true
303316
}
304-
305317
}
306318

307319
// get the time per aggregate cell
@@ -317,7 +329,7 @@ func (as *AggregateSet) GetCellTime(base int64, index int) int64 {
317329

318330
func (as *AggregateSet) Clear() {
319331
as.maxCell = 0
320-
for aggr, _ := range as.dataArrays {
332+
for aggr := range as.dataArrays {
321333
as.dataArrays[aggr] = as.dataArrays[aggr][:0]
322334
}
323335
}

Diff for: pkg/appender/ingest.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ package appender
2323
import (
2424
"github.com/pkg/errors"
2525
"github.com/v3io/v3io-go-http"
26+
"net/http"
2627
"reflect"
2728
"time"
2829
)
@@ -274,7 +275,7 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
274275
} else {
275276
// Metrics with too many update errors go into Error state
276277
metric.retryCount++
277-
if metric.retryCount == maxRetriesOnWrite {
278+
if e, hasStatusCode := resp.Error.(v3io.ErrorWithStatusCode); metric.retryCount == maxRetriesOnWrite || hasStatusCode && e.StatusCode() != http.StatusServiceUnavailable {
278279
mc.logger.ErrorWith("Metric error, exceeded retry", "metric", metric.Lset)
279280
metric.setError(errors.Wrap(resp.Error, "chunk update failed after few retries"))
280281
return false

Diff for: pkg/config/config.go

+8
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,14 @@ type V3ioConfig struct {
5353
DefaultTimeout int `json:"timeout,omitempty"`
5454
// The size of batch to use during ingestion
5555
BatchSize int `json:"batchSize,omitempty"`
56+
// Size of sample in bytes for worst an best compression scenarios
57+
MinimumSampleSize int `json:"minimumSampleSize,omitempty"`
58+
MaximumSampleSize int `json:"maximumSampleSize,omitempty"`
59+
// Max size of a partition object
60+
MaximumPartitionSize int `json:"maximumPartitionSize,omitempty"`
61+
// Size of chunk in bytes for worst an best compression scenarios
62+
MinimumChunkSize int `json:"minimumChunkSize,omitempty"`
63+
MaximumChunkSize int `json:"maximumChunkSize,omitempty"`
5664
}
5765

5866
type Rollup struct {

Diff for: pkg/partmgr/partmgr.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ func (p *DBPartition) GetStartTime() int64 {
230230
}
231231

232232
func (p *DBPartition) GetEndTime() int64 {
233-
return p.startTime + p.partitionInterval
233+
return p.startTime + p.partitionInterval - 1
234234
}
235235

236236
// return path to metrics table
@@ -272,7 +272,10 @@ func (p *DBPartition) Time2Bucket(t int64) int {
272272
if p.rollupTime == 0 {
273273
return 0
274274
}
275-
return int((t-p.startTime)/p.rollupTime) % p.rollupBuckets
275+
if t > p.GetEndTime() {
276+
return p.rollupBuckets - 1
277+
}
278+
return int((t - p.startTime) / p.rollupTime)
276279
}
277280

278281
// get nearest chunk start

Diff for: pkg/tsdb/tsdbtest/testutils/schema.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ func CreateSchema(t testing.TB, agg string) config.Schema {
2323
Version: 0,
2424
RollupLayers: []config.Rollup{defaultRollup},
2525
ShardingBuckets: 8,
26-
PartitionerInterval: "2d",
27-
ChunckerInterval: "1h",
26+
PartitionerInterval: "340h",
27+
ChunckerInterval: "10h",
2828
}
2929

3030
fields, err := aggregate.SchemaFieldFromString(rollups, "v")

Diff for: pkg/tsdb/v3iotsdb_integration_test.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/v3io/v3io-tsdb/pkg/config"
3030
. "github.com/v3io/v3io-tsdb/pkg/tsdb"
3131
"github.com/v3io/v3io-tsdb/pkg/tsdb/tsdbtest"
32+
"github.com/v3io/v3io-tsdb/pkg/tsdb/tsdbtest/testutils"
3233
"github.com/v3io/v3io-tsdb/pkg/utils"
3334
"path/filepath"
3435
"sort"
@@ -408,9 +409,9 @@ func TestCreateTSDB(t *testing.T) {
408409
conf config.Schema
409410
ignoreReason string
410411
}{
411-
{desc: "Should create TSDB with standard configuration", conf: tsdbtest.CreateSchema(t, "sum,count")},
412+
{desc: "Should create TSDB with standard configuration", conf: testutils.CreateSchema(t, "sum,count")},
412413

413-
{desc: "Should create TSDB with wildcard aggregations", conf: tsdbtest.CreateSchema(t, "*")},
414+
{desc: "Should create TSDB with wildcard aggregations", conf: testutils.CreateSchema(t, "*")},
414415
}
415416

416417
for _, test := range testCases {
@@ -442,7 +443,7 @@ func TestDeleteTSDB(t *testing.T) {
442443
t.Fatalf("Failed to load test configuration. reason: %s", err)
443444
}
444445

445-
schema := tsdbtest.CreateSchema(t, "count,sum")
446+
schema := testutils.CreateSchema(t, "count,sum")
446447
v3ioConfig.Path = t.Name()
447448
if err := CreateTSDB(v3ioConfig, &schema); err != nil {
448449
t.Fatalf("Failed to create TSDB. reason: %s", err)

0 commit comments

Comments
 (0)