Skip to content

Commit 73bef3b

Browse files
authored
Merge pull request #465 from dinal/master
dev -> master
2 parents 4714dd2 + 712c068 commit 73bef3b

17 files changed

+2008
-325
lines changed

Diff for: go.mod

+4-5
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,23 @@ require (
66
github.com/cespare/xxhash v1.1.0
77
github.com/ghodss/yaml v1.0.0
88
github.com/imdario/mergo v0.3.7
9+
github.com/inconshreveable/mousetrap v1.0.0 // indirect
10+
github.com/kr/pretty v0.2.0 // indirect
911
github.com/nuclio/logger v0.0.1
1012
github.com/nuclio/nuclio-sdk-go v0.0.0-20190205170814-3b507fbd0324
1113
github.com/nuclio/nuclio-test-go v0.0.0-20180704132150-0ce6587f8e37
1214
github.com/nuclio/zap v0.0.2
13-
github.com/pavius/impi v0.0.0-20200212064320-5db7efa5f87b // indirect
1415
github.com/pkg/errors v0.8.1
1516
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
16-
github.com/russross/blackfriday v1.5.2+incompatible // indirect
1717
github.com/spf13/cobra v0.0.3
1818
github.com/stretchr/testify v1.4.0
1919
github.com/v3io/frames v0.6.8-v0.9.11
20-
github.com/v3io/v3io-go v0.0.5-0.20191205125653-9003ae83f0b6
20+
github.com/v3io/v3io-go v0.1.5-0.20200301152134-6880d30985de
2121
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
2222
)
2323

2424
replace (
25-
github.com/v3io/frames => github.com/v3io/frames v0.6.9-v0.9.12.0.20200219120609-981ffb872c73
26-
github.com/v3io/v3io-go => github.com/v3io/v3io-go v0.1.5-0.20200224125003-964a745e51aa
25+
github.com/v3io/frames => github.com/v3io/frames v0.6.11-v0.9.15.0.20200301102651-c1e7acf2c501
2726
github.com/xwb1989/sqlparser => github.com/v3io/sqlparser v0.0.0-20190306105200-4d7273501871
2827
labix.org/v2/mgo => github.com/go-mgo/mgo v0.0.0-20180705113738-7446a0344b7872c067b3d6e1b7642571eafbae17
2928
launchpad.net/gocheck => github.com/go-check/check v0.0.0-20180628173108-788fd78401277ebd861206a03c884797c6ec5541

Diff for: go.sum

+9-44
Large diffs are not rendered by default.

Diff for: pkg/appender/ingest.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ func (mc *MetricsCache) metricFeed(index int) {
6262
mc.logger.Debug(`Complete update cycle - "in-flight requests"=%d; "metric queue length"=%d\n`, inFlight, length)
6363

6464
// If data was sent and the queue is empty, mark as completion
65-
if length == 0 && gotData {
65+
if length == 0 && gotData && len(mc.asyncAppendChan) == 0 {
6666
gotCompletion = true
6767
if completeChan != nil {
6868
completeChan <- 0
@@ -80,7 +80,7 @@ func (mc *MetricsCache) metricFeed(index int) {
8080
completeChan = app.resp
8181

8282
length := mc.metricQueue.Length()
83-
if gotCompletion && length == 0 {
83+
if gotCompletion && length == 0 && len(mc.asyncAppendChan) == 0 {
8484
completeChan <- 0
8585
gotCompletion = false
8686
gotData = false
@@ -209,7 +209,7 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) {
209209
}
210210

211211
// Notify the metric feeder when all in-flight tasks are done
212-
if mc.updatesInFlight == 0 {
212+
if mc.updatesInFlight == 0 && len(mc.asyncAppendChan) == 0 {
213213
mc.logger.Debug("Return to feed. Metric queue length: %d", mc.metricQueue.Length())
214214
mc.updatesComplete <- 0
215215
}

Diff for: pkg/chunkenc/xor.go

+12-7
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ type xorAppender struct {
160160

161161
leading uint8
162162
trailing uint8
163+
164+
isPreviousNewSeries bool
163165
}
164166

165167
func (a *xorAppender) Encoding() Encoding {
@@ -192,20 +194,22 @@ func (a *xorAppender) Append(t int64, vvar interface{}) {
192194
return
193195
}
194196

195-
if num == 0 {
197+
// We write time deltas as 32 bits (for compression) if the delta is too large we'll start a new series
198+
tDelta = uint64(t - a.t)
199+
shouldStartNewSeries := num == 0 || bits.Len64(tDelta) >= 32
200+
201+
if shouldStartNewSeries {
196202
// add a signature 11111 to indicate start of cseries in case we put few in the same chunk (append to existing)
197203
a.b.writeBits(0x1f, 5)
198204
a.b.writeBits(uint64(t), 51)
199205
a.b.writeBits(math.Float64bits(v), 64)
200-
201-
} else if num == 1 {
202-
tDelta = uint64(t - a.t)
203-
206+
a.isPreviousNewSeries = true
207+
tDelta = 0 // saving time delta for the first element is redundant
208+
} else if a.isPreviousNewSeries {
204209
a.b.writeBits(tDelta, 32)
205210
a.writeVDelta(v)
206-
211+
a.isPreviousNewSeries = false
207212
} else {
208-
tDelta = uint64(t - a.t)
209213
dod := int64(tDelta - a.tDelta)
210214

211215
// Gorilla has a max resolution of seconds, Prometheus milliseconds.
@@ -228,6 +232,7 @@ func (a *xorAppender) Append(t int64, vvar interface{}) {
228232
}
229233

230234
a.writeVDelta(v)
235+
231236
}
232237

233238
a.t = t

Diff for: pkg/config/config.go

+10-8
Original file line numberDiff line numberDiff line change
@@ -62,14 +62,16 @@ const (
6262
DefaultUseServerAggregateCoefficient = 3
6363

6464
// KV attribute names
65-
MaxTimeAttrName = "_maxtime"
66-
LabelSetAttrName = "_lset"
67-
EncodingAttrName = "_enc"
68-
OutOfOrderAttrName = "_ooo"
69-
MetricNameAttrName = "_name"
70-
ObjectNameAttrName = "__name"
71-
ChunkAttrPrefix = "_v"
72-
AggregateAttrPrefix = "_v_"
65+
MaxTimeAttrName = "_maxtime"
66+
LabelSetAttrName = "_lset"
67+
EncodingAttrName = "_enc"
68+
OutOfOrderAttrName = "_ooo"
69+
MetricNameAttrName = "_name"
70+
ObjectNameAttrName = "__name"
71+
ChunkAttrPrefix = "_v"
72+
AggregateAttrPrefix = "_v_"
73+
MtimeSecsAttributeName = "__mtime_secs"
74+
MtimeNSecsAttributeName = "__mtime_nsecs"
7375

7476
PrometheusMetricNameAttribute = "__name__"
7577

Diff for: pkg/partmgr/partmgr.go

+60-2
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ func (p *PartitionManager) updateSchema() error {
195195
}
196196

197197
input := &v3io.PutItemInput{Path: schemaFilePath, Attributes: attributes}
198-
err := p.container.PutItemSync(input)
198+
_, err := p.container.PutItemSync(input)
199199

200200
if err != nil {
201201
outerError = errors.Wrap(err, "failed to update partitions table.")
@@ -235,7 +235,7 @@ func (p *PartitionManager) DeletePartitionsFromSchema(partitionsToDelete []*DBPa
235235
deletePartitionExpression.WriteString(");")
236236
}
237237
expression := deletePartitionExpression.String()
238-
err := p.container.UpdateItemSync(&v3io.UpdateItemInput{Path: p.GetSchemaFilePath(), Expression: &expression})
238+
_, err := p.container.UpdateItemSync(&v3io.UpdateItemInput{Path: p.GetSchemaFilePath(), Expression: &expression})
239239
if err != nil {
240240
return err
241241
}
@@ -589,6 +589,33 @@ func (p *DBPartition) Time2Bucket(t int64) int {
589589
return int((t - p.startTime) / p.rollupTime)
590590
}
591591

592+
// Return the start time of an aggregation bucket by id
593+
func (p *DBPartition) GetAggregationBucketStartTime(id int) int64 {
594+
return p.startTime + int64(id)*p.rollupTime
595+
}
596+
597+
// Return the end time of an aggregation bucket by id
598+
func (p *DBPartition) GetAggregationBucketEndTime(id int) int64 {
599+
return p.startTime + int64(id+1)*p.rollupTime - 1
600+
}
601+
602+
func (p *DBPartition) Times2BucketRange(start, end int64) []int {
603+
var buckets []int
604+
605+
if start > p.GetEndTime() || end < p.startTime {
606+
return buckets
607+
}
608+
609+
startingAggrBucket := p.Time2Bucket(start)
610+
endAggrBucket := p.Time2Bucket(end)
611+
612+
for bucketID := startingAggrBucket; bucketID <= endAggrBucket; bucketID++ {
613+
buckets = append(buckets, bucketID)
614+
}
615+
616+
return buckets
617+
}
618+
592619
// Return the nearest chunk start time for the specified time
593620
func (p *DBPartition) GetChunkMint(t int64) int64 {
594621
if t > p.GetEndTime() {
@@ -618,6 +645,37 @@ func (p *DBPartition) TimeToChunkID(tmilli int64) (int, error) {
618645
return -1, errors.Errorf("Time %d isn't within the range of this partition.", tmilli)
619646
}
620647

648+
// Check if a chunk (by attribute name) is in the given time range.
649+
func (p *DBPartition) IsChunkInRangeByAttr(attr string, mint, maxt int64) bool {
650+
651+
// Discard '_v' prefix
652+
chunkIDStr := attr[2:]
653+
chunkID, err := strconv.ParseInt(chunkIDStr, 10, 64)
654+
if err != nil {
655+
return false
656+
}
657+
658+
chunkStartTime := p.startTime + (chunkID-1)*p.chunkInterval
659+
chunkEndTime := chunkStartTime + p.chunkInterval - 1
660+
661+
return mint <= chunkStartTime && maxt >= chunkEndTime
662+
}
663+
664+
// Get a chunk's start time by it's attribute name
665+
func (p *DBPartition) GetChunkStartTimeByAttr(attr string) (int64, error) {
666+
667+
// Discard '_v' prefix
668+
chunkIDStr := attr[2:]
669+
chunkID, err := strconv.ParseInt(chunkIDStr, 10, 64)
670+
if err != nil {
671+
return 0, err
672+
}
673+
674+
chunkStartTime := p.startTime + (chunkID-1)*p.chunkInterval
675+
676+
return chunkStartTime, nil
677+
}
678+
621679
// Check whether the specified time is within the range of this partition
622680
func (p *DBPartition) InRange(t int64) bool {
623681
if p.manager.cyclic {

Diff for: pkg/pquerier/pqueriertest/dataframe_query_integration_test.go

+14-10
Original file line numberDiff line numberDiff line change
@@ -1206,10 +1206,14 @@ func (suite *testSelectDataframeSuite) TestSparseNumericColumnsWithNotExistingMe
12061206
suite.basicQueryTime + 3*tsdbtest.MinuteInMillis,
12071207
suite.basicQueryTime + 4*tsdbtest.MinuteInMillis}
12081208
expectedColumns := map[string][]interface{}{
1209-
"cpu_0": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
1210-
"cpu_1": {10.0, 20.0, 30.0, 40.0, 50.0, math.NaN(), 22.0, 33.0, math.NaN(), 55.0},
1211-
"cpu_2": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), 10.0, 20.0, math.NaN(), 40.0, 50.0},
1212-
"fake": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
1209+
"cpu_0-linux": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
1210+
"cpu_1-linux": {10.0, 20.0, 30.0, 40.0, 50.0},
1211+
"cpu_2-linux": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
1212+
"fake-linux": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
1213+
"cpu_0-windows": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
1214+
"cpu_1-windows": {math.NaN(), 22.0, 33.0, math.NaN(), 55.0},
1215+
"cpu_2-windows": {10.0, 20.0, math.NaN(), 40.0, 50.0},
1216+
"fake-windows": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
12131217
}
12141218

12151219
testParams := tsdbtest.NewTestParams(suite.T(),
@@ -1263,23 +1267,22 @@ func (suite *testSelectDataframeSuite) TestSparseNumericColumnsWithNotExistingMe
12631267
iter, err := querierV2.SelectDataFrame(params)
12641268
requireCtx.NoError(err, "failed to execute query")
12651269

1266-
rowId := -1
12671270
var seriesCount int
12681271
for iter.NextFrame() {
12691272
seriesCount++
12701273
frame, err := iter.GetFrame()
12711274
requireCtx.NoError(err)
12721275
indexCol := frame.Indices()[0]
1273-
1276+
osLabel := frame.Labels()["os"]
12741277
nullValuesMap := frame.NullValuesMap()
12751278
requireCtx.NotNil(nullValuesMap, "null value map should not be empty")
12761279

12771280
for i := 0; i < indexCol.Len(); i++ {
1278-
rowId++
12791281
t, _ := indexCol.TimeAt(i)
12801282
timeMillis := t.UnixNano() / int64(time.Millisecond)
12811283
requireCtx.Equal(expectedTimeColumn[i], timeMillis, "time column does not match at index %d", i)
12821284
for _, columnName := range frame.Names() {
1285+
key := fmt.Sprintf("%v-%v", columnName, osLabel)
12831286
var v interface{}
12841287
column, err := frame.Column(columnName)
12851288
requireCtx.NoError(err)
@@ -1288,7 +1291,8 @@ func (suite *testSelectDataframeSuite) TestSparseNumericColumnsWithNotExistingMe
12881291
if v == math.NaN() {
12891292
requireCtx.True(nullValuesMap[i].NullColumns[columnName])
12901293
}
1291-
bothNaN := math.IsNaN(expectedColumns[column.Name()][i].(float64)) && math.IsNaN(v.(float64))
1294+
1295+
bothNaN := math.IsNaN(expectedColumns[key][i].(float64)) && math.IsNaN(v.(float64))
12921296
if bothNaN {
12931297
continue
12941298
}
@@ -1301,9 +1305,9 @@ func (suite *testSelectDataframeSuite) TestSparseNumericColumnsWithNotExistingMe
13011305
suite.Fail(fmt.Sprintf("column type is not as expected: %v", column.DType()))
13021306
}
13031307

1304-
expectedValue := expectedColumns[columnName][rowId]
1308+
expectedValue := expectedColumns[key][i]
13051309
if !math.IsNaN(expectedValue.(float64)) || !math.IsNaN(v.(float64)) {
1306-
requireCtx.Equal(expectedValue, v, "column %v does not match at index %d", columnName, rowId)
1310+
requireCtx.Equal(expectedValue, v, "column %v does not match at index %d", columnName, i)
13071311
}
13081312
}
13091313
}

Diff for: pkg/pquerier/pqueriertest/variant_type_query_integration_test.go

+70
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package pqueriertest
44

55
import (
6+
"github.com/v3io/v3io-tsdb/pkg/aggregate"
67
"testing"
78
"time"
89

@@ -151,3 +152,72 @@ func (suite *testVariantTypeSuite) TestVariantTypeQueryWithSeries() {
151152

152153
assert.Equal(suite.T(), 1, seriesCount, "series count didn't match expected")
153154
}
155+
156+
func (suite *testVariantTypeSuite) TestCountAggregationForVariantTypeQueryWithSeries() {
157+
adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
158+
if err != nil {
159+
suite.T().Fatalf("failed to create v3io adapter. reason: %s", err)
160+
}
161+
162+
metricName := "log"
163+
labels := utils.LabelsFromStringList("os", "linux", "__name__", metricName)
164+
165+
dataToIngest := []string{"a", "b", "c", "d", "e", "f"}
166+
numberOfEvents := len(dataToIngest)
167+
var expectedTimeColumn []int64
168+
for i := 0; i < numberOfEvents; i++ {
169+
expectedTimeColumn = append(expectedTimeColumn, suite.basicQueryTime+int64(i)*tsdbtest.MinuteInMillis)
170+
}
171+
172+
expected := map[string][]tsdbtest.DataPoint{"count": {{Time: suite.basicQueryTime - 5*tsdbtest.MinuteInMillis, Value: numberOfEvents}}}
173+
174+
appender, err := adapter.Appender()
175+
if err != nil {
176+
suite.T().Fatalf("failed to create v3io appender. reason: %s", err)
177+
}
178+
179+
ref, err := appender.Add(labels, expectedTimeColumn[0], dataToIngest[0])
180+
if err != nil {
181+
suite.T().Fatalf("Failed to add data to the TSDB appender. Reason: %s", err)
182+
}
183+
for i := 1; i < numberOfEvents; i++ {
184+
appender.AddFast(labels, ref, expectedTimeColumn[i], dataToIngest[i])
185+
}
186+
187+
if _, err := appender.WaitForCompletion(0); err != nil {
188+
suite.T().Fatalf("Failed to wait for TSDB append completion. Reason: %s", err)
189+
}
190+
191+
querierV2, err := adapter.QuerierV2()
192+
if err != nil {
193+
suite.T().Fatalf("Failed to create querier v2, err: %v", err)
194+
}
195+
196+
params := &pquerier.SelectParams{
197+
From: suite.basicQueryTime - tsdbtest.DaysInMillis,
198+
To: suite.basicQueryTime + tsdbtest.DaysInMillis,
199+
Functions: "count",
200+
Step: 10 * tsdbtest.MinuteInMillis}
201+
202+
set, err := querierV2.Select(params)
203+
if err != nil {
204+
suite.T().Fatalf("Failed to exeute query, err: %v", err)
205+
}
206+
207+
var seriesCount int
208+
for set.Next() {
209+
seriesCount++
210+
iter := set.At().Iterator()
211+
212+
data, err := tsdbtest.IteratorToSlice(iter)
213+
if err != nil {
214+
suite.T().Fatal(err)
215+
}
216+
labels := set.At().Labels()
217+
agg := labels.Get(aggregate.AggregateLabel)
218+
219+
suite.compareSingleMetricWithAggregator(data, expected, agg)
220+
}
221+
222+
assert.Equal(suite.T(), 1, seriesCount, "series count didn't match expected")
223+
}

0 commit comments

Comments
 (0)