Skip to content

Commit 17286c1

Browse files
authored
Merge pull request #488 from v3io/development
Development -> master
2 parents 6b64285 + 6a4836e commit 17286c1

File tree

3 files changed

+54
-37
lines changed

3 files changed

+54
-37
lines changed

Diff for: pkg/appender/ingest.go

+27-13
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func (mc *MetricsCache) metricFeed(index int) {
5050
inFlight := 0
5151
gotData := false
5252
gotCompletion := false
53+
potentialCompletion := false
5354
var completeChan chan int
5455

5556
for {
@@ -62,11 +63,16 @@ func (mc *MetricsCache) metricFeed(index int) {
6263
mc.logger.Debug(`Complete update cycle - "in-flight requests"=%d; "metric queue length"=%d\n`, inFlight, length)
6364

6465
// If data was sent and the queue is empty, mark as completion
65-
if length == 0 && gotData && len(mc.asyncAppendChan) == 0 {
66-
gotCompletion = true
67-
if completeChan != nil {
68-
completeChan <- 0
69-
gotData = false
66+
if length == 0 && gotData {
67+
if len(mc.asyncAppendChan) == 0 {
68+
gotCompletion = true
69+
if completeChan != nil {
70+
completeChan <- 0
71+
gotData = false
72+
potentialCompletion = false
73+
}
74+
} else if len(mc.asyncAppendChan) == 1 {
75+
potentialCompletion = true
7076
}
7177
}
7278
case app := <-mc.asyncAppendChan:
@@ -78,14 +84,17 @@ func (mc *MetricsCache) metricFeed(index int) {
7884
if app.metric == nil {
7985
// Handle update completion requests (metric == nil)
8086
completeChan = app.resp
81-
8287
length := mc.metricQueue.Length()
83-
if gotCompletion && length == 0 && len(mc.asyncAppendChan) == 0 {
84-
completeChan <- 0
85-
gotCompletion = false
86-
gotData = false
88+
if length == 0 && len(mc.asyncAppendChan) == 0 {
89+
if gotCompletion || (potentialCompletion && gotData) {
90+
completeChan <- 0
91+
gotCompletion = false
92+
gotData = false
93+
}
8794
}
95+
potentialCompletion = false
8896
} else {
97+
potentialCompletion = false
8998
// Handle append requests (Add / AddFast)
9099
gotData = true
91100
metric := app.metric
@@ -114,7 +123,6 @@ func (mc *MetricsCache) metricFeed(index int) {
114123
}
115124
metric.Unlock()
116125
}
117-
118126
// Poll if we have more updates (accelerate the outer select)
119127
if i < mc.cfg.BatchSize {
120128
select {
@@ -250,8 +258,14 @@ func (mc *MetricsCache) postMetricUpdates(metric *MetricState) {
250258
} else if sent {
251259
metric.setState(storeStateUpdate)
252260
}
253-
if !sent && metric.store.samplesQueueLength() == 0 {
254-
metric.setState(storeStateReady)
261+
if !sent {
262+
if metric.store.samplesQueueLength() == 0 {
263+
metric.setState(storeStateReady)
264+
} else {
265+
if mc.metricQueue.length() > 0 {
266+
mc.newUpdates <- mc.metricQueue.length()
267+
}
268+
}
255269
}
256270
}
257271

Diff for: pkg/pquerier/pqueriertest/dataframe_query_integration_test.go

+22-16
Original file line numberDiff line numberDiff line change
@@ -971,9 +971,12 @@ func (suite *testSelectDataframeSuite) TestSparseNumericColumnsWithEmptyColumnsD
971971
suite.basicQueryTime + 3*tsdbtest.MinuteInMillis,
972972
suite.basicQueryTime + 4*tsdbtest.MinuteInMillis}
973973
expectedColumns := map[string][]interface{}{
974-
"cpu_0": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
975-
"cpu_1": {10.0, 20.0, 30.0, math.NaN(), 50.0, math.NaN(), 22.0, 33.0, math.NaN(), 55.0},
976-
"cpu_2": {math.NaN(), math.NaN(), math.NaN(), 40.4, 50.5, 10.0, 20.0, math.NaN(), 40.0, 50.0},
974+
"cpu_0-linux": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
975+
"cpu_0-windows": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
976+
"cpu_1-linux": {10.0, 20.0, 30.0, math.NaN(), 50.0},
977+
"cpu_1-windows": {math.NaN(), 22.0, 33.0, math.NaN(), 55.0},
978+
"cpu_2-linux": {math.NaN(), math.NaN(), math.NaN(), 40.4, 50.5},
979+
"cpu_2-windows": {10.0, 20.0, math.NaN(), 40.0, 50.0},
977980
}
978981

979982
testParams := tsdbtest.NewTestParams(suite.T(),
@@ -1034,32 +1037,32 @@ func (suite *testSelectDataframeSuite) TestSparseNumericColumnsWithEmptyColumnsD
10341037
iter, err := querierV2.SelectDataFrame(params)
10351038
requireCtx.NoError(err, "failed to execute query")
10361039

1037-
rowId := -1
10381040
var seriesCount int
10391041
for iter.NextFrame() {
10401042
seriesCount++
10411043
frame, err := iter.GetFrame()
10421044
requireCtx.NoError(err)
10431045
indexCol := frame.Indices()[0]
1046+
osLabel := frame.Labels()["os"]
10441047

10451048
nullValuesMap := frame.NullValuesMap()
10461049
requireCtx.NotNil(nullValuesMap, "null value map should not be empty")
10471050

10481051
for i := 0; i < indexCol.Len(); i++ {
1049-
rowId++
10501052
t, _ := indexCol.TimeAt(i)
10511053
timeMillis := t.UnixNano() / int64(time.Millisecond)
10521054
requireCtx.Equal(expectedTimeColumn[i], timeMillis, "time column does not match at index %v", i)
10531055
for _, columnName := range frame.Names() {
10541056
var v interface{}
1057+
key := fmt.Sprintf("%v-%v", columnName, osLabel)
10551058
column, err := frame.Column(columnName)
10561059
requireCtx.NoError(err)
10571060
if column.DType() == frames.FloatType {
10581061
v, _ = column.FloatAt(i)
10591062
if v == math.NaN() {
10601063
requireCtx.True(nullValuesMap[i].NullColumns[columnName])
10611064
}
1062-
bothNaN := math.IsNaN(expectedColumns[column.Name()][i].(float64)) && math.IsNaN(v.(float64))
1065+
bothNaN := math.IsNaN(expectedColumns[key][i].(float64)) && math.IsNaN(v.(float64))
10631066
if bothNaN {
10641067
continue
10651068
}
@@ -1072,9 +1075,9 @@ func (suite *testSelectDataframeSuite) TestSparseNumericColumnsWithEmptyColumnsD
10721075
suite.Fail(fmt.Sprintf("column type is not as expected: %v", column.DType()))
10731076
}
10741077

1075-
expectedValue := expectedColumns[columnName][rowId]
1078+
expectedValue := expectedColumns[key][i]
10761079
if !math.IsNaN(expectedValue.(float64)) || !math.IsNaN(v.(float64)) {
1077-
requireCtx.Equal(expectedValue, v, "column %v does not match at index %v", columnName, rowId)
1080+
requireCtx.Equal(expectedValue, v, "column %v does not match at index %v", columnName, i)
10781081
}
10791082
}
10801083
}
@@ -1092,9 +1095,12 @@ func (suite *testSelectDataframeSuite) TestSparseNumericColumnsWithPartialLabels
10921095
suite.basicQueryTime + 3*tsdbtest.MinuteInMillis,
10931096
suite.basicQueryTime + 4*tsdbtest.MinuteInMillis}
10941097
expectedColumns := map[string][]interface{}{
1095-
"cpu_0": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
1096-
"cpu_1": {10.0, 20.0, 30.0, 40.0, 50.0, math.NaN(), 22.0, 33.0, math.NaN(), 55.0},
1097-
"cpu_2": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN(), 10.0, 20.0, math.NaN(), 40.0, 50.0},
1098+
"cpu_0-linux": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
1099+
"cpu_0-windows": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
1100+
"cpu_1-linux": {10.0, 20.0, 30.0, 40.0, 50.0},
1101+
"cpu_1-windows": {math.NaN(), 22.0, 33.0, math.NaN(), 55.0},
1102+
"cpu_2-linux": {math.NaN(), math.NaN(), math.NaN(), math.NaN(), math.NaN()},
1103+
"cpu_2-windows": {10.0, 20.0, math.NaN(), 40.0, 50.0},
10981104
}
10991105

11001106
testParams := tsdbtest.NewTestParams(suite.T(),
@@ -1148,23 +1154,23 @@ func (suite *testSelectDataframeSuite) TestSparseNumericColumnsWithPartialLabels
11481154
iter, err := querierV2.SelectDataFrame(params)
11491155
requireCtx.NoError(err, "failed to execute query")
11501156

1151-
rowId := -1
11521157
var seriesCount int
11531158
for iter.NextFrame() {
11541159
seriesCount++
11551160
frame, err := iter.GetFrame()
11561161
requireCtx.NoError(err)
11571162
indexCol := frame.Indices()[0]
1163+
osLabel := frame.Labels()["os"]
11581164

11591165
nullValuesMap := frame.NullValuesMap()
11601166
requireCtx.NotNil(nullValuesMap, "null value map should not be empty")
11611167

11621168
for i := 0; i < indexCol.Len(); i++ {
1163-
rowId++
11641169
t, _ := indexCol.TimeAt(i)
11651170
timeMillis := t.UnixNano() / int64(time.Millisecond)
11661171
requireCtx.Equal(expectedTimeColumn[i], timeMillis, "time column does not match at index %v", i)
11671172
for _, columnName := range frame.Names() {
1173+
key := fmt.Sprintf("%v-%v", columnName, osLabel)
11681174
var v interface{}
11691175
column, err := frame.Column(columnName)
11701176
requireCtx.NoError(err)
@@ -1173,7 +1179,7 @@ func (suite *testSelectDataframeSuite) TestSparseNumericColumnsWithPartialLabels
11731179
if v == math.NaN() {
11741180
requireCtx.True(nullValuesMap[i].NullColumns[columnName])
11751181
}
1176-
bothNaN := math.IsNaN(expectedColumns[column.Name()][i].(float64)) && math.IsNaN(v.(float64))
1182+
bothNaN := math.IsNaN(expectedColumns[key][i].(float64)) && math.IsNaN(v.(float64))
11771183
if bothNaN {
11781184
continue
11791185
}
@@ -1186,9 +1192,9 @@ func (suite *testSelectDataframeSuite) TestSparseNumericColumnsWithPartialLabels
11861192
suite.Fail(fmt.Sprintf("column type is not as expected: %v", column.DType()))
11871193
}
11881194

1189-
expectedValue := expectedColumns[columnName][rowId]
1195+
expectedValue := expectedColumns[key][i]
11901196
if !math.IsNaN(expectedValue.(float64)) || !math.IsNaN(v.(float64)) {
1191-
requireCtx.Equal(expectedValue, v, "column %v does not match at index %v", columnName, rowId)
1197+
requireCtx.Equal(expectedValue, v, "column %v does not match at index %v", columnName, i)
11921198
}
11931199
}
11941200
}

Diff for: pkg/tsdbctl/delete.go

+5-8
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,10 @@ func newDeleteCommandeer(rootCommandeer *RootCommandeer) *delCommandeer {
6161
the name of the data container are configured in the default configuration file (` + config.DefaultConfigurationFileName + `)
6262
instead of using the -s|--server, -u|--username, -p|--password, and -c|--container flags.
6363
- tsdbctl delete -t metrics_tsdb -a
64-
- tsdbctl delete -t dbs/perfstats -f
64+
- tsdbctl delete -t dbs/perfstats --force
6565
- tsdbctl delete -t my_tsdb -b 0 -e now-7d -i
66-
67-
Notes:
68-
- When deleting content within a specific time range (see the -b|--begin and -e|--end flags and
69-
their default values), all partitions containing data within this range are deleted, including
70-
metric items with older or newer times. Use the info command to view the partitioning interval.`,
66+
- tsdbctl delete -t my_tsdb -b 0 -e now-7d -m "metric_1,metric_2"
67+
- tsdbctl delete -t my_tsdb -b 0 -e now-7d -f 'my_label=="value1"'`,
7168
RunE: func(cmd *cobra.Command, args []string) error {
7269

7370
if len(args) > 0 {
@@ -82,13 +79,13 @@ Notes:
8279
"Delete the TSDB table, including its configuration and all content.")
8380
cmd.Flags().BoolVarP(&commandeer.ignoreErrors, "ignore-errors", "i", true,
8481
"Ignore errors - continue deleting even if some steps fail.")
85-
cmd.Flags().BoolVarP(&commandeer.force, "force", "f", false,
82+
cmd.Flags().BoolVar(&commandeer.force, "force", false,
8683
"Forceful deletion - don't display a delete-verification prompt.")
8784
cmd.Flags().StringVarP(&commandeer.toTime, "end", "e", "",
8885
"End (maximum) time for the delete operation, as a string containing an\nRFC 3339 time string, a Unix timestamp in milliseconds, or a relative\ntime of the format \"now\" or \"now-[0-9]+[mhd]\" (where 'm' = minutes,\n'h' = hours, and 'd' = days). Examples: \"2018-09-26T14:10:20Z\";\n\"1537971006000\"; \"now-3h\"; \"now-7d\". (default \"now\")")
8986
cmd.Flags().StringVarP(&commandeer.fromTime, "begin", "b", "",
9087
"Start (minimum) time for the delete operation, as a string containing\nan RFC 3339 time, a Unix timestamp in milliseconds, a relative time of\nthe format \"now\" or \"now-[0-9]+[mhd]\" (where 'm' = minutes, 'h' = hours,\nand 'd' = days), or 0 for the earliest time. Examples:\n\"2016-01-02T15:34:26Z\"; \"1451748866\"; \"now-90m\"; \"0\". (default =\n<end time> - 1h)")
91-
cmd.Flags().StringVar(&commandeer.filter, "filter", "",
88+
cmd.Flags().StringVarP(&commandeer.filter, "filter", "f", "",
9289
"Query filter, as an Iguazio Data Science Platform\nfilter expression. \nExamples: \"method=='get'\"; \"method=='get' AND os=='win'\".")
9390
cmd.Flags().StringVarP(&commandeer.metrics, "metrics", "m", "",
9491
"Comma-separated list of metric names to delete. If you don't set this argument, all metrics will be deleted according to the time range and filter specified.")

0 commit comments

Comments
 (0)