Skip to content

Commit bb2ea9b

Browse files
authored
Merge pull request #85 from v3io/development
Development --> master
2 parents b2309d1 + 5a6d0de commit bb2ea9b

File tree

12 files changed

+224
-43
lines changed

12 files changed

+224
-43
lines changed

Diff for: pkg/appender/store.go

+4-1
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,10 @@ func (cs *chunkStore) appendExpression(chunk *attrAppender) string {
396396
chunk.state |= chunkStateWriting
397397

398398
expr := ""
399-
idx := chunk.partition.TimeToChunkId(chunk.chunkMint) // TODO: add DaysPerObj from part manager
399+
idx, err := chunk.partition.TimeToChunkId(chunk.chunkMint)
400+
if err != nil {
401+
return ""
402+
}
400403
attr := chunk.partition.ChunkID2Attr("v", idx)
401404

402405
val := base64.StdEncoding.EncodeToString(bytes)

Diff for: pkg/config/config.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"os"
2727
)
2828

29+
const V3ioConfigEnvironmentVariable = "V3IO_CONF"
2930
const DefaultConfigurationFileName = "v3io.yaml"
3031
const SCHEMA_CONFIG = ".schema"
3132

@@ -123,7 +124,7 @@ type MetricConfig struct {
123124

124125
func LoadConfig(path string) (*V3ioConfig, error) {
125126

126-
envpath := os.Getenv("V3IO_TSDBCFG_PATH")
127+
envpath := os.Getenv(V3ioConfigEnvironmentVariable)
127128
if envpath != "" {
128129
path = envpath
129130
}

Diff for: pkg/partmgr/partmgr.go

+30-20
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,12 @@ func (p *PartitionManager) IsCyclic() bool {
109109
return p.cyclic
110110
}
111111

112-
func (p *PartitionManager) GetPartitions() []*DBPartition {
113-
return p.partitions
112+
func (p *PartitionManager) GetPartitionsPaths() []string {
113+
var paths []string
114+
for _, part := range p.partitions {
115+
paths = append(paths, part.GetTablePath())
116+
}
117+
return paths
114118
}
115119

116120
func (p *PartitionManager) GetConfig() *config.Schema {
@@ -192,7 +196,7 @@ func (p *PartitionManager) updatePartitionInSchema(partition *DBPartition) error
192196
func (p *PartitionManager) PartsForRange(mint, maxt int64) []*DBPartition {
193197
var parts []*DBPartition
194198
for _, part := range p.partitions {
195-
if part.startTime+p.currentPartitionInterval >= mint && (maxt == 0 || part.startTime <= maxt) {
199+
if part.startTime >= mint && (maxt == 0 || part.startTime < maxt) {
196200
parts = append(parts, part)
197201
}
198202
}
@@ -275,11 +279,20 @@ func (p *DBPartition) Time2Bucket(t int64) int {
275279
if t > p.GetEndTime() {
276280
return p.rollupBuckets - 1
277281
}
282+
if t < p.GetStartTime() {
283+
return 0
284+
}
278285
return int((t - p.startTime) / p.rollupTime)
279286
}
280287

281288
// get nearest chunk start
282289
func (p *DBPartition) GetChunkMint(t int64) int64 {
290+
if t > p.GetEndTime() {
291+
return p.GetEndTime() - p.chunkInterval + 1
292+
}
293+
if t < p.GetStartTime() {
294+
return p.startTime
295+
}
283296
return p.chunkInterval * (t / p.chunkInterval)
284297
}
285298

@@ -294,16 +307,20 @@ func (p *DBPartition) IsAheadOfChunk(mint, t int64) bool {
294307
}
295308

296309
// Get ID of the Chunk covering time t
297-
func (p *DBPartition) TimeToChunkId(tmilli int64) int {
298-
return int((tmilli-p.startTime)/p.chunkInterval) + 1
310+
func (p *DBPartition) TimeToChunkId(tmilli int64) (int, error) {
311+
if tmilli >= p.startTime && tmilli <= p.GetEndTime() {
312+
return int((tmilli-p.startTime)/p.chunkInterval) + 1, nil
313+
} else {
314+
return 0, errors.New("time " + string(tmilli) + " is not covered by time partition")
315+
}
299316
}
300317

301318
// is t covered by this partition
302319
func (p *DBPartition) InRange(t int64) bool {
303320
if p.manager.cyclic {
304321
return true
305322
}
306-
return (t >= p.startTime) && (t < p.startTime+p.partitionInterval)
323+
return t >= p.startTime && t < p.GetEndTime()
307324
}
308325

309326
// return the mint and maxt for this partition, may need maxt for cyclic partition
@@ -340,21 +357,14 @@ func (p *DBPartition) Range2Attrs(col string, mint, maxt int64) ([]string, []int
340357
// All the chunk IDs which match the time range
341358
func (p *DBPartition) Range2Cids(mint, maxt int64) []int {
342359
list := []int{}
343-
start := p.TimeToChunkId(mint)
344-
end := p.TimeToChunkId(maxt)
345-
chunks := p.partitionInterval / p.chunkInterval
346-
347-
if end < start {
348-
for i := start; int64(i) < chunks; i++ {
349-
list = append(list, i)
350-
}
351-
for i := 0; i <= end; i++ {
352-
list = append(list, i)
353-
}
354-
355-
return list
360+
start, err := p.TimeToChunkId(mint)
361+
if err != nil {
362+
start = 1
363+
}
364+
end, err := p.TimeToChunkId(maxt)
365+
if err != nil {
366+
end = int(p.partitionInterval / p.chunkInterval)
356367
}
357-
358368
for i := start; i <= end; i++ {
359369
list = append(list, i)
360370
}

Diff for: pkg/partmgr/partmgr_test.go

+68-2
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,13 @@ package partmgr
2525
import (
2626
"github.com/stretchr/testify/assert"
2727
"github.com/v3io/v3io-tsdb/pkg/tsdb/tsdbtest/testutils"
28-
"github.com/v3io/v3io-tsdb/pkg/utils"
2928
"testing"
3029
)
3130

3231
func TestCreateNewPartition(tst *testing.T) {
3332
schema := testutils.CreateSchema(tst, "*")
34-
interval, _ := utils.Str2duration(schema.PartitionSchemaInfo.PartitionerInterval)
3533
manager, _ := NewPartitionMngr(&schema, "/", nil)
34+
interval := manager.currentPartitionInterval
3635
startTime := interval + 1
3736
//first partition
3837
part, _ := manager.TimeToPart(startTime + interval)
@@ -51,3 +50,70 @@ func TestCreateNewPartition(tst *testing.T) {
5150
assert.Equal(tst, 4, len(manager.partitions))
5251
assert.Equal(tst, manager.partitions[0], part)
5352
}
53+
54+
func TestPartsForRange(tst *testing.T) {
55+
numPartitions := 5
56+
schema := testutils.CreateSchema(tst, "*")
57+
manager, _ := NewPartitionMngr(&schema, "/", nil)
58+
interval := manager.currentPartitionInterval
59+
for i := 1; i <= numPartitions; i++ {
60+
manager.TimeToPart(interval * int64(i))
61+
}
62+
assert.Equal(tst, numPartitions, len(manager.partitions))
63+
//get all partitions
64+
assert.Equal(tst, manager.partitions, manager.PartsForRange(0, interval*int64(numPartitions+1)))
65+
//get no partitions
66+
assert.Equal(tst, 0, len(manager.PartsForRange(0, interval-1)))
67+
//get first 2 partitions
68+
parts := manager.PartsForRange(0, interval*2+1)
69+
assert.Equal(tst, 2, len(parts))
70+
assert.Equal(tst, manager.partitions[0], parts[0])
71+
assert.Equal(tst, manager.partitions[1], parts[1])
72+
//get middle 3 partitions
73+
parts = manager.PartsForRange(interval*2, interval*4+1)
74+
assert.Equal(tst, 3, len(parts))
75+
assert.Equal(tst, manager.partitions[1], parts[0])
76+
assert.Equal(tst, manager.partitions[2], parts[1])
77+
assert.Equal(tst, manager.partitions[3], parts[2])
78+
}
79+
80+
func TestTime2Bucket(tst *testing.T) {
81+
schema := testutils.CreateSchema(tst, "*")
82+
manager, _ := NewPartitionMngr(&schema, "/", nil)
83+
part, _ := manager.TimeToPart(1000000)
84+
assert.Equal(tst, 0, part.Time2Bucket(100))
85+
assert.Equal(tst, part.rollupBuckets-1, part.Time2Bucket(part.startTime+part.partitionInterval+1))
86+
assert.Equal(tst, part.rollupBuckets/2, part.Time2Bucket((part.startTime+part.partitionInterval)/2))
87+
}
88+
89+
func TestGetChunkMint(tst *testing.T) {
90+
schema := testutils.CreateSchema(tst, "*")
91+
manager, _ := NewPartitionMngr(&schema, "/", nil)
92+
part, _ := manager.TimeToPart(manager.currentPartitionInterval)
93+
assert.Equal(tst, part.startTime, part.GetChunkMint(0))
94+
assert.Equal(tst, part.startTime, part.GetChunkMint(part.startTime+1))
95+
assert.Equal(tst, part.startTime+part.chunkInterval, part.GetChunkMint(part.startTime+part.chunkInterval+100))
96+
assert.Equal(tst, part.GetEndTime()-part.chunkInterval+1, part.GetChunkMint(part.GetEndTime()+100))
97+
}
98+
99+
func TestInRange(tst *testing.T) {
100+
schema := testutils.CreateSchema(tst, "*")
101+
manager, _ := NewPartitionMngr(&schema, "/", nil)
102+
part, _ := manager.TimeToPart(manager.currentPartitionInterval)
103+
assert.Equal(tst, false, part.InRange(part.GetStartTime()-100))
104+
assert.Equal(tst, false, part.InRange(part.GetEndTime()+100))
105+
assert.Equal(tst, true, part.InRange(part.GetStartTime()+part.partitionInterval/2))
106+
}
107+
108+
func TestRange2Cids(tst *testing.T) {
109+
schema := testutils.CreateSchema(tst, "*")
110+
manager, _ := NewPartitionMngr(&schema, "/", nil)
111+
part, _ := manager.TimeToPart(manager.currentPartitionInterval)
112+
numChunks := int(part.partitionInterval / part.chunkInterval)
113+
var cids []int
114+
for i := 1; i <= numChunks; i++ {
115+
cids = append(cids, i)
116+
}
117+
assert.Equal(tst, cids, part.Range2Cids(0, part.GetEndTime()+100))
118+
assert.Equal(tst, []int{3, 4, 5}, part.Range2Cids(part.startTime+2*part.chunkInterval, part.startTime+5*part.chunkInterval-1))
119+
}

Diff for: pkg/querier/multipart.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ func (im *IterSortMerger) Next() bool {
7575
im.err = iter.Err()
7676
return false
7777
}
78-
completed = completed && im.done[i]
7978
}
79+
completed = completed && im.done[i]
8080
if !im.done[i] {
8181
key := iter.At().GetKey()
8282
if !keyIsSet {

Diff for: pkg/querier/multipart_test.go

+71
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package querier
2+
3+
import (
4+
"testing"
5+
6+
"github.com/stretchr/testify/suite"
7+
"github.com/v3io/v3io-tsdb/pkg/utils"
8+
)
9+
10+
type testIterSortMergerSuite struct {
11+
suite.Suite
12+
}
13+
14+
type mockSeriesSet struct {
15+
s []Series
16+
init bool
17+
}
18+
19+
func (m *mockSeriesSet) Next() bool {
20+
if !m.init {
21+
m.init = true
22+
} else if len(m.s) > 1 {
23+
m.s = m.s[1:]
24+
} else {
25+
return false
26+
}
27+
return true
28+
}
29+
30+
func (m *mockSeriesSet) At() Series {
31+
return m.s[0]
32+
}
33+
34+
func (m *mockSeriesSet) Err() error {
35+
return nil
36+
}
37+
38+
type stubSeries uint64
39+
40+
func (stubSeries) Labels() utils.Labels {
41+
panic("stub")
42+
}
43+
44+
func (stubSeries) Iterator() SeriesIterator {
45+
panic("stub")
46+
}
47+
48+
func (s stubSeries) GetKey() uint64 {
49+
return uint64(s)
50+
}
51+
52+
func (suite *testIterSortMergerSuite) TestIterSortMerger() {
53+
54+
s1 := []Series{stubSeries(0), stubSeries(1)}
55+
s2 := []Series{stubSeries(2), stubSeries(3)}
56+
iter, err := newIterSortMerger([]SeriesSet{&mockSeriesSet{s: s1}, &mockSeriesSet{s: s2}})
57+
58+
suite.Require().Nil(err)
59+
suite.Require().True(iter.Next())
60+
suite.Require().Equal(uint64(0), iter.At().GetKey())
61+
suite.Require().True(iter.Next())
62+
suite.Require().Equal(uint64(1), iter.At().GetKey())
63+
suite.Require().True(iter.Next())
64+
suite.Require().Equal(uint64(2), iter.At().GetKey())
65+
suite.Require().True(iter.Next())
66+
suite.Require().Equal(uint64(3), iter.At().GetKey())
67+
}
68+
69+
func TestAddSuite(t *testing.T) {
70+
suite.Run(t, new(testIterSortMergerSuite))
71+
}

Diff for: pkg/tsdb/tsdbtest/config.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"strings"
99
)
1010

11-
const V3ioConfigEnvironmentVariable = "V3IO_CONF"
1211
const TsdbDefaultTestConfigPath = "testdata"
1312
const relativeProjectPath = "src/github.com/v3io/v3io-tsdb"
1413

@@ -19,7 +18,7 @@ This method will try and load the configuration file from several locations by t
1918
3. $GOPATH/src/github.com/v3io/v3io-tsdb/v3io.yaml
2019
*/
2120
func GetV3ioConfigPath() (string, error) {
22-
if configurationPath := os.Getenv(V3ioConfigEnvironmentVariable); configurationPath != "" {
21+
if configurationPath := os.Getenv(config.V3ioConfigEnvironmentVariable); configurationPath != "" {
2322
return configurationPath, nil
2423
}
2524

Diff for: pkg/tsdb/tsdbtest/config_test.go

+32-6
Original file line numberDiff line numberDiff line change
@@ -29,15 +29,22 @@ func TestGetV3ioConfigPath(t *testing.T) {
2929
{description: "get config from package testdata",
3030
expectedPath: filepath.Join(TsdbDefaultTestConfigPath, config.DefaultConfigurationFileName),
3131
setup: func() func() {
32+
// Make this test agnostic to environment variables at runtime (store & recover on exit)
33+
configPathEnv := os.Getenv(config.V3ioConfigEnvironmentVariable)
34+
os.Unsetenv(config.V3ioConfigEnvironmentVariable)
35+
3236
if _, err := os.Stat(filepath.Join(TsdbDefaultTestConfigPath, config.DefaultConfigurationFileName)); !os.IsNotExist(err) {
33-
return func() {}
37+
return func() {
38+
os.Setenv(config.V3ioConfigEnvironmentVariable, configPathEnv)
39+
}
3440
} else {
3541
path := TsdbDefaultTestConfigPath
3642
if err := os.Mkdir(path, 0777); err != nil {
3743
t.Fatalf("Failed to mkdir %v", err)
3844
}
3945
createTestConfig(t, path)
4046
return func() {
47+
os.Setenv(config.V3ioConfigEnvironmentVariable, configPathEnv)
4148
os.RemoveAll(path)
4249
}
4350
}
@@ -46,24 +53,35 @@ func TestGetV3ioConfigPath(t *testing.T) {
4653
{description: "get config from project root",
4754
expectedPath: filepath.Join(projectHome, config.DefaultConfigurationFileName),
4855
setup: func() func() {
56+
// Make this test agnostic to environment variables at runtime (store & recover on exit)
57+
configPathEnv := os.Getenv(config.V3ioConfigEnvironmentVariable)
58+
os.Unsetenv(config.V3ioConfigEnvironmentVariable)
59+
4960
if _, err := os.Stat(filepath.Join(projectHome, config.DefaultConfigurationFileName)); !os.IsNotExist(err) {
50-
return func() {}
61+
return func() {
62+
os.Setenv(config.V3ioConfigEnvironmentVariable, configPathEnv)
63+
}
5164
} else {
5265
path := projectHome
5366
createTestConfig(t, path)
5467
return func() {
68+
os.Setenv(config.V3ioConfigEnvironmentVariable, configPathEnv)
5569
os.Remove(path)
5670
}
5771
}
5872
}},
5973

6074
{description: "get config from env var",
61-
expectedPath: config.DefaultConfigurationFileName,
75+
expectedPath: getConfigPathFromEnvOrDefault(),
6276
setup: func() func() {
63-
os.Setenv(V3ioConfigEnvironmentVariable, config.DefaultConfigurationFileName)
64-
return func() {
65-
os.Unsetenv(V3ioConfigEnvironmentVariable)
77+
env := os.Getenv(config.V3ioConfigEnvironmentVariable)
78+
if env == "" {
79+
os.Setenv(config.V3ioConfigEnvironmentVariable, config.DefaultConfigurationFileName)
80+
return func() {
81+
os.Unsetenv(config.V3ioConfigEnvironmentVariable)
82+
}
6683
}
84+
return func() {}
6785
}},
6886
}
6987

@@ -74,6 +92,14 @@ func TestGetV3ioConfigPath(t *testing.T) {
7492
}
7593
}
7694

95+
func getConfigPathFromEnvOrDefault() string {
96+
configPath := os.Getenv(config.V3ioConfigEnvironmentVariable)
97+
if configPath == "" {
98+
configPath = config.DefaultConfigurationFileName
99+
}
100+
return configPath
101+
}
102+
77103
func testGetV3ioConfigPathCase(t *testing.T, expected string, setup func() func()) {
78104
defer setup()()
79105
path, err := GetV3ioConfigPath()

0 commit comments

Comments
 (0)