Skip to content

Commit 8d6524e

Browse files
authored
Merge pull request #434 from v3io/development
Development -> master
2 parents 3af1d91 + f8462df commit 8d6524e

33 files changed

+760
-396
lines changed

Diff for: Makefile

+9-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,15 @@ BUILD_OPTS := -ldflags " \
3232
-X $(CONFIG_PKG).branch=$(GIT_BRANCH)" \
3333
-v -o "$(GOPATH)/bin/$(TSDBCTL_BIN_NAME)"
3434

35-
TSDB_BUILD_COMMAND ?= CGO_ENABLED=0 go build $(BUILD_OPTS) ./cmd/tsdbctl
35+
TSDB_BUILD_COMMAND ?= GO111MODULE="on" CGO_ENABLED=0 go build $(BUILD_OPTS) ./cmd/tsdbctl
36+
37+
.PHONY: fmt
38+
fmt:
39+
gofmt -l -s -w .
40+
41+
.PHONY: get
42+
get:
43+
GO111MODULE="on" go mod tidy
3644

3745
.PHONY: test
3846
test:

Diff for: go.mod

+4
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ require (
77
github.com/cpuguy83/go-md2man v1.0.10 // indirect
88
github.com/ghodss/yaml v1.0.0
99
github.com/imdario/mergo v0.3.7
10+
github.com/kr/pretty v0.2.0 // indirect
1011
github.com/nuclio/logger v0.0.1
1112
github.com/nuclio/nuclio-sdk-go v0.0.0-20190205170814-3b507fbd0324
1213
github.com/nuclio/nuclio-test-go v0.0.0-20180704132150-0ce6587f8e37
1314
github.com/nuclio/zap v0.0.2
15+
github.com/pavius/impi v0.0.0-20180302134524-c1cbdcb8df2b // indirect
1416
github.com/pkg/errors v0.8.1
1517
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
1618
github.com/spf13/cobra v0.0.3
@@ -21,6 +23,8 @@ require (
2123
github.com/v3io/v3io-go v0.0.5-0.20191205125653-9003ae83f0b6
2224
github.com/v3io/v3io-go-http v0.0.0-20190415143924-cc2fbcde6663 // indirect
2325
github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2
26+
google.golang.org/genproto v0.0.0-20181026194446-8b5d7a19e2d9 // indirect
27+
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 // indirect
2428
)
2529

2630
replace (

Diff for: go.sum

+5-1
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e h1:+lIPJOWl+jSiJOc
3434
github.com/klauspost/cpuid v0.0.0-20180405133222-e7e905edc00e/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
3535
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
3636
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
37+
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
38+
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
3739
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
3840
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
3941
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
@@ -61,6 +63,8 @@ github.com/nuclio/nuclio-test-go v0.0.0-20180704132150-0ce6587f8e37/go.mod h1:aO
6163
github.com/nuclio/zap v0.0.0-20180228181516-4a2bd2f9ef28/go.mod h1:SUxPsgePvlyjx6c5MtGdB50pf0IQThtlyLwISLboeuc=
6264
github.com/nuclio/zap v0.0.2 h1:rY5PkMOl8CTkqRqIPuxziBiKK6Mq/8oEurfgRnNtqf0=
6365
github.com/nuclio/zap v0.0.2/go.mod h1:SUxPsgePvlyjx6c5MtGdB50pf0IQThtlyLwISLboeuc=
66+
github.com/pavius/impi v0.0.0-20180302134524-c1cbdcb8df2b h1:yS0+/i6mwRZCdssUd+MkFJkCn/Evh1PlUKCYe3aCtQw=
67+
github.com/pavius/impi v0.0.0-20180302134524-c1cbdcb8df2b/go.mod h1:x/hU0bfdWIhuOT1SKwiJg++yvkk6EuOtJk8WtDZqgr8=
6468
github.com/pavius/zap v0.0.0-20180228181622-8d52692529b8 h1:1N/m7VjDY1Pd30Uwv6bLttZVFQm3n8RUK9Ylf2J+4a4=
6569
github.com/pavius/zap v0.0.0-20180228181622-8d52692529b8/go.mod h1:6FWOCx06uh50GClv8S2cfk3asqTJs3qq3ZNRtLZE77I=
6670
github.com/pavius/zap v1.4.2-0.20180228181622-8d52692529b8 h1:WqLgmr/wj9TO5Sc6oYPQRAJBxuHE0NTeuVeFnT+FZVo=
@@ -76,9 +80,9 @@ github.com/prometheus/prometheus v2.5.0+incompatible h1:7QPitgO2kOFG8ecuRn9O/4L9
7680
github.com/prometheus/prometheus v2.5.0+incompatible/go.mod h1:oAIUtOny2rjMX0OWN5vPR5/q/twIROJvdqnQKDdil/s=
7781
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a h1:9ZKAASQSHhDYGoxY8uLVpewe1GDZ2vu2Tr/vTdVAkFQ=
7882
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
79-
github.com/russross/blackfriday v1.5.2+incompatible h1:/YIL6L1Deczl4O/cQ7ZVdrdKwuB6y7EWpw9LkD8xofE=
8083
github.com/russross/blackfriday v1.5.2+incompatible/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
8184
github.com/russross/blackfriday v1.5.2 h1:HyvC0ARfnZBqnXwABFeSZHpKvJHJJfPz81GNueLj0oo=
85+
github.com/russross/blackfriday v1.5.2+incompatible h1:/YIL6L1Deczl4O/cQ7ZVdrdKwuB6y7EWpw9LkD8xofE=
8286
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
8387
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72 h1:qLC7fQah7D6K1B0ujays3HV9gkFtllcxhzImRR7ArPQ=
8488
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=

Diff for: internal/pkg/performance/metrics.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,5 @@ func (mr *MetricReporter) isEnabled() bool {
190190
}
191191

192192
func (mr *MetricReporter) isRunning() bool {
193-
mr.lock.Lock()
194-
defer mr.lock.Unlock()
195-
196-
return mr.running
193+
return false
197194
}

Diff for: pkg/appender/appender.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -122,14 +122,18 @@ type MetricsCache struct {
122122
updatesComplete chan int
123123
newUpdates chan int
124124

125-
lastMetric uint64
125+
lastMetric uint64
126+
127+
// TODO: consider switching to synch.Map (https://golang.org/pkg/sync/#Map)
126128
cacheMetricMap map[cacheKey]*MetricState // TODO: maybe use hash as key & combine w ref
127129
cacheRefMap map[uint64]*MetricState // TODO: maybe turn to list + free list, periodically delete old matrics
128130

129131
NameLabelMap map[string]bool // temp store all lable names
130132

131133
lastError error
132134
performanceReporter *performance.MetricReporter
135+
136+
stopChan chan int
133137
}
134138

135139
func NewMetricsCache(container v3io.Container, logger logger.Logger, cfg *config.V3ioConfig,
@@ -146,6 +150,7 @@ func NewMetricsCache(container v3io.Container, logger logger.Logger, cfg *config
146150
newCache.metricQueue = NewElasticQueue()
147151
newCache.updatesComplete = make(chan int, 100)
148152
newCache.newUpdates = make(chan int, 1000)
153+
newCache.stopChan = make(chan int, 3)
149154

150155
newCache.NameLabelMap = map[string]bool{}
151156
newCache.performanceReporter = performance.ReporterInstanceFromConfig(cfg)
@@ -217,11 +222,18 @@ func (mc *MetricsCache) Add(lset utils.LabelsIfc, t int64, v interface{}) (uint6
217222

218223
isValueVariantType := false
219224
// If the value is not of Float type assume it's variant type.
220-
if _, ok := v.(float64); !ok {
225+
switch v.(type) {
226+
case int, int64, float64, float32:
227+
isValueVariantType = false
228+
default:
221229
isValueVariantType = true
222230
}
223231

224232
name, key, hash := lset.GetKey()
233+
err = utils.IsValidMetricName(name)
234+
if err != nil {
235+
return 0, err
236+
}
225237
metric, ok := mc.getMetric(name, hash)
226238

227239
var aggrMetrics []*MetricState
@@ -301,6 +313,12 @@ func verifyTimeValid(t int64) error {
301313
}
302314
return nil
303315
}
316+
func (mc *MetricsCache) Close() {
317+
//for 3 go funcs
318+
mc.stopChan <- 0
319+
mc.stopChan <- 0
320+
mc.stopChan <- 0
321+
}
304322

305323
func (mc *MetricsCache) WaitForCompletion(timeout time.Duration) (int, error) {
306324
waitChan := make(chan int, 2)

Diff for: pkg/appender/ingest.go

+36-19
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/pkg/errors"
3030
"github.com/v3io/v3io-go/pkg/dataplane"
3131
"github.com/v3io/v3io-go/pkg/errors"
32+
"github.com/v3io/v3io-tsdb/pkg/utils"
3233
)
3334

3435
// Start event loops for handling metric updates (appends and Get/Update DB responses)
@@ -53,6 +54,8 @@ func (mc *MetricsCache) metricFeed(index int) {
5354

5455
for {
5556
select {
57+
case _ = <-mc.stopChan:
58+
return
5659
case inFlight = <-mc.updatesComplete:
5760
// Handle completion notifications from the update loop
5861
length := mc.metricQueue.Length()
@@ -149,6 +152,8 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) {
149152
counter := 0
150153
for {
151154
select {
155+
case _ = <-mc.stopChan:
156+
return
152157
case _ = <-mc.newUpdates:
153158
// Handle new metric notifications (from metricFeed)
154159
for mc.updatesInFlight < mc.cfg.Workers*2 { //&& newMetrics > 0{
@@ -266,7 +271,7 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
266271

267272
if resp.Error != nil && metric.getState() != storeStateGet {
268273
req := reqInput.(*v3io.UpdateItemInput)
269-
mc.logger.ErrorWith("I/O failure", "id", resp.ID, "err", resp.Error, "key", metric.key,
274+
mc.logger.WarnWith("I/O failure", "id", resp.ID, "err", resp.Error, "key", metric.key,
270275
"in-flight", mc.updatesInFlight, "mqueue", mc.metricQueue.Length(),
271276
"numsamples", metric.store.samplesQueueLength(), "path", req.Path, "update expression", req.Expression)
272277
} else {
@@ -300,8 +305,17 @@ func (mc *MetricsCache) handleResponse(metric *MetricState, resp *v3io.Response,
300305
// Metrics with too many update errors go into Error state
301306
metric.retryCount++
302307
if e, hasStatusCode := resp.Error.(v3ioerrors.ErrorWithStatusCode); hasStatusCode && e.StatusCode() != http.StatusServiceUnavailable {
303-
mc.logger.ErrorWith(fmt.Sprintf("Chunk update failed with status code %d.", e.StatusCode()))
304-
setError(mc, metric, errors.Wrap(resp.Error, fmt.Sprintf("Chunk update failed due to status code %d.", e.StatusCode())))
308+
// If condition was evaluated as false log this and report this error upstream.
309+
if utils.IsFalseConditionError(resp.Error) {
310+
req := reqInput.(*v3io.UpdateItemInput)
311+
// This might happen on attempt to add metric value of wrong type, i.e. float <-> string
312+
errMsg := fmt.Sprintf("trying to ingest values of incompatible data type. Metric %q has not been updated.", req.Path)
313+
mc.logger.ErrorWith(errMsg)
314+
setError(mc, metric, errors.Wrap(resp.Error, errMsg))
315+
} else {
316+
mc.logger.ErrorWith(fmt.Sprintf("Chunk update failed with status code %d.", e.StatusCode()))
317+
setError(mc, metric, errors.Wrap(resp.Error, fmt.Sprintf("Chunk update failed due to status code %d.", e.StatusCode())))
318+
}
305319
clear()
306320
return false
307321
} else if metric.retryCount == maxRetriesOnWrite {
@@ -347,24 +361,27 @@ func (mc *MetricsCache) nameUpdateRespLoop() {
347361

348362
go func() {
349363
for {
350-
resp := <-mc.nameUpdateChan
351-
// Handle V3IO PutItem in names table
352-
353-
metric, ok := resp.Context.(*MetricState)
354-
if ok {
355-
metric.Lock()
356-
if resp.Error != nil {
357-
// Count errors
358-
mc.performanceReporter.IncrementCounter("UpdateNameError", 1)
359-
360-
mc.logger.ErrorWith("Update-name process failed", "id", resp.ID, "name", metric.name)
361-
} else {
362-
mc.logger.DebugWith("Update-name process response", "id", resp.ID, "name", metric.name)
364+
select {
365+
case _ = <-mc.stopChan:
366+
return
367+
case resp := <-mc.nameUpdateChan:
368+
// Handle V3IO PutItem in names table
369+
metric, ok := resp.Context.(*MetricState)
370+
if ok {
371+
metric.Lock()
372+
if resp.Error != nil {
373+
// Count errors
374+
mc.performanceReporter.IncrementCounter("UpdateNameError", 1)
375+
376+
mc.logger.ErrorWith("Update-name process failed", "id", resp.ID, "name", metric.name)
377+
} else {
378+
mc.logger.DebugWith("Update-name process response", "id", resp.ID, "name", metric.name)
379+
}
380+
metric.Unlock()
363381
}
364-
metric.Unlock()
365-
}
366382

367-
resp.Release()
383+
resp.Release()
384+
}
368385
}
369386
}()
370387
}

Diff for: pkg/appender/store.go

+7-4
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func (cs *chunkStore) Append(t int64, v interface{}) {
232232

233233
cs.pending = append(cs.pending, pendingData{t: t, v: v})
234234
// If the new time is older than previous times, sort the list
235-
if len(cs.pending) > 1 && cs.pending[len(cs.pending)-2].t < t {
235+
if len(cs.pending) > 1 && cs.pending[len(cs.pending)-2].t > t {
236236
sort.Sort(cs.pending)
237237
}
238238
}
@@ -418,17 +418,20 @@ func (cs *chunkStore) writeChunks(mc *MetricsCache, metric *MetricState) (hasPen
418418

419419
var encodingExpr string
420420
if !cs.isAggr() {
421-
encodingExpr = fmt.Sprintf("%v='%d'; ", config.EncodingAttrName, activeChunk.appender.Encoding())
421+
encodingExpr = fmt.Sprintf("%s='%d'; ", config.EncodingAttrName, activeChunk.appender.Encoding())
422422
}
423-
lsetExpr := fmt.Sprintf("%v='%s'; ", config.LabelSetAttrName, metric.key)
423+
lsetExpr := fmt.Sprintf("%s='%s'; ", config.LabelSetAttrName, metric.key)
424424
expr = lblexpr + encodingExpr + lsetExpr + expr
425425
}
426426

427427
// Call the V3IO async UpdateItem method
428+
conditionExpr := fmt.Sprintf("NOT exists(%s) OR (exists(%s) AND %s == '%d')",
429+
config.EncodingAttrName, config.EncodingAttrName,
430+
config.EncodingAttrName, activeChunk.appender.Encoding())
428431
expr += fmt.Sprintf("%v=%d;", config.MaxTimeAttrName, cs.maxTime) // TODO: use max() expr
429432
path := partition.GetMetricPath(metric.name, metric.hash, cs.labelNames, cs.isAggr())
430433
request, err := mc.container.UpdateItem(
431-
&v3io.UpdateItemInput{Path: path, Expression: &expr}, metric, mc.responseChan)
434+
&v3io.UpdateItemInput{Path: path, Expression: &expr, Condition: conditionExpr}, metric, mc.responseChan)
432435
if err != nil {
433436
mc.logger.ErrorWith("UpdateItem failed", "err", err)
434437
hasPendingUpdates = false

Diff for: pkg/chunkenc/vartype.go

+3-21
Original file line numberDiff line numberDiff line change
@@ -108,36 +108,18 @@ func (a *varAppender) Chunk() Chunk {
108108
}
109109

110110
func (a *varAppender) Append(t int64, v interface{}) {
111-
112111
if v == nil {
113112
a.appendNoValue(t, varTypeNil, varValueNone)
114113
return
115114
}
116115

117-
switch vType := v.(type) {
118-
case float64:
119-
val := v.(float64)
120-
if val == 0 {
121-
a.appendNoValue(t, varTypeFloat64, varValueZero)
122-
return
123-
124-
}
125-
126-
if math.IsNaN(val) {
127-
a.appendNoValue(t, varTypeFloat64, varValueNone)
128-
return
129-
}
130-
131-
a.appendWithUint(t, varTypeFloat64, math.Float64bits(val))
132-
116+
switch val := v.(type) {
133117
case string:
134-
val := []byte(v.(string))
135-
a.appendWithValue(t, varTypeString, val)
118+
a.appendWithValue(t, varTypeString, []byte(val))
136119

137120
default:
138-
a.logger.Error("unsupported type %v of value %v\n", vType, v)
121+
a.logger.Error("unsupported type %T of value %v\n", v, v)
139122
}
140-
141123
}
142124

143125
func (a *varAppender) appendNoValue(t int64, varType, varVal byte) {

Diff for: pkg/chunkenc/xor.go

+12-1
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,18 @@ func (a *xorAppender) Chunk() Chunk {
173173
func (a *xorAppender) Append(t int64, vvar interface{}) {
174174
var tDelta uint64
175175
num := *a.samples
176-
v := vvar.(float64)
176+
177+
var v float64
178+
switch typedValue := vvar.(type) {
179+
case int:
180+
v = float64(typedValue)
181+
case float64:
182+
v = typedValue
183+
default:
184+
a.logger.Warn("Discarding sample {time: %d, value: %v}, as it's value is of incompatible data type. "+
185+
"Reason: expected 'float' actual '%T'.", t, vvar, vvar)
186+
return
187+
}
177188

178189
// Do not append if sample is too old.
179190
if t < a.t {

Diff for: pkg/config/config.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ type V3ioConfig struct {
170170
// Coefficient to decide whether or not to use server aggregates optimization
171171
// use server aggregations if ` <requested step> / <rollup interval> > UseServerAggregateCoefficient`
172172
UseServerAggregateCoefficient int `json:"useServerAggregateCoefficient,omitempty"`
173-
LoadPartitionsFromSchemaFile bool `json:"loadPartitionsFromSchemaFile,omitempty"`
173+
LoadPartitionsFromSchemaAttr bool `json:"loadPartitionsFromSchemaAttr,omitempty"`
174174
}
175175

176176
type MetricsReporterConfig struct {

0 commit comments

Comments
 (0)