Skip to content

Commit dcd6656

Browse files
author
Gal Topper
authored
Merge pull request #98 from v3io/development
dev -> master
2 parents 9b020bf + ebee32d commit dcd6656

File tree

10 files changed

+111
-66
lines changed

10 files changed

+111
-66
lines changed

Diff for: pkg/aggregate/iterator.go

+12-7
Original file line numberDiff line numberDiff line change
@@ -39,13 +39,10 @@ type AggregateSeries struct {
3939

4040
func NewAggregateSeries(functions, col string, buckets int, interval, rollupTime int64, windows []int) (*AggregateSeries, error) {
4141

42-
if functions == "" || interval == 0 {
43-
return nil, nil
44-
}
45-
4642
split := strings.Split(functions, ",")
4743
var aggrMask AggrType
4844
var aggrList []AggrType
45+
4946
for _, s := range split {
5047
aggr, ok := aggrTypeString[s]
5148
if !ok {
@@ -56,8 +53,14 @@ func NewAggregateSeries(functions, col string, buckets int, interval, rollupTime
5653
}
5754

5855
newAggregateSeries := AggregateSeries{
59-
aggrMask: aggrMask, functions: aggrList, colName: col, buckets: buckets, rollupTime: rollupTime,
60-
interval: interval, overlapWindows: windows}
56+
aggrMask: aggrMask,
57+
functions: aggrList,
58+
colName: col,
59+
buckets: buckets,
60+
rollupTime: rollupTime,
61+
interval: interval,
62+
overlapWindows: windows,
63+
}
6164

6265
return &newAggregateSeries, nil
6366
}
@@ -154,7 +157,7 @@ func (as *AggregateSeries) NewSetFromAttrs(
154157
}
155158

156159
i++
157-
arrayIndex = (arrayIndex + 1) % as.buckets
160+
arrayIndex = (arrayIndex + 1) % (as.buckets + 1)
158161
}
159162

160163
return &aggrSet, nil
@@ -321,9 +324,11 @@ func (as *AggregateSet) GetCellTime(base int64, index int) int64 {
321324
if as.overlapWin == nil {
322325
return base + int64(index)*as.interval
323326
}
327+
324328
if index >= len(as.overlapWin) {
325329
return base
326330
}
331+
327332
return base - int64(as.overlapWin[index])*as.interval
328333
}
329334

Diff for: pkg/appender/appender.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -251,11 +251,10 @@ func (mc *MetricsCache) WaitForCompletion(timeout time.Duration) (int, error) {
251251
} else if timeout > 0 {
252252
maxWaitTime = timeout
253253
} else {
254-
maxWaitTime = time.Duration(mc.cfg.DefaultTimeout) * time.Second
254+
// if negative - use default value from configuration
255+
maxWaitTime = time.Duration(mc.cfg.DefaultTimeoutInSeconds) * time.Second
255256
}
256257

257-
//fmt.Printf("\nmaxWaitTime=%d\n", maxWaitTime)
258-
259258
select {
260259
case res := <-waitChan:
261260
return res, nil

Diff for: pkg/config/config.go

+19-8
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,16 @@ import (
2626
"os"
2727
)
2828

29-
const V3ioConfigEnvironmentVariable = "V3IO_CONF"
30-
const DefaultConfigurationFileName = "v3io.yaml"
31-
const SCHEMA_CONFIG = ".schema"
29+
const (
30+
V3ioConfigEnvironmentVariable = "V3IO_CONF"
31+
DefaultConfigurationFileName = "v3io.yaml"
32+
SCHEMA_CONFIG = ".schema"
33+
34+
defaultNumberOfIngestWorkers = 1
35+
defaultNumberOfQueryWorkers = 8
36+
defaultBatchSize = 64
37+
defaultTimeoutInSeconds = 24 * 60 * 60 // 24 hours
38+
)
3239

3340
type V3ioConfig struct {
3441
// V3IO Connection details: Url, Data container, relative path for this dataset, credentials
@@ -51,7 +58,7 @@ type V3ioConfig struct {
5158
// Override last chunk (by default on restart it will append from the last point if possible)
5259
OverrideOld bool `json:"overrideOld"`
5360
// Default timeout duration in Seconds (if not set, 1 Hour timeout will be used )
54-
DefaultTimeout int `json:"timeout,omitempty"`
61+
DefaultTimeoutInSeconds int `json:"timeout,omitempty"`
5562
// The size of batch to use during ingestion
5663
BatchSize int `json:"batchSize,omitempty"`
5764
// Size of sample in bytes for worst an best compression scenarios
@@ -153,20 +160,24 @@ func LoadFromData(data []byte) (*V3ioConfig, error) {
153160
func InitDefaults(cfg *V3ioConfig) {
154161
// Initialize default number of workers
155162
if cfg.Workers == 0 {
156-
cfg.Workers = 8
163+
cfg.Workers = defaultNumberOfIngestWorkers
157164
}
158165

159166
// init default number Query workers if not set to Min(8,Workers)
160167
if cfg.QryWorkers == 0 {
161-
if cfg.Workers < 8 {
168+
if cfg.Workers < defaultNumberOfQueryWorkers {
162169
cfg.QryWorkers = cfg.Workers
163170
} else {
164-
cfg.QryWorkers = 8
171+
cfg.QryWorkers = defaultNumberOfQueryWorkers
165172
}
166173
}
167174

168175
// init default batch size
169176
if cfg.BatchSize <= 0 {
170-
cfg.BatchSize = 64
177+
cfg.BatchSize = defaultBatchSize
178+
}
179+
180+
if cfg.DefaultTimeoutInSeconds == 0 {
181+
cfg.DefaultTimeoutInSeconds = int(defaultTimeoutInSeconds)
171182
}
172183
}

Diff for: pkg/querier/querier.go

+20-10
Original file line numberDiff line numberDiff line change
@@ -116,24 +116,34 @@ func (q *V3ioQuerier) queryNumericPartition(
116116

117117
newSet := &V3ioSeriesSet{mint: mint, maxt: maxt, partition: partition, logger: q.logger}
118118

119-
if functions != "" && step == 0 && partition.RollupTime() != 0 {
120-
step = partition.RollupTime()
121-
}
119+
// if there are aggregations to be made
120+
if functions != "" {
122121

123-
newAggrSeries, err := aggregate.NewAggregateSeries(
124-
functions, "v", partition.AggrBuckets(), step, partition.RollupTime(), windows)
125-
if err != nil {
126-
return nil, err
127-
}
122+
// if step isn't passed (e.g. when using the console) - the step is the difference between max
123+
// and min times (e.g. 5 minutes)
124+
if step == 0 {
125+
step = maxt - mint
126+
}
127+
128+
newAggrSeries, err := aggregate.NewAggregateSeries(functions,
129+
"v",
130+
partition.AggrBuckets(),
131+
step,
132+
partition.RollupTime(),
133+
windows)
134+
135+
if err != nil {
136+
return nil, err
137+
}
128138

129-
if newAggrSeries != nil && step != 0 {
130139
newSet.aggrSeries = newAggrSeries
131140
newSet.interval = step
132141
newSet.aggrIdx = newAggrSeries.NumFunctions() - 1
133142
newSet.overlapWin = windows
134143
}
135144

136-
err = newSet.getItems(partition, name, filter, q.container, q.cfg.QryWorkers)
145+
err := newSet.getItems(partition, name, filter, q.container, q.cfg.QryWorkers)
146+
137147
return newSet, err
138148
}
139149

Diff for: pkg/querier/series.go

+17-5
Original file line numberDiff line numberDiff line change
@@ -213,16 +213,28 @@ func NewAggrSeries(set *V3ioSeriesSet, aggr aggregate.AggrType) *V3ioSeries {
213213
newSeries := V3ioSeries{set: set}
214214
lset := append(initLabels(set), utils.Label{Name: "Aggregator", Value: aggr.String()})
215215
newSeries.lset = lset
216+
216217
if set.nullSeries {
217218
newSeries.iter = &nullSeriesIterator{}
218219
} else {
219-
newSeries.iter = &aggrSeriesIterator{set: set, aggrType: aggr, index: -1}
220+
221+
// `set`, the thing this iterator "iterates" over is stateful - it holds a "current" set and aggrSet.
222+
// this means we need to copy all the stateful things we need into the iterator (e.g. aggrSet) so that
223+
// when it's evaluated, it'll hold the proper pointer
224+
newSeries.iter = &aggrSeriesIterator{
225+
set: set,
226+
aggrSet: set.aggrSet,
227+
aggrType: aggr,
228+
index: -1,
229+
}
220230
}
231+
221232
return &newSeries
222233
}
223234

224235
type aggrSeriesIterator struct {
225236
set *V3ioSeriesSet
237+
aggrSet *aggregate.AggregateSet
226238
aggrType aggregate.AggrType
227239
index int
228240
err error
@@ -235,7 +247,7 @@ func (s *aggrSeriesIterator) Seek(t int64) bool {
235247
return true
236248
}
237249

238-
if t > s.set.baseTime+int64(s.set.aggrSet.GetMaxCell())*s.set.interval {
250+
if t > s.set.baseTime+int64(s.aggrSet.GetMaxCell())*s.set.interval {
239251
return false
240252
}
241253

@@ -245,7 +257,7 @@ func (s *aggrSeriesIterator) Seek(t int64) bool {
245257

246258
// advance to the next time interval/bucket
247259
func (s *aggrSeriesIterator) Next() bool {
248-
if s.index >= s.set.aggrSet.GetMaxCell() {
260+
if s.index >= s.aggrSet.GetMaxCell() {
249261
return false
250262
}
251263

@@ -255,8 +267,8 @@ func (s *aggrSeriesIterator) Next() bool {
255267

256268
// return the time & value at the current bucket
257269
func (s *aggrSeriesIterator) At() (t int64, v float64) {
258-
val, _ := s.set.aggrSet.GetCellValue(s.aggrType, s.index)
259-
return s.set.aggrSet.GetCellTime(s.set.baseTime, s.index), val
270+
val, _ := s.aggrSet.GetCellValue(s.aggrType, s.index)
271+
return s.aggrSet.GetCellTime(s.set.baseTime, s.index), val
260272
}
261273

262274
func (s *aggrSeriesIterator) Err() error { return s.err }

Diff for: pkg/querier/seriesset.go

+15-3
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ func (s *V3ioSeriesSet) Next() bool {
116116
mint := s.partition.CyclicMinTime(s.mint, maxtUpdate)
117117

118118
start := s.partition.Time2Bucket(mint)
119-
end := s.partition.Time2Bucket(s.maxt + s.interval)
119+
end := s.partition.Time2Bucket(s.maxt+s.interval) + 1
120120

121121
// len of the returned array, cropped at the end in case of cyclic overlap
122122
length := int((maxtUpdate-mint)/s.interval) + 2
@@ -144,7 +144,15 @@ func (s *V3ioSeriesSet) Next() bool {
144144

145145
// create series from raw chunks
146146
s.currSeries = NewSeries(s)
147-
s.aggrSet = s.aggrSeries.NewSetFromChunks(int((s.maxt-s.mint)/s.interval) + 1)
147+
148+
// the number of cells is equal to divisor of (maxt-mint) and interval. if there's a
149+
// remainder or if there are no cells (e.g. diff is smaller than interval), add a cell
150+
numCells := (s.maxt - s.mint) / s.interval
151+
if ((s.maxt-s.mint)%s.interval) != 0 || numCells == 0 {
152+
numCells++
153+
}
154+
155+
s.aggrSet = s.aggrSeries.NewSetFromChunks(int(numCells))
148156
if s.overlapWin != nil {
149157
s.chunks2WindowedAggregates()
150158
} else {
@@ -164,7 +172,11 @@ func (s *V3ioSeriesSet) chunks2IntervalAggregates() {
164172
iter := s.currSeries.Iterator()
165173
if iter.Next() {
166174
t0, _ := iter.At()
167-
s.baseTime = (t0 / s.interval) * s.interval
175+
176+
// the base time should be the first sample we receive. initially, baseTime is zero
177+
if s.baseTime == 0 {
178+
s.baseTime = t0
179+
}
168180

169181
for {
170182
t, v := iter.At()

Diff for: pkg/tsdb/testdata/v3io.yaml.template

-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,5 @@ verbose: "warn"
1515
# Default timeout 10 seconds
1616
timeout: 10
1717

18-
workers: 8
19-
2018
username: <user name>
2119
password: <password>

Diff for: pkg/tsdb/v3iotsdb_integration_test.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ func TestQueryData(t *testing.T) {
196196
{Time: 1532940510 + 10, Value: 100.4}},
197197
from: 1532940510, to: 1532940510 + 11,
198198
aggregators: "sum",
199-
expected: map[string][]tsdbtest.DataPoint{"sum": {{Time: 1532940000, Value: 701.0}}}},
199+
expected: map[string][]tsdbtest.DataPoint{"sum": {{Time: 1532940510, Value: 701.0}}}},
200200

201201
{desc: "Should ingest and query multiple aggregators", metricName: "cpu",
202202
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),
@@ -205,8 +205,8 @@ func TestQueryData(t *testing.T) {
205205
{Time: 1532940510 + 10, Value: 100.4}},
206206
from: 1532940510, to: 1532940510 + 11,
207207
aggregators: "sum,count",
208-
expected: map[string][]tsdbtest.DataPoint{"sum": {{Time: 1532940000, Value: 701.0}},
209-
"count": {{Time: 1532940000, Value: 3}}}},
208+
expected: map[string][]tsdbtest.DataPoint{"sum": {{Time: 1532940510, Value: 701.0}},
209+
"count": {{Time: 1532940510, Value: 3}}}},
210210

211211
{desc: "Should ingest and query with illegal time (switch from and to)", metricName: "cpu",
212212
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),

Diff for: pkg/tsdbctl/check.go

+9-18
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,7 @@ import (
3333
type checkCommandeer struct {
3434
cmd *cobra.Command
3535
rootCommandeer *RootCommandeer
36-
name string
37-
lset string
36+
objPath string
3837
attrs []string
3938
}
4039

@@ -50,14 +49,10 @@ func newCheckCommandeer(rootCommandeer *RootCommandeer) *checkCommandeer {
5049
RunE: func(cmd *cobra.Command, args []string) error {
5150

5251
if len(args) == 0 {
53-
return errors.New("add require metric name and/or labels")
52+
return errors.New("check requires an object path")
5453
}
5554

56-
commandeer.name = args[0]
57-
58-
if len(args) > 1 {
59-
commandeer.lset = args[1]
60-
}
55+
commandeer.objPath = args[0]
6156

6257
// initialize params
6358
return commandeer.check()
@@ -85,15 +80,11 @@ func (cc *checkCommandeer) check() error {
8580
return err
8681
}
8782

88-
if lset, err = strToLabels(cc.name, cc.lset); err != nil {
89-
return err
90-
}
91-
9283
// get metric data and metadata
93-
allAtters := append(cc.attrs, "__name", "_name", "_lset", "_maxtime")
94-
container, path := cc.rootCommandeer.adapter.GetContainer()
95-
objPath := fmt.Sprintf("%s/0/%s.%016x", path, cc.name, lset.Hash())
96-
input := v3io.GetItemInput{Path: objPath, AttributeNames: allAtters}
84+
allAttrs := append(cc.attrs, "__name", "_name", "_lset", "_maxtime")
85+
container, tablePath := cc.rootCommandeer.adapter.GetContainer()
86+
objPath := fmt.Sprintf("%s/%s", tablePath, cc.objPath)
87+
input := v3io.GetItemInput{Path: objPath, AttributeNames: allAttrs}
9788
resp, err := container.Sync.GetItem(&input)
9889
if err != nil {
9990
return errors.Wrap(err, "failed to GetItem")
@@ -108,10 +99,10 @@ func (cc *checkCommandeer) check() error {
10899
fmt.Printf("Object: %s, %s {%s} maxtime: %d\n", objName, metricName, lsetString, maxtime)
109100

110101
// decompress and print metrics
111-
for k, attr := range cc.attrs {
102+
for _, attr := range cc.attrs {
112103

113104
values := item.GetField(attr)
114-
fmt.Println("Attr:", k)
105+
fmt.Println("Attr:", attr)
115106

116107
if values != nil {
117108
bytes := values.([]byte)

Diff for: pkg/tsdbctl/create.go

+14-7
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,15 @@ import (
3030
"strconv"
3131
)
3232

33-
const schemaVersion = 0
34-
const defaultStorageClass = "local"
33+
const (
34+
schemaVersion = 0
35+
defaultStorageClass = "local"
36+
defaultIngestionRate = "1/s"
37+
defaultRollupInterval = "1h"
38+
defaultShardingBuckets = 8
39+
defaultSampleRetentionHours = 0
40+
defaultLayerRetentionTime = "1y"
41+
)
3542

3643
type createCommandeer struct {
3744
cmd *cobra.Command
@@ -62,10 +69,10 @@ func newCreateCommandeer(rootCommandeer *RootCommandeer) *createCommandeer {
6269

6370
cmd.Flags().StringVarP(&commandeer.defaultRollups, "rollups", "r", "",
6471
"Default aggregation rollups, comma seperated: count,avg,sum,min,max,stddev")
65-
cmd.Flags().StringVarP(&commandeer.rollupInterval, "rollup-interval", "i", "1h", "aggregation interval")
66-
cmd.Flags().IntVarP(&commandeer.shardingBuckets, "sharding-buckets", "b", 8, "number of buckets to split key")
67-
cmd.Flags().IntVarP(&commandeer.sampleRetention, "sample-retention", "a", 0, "sample retention in hours")
68-
cmd.Flags().StringVar(&commandeer.sampleRate, "rate", "12/m", "sample rate")
72+
cmd.Flags().StringVarP(&commandeer.rollupInterval, "rollup-interval", "i", defaultRollupInterval, "aggregation interval")
73+
cmd.Flags().IntVarP(&commandeer.shardingBuckets, "sharding-buckets", "b", defaultShardingBuckets, "number of buckets to split key")
74+
cmd.Flags().IntVarP(&commandeer.sampleRetention, "sample-retention", "a", defaultSampleRetentionHours, "sample retention in hours")
75+
cmd.Flags().StringVar(&commandeer.sampleRate, "rate", defaultIngestionRate, "sample rate")
6976

7077
commandeer.cmd = cmd
7178

@@ -103,7 +110,7 @@ func (cc *createCommandeer) create() error {
103110
AggregatorsGranularity: cc.rollupInterval,
104111
StorageClass: defaultStorageClass,
105112
SampleRetention: cc.sampleRetention,
106-
LayerRetentionTime: "1y", //TODO
113+
LayerRetentionTime: defaultLayerRetentionTime, //TODO: make configurable
107114
}
108115

109116
tableSchema := config.TableSchema{

0 commit comments

Comments
 (0)