Skip to content

Commit 2474f95

Browse files
authored
Merge pull request #122 from v3io/development
Development --> Master (v0.0.11)
2 parents 18410e7 + ec8d435 commit 2474f95

17 files changed

+372
-132
lines changed

Diff for: Makefile

+4
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ test: get
2525
integration: get
2626
go test -race -tags integration -p 1 -count 1 $(TOPLEVEL_DIRS) # p=1 to force Go to run pkg tests serially.
2727

28+
.PHONY: bench
29+
bench: get
30+
go test -run=XXX -bench='^BenchmarkIngest$$' -benchtime 10s -timeout 1m ./test/benchmark/...
31+
2832
.PHONY: build
2933
build: get
3034
go build -v -o "$(GOPATH)/bin/$(TSDBCTL_BIN_NAME)" ./cmd/tsdbctl

Diff for: pkg/querier/querier.go

+83-11
Original file line numberDiff line numberDiff line change
@@ -154,27 +154,99 @@ func (q *V3ioQuerier) queryNumericPartition(
154154
}
155155

156156
// return the current metric names
157-
func (q *V3ioQuerier) LabelValues(name string) ([]string, error) {
157+
func (q *V3ioQuerier) LabelValues(labelKey string) ([]string, error) {
158+
if labelKey == "__name__" {
159+
return q.getMetricNames()
160+
} else {
161+
return q.getLabelValues(labelKey)
162+
}
163+
}
164+
165+
func (q *V3ioQuerier) Close() error {
166+
return nil
167+
}
168+
169+
func (q *V3ioQuerier) getMetricNames() ([]string, error) {
170+
input := v3io.GetItemsInput{
171+
Path: q.cfg.Path + "/names/",
172+
AttributeNames: []string{"__name"},
173+
}
158174

159-
list := []string{}
160-
input := v3io.GetItemsInput{Path: q.cfg.Path + "/names/", AttributeNames: []string{"__name"}, Filter: ""}
161175
iter, err := utils.NewAsyncItemsCursor(q.container, &input, q.cfg.QryWorkers, []string{}, q.logger)
162-
q.logger.DebugWith("GetItems to read names", "input", input, "err", err)
163176
if err != nil {
164-
return list, err
177+
return nil, err
165178
}
166179

180+
var metricNames []string
181+
167182
for iter.Next() {
168-
name := iter.GetField("__name").(string)
169-
list = append(list, name)
183+
metricNames = append(metricNames, iter.GetField("__name").(string))
170184
}
171185

172186
if iter.Err() != nil {
173-
q.logger.InfoWith("Failed to read names, assume empty list", "err", iter.Err().Error())
187+
q.logger.InfoWith("Failed to read metric names, returning empty list", "err", iter.Err().Error())
174188
}
175-
return list, nil
189+
190+
return metricNames, nil
176191
}
177192

178-
func (q *V3ioQuerier) Close() error {
179-
return nil
193+
func (q *V3ioQuerier) getLabelValues(labelKey string) ([]string, error) {
194+
195+
// sync partition manager (hack)
196+
err := q.partitionMngr.ReadAndUpdateSchema()
197+
if err != nil {
198+
return nil, err
199+
}
200+
201+
partitionPaths := q.partitionMngr.GetPartitionsPaths()
202+
203+
// if no partitions yet - there are no labels
204+
if len(partitionPaths) == 0 {
205+
return nil, nil
206+
}
207+
208+
labelValuesMap := map[string]struct{}{}
209+
210+
// get all labelsets
211+
input := v3io.GetItemsInput{
212+
Path: partitionPaths[0],
213+
AttributeNames: []string{"_lset"},
214+
}
215+
216+
iter, err := utils.NewAsyncItemsCursor(q.container, &input, q.cfg.QryWorkers, []string{}, q.logger)
217+
if err != nil {
218+
return nil, err
219+
}
220+
221+
// iterate over the results
222+
for iter.Next() {
223+
labelSet := iter.GetField("_lset").(string)
224+
225+
// the labelSet will be k1=v1,k2=v2, k2=v3. Assuming labelKey is "k2", we want to convert
226+
// that to [v2, v3]
227+
228+
// split at "," to get k=v pairs
229+
for _, label := range strings.Split(labelSet, ",") {
230+
231+
// split at "=" to get label key and label value
232+
splitLabel := strings.SplitN(label, "=", 2)
233+
234+
// if we got two elements and the first element (the key) is equal to what we're looking
235+
// for, save the label value in the map. use a map to prevent duplications
236+
if len(splitLabel) == 2 && splitLabel[0] == labelKey {
237+
labelValuesMap[splitLabel[1]] = struct{}{}
238+
}
239+
}
240+
}
241+
242+
if iter.Err() != nil {
243+
q.logger.InfoWith("Failed to read label values, returning empty list", "err", iter.Err().Error())
244+
}
245+
246+
var labelValues []string
247+
for labelValue, _ := range labelValuesMap {
248+
labelValues = append(labelValues, labelValue)
249+
}
250+
251+
return labelValues, nil
180252
}

Diff for: pkg/querier/seriesset.go

-5
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@ import (
2828
"github.com/v3io/v3io-tsdb/pkg/utils"
2929
)
3030

31-
func newSeriesSet(partition *partmgr.DBPartition, mint, maxt int64) *V3ioSeriesSet {
32-
33-
return &V3ioSeriesSet{mint: mint, maxt: maxt, partition: partition}
34-
}
35-
3631
// holds the query result set
3732
type V3ioSeriesSet struct {
3833
err error

Diff for: pkg/tsdb/tsdbtest/tsdbtest.go

+15-7
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func CreateTestTSDB(t testing.TB, v3ioConfig *config.V3ioConfig) {
4646
schema := testutils.CreateSchema(t, "*")
4747
if err := CreateTSDB(v3ioConfig, &schema); err != nil {
4848
v3ioConfigAsJson, _ := json2.MarshalIndent(v3ioConfig, "", " ")
49-
t.Fatalf("Failed to create TSDB. Reason: %s\nConfiguration:\n%s", err, string(v3ioConfigAsJson))
49+
t.Fatalf("Failed to create TSDB. Reason: %v\nConfiguration:\n%s", err, string(v3ioConfigAsJson))
5050
}
5151
}
5252

@@ -112,14 +112,22 @@ func InsertData(t *testing.T, v3ioConfig *config.V3ioConfig, metricName string,
112112
return adapter
113113
}
114114

115-
func ValidateCountOfSamples(t testing.TB, adapter *V3ioAdapter, metricName string, expected int, startTimeMs, endTimeMs int64) {
116-
qry, err := adapter.Querier(nil, startTimeMs, endTimeMs)
117-
if err != nil {
118-
t.Fatal(err, "failed to create Querier instance.")
115+
func ValidateCountOfSamples(t testing.TB, adapter *V3ioAdapter, metricName string, expected int, startTimeMs, endTimeMs int64, queryAggStep int64) {
116+
117+
var stepSize int64
118+
if queryAggStep <= 0 {
119+
var err error
120+
stepSize, err = utils.Str2duration("1h")
121+
if err != nil {
122+
t.Fatal(err, "failed to create step")
123+
}
124+
} else {
125+
stepSize = queryAggStep
119126
}
120-
stepSize, err := utils.Str2duration("1h")
127+
128+
qry, err := adapter.Querier(nil, startTimeMs-stepSize, endTimeMs)
121129
if err != nil {
122-
t.Fatal(err, "failed to create step")
130+
t.Fatal(err, "failed to create Querier instance.")
123131
}
124132

125133
set, err := qry.Select("", "count", stepSize, fmt.Sprintf("starts(__name__, '%v')", metricName))

Diff for: pkg/tsdb/v3iotsdb.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -59,14 +59,17 @@ func CreateTSDB(v3iocfg *config.V3ioConfig, schema *config.Schema) error {
5959
return errors.Wrap(err, "Failed to Marshal schema file")
6060
}
6161

62+
path := pathUtil.Join(v3iocfg.Path, config.SCHEMA_CONFIG)
6263
// check if the config file already exist, abort if it does
63-
_, err = container.Sync.GetObject(&v3io.GetObjectInput{Path: pathUtil.Join(v3iocfg.Path, config.SCHEMA_CONFIG)})
64+
_, err = container.Sync.GetObject(&v3io.GetObjectInput{Path: path})
6465
if err == nil {
6566
return fmt.Errorf("TSDB already exist in path: " + v3iocfg.Path)
6667
}
6768

68-
err = container.Sync.PutObject(&v3io.PutObjectInput{Path: pathUtil.Join(v3iocfg.Path, config.SCHEMA_CONFIG), Body: data})
69-
69+
err = container.Sync.PutObject(&v3io.PutObjectInput{Path: path, Body: data})
70+
if err != nil {
71+
return errors.Wrap(err, "Failed create schema at path "+pathUtil.Join(v3iocfg.V3ioUrl, v3iocfg.Container, path))
72+
}
7073
return err
7174
}
7275

Diff for: pkg/tsdb/v3iotsdb_integration_test.go

+53-27
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import (
3737
"testing"
3838
)
3939

40+
const defaultStepMs = 5 * 60 * 1000 // 5 minutes
41+
4042
func TestIngestData(t *testing.T) {
4143
v3ioConfig, err := tsdbtest.LoadV3ioConfig()
4244
if err != nil {
@@ -115,7 +117,7 @@ func testIngestDataCase(t *testing.T, v3ioConfig *config.V3ioConfig,
115117
t.Fatalf("Failed to wait for appender completion. reason: %s", err)
116118
}
117119

118-
tsdbtest.ValidateCountOfSamples(t, adapter, metricsName, len(data), from, to)
120+
tsdbtest.ValidateCountOfSamples(t, adapter, metricsName, len(data), from, to, -1)
119121
}
120122

121123
func TestQueryData(t *testing.T) {
@@ -133,52 +135,65 @@ func TestQueryData(t *testing.T) {
133135
aggregators string
134136
from int64
135137
to int64
138+
step int64
136139
expected map[string][]tsdbtest.DataPoint
137140
ignoreReason string
138141
expectFail bool
139142
}{
140143
{desc: "Should ingest and query one data point", metricName: "cpu",
141-
labels: utils.FromStrings("testLabel", "balbala"),
142-
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 314.3}},
143-
from: 0, to: 1532940510 + 1,
144+
labels: utils.FromStrings("testLabel", "balbala"),
145+
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 314.3}},
146+
from: 0,
147+
to: 1532940510 + 1,
148+
step: defaultStepMs,
144149
expected: map[string][]tsdbtest.DataPoint{"": {{Time: 1532940510, Value: 314.3}}}},
145150

146151
{desc: "Should ingest and query multiple data points", metricName: "cpu",
147152
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),
148153
data: []tsdbtest.DataPoint{{Time: 1532940510 - 10, Value: 314.3},
149154
{Time: 1532940510 - 5, Value: 300.3},
150155
{Time: 1532940510, Value: 3234.6}},
151-
from: 0, to: 1532940510 + 1,
156+
from: 0,
157+
to: 1532940510 + 1,
158+
step: defaultStepMs,
152159
expected: map[string][]tsdbtest.DataPoint{"": {{Time: 1532940510 - 10, Value: 314.3},
153160
{Time: 1532940510 - 5, Value: 300.3},
154161
{Time: 1532940510, Value: 3234.6}}}},
155162

156163
{desc: "Should query with filter on metric name", metricName: "cpu",
157-
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),
158-
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 33.3}},
159-
filter: "_name=='cpu'",
160-
from: 0, to: 1532940510 + 1,
164+
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),
165+
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 33.3}},
166+
filter: "_name=='cpu'",
167+
from: 0,
168+
to: 1532940510 + 1,
169+
step: defaultStepMs,
161170
expected: map[string][]tsdbtest.DataPoint{"": {{Time: 1532940510, Value: 33.3}}}},
162171

163172
{desc: "Should query with filter on label name", metricName: "cpu",
164-
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),
165-
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 31.3}},
166-
filter: "os=='linux'",
167-
from: 0, to: 1532940510 + 1,
173+
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),
174+
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 31.3}},
175+
filter: "os=='linux'",
176+
from: 0,
177+
to: 1532940510 + 1,
178+
step: defaultStepMs,
168179
expected: map[string][]tsdbtest.DataPoint{"": {{Time: 1532940510, Value: 31.3}}}},
169180

170181
{desc: "Should ingest and query data with '-' in the metric name (IG-8585)", metricName: "cool-cpu",
171-
labels: utils.FromStrings("testLabel", "balbala"),
172-
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 314.3}},
173-
from: 0, to: 1532940510 + 1,
182+
labels: utils.FromStrings("testLabel", "balbala"),
183+
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 314.3}},
184+
from: 0,
185+
to: 1532940510 + 1,
186+
step: defaultStepMs,
174187
expected: map[string][]tsdbtest.DataPoint{"": {{Time: 1532940510, Value: 314.3}}}},
175188

176189
{desc: "Should ingest and query by time", metricName: "cpu",
177190
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),
178191
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 314.3},
179192
{Time: 1532940510 + 5, Value: 300.3},
180193
{Time: 1532940510 + 10, Value: 3234.6}},
181-
from: 1532940510 + 2, to: 1532940510 + 12,
194+
from: 1532940510 + 2,
195+
to: 1532940510 + 12,
196+
step: defaultStepMs,
182197
expected: map[string][]tsdbtest.DataPoint{"": {{Time: 1532940510 + 5, Value: 300.3},
183198
{Time: 1532940510 + 10, Value: 3234.6}}}},
184199

@@ -187,26 +202,31 @@ func TestQueryData(t *testing.T) {
187202
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 314.3},
188203
{Time: 1532940510 + 5, Value: 300.3},
189204
{Time: 1532940510 + 10, Value: 3234.6}},
190-
from: 1532940510 + 1, to: 1532940510 + 4,
205+
from: 1532940510 + 1,
206+
to: 1532940510 + 4,
207+
step: defaultStepMs,
191208
expected: map[string][]tsdbtest.DataPoint{}},
192209

193210
{desc: "Should ingest and query an aggregator", metricName: "cpu",
194211
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),
195212
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 300.3},
196213
{Time: 1532940510 + 5, Value: 300.3},
197214
{Time: 1532940510 + 10, Value: 100.4}},
198-
from: 1532940510, to: 1532940510 + 11,
215+
from: 1532940510,
216+
to: 1532940510 + 11,
217+
step: defaultStepMs,
199218
aggregators: "sum",
200219
expected: map[string][]tsdbtest.DataPoint{"sum": {{Time: 1532940510, Value: 701.0}}}},
201220

202-
{desc: "Should ingest and query an aggregator EXTRA", metricName: "cpu",
221+
{desc: "Should ingest and query an aggregator with interval greater than step size", metricName: "cpu",
203222
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),
204223
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 300.3},
205224
{Time: 1532940510 + 60, Value: 300.3},
206225
{Time: 1532940510 + 2*60, Value: 100.4},
207-
{Time: 1532940510 + 2*60, Value: 200.0}},
226+
{Time: 1532940510 + 5*60, Value: 200.0}},
208227
from: 1532940510,
209228
to: 1532940510 + 6*60,
229+
step: defaultStepMs,
210230
aggregators: "sum",
211231
expected: map[string][]tsdbtest.DataPoint{"sum": {{Time: 1532940510, Value: 901.0}}}},
212232

@@ -215,7 +235,9 @@ func TestQueryData(t *testing.T) {
215235
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 300.3},
216236
{Time: 1532940510 + 5, Value: 300.3},
217237
{Time: 1532940510 + 10, Value: 100.4}},
218-
from: 1532940510, to: 1532940510 + 11,
238+
from: 1532940510,
239+
to: 1532940510 + 11,
240+
step: defaultStepMs,
219241
aggregators: "sum,count",
220242
expected: map[string][]tsdbtest.DataPoint{"sum": {{Time: 1532940510, Value: 701.0}},
221243
"count": {{Time: 1532940510, Value: 3}}}},
@@ -225,15 +247,19 @@ func TestQueryData(t *testing.T) {
225247
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 314.3},
226248
{Time: 1532940510 + 5, Value: 300.3},
227249
{Time: 1532940510 + 10, Value: 3234.6}},
228-
from: 1532940510 + 1, to: 0,
250+
from: 1532940510 + 1,
251+
to: 0,
252+
step: defaultStepMs,
229253
expectFail: true,
230254
},
231255

232256
{desc: "Should query with filter on not existing metric name", metricName: "cpu",
233-
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),
234-
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 33.3}},
235-
filter: "_name=='hahaha'",
236-
from: 0, to: 1532940510 + 1,
257+
labels: utils.FromStrings("os", "linux", "iguaz", "yesplease"),
258+
data: []tsdbtest.DataPoint{{Time: 1532940510, Value: 33.3}},
259+
filter: "_name=='hahaha'",
260+
from: 0,
261+
to: 1532940510 + 1,
262+
step: defaultStepMs,
237263
expected: map[string][]tsdbtest.DataPoint{}},
238264
}
239265

Diff for: pkg/tsdbctl/info.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ func (ic *infoCommandeer) info() error {
8484
}
8585

8686
// get all metric names
87-
names, err := qry.LabelValues("")
87+
names, err := qry.LabelValues("__name__")
8888
if err != nil {
8989
return errors.Wrap(err, "Failed to get labels")
9090
}

0 commit comments

Comments
 (0)