Skip to content

Commit 61898d6

Browse files
authored
Merge pull request #493 from v3io/development
Development
2 parents 0455688 + f1c1338 commit 61898d6

File tree

5 files changed

+102
-63
lines changed

5 files changed

+102
-63
lines changed

Diff for: Jenkinsfile

+3-10
Original file line numberDiff line numberDiff line change
@@ -432,9 +432,7 @@ podTemplate(label: "${git_project}-${label}", inheritFrom: "jnlp-docker-golang")
432432
if (MAIN_TAG_VERSION != "unstable") {
433433
stage('get previous release version') {
434434
container('jnlp') {
435-
CURRENT_VERSION = github.get_short_tag_version("tsdb-nuclio", git_project_user, GIT_TOKEN)
436-
echo "$CURRENT_VERSION"
437-
NEXT_VERSION = "${CURRENT_VERSION}-${MAIN_TAG_VERSION}"
435+
NEXT_VERSION = github.get_next_short_tag_version("tsdb-nuclio", git_project_user, GIT_TOKEN)
438436
next_versions.putAt("tsdb-nuclio", NEXT_VERSION)
439437
}
440438
}
@@ -473,9 +471,7 @@ podTemplate(label: "${git_project}-${label}", inheritFrom: "jnlp-docker-golang")
473471
if (MAIN_TAG_VERSION != "unstable") {
474472
stage('get previous release version') {
475473
container('jnlp') {
476-
CURRENT_VERSION = github.get_short_tag_version("frames", git_project_user, GIT_TOKEN)
477-
echo "$CURRENT_VERSION"
478-
NEXT_VERSION = "${CURRENT_VERSION}-${MAIN_TAG_VERSION}"
474+
NEXT_VERSION = github.get_next_short_tag_version("frames", git_project_user, GIT_TOKEN)
479475
FRAMES_NEXT_VERSION = NEXT_VERSION
480476
next_versions.putAt("frames", NEXT_VERSION)
481477
}
@@ -540,10 +536,7 @@ podTemplate(label: "${git_project}-${label}", inheritFrom: "jnlp-docker-golang")
540536
if (TAG_VERSION) {
541537
stage('get previous release version') {
542538
container('jnlp') {
543-
CURRENT_VERSION = github.get_current_tag_version("prometheus", git_project_user, GIT_TOKEN)
544-
echo "$CURRENT_VERSION"
545-
version_list=CURRENT_VERSION.split('-')
546-
NEXT_VERSION = "v${TAG_VERSION}-${version_list[1]}-${MAIN_TAG_VERSION}"
539+
NEXT_VERSION = github.get_next_short_tag_version("prometheus", git_project_user, GIT_TOKEN)
547540
echo "$NEXT_VERSION"
548541
next_versions.putAt('prometheus', NEXT_VERSION)
549542
}

Diff for: go.sum

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,9 +75,9 @@ github.com/prometheus/prometheus v2.5.0+incompatible/go.mod h1:oAIUtOny2rjMX0OWN
7575
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
7676
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
7777
github.com/rs/xid v1.1.0/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ=
78-
github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo=
7978
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
8079
github.com/russross/blackfriday v1.5.2+incompatible/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
80+
github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo=
8181
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
8282
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
8383
github.com/spf13/cobra v0.0.3 h1:ZlrZ4XsMRm04Fr5pSFxBgfND2EBVa1nLpiy1stUsX/8=

Diff for: pkg/appender/ingest.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
285285

286286
if resp.Error != nil && metric.getState() != storeStateGet {
287287
req := reqInput.(*v3io.UpdateItemInput)
288-
mc.logger.WarnWith("I/O failure", "id", resp.ID, "err", resp.Error, "key", metric.key,
288+
mc.logger.DebugWith("I/O failure", "id", resp.ID, "err", resp.Error, "key", metric.key,
289289
"in-flight", mc.updatesInFlight, "mqueue", mc.metricQueue.Length(),
290290
"numsamples", metric.store.samplesQueueLength(), "path", req.Path, "update expression", req.Expression)
291291
} else {
@@ -323,9 +323,9 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
323323
if utils.IsFalseConditionError(resp.Error) {
324324
req := reqInput.(*v3io.UpdateItemInput)
325325
// This might happen on attempt to add metric value of wrong type, i.e. float <-> string
326-
errMsg := fmt.Sprintf("trying to ingest values of incompatible data type. Metric %q has not been updated.", req.Path)
327-
mc.logger.ErrorWith(errMsg)
328-
setError(mc, metric, errors.Wrap(resp.Error, errMsg))
326+
errMsg := fmt.Sprintf("failed to ingest values of incompatible data type into metric %s.", req.Path)
327+
mc.logger.DebugWith(errMsg)
328+
setError(mc, metric, errors.New(errMsg))
329329
} else {
330330
mc.logger.ErrorWith(fmt.Sprintf("Chunk update failed with status code %d.", e.StatusCode()))
331331
setError(mc, metric, errors.Wrap(resp.Error, fmt.Sprintf("Chunk update failed due to status code %d.", e.StatusCode())))

Diff for: pkg/pquerier/collector.go

+40-32
Original file line numberDiff line numberDiff line change
@@ -64,45 +64,53 @@ func mainCollector(ctx *selectQueryContext, responseChannel chan *qryResults) {
6464
lastTimePerMetric := make(map[uint64]int64, len(ctx.columnsSpecByMetric))
6565
lastValuePerMetric := make(map[uint64]float64, len(ctx.columnsSpecByMetric))
6666

67-
for res := range responseChannel {
68-
if res.IsRawQuery() {
69-
err := rawCollector(ctx, res)
70-
if err != nil {
71-
ctx.errorChannel <- err
72-
return
73-
}
74-
} else {
75-
err := res.frame.addMetricIfNotExist(res.name, ctx.getResultBucketsSize(), res.IsServerAggregates())
76-
if err != nil {
77-
ctx.logger.Error("problem adding new metric '%v', lset: %v, err:%v", res.name, res.frame.lset, err)
78-
ctx.errorChannel <- err
67+
for {
68+
select {
69+
case _ = <-ctx.stopChan:
70+
return
71+
case res, ok := <-responseChannel:
72+
if !ok {
7973
return
8074
}
81-
lsetAttr, _ := res.fields[config.LabelSetAttrName].(string)
82-
lset, _ := utils.LabelsFromString(lsetAttr)
83-
lset = append(lset, utils.Label{Name: config.MetricNameAttrName, Value: res.name})
84-
currentResultHash := lset.Hash()
85-
86-
// Aggregating cross series aggregates, only supported over raw data.
87-
if ctx.isCrossSeriesAggregate {
88-
lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash], _ = aggregateClientAggregatesCrossSeries(ctx, res, lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash])
89-
} else {
90-
// Aggregating over time aggregates
91-
if res.IsServerAggregates() {
92-
aggregateServerAggregates(ctx, res)
93-
} else if res.IsClientAggregates() {
94-
aggregateClientAggregates(ctx, res)
75+
if res.IsRawQuery() {
76+
err := rawCollector(ctx, res)
77+
if err != nil {
78+
ctx.errorChannel <- err
79+
return
9580
}
96-
}
97-
98-
// It is possible to query an aggregate and down sample raw chunks in the same df.
99-
if res.IsDownsample() {
100-
lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash], err = downsampleRawData(ctx, res, lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash])
81+
} else {
82+
err := res.frame.addMetricIfNotExist(res.name, ctx.getResultBucketsSize(), res.IsServerAggregates())
10183
if err != nil {
102-
ctx.logger.Error("problem downsampling '%v', lset: %v, err:%v", res.name, res.frame.lset, err)
84+
ctx.logger.Error("problem adding new metric '%v', lset: %v, err:%v", res.name, res.frame.lset, err)
10385
ctx.errorChannel <- err
10486
return
10587
}
88+
lsetAttr, _ := res.fields[config.LabelSetAttrName].(string)
89+
lset, _ := utils.LabelsFromString(lsetAttr)
90+
lset = append(lset, utils.Label{Name: config.MetricNameAttrName, Value: res.name})
91+
currentResultHash := lset.Hash()
92+
93+
// Aggregating cross series aggregates, only supported over raw data.
94+
if ctx.isCrossSeriesAggregate {
95+
lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash], _ = aggregateClientAggregatesCrossSeries(ctx, res, lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash])
96+
} else {
97+
// Aggregating over time aggregates
98+
if res.IsServerAggregates() {
99+
aggregateServerAggregates(ctx, res)
100+
} else if res.IsClientAggregates() {
101+
aggregateClientAggregates(ctx, res)
102+
}
103+
}
104+
105+
// It is possible to query an aggregate and down sample raw chunks in the same df.
106+
if res.IsDownsample() {
107+
lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash], err = downsampleRawData(ctx, res, lastTimePerMetric[currentResultHash], lastValuePerMetric[currentResultHash])
108+
if err != nil {
109+
ctx.logger.Error("problem downsampling '%v', lset: %v, err:%v", res.name, res.frame.lset, err)
110+
ctx.errorChannel <- err
111+
return
112+
}
113+
}
106114
}
107115
}
108116
}

Diff for: pkg/pquerier/select.go

+54-16
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ type selectQueryContext struct {
4444
requestChannels []chan *qryResults
4545
errorChannel chan error
4646
wg sync.WaitGroup
47+
createDFLock sync.Mutex
48+
stopChan chan bool
49+
queryWG sync.WaitGroup
50+
finalErrorChan chan error
4751
}
4852

4953
func (queryCtx *selectQueryContext) start(parts []*partmgr.DBPartition, params *SelectParams) (*frameIterator, error) {
@@ -77,18 +81,21 @@ func (queryCtx *selectQueryContext) start(parts []*partmgr.DBPartition, params *
7781
}
7882
}
7983

84+
queryCtx.stopChan = make(chan bool, 1)
85+
queryCtx.finalErrorChan = make(chan error, 1)
86+
queryCtx.errorChannel = make(chan error, queryCtx.workers+len(queries))
87+
8088
err = queryCtx.startCollectors()
8189
if err != nil {
8290
return nil, err
8391
}
8492

8593
for _, query := range queries {
86-
err = queryCtx.processQueryResults(query)
87-
if err != nil {
88-
return nil, err
89-
}
94+
queryCtx.queryWG.Add(1)
95+
go processQueryResults(queryCtx, query)
9096
}
9197

98+
queryCtx.queryWG.Wait()
9299
for i := 0; i < queryCtx.workers; i++ {
93100
close(queryCtx.requestChannels[i])
94101
}
@@ -98,7 +105,7 @@ func (queryCtx *selectQueryContext) start(parts []*partmgr.DBPartition, params *
98105
close(queryCtx.errorChannel)
99106

100107
// return first error
101-
err = <-queryCtx.errorChannel
108+
err = <-queryCtx.finalErrorChan
102109
if err != nil {
103110
return nil, err
104111
}
@@ -260,7 +267,6 @@ func (queryCtx *selectQueryContext) parsePreAggregateLabels(partition *partmgr.D
260267
func (queryCtx *selectQueryContext) startCollectors() error {
261268

262269
queryCtx.requestChannels = make([]chan *qryResults, queryCtx.workers)
263-
queryCtx.errorChannel = make(chan error, queryCtx.workers)
264270

265271
// Increment the WaitGroup counter.
266272
queryCtx.wg.Add(queryCtx.workers)
@@ -274,27 +280,45 @@ func (queryCtx *selectQueryContext) startCollectors() error {
274280
}(i)
275281
}
276282

283+
// Watch error channel, and signal all go routines to stop in case of an error
284+
go func() {
285+
// Signal all goroutines to stop when error received
286+
err, ok := <-queryCtx.errorChannel
287+
if ok && err != nil {
288+
close(queryCtx.stopChan)
289+
queryCtx.finalErrorChan <- err
290+
}
291+
292+
close(queryCtx.finalErrorChan)
293+
return
294+
}()
295+
277296
return nil
278297
}
279298

280-
func (queryCtx *selectQueryContext) processQueryResults(query *partQuery) error {
299+
func processQueryResults(queryCtx *selectQueryContext, query *partQuery) {
300+
defer queryCtx.queryWG.Done()
301+
281302
for query.Next() {
282303

283304
// read metric name
284305
name, ok := query.GetField(config.MetricNameAttrName).(string)
285306
if !ok {
286-
return fmt.Errorf("could not find metric name attribute in response, res:%v", query.GetFields())
307+
queryCtx.errorChannel <- fmt.Errorf("could not find metric name attribute in response, res:%v", query.GetFields())
308+
return
287309
}
288310

289311
// read label set
290312
lsetAttr, lok := query.GetField(config.LabelSetAttrName).(string)
291313
if !lok {
292-
return fmt.Errorf("could not find label set attribute in response, res:%v", query.GetFields())
314+
queryCtx.errorChannel <- fmt.Errorf("could not find label set attribute in response, res:%v", query.GetFields())
315+
return
293316
}
294317

295318
lset, err := utils.LabelsFromString(lsetAttr)
296319
if err != nil {
297-
return err
320+
queryCtx.errorChannel <- err
321+
return
298322
}
299323

300324
// read chunk encoding type
@@ -306,7 +330,8 @@ func (queryCtx *selectQueryContext) processQueryResults(query *partQuery) error
306330
} else {
307331
intEncoding, err := strconv.Atoi(encodingStr)
308332
if err != nil {
309-
return fmt.Errorf("error parsing encoding type of chunk, got: %v, error: %v", encodingStr, err)
333+
queryCtx.errorChannel <- fmt.Errorf("error parsing encoding type of chunk, got: %v, error: %v", encodingStr, err)
334+
return
310335
}
311336
encoding = chunkenc.Encoding(intEncoding)
312337
}
@@ -324,7 +349,8 @@ func (queryCtx *selectQueryContext) processQueryResults(query *partQuery) error
324349
if labelValue != "" {
325350
newLset[i] = utils.Label{Name: trimmed, Value: labelValue}
326351
} else {
327-
return fmt.Errorf("no label named %v found to group by", trimmed)
352+
queryCtx.errorChannel <- fmt.Errorf("no label named %v found to group by", trimmed)
353+
return
328354
}
329355
}
330356
lset = newLset
@@ -336,6 +362,7 @@ func (queryCtx *selectQueryContext) processQueryResults(query *partQuery) error
336362
hash = lset.Hash()
337363
}
338364

365+
queryCtx.createDFLock.Lock()
339366
// find or create data frame
340367
frame, ok := queryCtx.dataFrames[hash]
341368
if !ok {
@@ -349,19 +376,30 @@ func (queryCtx *selectQueryContext) processQueryResults(query *partQuery) error
349376
results.IsServerAggregates(),
350377
queryCtx.showAggregateLabel)
351378
if err != nil {
352-
return err
379+
queryCtx.errorChannel <- err
380+
queryCtx.createDFLock.Unlock()
381+
return
353382
}
354383
queryCtx.dataFrames[hash] = frame
355384
queryCtx.frameList = append(queryCtx.frameList, frame)
356385
}
386+
queryCtx.createDFLock.Unlock()
357387

358388
results.frame = frame
359-
360389
workerNum := hash & uint64(queryCtx.workers-1)
361-
queryCtx.requestChannels[workerNum] <- &results
390+
391+
// In case termination signal was received exit, Otherwise send query result to worker
392+
select {
393+
case _ = <-queryCtx.stopChan:
394+
return
395+
case queryCtx.requestChannels[workerNum] <- &results:
396+
}
397+
362398
}
363399

364-
return query.Err()
400+
if query.Err() != nil {
401+
queryCtx.errorChannel <- query.Err()
402+
}
365403
}
366404

367405
func (queryCtx *selectQueryContext) createColumnSpecs() ([]columnMeta, map[string][]columnMeta, error) {

0 commit comments

Comments
 (0)