Skip to content

Commit 617036a

Browse files
author
Gal Topper
authored
Merge pull request #69 from v3io/development
Merge: development -> master
2 parents af5a598 + 20da8a8 commit 617036a

File tree

16 files changed

+448
-169
lines changed

16 files changed

+448
-169
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go:
44
- "1.10.x"
55

66
script:
7+
- make get
78
- make test
89
- make lint
910

Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,15 +18,15 @@ get:
1818
go get -v -t -tags "unit integration" $(TOPLEVEL_DIRS)
1919

2020
.PHONY: test
21-
test: get
21+
test:
2222
go test -race -tags unit -count 1 $(TOPLEVEL_DIRS)
2323

2424
.PHONY: integration
25-
integration: get
25+
integration:
2626
go test -race -tags integration -p 1 -count 1 $(TOPLEVEL_DIRS) # p=1 to force Go to run pkg tests serially.
2727

2828
.PHONY: build
29-
build: get
29+
build:
3030
go build -v -o "$(GOPATH)/bin/$(TSDBCTL_BIN_NAME)" ./cmd/tsdbctl
3131

3232
.PHONY: lint

pkg/appender/store.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func (cs *chunkStore) WriteChunks(mc *MetricsCache, metric *MetricState) (bool,
274274
var pendingSamplesCount int
275275

276276
// loop over pending samples, add to chunks & aggregates (create required update expressions)
277-
for {
277+
for pendingSampleIndex < len(cs.pending) && pendingSamplesCount < mc.cfg.BatchSize && partition.InRange(cs.pending[pendingSampleIndex].t) {
278278
sampleTime := cs.pending[pendingSampleIndex].t
279279

280280
if sampleTime <= cs.initMaxTime && !mc.cfg.OverrideOld {
@@ -308,7 +308,6 @@ func (cs *chunkStore) WriteChunks(mc *MetricsCache, metric *MetricState) (bool,
308308
if (pendingSampleIndex == len(cs.pending)-1) || pendingSamplesCount == mc.cfg.BatchSize-1 || !partition.InRange(cs.pending[pendingSampleIndex+1].t) {
309309
expr = expr + cs.aggrList.SetOrUpdateExpr("v", bucket, isNewBucket)
310310
expr = expr + cs.appendExpression(activeChunk)
311-
cs.aggrList.Clear()
312311
pendingSampleIndex++
313312
pendingSamplesCount++
314313
break
@@ -334,6 +333,7 @@ func (cs *chunkStore) WriteChunks(mc *MetricsCache, metric *MetricState) (bool,
334333
pendingSamplesCount++
335334
}
336335

336+
cs.aggrList.Clear()
337337
if pendingSampleIndex == len(cs.pending) {
338338
cs.pending = cs.pending[:0]
339339
} else {

pkg/partmgr/partmgr.go

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -124,12 +124,12 @@ func (p *PartitionManager) Init() error {
124124
func (p *PartitionManager) TimeToPart(t int64) (*DBPartition, error) {
125125
if p.headPartition == nil {
126126
// Rounding t to the nearest PartitionInterval multiple
127-
_, err := p.createNewPartition(p.currentPartitionInterval * (t / p.currentPartitionInterval))
127+
_, err := p.createAndUpdatePartition(p.currentPartitionInterval * (t / p.currentPartitionInterval))
128128
return p.headPartition, err
129129
} else {
130130
if t >= p.headPartition.startTime {
131-
if (t - p.headPartition.startTime) > p.currentPartitionInterval {
132-
_, err := p.createNewPartition(p.headPartition.startTime + p.currentPartitionInterval)
131+
if (t - p.headPartition.startTime) >= p.currentPartitionInterval {
132+
_, err := p.createAndUpdatePartition(p.headPartition.startTime + p.currentPartitionInterval)
133133
if err != nil {
134134
return nil, err
135135
}
@@ -144,21 +144,35 @@ func (p *PartitionManager) TimeToPart(t int64) (*DBPartition, error) {
144144
return p.partitions[i], nil
145145
}
146146
}
147+
head := p.headPartition
148+
part, _ := p.createAndUpdatePartition(p.currentPartitionInterval * (t / p.currentPartitionInterval))
149+
p.headPartition = head
150+
return part, nil
147151
}
148152
}
149-
return p.headPartition, nil
150153
}
151154

152-
func (p *PartitionManager) createNewPartition(t int64) (*DBPartition, error) {
155+
func (p *PartitionManager) createAndUpdatePartition(t int64) (*DBPartition, error) {
153156
time := t & 0x7FFFFFFFFFFFFFF0
154157
partPath := path.Join(p.path, strconv.FormatInt(time/1000, 10)) + "/"
155158
partition, err := NewDBPartition(p, time, partPath)
156159
if err != nil {
157160
return nil, err
158161
}
159162
p.currentPartitionInterval = partition.partitionInterval
160-
p.headPartition = partition
161-
p.partitions = append(p.partitions, partition)
163+
if p.headPartition == nil || time > p.headPartition.startTime {
164+
p.headPartition = partition
165+
p.partitions = append(p.partitions, partition)
166+
} else {
167+
for i, part := range p.partitions {
168+
if part.startTime > time {
169+
p.partitions = append(p.partitions, nil)
170+
copy(p.partitions[i+1:], p.partitions[i:])
171+
p.partitions[i] = partition
172+
break
173+
}
174+
}
175+
}
162176
err = p.updatePartitionInSchema(partition)
163177
return partition, err
164178
}
@@ -169,7 +183,9 @@ func (p *PartitionManager) updatePartitionInSchema(partition *DBPartition) error
169183
if err != nil {
170184
return errors.Wrap(err, "Failed to update new partition in schema file")
171185
}
172-
err = p.container.Sync.PutObject(&v3io.PutObjectInput{Path: path.Join(p.path, config.SCHEMA_CONFIG), Body: data})
186+
if p.container != nil { //tests use case only
187+
err = p.container.Sync.PutObject(&v3io.PutObjectInput{Path: path.Join(p.path, config.SCHEMA_CONFIG), Body: data})
188+
}
173189
return err
174190
}
175191

pkg/partmgr/partmgr_test.go

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// +build unit
2+
3+
/*
4+
Copyright 2018 Iguazio Systems Ltd.
5+
6+
Licensed under the Apache License, Version 2.0 (the "License") with
7+
an addition restriction as set forth herein. You may not use this
8+
file except in compliance with the License. You may obtain a copy of
9+
the License at http://www.apache.org/licenses/LICENSE-2.0.
10+
11+
Unless required by applicable law or agreed to in writing, software
12+
distributed under the License is distributed on an "AS IS" BASIS,
13+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
14+
implied. See the License for the specific language governing
15+
permissions and limitations under the License.
16+
17+
In addition, you may not use the software for any purposes that are
18+
illegal under applicable law, and the grant of the foregoing license
19+
under the Apache 2.0 license is conditioned upon your compliance with
20+
such restriction.
21+
*/
22+
23+
package partmgr
24+
25+
import (
26+
"github.com/stretchr/testify/assert"
27+
"github.com/v3io/v3io-tsdb/pkg/tsdb/tsdbtest/testutils"
28+
"github.com/v3io/v3io-tsdb/pkg/utils"
29+
"testing"
30+
)
31+
32+
func TestCreateNewPartition(tst *testing.T) {
33+
schema := testutils.CreateSchema(tst, "*")
34+
interval, _ := utils.Str2duration(schema.PartitionSchemaInfo.PartitionerInterval)
35+
manager, _ := NewPartitionMngr(&schema, "/", nil)
36+
startTime := interval + 1
37+
//first partition
38+
part, _ := manager.TimeToPart(startTime + interval)
39+
assert.Equal(tst, 1, len(manager.partitions))
40+
assert.Equal(tst, manager.headPartition, part)
41+
//new head
42+
part, _ = manager.TimeToPart(startTime + (interval * 3))
43+
assert.Equal(tst, 3, len(manager.partitions))
44+
assert.Equal(tst, manager.headPartition, part)
45+
//add in the middle
46+
part, _ = manager.TimeToPart(startTime + (interval * 2))
47+
assert.Equal(tst, 3, len(manager.partitions))
48+
assert.Equal(tst, manager.partitions[1], part)
49+
//add first
50+
part, _ = manager.TimeToPart(startTime)
51+
assert.Equal(tst, 4, len(manager.partitions))
52+
assert.Equal(tst, manager.partitions[0], part)
53+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package testutils
2+
3+
import (
4+
"github.com/v3io/v3io-tsdb/pkg/aggregate"
5+
"github.com/v3io/v3io-tsdb/pkg/config"
6+
"testing"
7+
)
8+
9+
func CreateSchema(t testing.TB, agg string) config.Schema {
10+
rollups, err := aggregate.AggregatorsToStringList(agg)
11+
if err != nil {
12+
t.Fatal(err)
13+
}
14+
defaultRollup := config.Rollup{
15+
Aggregators: rollups,
16+
AggregatorsGranularity: "1h",
17+
StorageClass: "local",
18+
SampleRetention: 0,
19+
LayerRetentionTime: "1y",
20+
}
21+
22+
tableSchema := config.TableSchema{
23+
Version: 0,
24+
RollupLayers: []config.Rollup{defaultRollup},
25+
ShardingBuckets: 8,
26+
PartitionerInterval: "2d",
27+
ChunckerInterval: "1h",
28+
}
29+
30+
fields, err := aggregate.SchemaFieldFromString(rollups, "v")
31+
if err != nil {
32+
t.Fatal("Failed to create aggregators list", err)
33+
}
34+
fields = append(fields, config.SchemaField{Name: "_name", Type: "string", Nullable: false, Items: ""})
35+
36+
partitionSchema := config.PartitionSchema{
37+
Version: tableSchema.Version,
38+
Aggregators: rollups,
39+
AggregatorsGranularity: "1h",
40+
StorageClass: "local",
41+
SampleRetention: 0,
42+
ChunckerInterval: tableSchema.ChunckerInterval,
43+
PartitionerInterval: tableSchema.PartitionerInterval,
44+
}
45+
46+
schema := config.Schema{
47+
TableSchemaInfo: tableSchema,
48+
PartitionSchemaInfo: partitionSchema,
49+
Partitions: []config.Partition{},
50+
Fields: fields,
51+
}
52+
return schema
53+
}

pkg/tsdb/tsdbtest/tsdbtest.go

Lines changed: 2 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@ package tsdbtest
22

33
import (
44
"fmt"
5-
"github.com/v3io/v3io-tsdb/pkg/aggregate"
65
"github.com/v3io/v3io-tsdb/pkg/config"
76
. "github.com/v3io/v3io-tsdb/pkg/tsdb"
7+
"github.com/v3io/v3io-tsdb/pkg/tsdb/tsdbtest/testutils"
88
"github.com/v3io/v3io-tsdb/pkg/utils"
99
"regexp"
1010
"strings"
@@ -42,7 +42,7 @@ func DeleteTSDB(t testing.TB, v3ioConfig *config.V3ioConfig) {
4242
}
4343

4444
func CreateTestTSDB(t testing.TB, v3ioConfig *config.V3ioConfig) {
45-
schema := CreateSchema(t, "*")
45+
schema := testutils.CreateSchema(t, "*")
4646
if err := CreateTSDB(v3ioConfig, &schema); err != nil {
4747
t.Fatalf("Failed to create TSDB. reason: %s", err)
4848
}
@@ -153,49 +153,3 @@ func NormalizePath(path string) string {
153153
re := regexp.MustCompile("[" + r + "]+")
154154
return re.ReplaceAllString(path, "_")
155155
}
156-
157-
func CreateSchema(t testing.TB, agg string) config.Schema {
158-
rollups, err := aggregate.AggregatorsToStringList(agg)
159-
if err != nil {
160-
t.Fatal(err)
161-
}
162-
defaultRollup := config.Rollup{
163-
Aggregators: rollups,
164-
AggregatorsGranularity: "1h",
165-
StorageClass: "local",
166-
SampleRetention: 0,
167-
LayerRetentionTime: "1y",
168-
}
169-
170-
tableSchema := config.TableSchema{
171-
Version: 0,
172-
RollupLayers: []config.Rollup{defaultRollup},
173-
ShardingBuckets: 8,
174-
PartitionerInterval: "2d",
175-
ChunckerInterval: "1h",
176-
}
177-
178-
fields, err := aggregate.SchemaFieldFromString(rollups, "v")
179-
if err != nil {
180-
t.Fatal("Failed to create aggregators list", err)
181-
}
182-
fields = append(fields, config.SchemaField{Name: "_name", Type: "string", Nullable: false, Items: ""})
183-
184-
partitionSchema := config.PartitionSchema{
185-
Version: tableSchema.Version,
186-
Aggregators: rollups,
187-
AggregatorsGranularity: "1h",
188-
StorageClass: "local",
189-
SampleRetention: 0,
190-
ChunckerInterval: tableSchema.ChunckerInterval,
191-
PartitionerInterval: tableSchema.PartitionerInterval,
192-
}
193-
194-
schema := config.Schema{
195-
TableSchemaInfo: tableSchema,
196-
PartitionSchemaInfo: partitionSchema,
197-
Partitions: []config.Partition{},
198-
Fields: fields,
199-
}
200-
return schema
201-
}

pkg/tsdbctl/add.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func (ac *addCommandeer) add() error {
107107
return err
108108
}
109109

110-
append, err := ac.rootCommandeer.adapter.Appender()
110+
appender, err := ac.rootCommandeer.adapter.Appender()
111111
if err != nil {
112112
return errors.Wrap(err, "failed to create Appender")
113113
}
@@ -127,22 +127,22 @@ func (ac *addCommandeer) add() error {
127127
return err
128128
}
129129

130-
_, err = ac.appendMetric(append, lset, tarray, varray, true)
130+
_, err = ac.appendMetric(appender, lset, tarray, varray, true)
131131
if err != nil {
132132
return err
133133
}
134134

135-
_, err = append.WaitForCompletion(0)
135+
_, err = appender.WaitForCompletion(0)
136136
return err
137137
}
138138

139-
err = ac.appendMetrics(append, lset)
139+
err = ac.appendMetrics(appender, lset)
140140
if err != nil {
141141
return err
142142
}
143143

144144
// make sure all writes are committed
145-
_, err = append.WaitForCompletion(0)
145+
_, err = appender.WaitForCompletion(0)
146146
if err != nil {
147147
return errors.Wrap(err, "operation timed out")
148148
}
@@ -239,6 +239,10 @@ func (ac *addCommandeer) appendMetric(
239239

240240
func strToLabels(name, lbls string) (utils.Labels, error) {
241241

242+
if err := utils.IsValidMetricName(name); err != nil {
243+
return nil, errors.Wrap(err, fmt.Sprintf("illegal metric name: '%s'", name))
244+
}
245+
242246
lset := utils.Labels{utils.Label{Name: "__name__", Value: name}}
243247

244248
if lbls != "" {
@@ -248,6 +252,10 @@ func strToLabels(name, lbls string) (utils.Labels, error) {
248252
if len(splitLbl) != 2 {
249253
return nil, errors.New("labels must be in the form: key1=label1,key2=label2,...")
250254
}
255+
256+
if err := utils.IsValidLabelName(splitLbl[0]); err != nil {
257+
return nil, errors.Wrap(err, fmt.Sprintf("illegal label name: '%s'", splitLbl[0]))
258+
}
251259
lset = append(lset, utils.Label{Name: splitLbl[0], Value: splitLbl[1]})
252260
}
253261
}

pkg/tsdbctl/add_test.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,30 @@ func (suite *testAddSuite) TestStrToTVInvalidTime() {
6969
suite.Require().Error(err)
7070
}
7171

72+
func (suite *testAddSuite) TestStrToLabelsWithEmptyName() {
73+
74+
labels, err := strToLabels("", "a=A,b=B")
75+
76+
suite.Require().Nil(labels)
77+
suite.Require().Error(err)
78+
}
79+
80+
func (suite *testAddSuite) TestStrToLabelsWithIllegalName() {
81+
82+
labels, err := strToLabels("illegal-name", "a=A,b=B")
83+
84+
suite.Require().Nil(labels)
85+
suite.Require().Error(err)
86+
}
87+
88+
func (suite *testAddSuite) TestStrToLabelsWithIllegalLabel() {
89+
90+
labels, err := strToLabels("valid_name", "a=A,b-b=B")
91+
92+
suite.Require().Nil(labels)
93+
suite.Require().Error(err)
94+
}
95+
7296
func TestAddSuite(t *testing.T) {
7397
suite.Run(t, new(testAddSuite))
7498
}

0 commit comments

Comments
 (0)