Skip to content

Commit fcf76c5

Browse files
committed
[influxdb] Move keep() after ordering/pagination for query performance
Applying keep() before last()/sort()/limit() prevents InfluxDB from using time-series index pushdown optimisations. For the common restore-on-startup case (DESCENDING + pageSize=1), keep() → last() forces a full data scan before last() can run; reversing the order to last() → keep() allows the query planner to seek directly to the most recent record. The reordering also fixes a latent correctness issue: the state-value filter uses r._field, which keep() was dropping before the filter ran. Signed-off-by: Jonathan Gilbert <jpg@trillica.com>
1 parent bcf6bb1 commit fcf76c5

2 files changed

Lines changed: 29 additions & 22 deletions

File tree

bundles/org.openhab.persistence.influxdb/src/main/java/org/openhab/persistence/influxdb/internal/influx2/InfluxDB2FilterCriteriaQueryCreatorImpl.java

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,12 +69,9 @@ public String createQuery(FilterCriteria criteria, String retentionPolicy, @Null
6969
String name = influxDBMetadataService.getMeasurementNameOrDefault(localAlias);
7070
String measurementName = configuration.isReplaceUnderscore() ? name.replace('_', '.') : name;
7171
flux = flux.filter(measurement().equal(measurementName));
72-
if (!measurementName.equals(itemName)) {
72+
boolean needsItemTag = !measurementName.equals(itemName);
73+
if (needsItemTag) {
7374
flux = flux.filter(tag(TAG_ITEM_NAME).equal(itemName));
74-
flux = flux.keep(
75-
new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2, TAG_ITEM_NAME });
76-
} else {
77-
flux = flux.keep(new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2 });
7875
}
7976

8077
State filterState = criteria.getState();
@@ -84,8 +81,18 @@ public String createQuery(FilterCriteria criteria, String retentionPolicy, @Null
8481
flux = flux.filter(restrictions);
8582
}
8683

84+
// Apply ordering/pagination before keep() so that last()/sort()/limit() can benefit
85+
// from InfluxDB's pushdown optimisations. Applying keep() first drops _field, which
86+
// both prevents state-value filtering from working correctly and blocks last() pushdown.
8787
flux = applyOrderingAndPageSize(criteria, flux);
8888

89+
if (needsItemTag) {
90+
flux = flux.keep(
91+
new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2, TAG_ITEM_NAME });
92+
} else {
93+
flux = flux.keep(new String[] { FIELD_MEASUREMENT_NAME, COLUMN_TIME_NAME_V2, COLUMN_VALUE_NAME_V2 });
94+
}
95+
8996
return flux.toString();
9097
}
9198

bundles/org.openhab.persistence.influxdb/src/test/java/org/openhab/persistence/influxdb/internal/InfluxFilterCriteriaQueryCreatorImplTest.java

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -85,8 +85,8 @@ public void testSimpleItemQueryWithoutParams() {
8585
from(bucket:"origin")
8686
\t|> range(start:-100y, stop:100y)
8787
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
88-
\t|> keep(columns:["_measurement", "_time", "_value"])
89-
\t|> sort(desc:true, columns:["_time"])"""));
88+
\t|> sort(desc:true, columns:["_time"])
89+
\t|> keep(columns:["_measurement", "_time", "_value"])"""));
9090
}
9191

9292
@Test
@@ -108,9 +108,9 @@ public void testRangeCriteria() {
108108
from(bucket:"origin")
109109
\t|> range(start:%s, stop:%s)
110110
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
111-
\t|> keep(columns:["_measurement", "_time", "_value"])
112-
\t|> sort(desc:true, columns:["_time"])""", INFLUX2_DATE_FORMATTER.format(now.toInstant()),
113-
INFLUX2_DATE_FORMATTER.format(tomorrow.toInstant()));
111+
\t|> sort(desc:true, columns:["_time"])
112+
\t|> keep(columns:["_measurement", "_time", "_value"])""",
113+
INFLUX2_DATE_FORMATTER.format(now.toInstant()), INFLUX2_DATE_FORMATTER.format(tomorrow.toInstant()));
114114
assertThat(queryV2, equalTo(expectedQueryV2));
115115
}
116116

@@ -129,9 +129,9 @@ public void testValueOperator() {
129129
from(bucket:"origin")
130130
\t|> range(start:-100y, stop:100y)
131131
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
132-
\t|> keep(columns:["_measurement", "_time", "_value"])
133132
\t|> filter(fn: (r) => (r["_field"] == "value" and r["_value"] <= 90))
134-
\t|> sort(desc:true, columns:["_time"])"""));
133+
\t|> sort(desc:true, columns:["_time"])
134+
\t|> keep(columns:["_measurement", "_time", "_value"])"""));
135135
}
136136

137137
@Test
@@ -149,9 +149,9 @@ public void testPagination() {
149149
from(bucket:"origin")
150150
\t|> range(start:-100y, stop:100y)
151151
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
152-
\t|> keep(columns:["_measurement", "_time", "_value"])
153152
\t|> sort(desc:true, columns:["_time"])
154-
\t|> limit(n:10, offset:20)"""));
153+
\t|> limit(n:10, offset:20)
154+
\t|> keep(columns:["_measurement", "_time", "_value"])"""));
155155
}
156156

157157
@Test
@@ -168,8 +168,8 @@ public void testOrdering() {
168168
from(bucket:"origin")
169169
\t|> range(start:-100y, stop:100y)
170170
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
171-
\t|> keep(columns:["_measurement", "_time", "_value"])
172-
\t|> sort(desc:false, columns:["_time"])"""));
171+
\t|> sort(desc:false, columns:["_time"])
172+
\t|> keep(columns:["_measurement", "_time", "_value"])"""));
173173
}
174174

175175
@Test
@@ -182,8 +182,8 @@ public void testPreviousState() {
182182
from(bucket:"origin")
183183
\t|> range(start:-100y, stop:100y)
184184
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
185-
\t|> keep(columns:["_measurement", "_time", "_value"])
186-
\t|> last()"""));
185+
\t|> last()
186+
\t|> keep(columns:["_measurement", "_time", "_value"])"""));
187187
}
188188

189189
private FilterCriteria createBaseCriteria() {
@@ -210,8 +210,8 @@ public void testMeasurementNameFromMetadata() {
210210
\t|> range(start:-100y, stop:100y)
211211
\t|> filter(fn: (r) => r["_measurement"] == "measurementName")
212212
\t|> filter(fn: (r) => r["item"] == "sampleItem")
213-
\t|> keep(columns:["_measurement", "_time", "_value", "item"])
214-
\t|> sort(desc:true, columns:["_time"])"""));
213+
\t|> sort(desc:true, columns:["_time"])
214+
\t|> keep(columns:["_measurement", "_time", "_value", "item"])"""));
215215
when(metadataRegistry.get(metadataKey))
216216
.thenReturn(new Metadata(metadataKey, "", Map.of("key1", "val1", "key2", "val2")));
217217

@@ -224,7 +224,7 @@ public void testMeasurementNameFromMetadata() {
224224
from(bucket:"origin")
225225
\t|> range(start:-100y, stop:100y)
226226
\t|> filter(fn: (r) => r["_measurement"] == "sampleItem")
227-
\t|> keep(columns:["_measurement", "_time", "_value"])
228-
\t|> sort(desc:true, columns:["_time"])"""));
227+
\t|> sort(desc:true, columns:["_time"])
228+
\t|> keep(columns:["_measurement", "_time", "_value"])"""));
229229
}
230230
}

0 commit comments

Comments
 (0)