Skip to content

Commit f785b1a

Browse files
authored
fix: clustered by virtual columns that depended on virtual columns now correctly preserve these dependencies (#19262) (#19279)
changes: * adds `addRequiredVirtualColumns` method to `SegmentGenerationStageSpec` which resolves transitive virtual column dependencies for virtual columns used by clustering, fixing a bug where these dependent virtual columns would be lost in the shard spec and compaction state * adds `supportsRequiredRewrite` and `rewriteRequiredColumns` to `VirtualColumn` allowing a virtual column to rewrite its input references to equivalent names * adds `Expr.rewriteBindings` to rewrite identifier bindings in an `Expr` tree * `VirtualColumns.findEquivalent` is enhanced to transitively resolve dependent virtual columns across naming contexts before checking equivalence, enabling detection that e.g. `lower("v1")` ≡ `lower("v0")` when v0 and v1 are equivalent virtual columns * `FilterSegmentPruner` updated to use transitive equivalence when matching shard virtual columns to query virtual columns (with Optional-based caching to correctly handle nulls) * `Projections.matchQueryVirtualColumn` updated similarly * intern range shardspec dimension strings and virtual columns
1 parent ce84642 commit f785b1a

19 files changed

Lines changed: 739 additions & 28 deletions

File tree

embedded-tests/src/test/java/org/apache/druid/testing/embedded/compact/CompactionSupervisorTest.java

Lines changed: 182 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import org.apache.druid.segment.metadata.IndexingStateFingerprintMapper;
6868
import org.apache.druid.segment.transform.CompactionTransformSpec;
6969
import org.apache.druid.segment.virtual.ExpressionVirtualColumn;
70+
import org.apache.druid.segment.virtual.NestedFieldVirtualColumn;
7071
import org.apache.druid.server.compaction.CompactionCandidateSearchPolicy;
7172
import org.apache.druid.server.compaction.InlineReindexingRuleProvider;
7273
import org.apache.druid.server.compaction.MostFragmentedIntervalFirstPolicy;
@@ -709,6 +710,187 @@ public void test_compaction_cluster_by_virtualcolumn_rollup()
709710
);
710711
}
711712

713+
@Test
714+
public void test_compaction_cluster_by_nested_virtualcolumn()
715+
{
716+
// Virtual Columns on nested data is only supported with MSQ compaction engine right now.
717+
CompactionEngine compactionEngine = CompactionEngine.MSQ;
718+
configureCompaction(compactionEngine, null);
719+
720+
String jsonDataWithNestedColumn =
721+
"""
722+
{"timestamp": "2023-01-01T00:00:00", "str":"a", "obj":{"a": "LL"}}
723+
{"timestamp": "2023-01-01T00:00:00", "str":"", "obj":{"a": "MM"}}
724+
{"timestamp": "2023-01-01T00:00:00", "str":"null", "obj":{"a": "NN"}}
725+
{"timestamp": "2023-01-01T00:00:00", "str":"b", "obj":{"a": "OO"}}
726+
{"timestamp": "2023-01-01T00:00:00", "str":"c", "obj":{"a": "PP"}}
727+
{"timestamp": "2023-01-01T00:00:00", "str":"d", "obj":{"a": "QQ"}}
728+
{"timestamp": "2023-01-01T00:00:00", "str":null, "obj":{"a": "RR"}}
729+
""";
730+
731+
final TaskBuilder.Index task = TaskBuilder
732+
.ofTypeIndex()
733+
.dataSource(dataSource)
734+
.jsonInputFormat()
735+
.inlineInputSourceWithData(jsonDataWithNestedColumn)
736+
.isoTimestampColumn("timestamp")
737+
.schemaDiscovery()
738+
.granularitySpec("DAY", null, false);
739+
740+
cluster.callApi().runTask(task.withId(IdUtils.getRandomId()), overlord);
741+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
742+
743+
Assertions.assertEquals(7, getTotalRowCount());
744+
745+
// getClusterByVirtualColumnMappings does the order 'backwards' since it finds the column referenced by the
746+
// clustered by expression and then adds its dependencies after when collecting virtual columns. this test will
747+
// fail if that ever changes (unless we do something like make equals on VirtualColumns not care about order)
748+
VirtualColumns virtualColumns = VirtualColumns.create(
749+
new ExpressionVirtualColumn("v1", "lower(\"v0\")", ColumnType.STRING, TestExprMacroTable.INSTANCE),
750+
new NestedFieldVirtualColumn("obj", "$.a", "v0", ColumnType.STRING)
751+
);
752+
753+
InlineSchemaDataSourceCompactionConfig config =
754+
InlineSchemaDataSourceCompactionConfig
755+
.builder()
756+
.forDataSource(dataSource)
757+
.withSkipOffsetFromLatest(Period.seconds(0))
758+
.withTransformSpec(
759+
new CompactionTransformSpec(
760+
null,
761+
virtualColumns
762+
)
763+
)
764+
.withTuningConfig(
765+
UserCompactionTaskQueryTuningConfig
766+
.builder()
767+
.partitionsSpec(new DimensionRangePartitionsSpec(4, null, List.of("v1"), false))
768+
.build()
769+
)
770+
.build();
771+
772+
runCompactionWithSpec(config);
773+
waitForAllCompactionTasksToFinish();
774+
775+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
776+
777+
List<DataSegment> segments = cluster.callApi().getVisibleUsedSegments(dataSource, overlord).stream().toList();
778+
Assertions.assertEquals(2, segments.size());
779+
Assertions.assertEquals(
780+
new DimensionRangeShardSpec(
781+
List.of("v1"),
782+
virtualColumns,
783+
null,
784+
StringTuple.create("oo"),
785+
0,
786+
2
787+
),
788+
segments.get(0).getShardSpec()
789+
);
790+
Assertions.assertEquals(
791+
new DimensionRangeShardSpec(
792+
List.of("v1"),
793+
virtualColumns,
794+
StringTuple.create("oo"),
795+
null,
796+
1,
797+
2
798+
),
799+
segments.get(1).getShardSpec()
800+
);
801+
}
802+
803+
@Test
804+
public void test_compaction_cluster_by_nested_virtualcolumn_rollup()
805+
{
806+
// Virtual Columns on nested data is only supported with MSQ compaction engine right now.
807+
CompactionEngine compactionEngine = CompactionEngine.MSQ;
808+
configureCompaction(compactionEngine, null);
809+
810+
String jsonDataWithNestedColumn =
811+
"""
812+
{"timestamp": "2023-01-01T00:00:00", "str":"a", "obj":{"a": "LL"}}
813+
{"timestamp": "2023-01-01T00:00:00", "str":"", "obj":{"a": "MM"}}
814+
{"timestamp": "2023-01-01T00:00:00", "str":"null", "obj":{"a": "NN"}}
815+
{"timestamp": "2023-01-01T00:00:00", "str":"b", "obj":{"a": "OO"}}
816+
{"timestamp": "2023-01-01T00:00:00", "str":"c", "obj":{"a": "PP"}}
817+
{"timestamp": "2023-01-01T00:00:00", "str":"d", "obj":{"a": "QQ"}}
818+
{"timestamp": "2023-01-01T00:00:00", "str":null, "obj":{"a": "RR"}}
819+
""";
820+
821+
final TaskBuilder.Index task = TaskBuilder
822+
.ofTypeIndex()
823+
.dataSource(dataSource)
824+
.jsonInputFormat()
825+
.inlineInputSourceWithData(jsonDataWithNestedColumn)
826+
.isoTimestampColumn("timestamp")
827+
.schemaDiscovery()
828+
.dataSchema(builder -> builder.withAggregators(new CountAggregatorFactory("count")))
829+
.granularitySpec("DAY", "MINUTE", true);
830+
831+
cluster.callApi().runTask(task.withId(IdUtils.getRandomId()), overlord);
832+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
833+
834+
Assertions.assertEquals(7, getTotalRowCount());
835+
836+
// getClusterByVirtualColumnMappings does the order 'backwards' since it finds the column referenced by the
837+
// clustered by expression and then adds its dependencies after when collecting virtual columns. this test will
838+
// fail if that ever changes (unless we do something like make equals on VirtualColumns not care about order)
839+
VirtualColumns virtualColumns = VirtualColumns.create(
840+
new ExpressionVirtualColumn("v1", "lower(\"v0\")", ColumnType.STRING, TestExprMacroTable.INSTANCE),
841+
new NestedFieldVirtualColumn("obj", "$.a", "v0", ColumnType.STRING)
842+
);
843+
844+
InlineSchemaDataSourceCompactionConfig config =
845+
InlineSchemaDataSourceCompactionConfig
846+
.builder()
847+
.forDataSource(dataSource)
848+
.withSkipOffsetFromLatest(Period.seconds(0))
849+
.withTransformSpec(
850+
new CompactionTransformSpec(
851+
null,
852+
virtualColumns
853+
)
854+
)
855+
.withTuningConfig(
856+
UserCompactionTaskQueryTuningConfig
857+
.builder()
858+
.partitionsSpec(new DimensionRangePartitionsSpec(4, null, List.of("v1"), false))
859+
.build()
860+
)
861+
.build();
862+
863+
runCompactionWithSpec(config);
864+
waitForAllCompactionTasksToFinish();
865+
866+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
867+
868+
List<DataSegment> segments = cluster.callApi().getVisibleUsedSegments(dataSource, overlord).stream().toList();
869+
Assertions.assertEquals(2, segments.size());
870+
Assertions.assertEquals(
871+
new DimensionRangeShardSpec(
872+
List.of("v1"),
873+
virtualColumns,
874+
null,
875+
StringTuple.create("oo"),
876+
0,
877+
2
878+
),
879+
segments.get(0).getShardSpec()
880+
);
881+
Assertions.assertEquals(
882+
new DimensionRangeShardSpec(
883+
List.of("v1"),
884+
virtualColumns,
885+
StringTuple.create("oo"),
886+
null,
887+
1,
888+
2
889+
),
890+
segments.get(1).getShardSpec()
891+
);
892+
}
893+
712894
/**
713895
* Tests that when a compaction task filters out all rows using a transform spec,
714896
* tombstones are created to properly drop the old segments. This test covers both

embedded-tests/src/test/java/org/apache/druid/testing/embedded/msq/MultiStageQueryTest.java

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@
4646
import org.junit.jupiter.api.Test;
4747

4848
import java.io.File;
49+
import java.io.IOException;
50+
import java.nio.file.Files;
51+
import java.nio.file.Path;
4952
import java.util.ArrayList;
5053
import java.util.Arrays;
5154
import java.util.List;
@@ -267,6 +270,88 @@ CLUSTERED BY CONCAT(country, ':', city)
267270
assertClusterByVirtualColumnQueries();
268271
}
269272

273+
@Test
274+
public void testClusterByNestedVirtualColumn() throws IOException
275+
{
276+
final Path tempFile = createNestedJsonDataFile();
277+
final String sqlTemplate =
278+
"""
279+
SET rowsPerSegment = 4;
280+
SET groupByEnableMultiValueUnnesting = FALSE;
281+
REPLACE INTO %s OVERWRITE ALL
282+
WITH "ext" AS (
283+
SELECT *
284+
FROM TABLE(EXTERN('{"type":"local","files":["%s"]}', '{"type":"json"}'))
285+
EXTEND(
286+
"timestamp" VARCHAR,
287+
"str" VARCHAR,
288+
"obj" TYPE('COMPLEX<json>')
289+
)
290+
)
291+
SELECT
292+
TIME_PARSE("timestamp") AS __time,
293+
str,
294+
obj
295+
FROM "ext"
296+
PARTITIONED BY DAY
297+
CLUSTERED BY LOWER(JSON_VALUE(obj, '$.a' RETURNING VARCHAR))
298+
""";
299+
final String sql = StringUtils.format(
300+
sqlTemplate,
301+
dataSource,
302+
tempFile.toAbsolutePath()
303+
);
304+
305+
final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql);
306+
cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), overlord.latchableEmitter());
307+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
308+
309+
assertClusterByNestedVirtualColumnSegments();
310+
assertClusterByNestedVirtualColumnQueries();
311+
}
312+
313+
@Test
314+
public void testClusterByNestedVirtualColumnRollup() throws IOException
315+
{
316+
final Path tempFile = createNestedJsonDataFile();
317+
final String sqlTemplate =
318+
"""
319+
SET rowsPerSegment = 4;
320+
SET groupByEnableMultiValueUnnesting = FALSE;
321+
REPLACE INTO %s OVERWRITE ALL
322+
WITH "ext" AS (
323+
SELECT *
324+
FROM TABLE(EXTERN('{"type":"local","files":["%s"]}', '{"type":"json"}'))
325+
EXTEND(
326+
"timestamp" VARCHAR,
327+
"str" VARCHAR,
328+
"obj" TYPE('COMPLEX<json>')
329+
)
330+
)
331+
SELECT
332+
TIME_PARSE("timestamp") AS __time,
333+
str,
334+
obj,
335+
COUNT(*) AS cnt
336+
FROM "ext"
337+
GROUP BY TIME_PARSE("timestamp"), str, obj, LOWER(JSON_VALUE(obj, '$.a' RETURNING VARCHAR))
338+
PARTITIONED BY DAY
339+
CLUSTERED BY LOWER(JSON_VALUE(obj, '$.a' RETURNING VARCHAR))
340+
""";
341+
final String sql = StringUtils.format(
342+
sqlTemplate,
343+
dataSource,
344+
tempFile.toAbsolutePath()
345+
);
346+
347+
final SqlTaskStatus taskStatus = msqApis.submitTaskSql(sql);
348+
cluster.callApi().waitForTaskToSucceed(taskStatus.getTaskId(), overlord.latchableEmitter());
349+
cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator, broker);
350+
351+
assertClusterByNestedVirtualColumnSegments();
352+
assertClusterByNestedVirtualColumnQueries();
353+
}
354+
270355
private void assertClusterByVirtualColumnSegments()
271356
{
272357
List<DataSegment> segments = cluster.callApi().getVisibleUsedSegments(dataSource, overlord).stream().toList();
@@ -331,6 +416,93 @@ private void assertClusterByVirtualColumnQueries()
331416
Assertions.assertEquals(2, getSegmentsScannedForDartQuery(queryId));
332417
}
333418

419+
420+
private Path createNestedJsonDataFile() throws IOException
421+
{
422+
final Path tempFile = Files.createTempFile("nested-data", ".json");
423+
tempFile.toFile().deleteOnExit();
424+
Files.writeString(tempFile,
425+
"""
426+
{"timestamp": "2023-01-01T00:00:00", "str":"a", "obj":{"a": "A"}}
427+
{"timestamp": "2023-01-01T00:00:01", "str":"b", "obj":{"a": "A"}}
428+
{"timestamp": "2023-01-01T00:00:02", "str":"c", "obj":{"a": "B"}}
429+
{"timestamp": "2023-01-01T00:00:03", "str":"d", "obj":{"a": "A"}}
430+
{"timestamp": "2023-01-01T00:00:04", "str":"e", "obj":{"a": "B"}}
431+
{"timestamp": "2023-01-01T00:00:05", "str":"f", "obj":{"a": "A"}}
432+
{"timestamp": "2023-01-01T00:00:06", "str":"g", "obj":{"a": "A"}}
433+
"""
434+
);
435+
return tempFile;
436+
}
437+
438+
private void assertClusterByNestedVirtualColumnSegments()
439+
{
440+
// all rows in same time chunk, max rows is 4, so we expect 2 segments with a range split on 'a' since there are
441+
// 5 rows with 'A' and 2 rows with 'B'
442+
List<DataSegment> segments = cluster.callApi().getVisibleUsedSegments(dataSource, overlord).stream().toList();
443+
Assertions.assertEquals(2, segments.size());
444+
445+
final DimensionRangeShardSpec shardSpec0 = (DimensionRangeShardSpec) segments.get(0).getShardSpec();
446+
Assertions.assertEquals(1, shardSpec0.getDimensions().size());
447+
Assertions.assertFalse(shardSpec0.getVirtualColumns().isEmpty());
448+
Assertions.assertEquals(2, shardSpec0.getVirtualColumns().getVirtualColumns().length);
449+
Assertions.assertEquals(0, shardSpec0.getPartitionNum());
450+
451+
Assertions.assertNull(shardSpec0.getStartTuple());
452+
Assertions.assertEquals(StringTuple.create("a"), shardSpec0.getEndTuple());
453+
454+
final DimensionRangeShardSpec shardSpec1 = (DimensionRangeShardSpec) segments.get(1).getShardSpec();
455+
Assertions.assertEquals(shardSpec0.getDimensions(), shardSpec1.getDimensions());
456+
Assertions.assertEquals(shardSpec0.getVirtualColumns(), shardSpec1.getVirtualColumns());
457+
Assertions.assertEquals(1, shardSpec1.getPartitionNum());
458+
459+
Assertions.assertEquals(StringTuple.create("a"), shardSpec1.getStartTuple());
460+
Assertions.assertNull(shardSpec1.getEndTuple());
461+
}
462+
463+
private void assertClusterByNestedVirtualColumnQueries()
464+
{
465+
String queryId = UUID.randomUUID().toString();
466+
cluster.callApi().verifySqlQuery(
467+
"SET engine = 'msq-dart'; SET sqlQueryId = '" + queryId + "'; SELECT str FROM %s ORDER BY __time",
468+
dataSource,
469+
"""
470+
a
471+
b
472+
c
473+
d
474+
e
475+
f
476+
g"""
477+
);
478+
Assertions.assertEquals(2, getSegmentsScannedForDartQuery(queryId));
479+
480+
queryId = UUID.randomUUID().toString();
481+
cluster.callApi().verifySqlQuery(
482+
"SET engine = 'msq-dart'; SET sqlQueryId = '" + queryId + "'; SELECT str FROM %s WHERE LOWER(JSON_VALUE(obj, '$.a' RETURNING VARCHAR)) = 'b' ORDER BY __time",
483+
dataSource,
484+
"""
485+
c
486+
e"""
487+
);
488+
Assertions.assertEquals(1, getSegmentsScannedForDartQuery(queryId));
489+
490+
queryId = UUID.randomUUID().toString();
491+
cluster.callApi().verifySqlQuery(
492+
"SET engine = 'msq-dart'; SET sqlQueryId = '" + queryId + "'; SELECT str FROM %s WHERE LOWER(JSON_VALUE(obj, '$.a' RETURNING VARCHAR)) <= 'b' ORDER BY __time",
493+
dataSource,
494+
"""
495+
a
496+
b
497+
c
498+
d
499+
e
500+
f
501+
g"""
502+
);
503+
Assertions.assertEquals(2, getSegmentsScannedForDartQuery(queryId));
504+
}
505+
334506
private long getSegmentsScannedForDartQuery(String sqlQueryId)
335507
{
336508
ChannelCounters.Snapshot segmentChannelCounters = getDartSegmentChannelCounters(sqlQueryId);

0 commit comments

Comments
 (0)