Skip to content

Commit 3aabf9d

Browse files
authored
Merge pull request #529 from v3io/development
Development
2 parents 670117b + 380a777 commit 3aabf9d

20 files changed

+53
-224
lines changed

pkg/aggregate/aggregate.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ func MaskToString(mask AggrType) string {
310310
return output.String()
311311
}
312312

313-
func ToAttrName(aggr AggrType) string {
313+
func ToAttrName(aggr fmt.Stringer) string {
314314
return config.AggregateAttrPrefix + aggr.String()
315315
}
316316

pkg/aggregate/iterator.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (as *Series) NewSetFromAttrs(
134134
}
135135
aggrArrays[aggr] = utils.AsInt64Array(attrBlob.([]byte))
136136

137-
dataArrays[aggr] = make([]float64, length, length)
137+
dataArrays[aggr] = make([]float64, length)
138138
copy(dataArrays[aggr], getOrCreateInitDataArray(aggr, length))
139139
}
140140
}
@@ -189,7 +189,7 @@ func (as *Series) NewSetFromChunks(length int) *Set {
189189

190190
for _, aggr := range rawAggregates {
191191
if aggr&as.aggrMask != 0 {
192-
dataArrays[aggr] = make([]float64, length, length) // TODO: len/capacity & reuse (pool)
192+
dataArrays[aggr] = make([]float64, length) // TODO: len/capacity & reuse (pool)
193193
initArray := getOrCreateInitDataArray(aggr, length)
194194
copy(dataArrays[aggr], initArray)
195195
}
@@ -373,7 +373,7 @@ func getOrCreateInitDataArray(aggrType AggrType, length int) []float64 {
373373

374374
func createInitDataArray(aggrType AggrType, length int) []float64 {
375375
// Prepare "clean" array for fastest reset of "uninitialized" data arrays
376-
resultArray := make([]float64, length, length)
376+
resultArray := make([]float64, length)
377377

378378
var initWith float64
379379
switch aggrType {

pkg/appender/appender.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,9 @@ func (mc *MetricsCache) Add(lset utils.LabelsIfc, t int64, v interface{}) (uint6
248248
for _, preAggr := range mc.partitionMngr.GetConfig().TableSchemaInfo.PreAggregates {
249249
subLset := lset.Filter(preAggr.Labels)
250250
name, key, hash := subLset.GetKey()
251-
aggrMetric, ok := mc.getMetric(name, hash)
251+
_, ok := mc.getMetric(name, hash)
252252
if !ok {
253-
aggrMetric = &MetricState{Lset: subLset, key: key, name: name, hash: hash}
253+
aggrMetric := &MetricState{Lset: subLset, key: key, name: name, hash: hash}
254254
aggrMetric.store = newChunkStore(mc.logger, subLset.LabelNames(), true)
255255
mc.addMetric(hash, name, aggrMetric)
256256
aggrMetrics = append(aggrMetrics, aggrMetric)

pkg/appender/ingest.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ func (mc *MetricsCache) metricFeed(index int) {
5353

5454
for {
5555
select {
56-
case _ = <-mc.stopChan:
56+
case <-mc.stopChan:
5757
return
5858
case app := <-mc.asyncAppendChan:
5959
newMetrics := 0
@@ -150,9 +150,9 @@ func (mc *MetricsCache) metricsUpdateLoop(index int) {
150150
counter := 0
151151
for {
152152
select {
153-
case _ = <-mc.stopChan:
153+
case <-mc.stopChan:
154154
return
155-
case _ = <-mc.newUpdates:
155+
case <-mc.newUpdates:
156156
// Handle new metric notifications (from metricFeed)
157157
for mc.updatesInFlight < mc.cfg.Workers*2 {
158158
freeSlots := mc.cfg.Workers*2 - mc.updatesInFlight
@@ -390,7 +390,7 @@ func (mc *MetricsCache) nameUpdateRespLoop() {
390390
go func() {
391391
for {
392392
select {
393-
case _ = <-mc.stopChan:
393+
case <-mc.stopChan:
394394
return
395395
case resp := <-mc.nameUpdateChan:
396396
// Handle V3IO PutItem in names table

pkg/appender/store.go

-1
Original file line numberDiff line numberDiff line change
@@ -475,7 +475,6 @@ func (cs *chunkStore) writeChunks(mc *MetricsCache, metric *MetricState) (hasPen
475475

476476
hasPendingUpdates = true
477477
cs.performanceReporter.UpdateHistogram("WriteChunksSizeHistogram", int64(pendingSamplesCount))
478-
return
479478
})
480479

481480
return

pkg/config/config.go

+3-8
Original file line numberDiff line numberDiff line change
@@ -171,9 +171,8 @@ type V3ioConfig struct {
171171
UsePreciseAggregations bool `json:"usePreciseAggregations,omitempty"`
172172
// Coefficient to decide whether or not to use server aggregates optimization
173173
// use server aggregations if ` <requested step> / <rollup interval> > UseServerAggregateCoefficient`
174-
UseServerAggregateCoefficient int `json:"useServerAggregateCoefficient,omitempty"`
175-
LoadPartitionsFromSchemaAttr bool `json:"loadPartitionsFromSchemaAttr,omitempty"`
176-
RequestChanLength int `json:"RequestChanLength,omitempty"`
174+
UseServerAggregateCoefficient int `json:"useServerAggregateCoefficient,omitempty"`
175+
RequestChanLength int `json:"RequestChanLength,omitempty"`
177176
}
178177

179178
type MetricsReporterConfig struct {
@@ -225,8 +224,7 @@ type PartitionSchema struct {
225224
}
226225

227226
type Partition struct {
228-
StartTime int64 `json:"startTime"`
229-
SchemaInfo PartitionSchema `json:"schemaInfo"`
227+
StartTime int64 `json:"startTime"`
230228
}
231229

232230
type SchemaField struct {
@@ -260,7 +258,6 @@ func GetOrDefaultConfig() (*V3ioConfig, error) {
260258
func GetOrLoadFromFile(path string) (*V3ioConfig, error) {
261259
once.Do(func() {
262260
instance, failure = loadConfig(path)
263-
return
264261
})
265262

266263
return instance, failure
@@ -269,7 +266,6 @@ func GetOrLoadFromFile(path string) (*V3ioConfig, error) {
269266
func GetOrLoadFromData(data []byte) (*V3ioConfig, error) {
270267
once.Do(func() {
271268
instance, failure = loadFromData(data)
272-
return
273269
})
274270

275271
return instance, failure
@@ -280,7 +276,6 @@ func GetOrLoadFromStruct(cfg *V3ioConfig) (*V3ioConfig, error) {
280276
once.Do(func() {
281277
initDefaults(cfg)
282278
instance = cfg
283-
return
284279
})
285280

286281
return instance, nil

pkg/partmgr/partmgr.go

+5-92
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"fmt"
2626
"math"
2727
"path"
28-
"sort"
2928
"strconv"
3029
"strings"
3130
"sync"
@@ -50,7 +49,7 @@ func NewPartitionMngr(schemaConfig *config.Schema, cont v3io.Container, v3ioConf
5049
return nil, err
5150
}
5251
newMngr := &PartitionManager{schemaConfig: schemaConfig, cyclic: false, container: cont, currentPartitionInterval: currentPartitionInterval, v3ioConfig: v3ioConfig}
53-
err = newMngr.updatePartitionsFromSchema(schemaConfig, nil)
52+
err = newMngr.updatePartitionsFromSchema(schemaConfig)
5453
if err != nil {
5554
return nil, err
5655
}
@@ -141,7 +140,7 @@ func (p *PartitionManager) createAndUpdatePartition(t int64) (*DBPartition, erro
141140
}
142141
p.currentPartitionInterval = partition.partitionInterval
143142

144-
schemaPartition := &config.Partition{StartTime: partition.startTime, SchemaInfo: p.schemaConfig.PartitionSchemaInfo}
143+
schemaPartition := &config.Partition{StartTime: partition.startTime}
145144
if p.headPartition == nil || time > p.headPartition.startTime {
146145
p.headPartition = partition
147146
p.partitions = append(p.partitions, partition)
@@ -182,24 +181,6 @@ func (p *PartitionManager) updateSchema() error {
182181
err = p.container.PutObjectSync(&v3io.PutObjectInput{Path: schemaFilePath, Body: data})
183182
if err != nil {
184183
outerError = err
185-
return
186-
}
187-
attributes := make(map[string]interface{}, len(p.partitions))
188-
for _, part := range p.partitions {
189-
marshalledPartition, err := json.Marshal(part.ToMap())
190-
if err != nil {
191-
outerError = err
192-
return
193-
}
194-
attributes[part.GetPartitionAttributeName()] = marshalledPartition
195-
}
196-
197-
input := &v3io.PutItemInput{Path: schemaFilePath, Attributes: attributes}
198-
_, err := p.container.PutItemSync(input)
199-
200-
if err != nil {
201-
outerError = errors.Wrap(err, "failed to update partitions table.")
202-
return
203184
}
204185
}
205186
})
@@ -256,7 +237,7 @@ func (p *PartitionManager) ReadAndUpdateSchema() (err error) {
256237
err = errors.Wrap(err, "Failed to create timer ReadAndUpdateSchemaTimer.")
257238
return
258239
}
259-
schemaInfoResp, err := p.container.GetItemSync(&v3io.GetItemInput{Path: schemaFilePath, AttributeNames: []string{"**"}})
240+
schemaInfoResp, err := p.container.GetItemSync(&v3io.GetItemInput{Path: schemaFilePath, AttributeNames: []string{"__mtime_secs", "__mtime_nsecs"}})
260241
if err != nil {
261242
err = errors.Wrapf(err, "Failed to read schema at path '%s'.", schemaFilePath)
262243
return
@@ -280,29 +261,13 @@ func (p *PartitionManager) ReadAndUpdateSchema() (err error) {
280261
p.schemaMtimeNanosecs = mtimeNsecs
281262

282263
metricReporter.WithTimer("ReadAndUpdateSchemaTimer", func() {
283-
err = p.updatePartitionsFromSchema(nil, schemaGetItemResponse)
284-
return
264+
err = p.updatePartitionsFromSchema(nil)
285265
})
286266
}
287267
return
288268
}
289269

290-
func (p *PartitionManager) updatePartitionsFromSchema(schemaConfig *config.Schema, schemaGetItemResponse *v3io.GetItemOutput) error {
291-
var currentSchemaVersion int
292-
if schemaConfig == nil {
293-
currentSchemaVersion = p.schemaConfig.TableSchemaInfo.Version
294-
} else {
295-
currentSchemaVersion = schemaConfig.TableSchemaInfo.Version
296-
}
297-
298-
if currentSchemaVersion == 4 && p.v3ioConfig.LoadPartitionsFromSchemaAttr {
299-
return p.newLoadPartitions(schemaGetItemResponse)
300-
}
301-
302-
return p.oldLoadPartitions(schemaConfig)
303-
}
304-
305-
func (p *PartitionManager) oldLoadPartitions(schema *config.Schema) error {
270+
func (p *PartitionManager) updatePartitionsFromSchema(schema *config.Schema) error {
306271
if schema == nil {
307272
schemaFilePath := p.GetSchemaFilePath()
308273
resp, innerError := p.container.GetObjectSync(&v3io.GetObjectInput{Path: schemaFilePath})
@@ -334,58 +299,6 @@ func (p *PartitionManager) oldLoadPartitions(schema *config.Schema) error {
334299
return nil
335300
}
336301

337-
func (p *PartitionManager) newLoadPartitions(schemaAttributesResponse *v3io.GetItemOutput) error {
338-
if p.container == nil { // Tests use case only
339-
return nil
340-
}
341-
342-
if schemaAttributesResponse == nil {
343-
schemaFilePath := p.GetSchemaFilePath()
344-
schemaInfoResp, err := p.container.GetItemSync(&v3io.GetItemInput{Path: schemaFilePath, AttributeNames: []string{"*"}})
345-
if err != nil {
346-
return errors.Wrapf(err, "Failed to read schema at path '%s'.", schemaFilePath)
347-
}
348-
349-
schemaAttributesResponse = schemaInfoResp.Output.(*v3io.GetItemOutput)
350-
}
351-
352-
p.partitions = []*DBPartition{}
353-
for partitionStartTime, partitionAttrBlob := range schemaAttributesResponse.Item {
354-
// Only process "partition" attributes
355-
if !strings.HasPrefix(partitionStartTime, partitionAttributePrefix) {
356-
continue
357-
}
358-
intStartTime, err := strconv.ParseInt(partitionStartTime[1:], 10, 64)
359-
if err != nil {
360-
return errors.Wrapf(err, "invalid partition name '%v'", partitionStartTime)
361-
}
362-
363-
partPath := path.Join(p.Path(), strconv.FormatInt(intStartTime/1000, 10)) + "/"
364-
365-
partitionAttr := make(map[string]interface{}, 5)
366-
err = json.Unmarshal(partitionAttrBlob.([]byte), &partitionAttr)
367-
if err != nil {
368-
return err
369-
}
370-
newPart, err := NewDBPartitionFromMap(p, intStartTime, partPath, partitionAttr)
371-
if err != nil {
372-
return err
373-
}
374-
p.partitions = append(p.partitions, newPart)
375-
if p.headPartition == nil {
376-
p.headPartition = newPart
377-
} else if p.headPartition.startTime < newPart.startTime {
378-
p.headPartition = newPart
379-
}
380-
}
381-
382-
sort.SliceStable(p.partitions, func(i, j int) bool {
383-
return p.partitions[i].startTime < p.partitions[j].startTime
384-
})
385-
386-
return nil
387-
}
388-
389302
//if inclusive is true than partial partitions (not fully in range) will be retireved as well
390303
func (p *PartitionManager) PartsForRange(mint, maxt int64, inclusive bool) []*DBPartition {
391304
var parts []*DBPartition

pkg/pquerier/collector.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ func mainCollector(ctx *selectQueryContext, responseChannel chan *qryResults) {
6666

6767
for {
6868
select {
69-
case _ = <-ctx.stopChan:
69+
case <-ctx.stopChan:
7070
return
7171
case res, ok := <-responseChannel:
7272
if !ok {

pkg/pquerier/pqueriertest/raw_query_integration_test.go

-73
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
package pqueriertest
44

55
import (
6-
"errors"
76
"fmt"
87
"math"
98
"testing"
@@ -718,75 +717,3 @@ func (suite *testRawQuerySuite) TestQueryMultiMetricsInconsistentLabels() {
718717
suite.NotNil(iter.At(), "Iterator yielded a nil series")
719718
}
720719
}
721-
722-
func (suite *testRawQuerySuite) TestLoadPartitionsFromAttributes() {
723-
suite.v3ioConfig.LoadPartitionsFromSchemaAttr = true
724-
defer func() { suite.v3ioConfig.LoadPartitionsFromSchemaAttr = false }()
725-
726-
adapter, err := tsdb.NewV3ioAdapter(suite.v3ioConfig, nil, nil)
727-
if err != nil {
728-
suite.T().Fatalf("failed to create v3io adapter. reason: %s", err)
729-
}
730-
731-
labels1 := utils.LabelsFromStringList("os", "linux")
732-
labels2 := utils.LabelsFromStringList("os", "mac")
733-
numberOfEvents := 5
734-
eventsInterval := int64(tsdbtest.MinuteInMillis)
735-
expectedData := []tsdbtest.DataPoint{{suite.basicQueryTime - 7*tsdbtest.DaysInMillis, 10},
736-
{int64(suite.basicQueryTime + tsdbtest.MinuteInMillis), 20},
737-
{suite.basicQueryTime + 2*eventsInterval, 30},
738-
{suite.basicQueryTime + 3*eventsInterval, 40}}
739-
740-
testParams := tsdbtest.NewTestParams(suite.T(),
741-
tsdbtest.TestOption{
742-
Key: tsdbtest.OptTimeSeries,
743-
Value: tsdbtest.TimeSeries{tsdbtest.Metric{
744-
Name: "cpu",
745-
Labels: labels1,
746-
Data: expectedData},
747-
tsdbtest.Metric{
748-
Name: "cpu",
749-
Labels: labels2,
750-
Data: expectedData},
751-
}})
752-
753-
tsdbtest.InsertData(suite.T(), testParams)
754-
755-
querierV2, err := adapter.QuerierV2()
756-
if err != nil {
757-
suite.T().Fatalf("Failed to create querier v2, err: %v", err)
758-
}
759-
760-
params := &pquerier.SelectParams{Name: "cpu", From: suite.basicQueryTime - 8*tsdbtest.DaysInMillis, To: suite.basicQueryTime + int64(numberOfEvents)*eventsInterval}
761-
set, err := querierV2.Select(params)
762-
if err != nil {
763-
suite.T().Fatalf("Failed to exeute query, err: %v", err)
764-
}
765-
766-
var seriesCount int
767-
for set.Next() {
768-
seriesCount++
769-
iter := set.At().Iterator()
770-
data, err := tsdbtest.IteratorToSlice(iter)
771-
if err != nil {
772-
suite.T().Fatal(err)
773-
}
774-
775-
for i := 0; i < len(expectedData); i++ {
776-
assert.Equal(suite.T(), expectedData[i].Time, data[i].Time)
777-
currentExpected := expectedData[i].Value
778-
switch val := currentExpected.(type) {
779-
case float64:
780-
assert.Equal(suite.T(), val, data[i].Value)
781-
case int:
782-
assert.Equal(suite.T(), float64(val), data[i].Value)
783-
case string:
784-
assert.Equal(suite.T(), val, data[i].Value)
785-
default:
786-
assert.Error(suite.T(), errors.New("unsupported data type"))
787-
}
788-
}
789-
}
790-
791-
assert.Equal(suite.T(), 2, seriesCount, "series count didn't match expected")
792-
}

pkg/pquerier/querier.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,6 @@ func (q *V3ioQuerier) baseSelectQry(params *SelectParams, showAggregateLabel boo
187187
}
188188

189189
iter, err = selectContext.start(parts, params)
190-
return
191190
})
192191

193192
return
@@ -227,7 +226,7 @@ func (q *V3ioQuerier) getMetricNames() ([]string, error) {
227226
metricNames = append(metricNames, iter.GetField(config.ObjectNameAttrName).(string))
228227
}
229228

230-
sort.Sort(sort.StringSlice(metricNames))
229+
sort.Strings(metricNames)
231230

232231
if iter.Err() != nil {
233232
return nil, fmt.Errorf("failed to read metric names; err = %v", iter.Err().Error())

0 commit comments

Comments
 (0)