diff --git a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java index 72e765d620dc..860a9d656d4b 100644 --- a/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java +++ b/multi-stage-query/src/main/java/org/apache/druid/msq/input/table/DataSegmentWithLocation.java @@ -27,6 +27,7 @@ import org.apache.druid.jackson.CommaListJoinDeserializer; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.server.coordination.DruidServerMetadata; +import org.apache.druid.timeline.ClusterGroupTuples; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.ShardSpec; @@ -54,8 +55,8 @@ private DataSegmentWithLocation( @JsonProperty("dimensions") @JsonDeserialize(using = CommaListJoinDeserializer.class) @Nullable List dimensions, @JsonProperty("metrics") @JsonDeserialize(using = CommaListJoinDeserializer.class) @Nullable List metrics, - @JsonProperty("projections") @JsonDeserialize(using = CommaListJoinDeserializer.class) @Nullable - List projections, + @JsonProperty("projections") @JsonDeserialize(using = CommaListJoinDeserializer.class) @Nullable List projections, + @JsonProperty("clusterGroups") @Nullable ClusterGroupTuples clusterGroups, @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec, @JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState, @JsonProperty("binaryVersion") Integer binaryVersion, @@ -74,6 +75,7 @@ private DataSegmentWithLocation( dimensions, metrics, projections, + clusterGroups, shardSpec, lastCompactionState, binaryVersion, @@ -98,6 +100,7 @@ public DataSegmentWithLocation( dataSegment.getDimensions(), dataSegment.getMetrics(), dataSegment.getProjections(), + dataSegment.getClusterGroups(), dataSegment.getShardSpec(), null, dataSegment.getBinaryVersion(), diff --git a/processing/src/main/java/org/apache/druid/segment/loading/PartialClusterGroupLoadSpec.java b/processing/src/main/java/org/apache/druid/segment/loading/PartialClusterGroupLoadSpec.java new file mode 100644 index 000000000000..86e562010c00 --- /dev/null +++ b/processing/src/main/java/org/apache/druid/segment/loading/PartialClusterGroupLoadSpec.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Preconditions; +import org.apache.druid.utils.CollectionUtils; + +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * A {@link PartialLoadSpec} that requests partial loading of a clustered segment's cluster groups. The base class + * carries the common {@code fingerprint} and {@code delegate} wire fields; this subtype adds the resolved + * {@code clusterGroupIndices} (positions into {@link org.apache.druid.timeline.ClusterGroupTuples#getTuples()}) that + * the historical should range-read into the local segment. + */ +@JsonTypeName(PartialClusterGroupLoadSpec.TYPE) +public class PartialClusterGroupLoadSpec extends PartialLoadSpec +{ + public static final String TYPE = "partialClusterGroup"; + + /** + * Builds the raw wire-form {@link Map} representation of a {@link PartialClusterGroupLoadSpec} request. Used by the + * coordinator-side matcher (which doesn't instantiate the typed class because doing so would require plumbing an + * {@link ObjectMapper} through every matcher just to satisfy the constructor's lazy-delegate supplier). + */ + public static Map wireForm( + Map delegate, + List clusterGroupIndices, + String fingerprint + ) + { + return Map.of( + "type", TYPE, + "delegate", delegate, + "clusterGroupIndices", clusterGroupIndices, + "fingerprint", fingerprint + ); + } + + private final List clusterGroupIndices; + + @JsonCreator + public PartialClusterGroupLoadSpec( + @JsonProperty("delegate") Map delegate, + @JsonProperty("clusterGroupIndices") List clusterGroupIndices, + @JsonProperty("fingerprint") String fingerprint, + @JacksonInject ObjectMapper jsonMapper + ) + { + super(delegate, fingerprint, jsonMapper); + Preconditions.checkArgument( + !CollectionUtils.isNullOrEmpty(clusterGroupIndices), + "clusterGroupIndices must not be null or empty" + ); + this.clusterGroupIndices = List.copyOf(clusterGroupIndices); + } + + @JsonProperty + public List getClusterGroupIndices() + { + return clusterGroupIndices; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PartialClusterGroupLoadSpec that = (PartialClusterGroupLoadSpec) o; + return Objects.equals(getDelegate(), that.getDelegate()) + && Objects.equals(clusterGroupIndices, that.clusterGroupIndices) + && Objects.equals(getFingerprint(), that.getFingerprint()); + } + + @Override + public int hashCode() + { + return Objects.hash(getDelegate(), clusterGroupIndices, getFingerprint()); + } + + @Override + public String toString() + { + return "PartialClusterGroupLoadSpec{" + + "delegate=" + getDelegate() + + ", clusterGroupIndices=" + clusterGroupIndices + + ", fingerprint=" + getFingerprint() + + '}'; + } +} diff --git a/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java b/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java index 07c642a28879..737a10655898 100644 --- a/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java +++ b/processing/src/main/java/org/apache/druid/segment/loading/PartialProjectionLoadSpec.java @@ -35,11 +35,6 @@ * A {@link PartialLoadSpec} that requests partial loading of a segment's projections. The base class carries the * common {@code fingerprint} and {@code delegate} wire fields; this subtype adds the resolved projection names that * the historical should range-read into the local segment. - *

- * The historical-side partial-load path inspects this wrapper at mount time. Until that path exists, the base - * class's default {@link #loadSegment} performs a full download via the inner delegate, and the announcement layer - * stamps the fingerprint + full size on the response so the coordinator's reconciler counts the replica as a - * satisfying full-fallback rather than re-queuing the load. */ @JsonTypeName(PartialProjectionLoadSpec.TYPE) public class PartialProjectionLoadSpec extends PartialLoadSpec diff --git a/processing/src/main/java/org/apache/druid/timeline/ClusterGroupTuples.java b/processing/src/main/java/org/apache/druid/timeline/ClusterGroupTuples.java new file mode 100644 index 000000000000..7b16295d3a1a --- /dev/null +++ b/processing/src/main/java/org/apache/druid/timeline/ClusterGroupTuples.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Objects; + +/** + * Typed clustering tuples carried on {@link DataSegment#getClusterGroups()} for clustered base-table segments. Each + * entry in {@link #tuples()} is one cluster group's clustering-column values, in the order declared by + * {@link #clusteringColumns()}. Optionally carries the clustering {@link VirtualColumns} when the segment was + * clustered on a virtual-column expression, so that matching for things like partial load rules and query time + * segment pruning can make use of this information. + *

+ * The compact constructor validates {@code clusteringColumns}, interns the virtual columns through + * {@link DataSegment#virtualColumnInterner()}, and canonicalizes every tuple value to its declared + * {@link ColumnType} via {@link #coerceValue} so {@link Object#equals} works across the JSON/programmatic boundary. + */ +public record ClusterGroupTuples( + @JsonProperty("clusteringColumns") RowSignature clusteringColumns, + @JsonProperty("virtualColumns") @JsonInclude(JsonInclude.Include.NON_EMPTY) VirtualColumns virtualColumns, + @JsonProperty("tuples") List> tuples +) +{ + @JsonCreator + public ClusterGroupTuples + { + if (clusteringColumns == null || clusteringColumns.size() == 0) { + throw InvalidInput.exception("clusteringColumns must not be null or empty"); + } + virtualColumns = internVirtualColumns(virtualColumns); + tuples = canonicalizeTuples(clusteringColumns, tuples); + } + + /** + * Convenience constructor for callers that don't carry clustering virtual columns. Equivalent to passing + * {@code null} for the virtual columns argument. + */ + public ClusterGroupTuples(RowSignature clusteringColumns, @Nullable List> tuples) + { + this(clusteringColumns, null, tuples); + } + + /** + * Canonicalize {@code raw} for the declared clustering column {@code type}. This is intentionally narrow: its job + * is to unbreak Jackson's number-type narrowing (e.g., an Integer arriving for a LONG column gets normalized to a + * Long), not to do general value coercion. Rules: + *

    + *
  • {@code null} → {@code null}.
  • + *
  • STRING: {@link Objects#toString} on any non-null value (stringifying numeric operator input is benign).
  • + *
  • LONG / DOUBLE / FLOAT: require {@link Number}; return via the matching primitive accessor. Strings, + * Booleans, etc. are rejected — typed rule authoring should produce typed JSON, and silently parsing + * strings risks accepting operator typos that change the matched set.
  • + *
+ * Unsupported column types (anything that isn't STRING/LONG/DOUBLE/FLOAT) are rejected. + *

+ * Used by: + *

    + *
  • {@link ClusterGroupTuples}'s compact constructor to canonicalize segment-side tuples (strict).
  • + *
  • Operator-supplied rule tuples in future cluster-group partial-load matchers, which can catch the + * exception and treat it as "no match for this segment" rather than a hard failure.
  • + *
+ */ + @Nullable + public static Object coerceValue(String columnName, ColumnType type, @Nullable Object raw) + { + if (raw == null) { + return null; + } + if (ColumnType.STRING.equals(type)) { + return raw instanceof String ? raw : Objects.toString(raw); + } + if (ColumnType.LONG.equals(type)) { + if (raw instanceof Number) { + return ((Number) raw).longValue(); + } + throw cannotCoerce(raw, columnName, "LONG"); + } + if (ColumnType.DOUBLE.equals(type)) { + if (raw instanceof Number) { + return ((Number) raw).doubleValue(); + } + throw cannotCoerce(raw, columnName, "DOUBLE"); + } + if (ColumnType.FLOAT.equals(type)) { + if (raw instanceof Number) { + return ((Number) raw).floatValue(); + } + throw cannotCoerce(raw, columnName, "FLOAT"); + } + throw InvalidInput.exception( + "Unsupported clustering column type [%s] for column [%s]; supported types are STRING, LONG, DOUBLE, FLOAT", + type, + columnName + ); + } + + private static VirtualColumns internVirtualColumns(@Nullable VirtualColumns virtualColumns) + { + if (virtualColumns == null || virtualColumns.isEmpty()) { + return VirtualColumns.EMPTY; + } + return VirtualColumns.create( + Arrays.stream(virtualColumns.getVirtualColumns()) + .map(DataSegment.virtualColumnInterner()::intern) + .toList() + ); + } + + private static List> canonicalizeTuples( + RowSignature clusteringColumns, + @Nullable List> tuples + ) + { + final List> source = tuples == null ? Collections.emptyList() : tuples; + final int numCols = clusteringColumns.size(); + final List> coerced = new ArrayList<>(source.size()); + for (int t = 0; t < source.size(); t++) { + final List tuple = source.get(t); + if (tuple == null || tuple.size() != numCols) { + throw InvalidInput.exception( + "tuple[%s] has size [%s] but clusteringColumns size is [%s]", + t, + tuple == null ? "null" : tuple.size(), + numCols + ); + } + final Object[] out = new Object[numCols]; + for (int i = 0; i < numCols; i++) { + final String name = clusteringColumns.getColumnName(i); + final ColumnType type = clusteringColumns.getColumnType(i).orElseThrow( + () -> InvalidInput.exception("clusteringColumn[%s] has no declared type", name) + ); + out[i] = coerceValue(name, type, tuple.get(i)); + } + coerced.add(Collections.unmodifiableList(Arrays.asList(out))); + } + return Collections.unmodifiableList(coerced); + } + + private static DruidException cannotCoerce(Object raw, String columnName, String targetType) + { + return InvalidInput.exception( + "Cannot coerce value [%s] of type [%s] for column [%s] to %s", + raw, + raw.getClass().getName(), + columnName, + targetType + ); + } +} diff --git a/processing/src/main/java/org/apache/druid/timeline/DataSegment.java b/processing/src/main/java/org/apache/druid/timeline/DataSegment.java index 601ea7613c53..f9375db5ae95 100644 --- a/processing/src/main/java/org/apache/druid/timeline/DataSegment.java +++ b/processing/src/main/java/org/apache/druid/timeline/DataSegment.java @@ -39,6 +39,7 @@ import org.apache.druid.jackson.CommaListJoinSerializer; import org.apache.druid.java.util.common.Intervals; import org.apache.druid.query.SegmentDescriptor; +import org.apache.druid.segment.VirtualColumn; import org.apache.druid.timeline.partition.NumberedShardSpec; import org.apache.druid.timeline.partition.ShardSpec; import org.joda.time.Interval; @@ -65,6 +66,18 @@ public static Interner stringInterner() return STRING_INTERNER; } + /** + * Shared canonical interner for {@link org.apache.druid.segment.VirtualColumn} instances embedded in segment-side + * metadata that lives in broker/coordinator memory. Used by callers that carry virtual columns through + * {@link DataSegment}'s wire form (clustering virtual columns on {@link ClusterGroupTuples}, + * {@link org.apache.druid.timeline.partition.BaseDimensionRangeShardSpec}'s range-clustering virtual columns) so + * identical VC definitions across segments collapse to a single instance in memory. + */ + public static Interner virtualColumnInterner() + { + return VIRTUAL_COLUMN_INTERNER; + } + /* * The difference between this class and org.apache.druid.segment.Segment is that this class contains the segment * metadata only, while org.apache.druid.segment.Segment represents the actual body of segment data, queryable. @@ -93,6 +106,8 @@ public static class PruneSpecsHolder private static final Interner> DIMENSIONS_INTERNER = Interners.newWeakInterner(); private static final Interner> METRICS_INTERNER = Interners.newWeakInterner(); private static final Interner> PROJECTIONS_INTERNER = Interners.newWeakInterner(); + private static final Interner CLUSTER_GROUPS_INTERNER = Interners.newWeakInterner(); + private static final Interner VIRTUAL_COLUMN_INTERNER = Interners.newWeakInterner(); private static final Interner COMPACTION_STATE_INTERNER = Interners.newWeakInterner(); private static final Map PRUNED_LOAD_SPEC = ImmutableMap.of( "load spec is pruned, because it's not needed on Brokers, but eats a lot of heap space", @@ -106,6 +121,13 @@ public static class PruneSpecsHolder private final List dimensions; private final List metrics; private final List projections; + /** + * Typed clustering tuples for clustered base-table segments, or {@code null} for non-clustered segments. Each tuple + * is one cluster group's clustering-column values in the order declared by + * {@link ClusterGroupTuples#getClusteringColumns()}. Consumed by cluster-group partial-load matchers. + */ + @Nullable + private final ClusterGroupTuples clusterGroups; private final ShardSpec shardSpec; /** @@ -154,6 +176,7 @@ public DataSegment( dimensions, metrics, null, + null, shardSpec, null, binaryVersion, @@ -189,6 +212,7 @@ public DataSegment( dimensions, metrics, null, + null, shardSpec, lastCompactionState, binaryVersion, @@ -199,6 +223,46 @@ public DataSegment( ); } + /** + * @deprecated use {@link #builder(SegmentId)} or {@link #builder(DataSegment)} instead. + */ + @Deprecated + public DataSegment( + String dataSource, + Interval interval, + String version, + @Nullable Map loadSpec, + @Nullable List dimensions, + @Nullable List metrics, + @Nullable List projections, + @Nullable ShardSpec shardSpec, + @Nullable CompactionState lastCompactionState, + Integer binaryVersion, + long size, + Integer totalRows, + String indexingStateFingerprint, + PruneSpecsHolder pruneSpecsHolder + ) + { + this( + dataSource, + interval, + version, + loadSpec, + dimensions, + metrics, + projections, + null, + shardSpec, + lastCompactionState, + binaryVersion, + size, + totalRows, + indexingStateFingerprint, + pruneSpecsHolder + ); + } + @JsonCreator private DataSegment( @JsonProperty("dataSource") String dataSource, @@ -212,6 +276,7 @@ private DataSegment( @JsonProperty("metrics") @JsonDeserialize(using = CommaListJoinDeserializer.class) @Nullable List metrics, @JsonProperty("projections") @JsonDeserialize(using = CommaListJoinDeserializer.class) @Nullable List projections, + @JsonProperty("clusterGroups") @Nullable ClusterGroupTuples clusterGroups, @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec, @JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState, @JsonProperty("binaryVersion") Integer binaryVersion, @@ -229,6 +294,7 @@ private DataSegment( dimensions, metrics, projections, + clusterGroups, shardSpec, lastCompactionState, binaryVersion, @@ -247,6 +313,7 @@ public DataSegment( @Nullable List dimensions, @Nullable List metrics, @Nullable List projections, + @Nullable ClusterGroupTuples clusterGroups, @Nullable ShardSpec shardSpec, @Nullable CompactionState lastCompactionState, Integer binaryVersion, @@ -264,6 +331,7 @@ public DataSegment( // A null value for projections means that this segment is not aware of projections (launched in druid 32). // An empty list means that this segment is projection-aware, but has no projections. this.projections = projections == null ? null : prepareWithInterner(projections, PROJECTIONS_INTERNER); + this.clusterGroups = prepareClusterGroups(clusterGroups); this.shardSpec = (shardSpec == null) ? new NumberedShardSpec(0, 1) : shardSpec; this.lastCompactionState = pruneSpecsHolder.pruneLastCompactionState ? null @@ -331,6 +399,14 @@ public List getProjections() return projections; } + @Nullable + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_NULL) + public ClusterGroupTuples getClusterGroups() + { + return clusterGroups; + } + @JsonProperty public ShardSpec getShardSpec() { @@ -461,6 +537,11 @@ public DataSegment withProjections(List projections) return builder(this).projections(projections).build(); } + public DataSegment withClusterGroups(@Nullable ClusterGroupTuples clusterGroups) + { + return builder(this).clusterGroups(clusterGroups).build(); + } + public DataSegment withShardSpec(ShardSpec newSpec) { return builder(this).shardSpec(newSpec).build(); @@ -527,6 +608,7 @@ public String toString() ", dimensions=" + dimensions + ", metrics=" + metrics + ", projections=" + projections + + ", clusterGroups=" + clusterGroups + ", shardSpec=" + shardSpec + ", lastCompactionState=" + lastCompactionState + ", size=" + size + @@ -559,6 +641,15 @@ private static CompactionState prepareCompactionState(@Nullable CompactionState return COMPACTION_STATE_INTERNER.intern(lastCompactionState); } + @Nullable + private static ClusterGroupTuples prepareClusterGroups(@Nullable ClusterGroupTuples clusterGroups) + { + if (clusterGroups == null) { + return null; + } + return CLUSTER_GROUPS_INTERNER.intern(clusterGroups); + } + /** * Returns a list of strings with all empty strings removed and all strings interned. *

@@ -606,6 +697,8 @@ public static class Builder private List dimensions; private List metrics; private List projections; + @Nullable + private ClusterGroupTuples clusterGroups; private ShardSpec shardSpec; private CompactionState lastCompactionState; private Integer binaryVersion; @@ -651,6 +744,7 @@ private Builder(DataSegment segment) this.dimensions = segment.getDimensions(); this.metrics = segment.getMetrics(); this.projections = segment.getProjections(); + this.clusterGroups = segment.getClusterGroups(); this.shardSpec = segment.getShardSpec(); this.lastCompactionState = segment.getLastCompactionState(); this.binaryVersion = segment.getBinaryVersion(); @@ -668,6 +762,7 @@ private Builder(DataSegment.Builder segmentBuilder) this.dimensions = segmentBuilder.dimensions; this.metrics = segmentBuilder.metrics; this.projections = segmentBuilder.projections; + this.clusterGroups = segmentBuilder.clusterGroups; this.shardSpec = segmentBuilder.shardSpec; this.lastCompactionState = segmentBuilder.lastCompactionState; this.binaryVersion = segmentBuilder.binaryVersion; @@ -718,6 +813,12 @@ public Builder projections(List projections) return this; } + public Builder clusterGroups(@Nullable ClusterGroupTuples clusterGroups) + { + this.clusterGroups = clusterGroups; + return this; + } + public Builder shardSpec(ShardSpec shardSpec) { this.shardSpec = shardSpec; @@ -770,6 +871,7 @@ public DataSegment build() dimensions, metrics, projections, + clusterGroups, shardSpec, lastCompactionState, binaryVersion, diff --git a/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java b/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java index d7a9d2018b08..36a761a0955b 100644 --- a/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java +++ b/processing/src/main/java/org/apache/druid/timeline/partition/BaseDimensionRangeShardSpec.java @@ -19,14 +19,11 @@ package org.apache.druid.timeline.partition; -import com.google.common.collect.Interner; -import com.google.common.collect.Interners; import com.google.common.collect.Ordering; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.StringTuple; import org.apache.druid.java.util.common.ISE; import org.apache.druid.java.util.common.guava.Comparators; -import org.apache.druid.segment.VirtualColumn; import org.apache.druid.segment.VirtualColumns; import org.apache.druid.timeline.DataSegment; @@ -37,8 +34,6 @@ public abstract class BaseDimensionRangeShardSpec implements ShardSpec { - private static final Interner VIRTUAL_COLUMN_INTERNER = Interners.newWeakInterner(); - protected final List dimensions; protected final VirtualColumns virtualColumns; @Nullable @@ -57,7 +52,7 @@ protected BaseDimensionRangeShardSpec( this.virtualColumns = virtualColumns == null ? VirtualColumns.EMPTY : VirtualColumns.create(Arrays.stream(virtualColumns.getVirtualColumns()) - .map(VIRTUAL_COLUMN_INTERNER::intern) + .map(DataSegment.virtualColumnInterner()::intern) .toList()); this.start = start; this.end = end; diff --git a/processing/src/test/java/org/apache/druid/segment/loading/PartialClusterGroupLoadSpecTest.java b/processing/src/test/java/org/apache/druid/segment/loading/PartialClusterGroupLoadSpecTest.java new file mode 100644 index 000000000000..5474617ea323 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/segment/loading/PartialClusterGroupLoadSpecTest.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.segment.loading; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.module.SimpleModule; +import com.google.common.collect.ImmutableMap; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nullable; +import java.io.ByteArrayInputStream; +import java.io.File; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +class PartialClusterGroupLoadSpecTest +{ + private static final Map DELEGATE = ImmutableMap.of( + "type", "stub", + "path", "/var/druid/segments/foo" + ); + private static final String FINGERPRINT = "v1:abcdef0123456789"; + + private static ObjectMapper configuredMapper() + { + final ObjectMapper m = new DefaultObjectMapper(); + final SimpleModule module = new SimpleModule(); + module.registerSubtypes(PartialClusterGroupLoadSpec.class, StubLoadSpec.class); + m.registerModule(module); + m.setInjectableValues(new InjectableValues.Std().addValue(ObjectMapper.class, m)); + return m; + } + + private final ObjectMapper jsonMapper = configuredMapper(); + + @Test + void testJsonRoundTrip() throws Exception + { + PartialClusterGroupLoadSpec spec = new PartialClusterGroupLoadSpec( + DELEGATE, + List.of(0, 2, 5), + FINGERPRINT, + jsonMapper + ); + String json = jsonMapper.writeValueAsString(spec); + LoadSpec reread = jsonMapper.readValue(json, LoadSpec.class); + Assertions.assertInstanceOf(PartialClusterGroupLoadSpec.class, reread); + Assertions.assertEquals(spec, reread); + } + + @Test + void testWireFormHasPartialClusterGroupType() throws Exception + { + PartialClusterGroupLoadSpec spec = new PartialClusterGroupLoadSpec( + DELEGATE, + List.of(0, 1), + FINGERPRINT, + jsonMapper + ); + Map wireForm = jsonMapper.readValue( + jsonMapper.writeValueAsString(spec), + new TypeReference<>() + { + } + ); + Assertions.assertEquals("partialClusterGroup", wireForm.get("type")); + Assertions.assertEquals(DELEGATE, wireForm.get("delegate")); + Assertions.assertEquals(List.of(0, 1), wireForm.get("clusterGroupIndices")); + Assertions.assertEquals(FINGERPRINT, wireForm.get("fingerprint")); + } + + @Test + void testLoadSegmentDelegatesToInner() throws Exception + { + PartialClusterGroupLoadSpec spec = new PartialClusterGroupLoadSpec( + DELEGATE, + List.of(0), + FINGERPRINT, + jsonMapper + ); + StubLoadSpec.LOAD_CALLS.set(0); + LoadSpec.LoadSpecResult result = spec.loadSegment(new File("/tmp/dest")); + Assertions.assertEquals(1, StubLoadSpec.LOAD_CALLS.get()); + Assertions.assertEquals(42L, result.getSize()); + } + + @Test + void testOpenRangeReaderDelegatesToInner() throws Exception + { + PartialClusterGroupLoadSpec spec = new PartialClusterGroupLoadSpec( + DELEGATE, + List.of(0), + FINGERPRINT, + jsonMapper + ); + StubLoadSpec.RANGE_CALLS.set(0); + SegmentRangeReader reader = spec.openRangeReader(); + Assertions.assertNotNull(reader); + Assertions.assertEquals(1, StubLoadSpec.RANGE_CALLS.get()); + } + + @Test + void testOpenRangeReaderReturnsNullWhenInnerDoesNotSupport() throws Exception + { + PartialClusterGroupLoadSpec spec = new PartialClusterGroupLoadSpec( + ImmutableMap.of("type", "stub", "path", "/", "supportsRange", false), + List.of(0), + FINGERPRINT, + jsonMapper + ); + Assertions.assertNull(spec.openRangeReader()); + } + + @Test + void testRejectsNullDelegate() + { + Assertions.assertThrows( + NullPointerException.class, + () -> new PartialClusterGroupLoadSpec(null, List.of(0), "v1:x", jsonMapper) + ); + } + + @Test + void testRejectsNullOrEmptyClusterGroupIndices() + { + Assertions.assertThrows( + IllegalArgumentException.class, + () -> new PartialClusterGroupLoadSpec(DELEGATE, null, "v1:x", jsonMapper) + ); + Assertions.assertThrows( + IllegalArgumentException.class, + () -> new PartialClusterGroupLoadSpec(DELEGATE, List.of(), "v1:x", jsonMapper) + ); + } + + @Test + void testRejectsNullFingerprint() + { + Assertions.assertThrows( + NullPointerException.class, + () -> new PartialClusterGroupLoadSpec(DELEGATE, List.of(0), null, jsonMapper) + ); + } + + /** + * Stub LoadSpec used to verify delegation. Uses the same JSON "type"=="stub" key as the test {@link #DELEGATE}. + */ + @JsonTypeName("stub") + public static class StubLoadSpec implements LoadSpec + { + static final AtomicInteger LOAD_CALLS = new AtomicInteger(0); + static final AtomicInteger RANGE_CALLS = new AtomicInteger(0); + + private final String path; + private final boolean supportsRange; + + @JsonCreator + public StubLoadSpec( + @JsonProperty("path") String path, + @JsonProperty("supportsRange") @Nullable Boolean supportsRange + ) + { + this.path = path; + this.supportsRange = supportsRange == null || supportsRange; + } + + @JsonProperty + public String getPath() + { + return path; + } + + @JsonProperty + public boolean isSupportsRange() + { + return supportsRange; + } + + @Override + public LoadSpecResult loadSegment(File destDir) + { + LOAD_CALLS.incrementAndGet(); + return new LoadSpecResult(42L); + } + + @Override + @Nullable + public SegmentRangeReader openRangeReader() + { + if (!supportsRange) { + return null; + } + RANGE_CALLS.incrementAndGet(); + return (filename, offset, length) -> new ByteArrayInputStream(new byte[0]); + } + } +} diff --git a/processing/src/test/java/org/apache/druid/timeline/ClusterGroupTuplesTest.java b/processing/src/test/java/org/apache/druid/timeline/ClusterGroupTuplesTest.java new file mode 100644 index 000000000000..674ad78c502e --- /dev/null +++ b/processing/src/test/java/org/apache/druid/timeline/ClusterGroupTuplesTest.java @@ -0,0 +1,360 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.error.DruidException; +import org.apache.druid.error.DruidExceptionMatcher; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.hamcrest.MatcherAssert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.List; + +class ClusterGroupTuplesTest +{ + private static final ObjectMapper MAPPER = new DefaultObjectMapper(); + private static final VirtualColumns VIRTUAL_COLUMNS = VirtualColumns.create( + new ExpressionVirtualColumn( + "tenant_lower", + "lower(tenant)", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + ) + ); + + private static RowSignature tenantRegion() + { + return RowSignature.builder() + .add("tenant", ColumnType.STRING) + .add("region", ColumnType.STRING) + .build(); + } + + private static RowSignature tenantPriority() + { + return RowSignature.builder() + .add("tenant", ColumnType.STRING) + .add("priority", ColumnType.LONG) + .build(); + } + + @Test + void testConstructorRejectsNullClusteringColumns() + { + MatcherAssert.assertThat( + Assertions.assertThrows( + DruidException.class, + () -> new ClusterGroupTuples(null, List.of(List.of("acme", "us-east-1"))) + ), + DruidExceptionMatcher.invalidInput().expectMessageContains("clusteringColumns must not be null or empty") + ); + } + + @Test + void testConstructorRejectsEmptyClusteringColumns() + { + MatcherAssert.assertThat( + Assertions.assertThrows( + DruidException.class, + () -> new ClusterGroupTuples(RowSignature.empty(), List.of()) + ), + DruidExceptionMatcher.invalidInput().expectMessageContains("clusteringColumns must not be null or empty") + ); + } + + @Test + void testConstructorAllowsEmptyTuples() + { + final ClusterGroupTuples groups = new ClusterGroupTuples(tenantRegion(), List.of()); + Assertions.assertTrue(groups.tuples().isEmpty()); + } + + @Test + void testConstructorAllowsNullTuplesList() + { + final ClusterGroupTuples groups = new ClusterGroupTuples(tenantRegion(), null); + Assertions.assertTrue(groups.tuples().isEmpty()); + } + + @Test + void testConstructorRejectsTupleLengthMismatch() + { + MatcherAssert.assertThat( + Assertions.assertThrows( + DruidException.class, + () -> new ClusterGroupTuples(tenantRegion(), List.of(List.of("acme"))) + ), + DruidExceptionMatcher.invalidInput() + .expectMessageContains("tuple[0] has size [1] but clusteringColumns size is [2]") + ); + } + + @Test + void testConstructorRejectsNullTuple() + { + MatcherAssert.assertThat( + Assertions.assertThrows( + DruidException.class, + () -> new ClusterGroupTuples(tenantRegion(), Arrays.asList(Arrays.asList("acme", "us-east-1"), null)) + ), + DruidExceptionMatcher.invalidInput() + .expectMessageContains("tuple[1] has size [null] but clusteringColumns size is [2]") + ); + } + + @Test + void testConstructorRejectsUntypedClusteringColumn() + { + final RowSignature untyped = RowSignature.builder().add("tenant", null).build(); + MatcherAssert.assertThat( + Assertions.assertThrows( + DruidException.class, + () -> new ClusterGroupTuples(untyped, List.of(List.of("acme"))) + ), + DruidExceptionMatcher.invalidInput().expectMessageContains("clusteringColumn[tenant] has no declared type") + ); + } + + @Test + void testConstructorRejectsUnsupportedColumnType() + { + final RowSignature arraySig = RowSignature.builder().add("arr", ColumnType.STRING_ARRAY).build(); + MatcherAssert.assertThat( + Assertions.assertThrows( + DruidException.class, + () -> new ClusterGroupTuples(arraySig, List.of(List.of(List.of("a")))) + ), + DruidExceptionMatcher.invalidInput().expectMessageContains("Unsupported clustering column type") + ); + } + + @Test + void testNullsAllowedAtAnyTuplePosition() + { + final ClusterGroupTuples groups = new ClusterGroupTuples( + tenantRegion(), + Arrays.asList( + Arrays.asList(null, "us-east-1"), + Arrays.asList("acme", null), + Arrays.asList(null, null) + ) + ); + Assertions.assertEquals(3, groups.tuples().size()); + Assertions.assertNull(groups.tuples().get(0).get(0)); + Assertions.assertNull(groups.tuples().get(1).get(1)); + Assertions.assertNull(groups.tuples().get(2).get(0)); + Assertions.assertNull(groups.tuples().get(2).get(1)); + } + + @Test + void testEqualsAndHashCodeNullSafe() + { + final ClusterGroupTuples a = new ClusterGroupTuples( + tenantRegion(), + Arrays.asList(Arrays.asList("acme", null), Arrays.asList(null, "us-east-1")) + ); + final ClusterGroupTuples b = new ClusterGroupTuples( + tenantRegion(), + Arrays.asList(Arrays.asList("acme", null), Arrays.asList(null, "us-east-1")) + ); + Assertions.assertEquals(a, b); + Assertions.assertEquals(a.hashCode(), b.hashCode()); + } + + @Test + void testCoercionIntegerToLong() + { + final ClusterGroupTuples groups = new ClusterGroupTuples( + tenantPriority(), + List.of(List.of("acme", Integer.valueOf(5))) + ); + Assertions.assertEquals(Long.class, groups.tuples().get(0).get(1).getClass()); + Assertions.assertEquals(5L, groups.tuples().get(0).get(1)); + } + + @Test + void testCoercionStringRejectedForLong() + { + // Coercion is intentionally narrow: only Number-family inputs are normalized. A String numeric value is rejected + // rather than parsed, so operator typos don't silently broaden the matched set in future rule consumers. + MatcherAssert.assertThat( + Assertions.assertThrows( + DruidException.class, + () -> new ClusterGroupTuples(tenantPriority(), List.of(List.of("acme", "42"))) + ), + DruidExceptionMatcher.invalidInput() + .expectMessageContains("Cannot coerce value [42] of type [java.lang.String] for column [priority] to LONG") + ); + } + + @Test + void testCoercionDoubleToFloat() + { + final RowSignature sig = RowSignature.builder().add("temp", ColumnType.FLOAT).build(); + final ClusterGroupTuples groups = new ClusterGroupTuples(sig, List.of(List.of((Object) Double.valueOf(98.6)))); + Assertions.assertEquals(Float.class, groups.tuples().get(0).get(0).getClass()); + Assertions.assertEquals(98.6f, (Float) groups.tuples().get(0).get(0), 0.0001f); + } + + @Test + void testCoercionStringRejectedForDouble() + { + final RowSignature sig = RowSignature.builder().add("v", ColumnType.DOUBLE).build(); + MatcherAssert.assertThat( + Assertions.assertThrows( + DruidException.class, + () -> new ClusterGroupTuples(sig, List.of(List.of("3.14"))) + ), + DruidExceptionMatcher.invalidInput() + .expectMessageContains("Cannot coerce value [3.14] of type [java.lang.String] for column [v] to DOUBLE") + ); + } + + @Test + void testCoercionBooleanRejectedForLong() + { + MatcherAssert.assertThat( + Assertions.assertThrows( + DruidException.class, + () -> new ClusterGroupTuples(tenantPriority(), List.of(List.of("acme", (Object) Boolean.TRUE))) + ), + DruidExceptionMatcher.invalidInput() + .expectMessageContains("Cannot coerce value [true] of type [java.lang.Boolean] for column [priority] to LONG") + ); + } + + @Test + void testCoercionAcceptsAnyTypeForString() + { + final RowSignature sig = RowSignature.builder().add("v", ColumnType.STRING).build(); + final ClusterGroupTuples groups = new ClusterGroupTuples(sig, List.of(List.of((Object) Long.valueOf(7)))); + Assertions.assertEquals("7", groups.tuples().get(0).get(0)); + } + + @Test + void testJsonRoundTripPreservesCoercedTypes() throws Exception + { + // Both small Integer (Jackson default) and large Long pass through coercion to canonical Long. + final ClusterGroupTuples groups = new ClusterGroupTuples( + tenantPriority(), + List.of(List.of("acme", (Object) 5), List.of("globex", (Object) 5_000_000_000L)) + ); + final String json = MAPPER.writeValueAsString(groups); + final ClusterGroupTuples back = MAPPER.readValue(json, ClusterGroupTuples.class); + Assertions.assertEquals(groups, back); + // Round-tripped tuples must end up with the same canonical types as the in-memory original. + Assertions.assertEquals(Long.class, back.tuples().get(0).get(1).getClass()); + Assertions.assertEquals(Long.class, back.tuples().get(1).get(1).getClass()); + } + + @Test + void testTuplesAreImmutable() + { + final ClusterGroupTuples groups = new ClusterGroupTuples( + tenantRegion(), + List.of(List.of("acme", "us-east-1")) + ); + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> groups.tuples().add(List.of("globex", "us-east-1")) + ); + Assertions.assertThrows( + UnsupportedOperationException.class, + () -> groups.tuples().get(0).set(0, "hijacked") + ); + } + + @Test + void testVirtualColumnsDefaultEmpty() + { + final ClusterGroupTuples groups = new ClusterGroupTuples(tenantRegion(), List.of()); + Assertions.assertSame(VirtualColumns.EMPTY, groups.virtualColumns()); + } + + @Test + void testVirtualColumnsAreStored() + { + final ClusterGroupTuples groups = new ClusterGroupTuples( + RowSignature.builder().add("tenant_lower", ColumnType.STRING).build(), + VIRTUAL_COLUMNS, + List.of(List.of("acme")) + ); + Assertions.assertNotNull(groups.virtualColumns().getVirtualColumn("tenant_lower")); + } + + @Test + void testVirtualColumnsJsonRoundTrip() throws Exception + { + final ClusterGroupTuples original = new ClusterGroupTuples( + RowSignature.builder().add("tenant_lower", ColumnType.STRING).build(), + VIRTUAL_COLUMNS, + List.of(List.of("acme")) + ); + // Round-trip needs an injectable ExprMacroTable for ExpressionVirtualColumn deserialization. + final ObjectMapper mapper = new DefaultObjectMapper(); + mapper.setInjectableValues( + new InjectableValues.Std() + .addValue(ExprMacroTable.class, TestExprMacroTable.INSTANCE) + ); + final String json = mapper.writeValueAsString(original); + Assertions.assertTrue(json.contains("\"virtualColumns\""), () -> "expected virtualColumns in JSON: " + json); + final ClusterGroupTuples back = mapper.readValue(json, ClusterGroupTuples.class); + Assertions.assertEquals(original, back); + } + + @Test + void testVirtualColumnsOmittedFromJsonWhenEmpty() throws Exception + { + final ClusterGroupTuples groups = new ClusterGroupTuples(tenantRegion(), List.of(List.of("acme", "us-east-1"))); + final String json = MAPPER.writeValueAsString(groups); + Assertions.assertFalse(json.contains("virtualColumns"), () -> "did not expect virtualColumns in JSON: " + json); + } + + @Test + void testVirtualColumnInternerSharesAcrossInstances() + { + // Two ClusterGroupTuples built from independent (but equal) VC inputs should share their VC instances via the + // shared interner on DataSegment, so identical clustering VCs dedupe across segments held in memory. + final ClusterGroupTuples a = new ClusterGroupTuples( + RowSignature.builder().add("tenant_lower", ColumnType.STRING).build(), + VIRTUAL_COLUMNS, + List.of(List.of("acme")) + ); + final ClusterGroupTuples b = new ClusterGroupTuples( + RowSignature.builder().add("tenant_lower", ColumnType.STRING).build(), + VIRTUAL_COLUMNS, + List.of(List.of("globex")) + ); + Assertions.assertSame( + a.virtualColumns().getVirtualColumns()[0], + b.virtualColumns().getVirtualColumns()[0] + ); + } +} diff --git a/processing/src/test/java/org/apache/druid/timeline/DataSegmentClusterGroupsTest.java b/processing/src/test/java/org/apache/druid/timeline/DataSegmentClusterGroupsTest.java new file mode 100644 index 000000000000..a0f54db18656 --- /dev/null +++ b/processing/src/test/java/org/apache/druid/timeline/DataSegmentClusterGroupsTest.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.timeline; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.List; + +/** + * Coverage for {@link DataSegment#getClusterGroups()} and the associated builder / interner plumbing. + */ +class DataSegmentClusterGroupsTest +{ + private final ObjectMapper mapper = new DefaultObjectMapper(); + + @BeforeEach + void setUp() + { + final InjectableValues.Std injectables = new InjectableValues.Std(); + injectables.addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT); + mapper.setInjectableValues(injectables); + } + + private static RowSignature tenantRegion() + { + return RowSignature.builder() + .add("tenant", ColumnType.STRING) + .add("region", ColumnType.STRING) + .build(); + } + + private static ClusterGroupTuples sampleGroups() + { + return new ClusterGroupTuples( + tenantRegion(), + List.of(List.of("acme", "us-east-1"), List.of("globex", "us-east-1")) + ); + } + + private DataSegment.Builder baseBuilder() + { + return DataSegment.builder(SegmentId.of( + "ds", + Intervals.of("2026-01-01/2026-01-02"), + "v", + new NumberedShardSpec(0, 1) + )).size(1L); + } + + @Test + void testNullByDefault() + { + final DataSegment segment = baseBuilder().build(); + Assertions.assertNull(segment.getClusterGroups()); + } + + @Test + void testBuilderAttachesClusterGroups() + { + final ClusterGroupTuples groups = sampleGroups(); + final DataSegment segment = baseBuilder().clusterGroups(groups).build(); + Assertions.assertEquals(groups, segment.getClusterGroups()); + } + + @Test + void testBuilderCopyPreservesClusterGroups() + { + final ClusterGroupTuples groups = sampleGroups(); + final DataSegment original = baseBuilder().clusterGroups(groups).build(); + final DataSegment copy = DataSegment.builder(original).build(); + Assertions.assertSame(original.getClusterGroups(), copy.getClusterGroups()); + } + + @Test + void testWithClusterGroupsCanClearField() + { + final DataSegment original = baseBuilder().clusterGroups(sampleGroups()).build(); + final DataSegment cleared = original.withClusterGroups(null); + Assertions.assertNull(cleared.getClusterGroups()); + } + + @Test + void testJsonRoundTripWithClusterGroups() throws Exception + { + final DataSegment segment = baseBuilder().clusterGroups(sampleGroups()).build(); + final String json = mapper.writeValueAsString(segment); + Assertions.assertTrue(json.contains("\"clusterGroups\""), () -> "expected clusterGroups in JSON: " + json); + final DataSegment back = mapper.readValue(json, DataSegment.class); + Assertions.assertEquals(segment.getClusterGroups(), back.getClusterGroups()); + } + + @Test + void testJsonRoundTripOmitsNullClusterGroups() throws Exception + { + final DataSegment segment = baseBuilder().build(); + final String json = mapper.writeValueAsString(segment); + Assertions.assertFalse(json.contains("clusterGroups"), () -> "did not expect clusterGroups in JSON: " + json); + final DataSegment back = mapper.readValue(json, DataSegment.class); + Assertions.assertNull(back.getClusterGroups()); + } + + @Test + void testOlderJsonWithoutFieldDeserializes() throws Exception + { + // A representative pre-clusterGroups JSON shape; the field is simply absent. Older readers (and segments published + // before this PR) don't carry it. + final String legacyJson = + "{\"dataSource\":\"ds\"," + + "\"interval\":\"2026-01-01T00:00:00.000Z/2026-01-02T00:00:00.000Z\"," + + "\"version\":\"v\"," + + "\"loadSpec\":{}," + + "\"dimensions\":\"\",\"metrics\":\"\"," + + "\"shardSpec\":{\"type\":\"numbered\",\"partitionNum\":0,\"partitions\":1}," + + "\"binaryVersion\":0,\"size\":1}"; + final DataSegment back = mapper.readValue(legacyJson, DataSegment.class); + Assertions.assertNull(back.getClusterGroups()); + } + + @Test + void testInternerReturnsSameInstanceAcrossSegments() + { + final DataSegment a = baseBuilder().clusterGroups(sampleGroups()).build(); + // sampleGroups() builds a fresh instance each call; the interner should dedupe identical content. + final DataSegment b = DataSegment.builder(SegmentId.of( + "ds-other", + Intervals.of("2026-01-01/2026-01-02"), + "v", + new NumberedShardSpec(0, 1) + )).size(1L).clusterGroups(sampleGroups()).build(); + Assertions.assertSame(a.getClusterGroups(), b.getClusterGroups()); + } + + @Test + void testInternerDistinguishesDifferentContent() + { + final DataSegment a = baseBuilder().clusterGroups(sampleGroups()).build(); + final ClusterGroupTuples otherGroups = new ClusterGroupTuples( + tenantRegion(), + List.of(List.of("globex", "us-west-2")) + ); + final DataSegment b = DataSegment.builder(SegmentId.of( + "ds-other", + Intervals.of("2026-01-01/2026-01-02"), + "v", + new NumberedShardSpec(0, 1) + )).size(1L).clusterGroups(otherGroups).build(); + Assertions.assertNotSame(a.getClusterGroups(), b.getClusterGroups()); + } +} diff --git a/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java b/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java index d1bc533269cd..d60ca44c12fc 100644 --- a/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java +++ b/server/src/main/java/org/apache/druid/guice/PartialLoadSpecModule.java @@ -24,13 +24,15 @@ import com.google.inject.Binder; import org.apache.druid.initialization.DruidModule; import org.apache.druid.segment.loading.LoadSpec; +import org.apache.druid.segment.loading.PartialClusterGroupLoadSpec; import org.apache.druid.segment.loading.PartialProjectionLoadSpec; import java.util.List; /** - * Registers {@link PartialProjectionLoadSpec} as a {@link LoadSpec} subtype for serde of partial load rules. This - * module is added to the always-loaded core list so it is available alongside any other deep-storage load spec modules. + * Registers {@link PartialProjectionLoadSpec} and {@link PartialClusterGroupLoadSpec} as {@link LoadSpec} subtypes + * for serde of partial load rules. This module is added to the always-loaded core list so they are available + * alongside any other deep-storage load spec modules. */ public class PartialLoadSpecModule implements DruidModule { @@ -43,6 +45,11 @@ public void configure(Binder binder) @Override public List getJacksonModules() { - return List.of(new SimpleModule().registerSubtypes(PartialProjectionLoadSpec.class)); + return List.of( + new SimpleModule().registerSubtypes( + PartialProjectionLoadSpec.class, + PartialClusterGroupLoadSpec.class + ) + ); } } diff --git a/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java index 427c670cf4cf..d853c675890a 100644 --- a/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java +++ b/server/src/main/java/org/apache/druid/server/coordination/LoadableDataSegment.java @@ -24,6 +24,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.druid.jackson.CommaListJoinDeserializer; import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.timeline.ClusterGroupTuples; import org.apache.druid.timeline.CompactionState; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.ShardSpec; @@ -50,6 +51,7 @@ private LoadableDataSegment( @JsonProperty("dimensions") @JsonDeserialize(using = CommaListJoinDeserializer.class) @Nullable List dimensions, @JsonProperty("metrics") @JsonDeserialize(using = CommaListJoinDeserializer.class) @Nullable List metrics, @JsonProperty("projections") @JsonDeserialize(using = CommaListJoinDeserializer.class) @Nullable List projections, + @JsonProperty("clusterGroups") @Nullable ClusterGroupTuples clusterGroups, @JsonProperty("shardSpec") @Nullable ShardSpec shardSpec, @JsonProperty("lastCompactionState") @Nullable CompactionState lastCompactionState, @JsonProperty("binaryVersion") Integer binaryVersion, @@ -66,6 +68,7 @@ private LoadableDataSegment( dimensions, metrics, projections, + clusterGroups, shardSpec, lastCompactionState, binaryVersion, diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/ClusterGroupPartialLoadMatcher.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/ClusterGroupPartialLoadMatcher.java new file mode 100644 index 000000000000..2dd50818def8 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/ClusterGroupPartialLoadMatcher.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.rules; + +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import com.google.common.io.BaseEncoding; +import org.apache.druid.segment.loading.PartialClusterGroupLoadSpec; +import org.apache.druid.timeline.DataSegment; + +import javax.annotation.Nullable; +import java.util.List; +import java.util.Locale; +import java.util.Map; + +/** + * Base for {@link PartialLoadMatcher} implementations that decide which of a clustered segment's cluster groups to + * partially load. Subclasses supply the resolution policy via {@link #resolveClusterGroupIndices(DataSegment)}; the + * sorted, deduped indices into {@code segment.getClusterGroups().getTuples()}, and this base handles fingerprint + * computation and wraps the result into the {@code partialClusterGroup} load-spec wire form consumed by the + * historical-side partial loader. + *

+ * The fingerprint is a hash of the resolved indices for a segment; the data node includes this value in the segment + * announcement so the coordinator can detect rule changes between runs and reconcile loaded replicas. + */ +public abstract class ClusterGroupPartialLoadMatcher implements PartialLoadMatcher +{ + static final String FINGERPRINT_VERSION = "v1"; + + /** + * Returns the sorted, deduped list of indices into {@code segment.getClusterGroups().getTuples()} selected by this + * matcher. Returns an empty list when nothing matches (the segment is not clustered, or no configured pattern / + * tuple intersects what the segment has). + */ + protected abstract List resolveClusterGroupIndices(DataSegment segment); + + @Override + @Nullable + public MatchResult match(DataSegment segment, Map baseLoadSpec) + { + if (segment.getClusterGroups() == null) { + return null; + } + final List resolved = resolveClusterGroupIndices(segment); + if (resolved.isEmpty()) { + return null; + } + final String fingerprint = computeFingerprint(resolved); + return new MatchResult(PartialClusterGroupLoadSpec.wireForm(baseLoadSpec, resolved, fingerprint), fingerprint); + } + + static String computeFingerprint(List sortedDedupedIndices) + { + final Hasher hasher = Hashing.sha256().newHasher(); + for (Integer idx : sortedDedupedIndices) { + hasher.putInt(idx); + } + final String hex = BaseEncoding.base16().encode(hasher.hash().asBytes()).toLowerCase(Locale.ROOT); + // should be good enough without dragging the whole thing around for every segment + return FINGERPRINT_VERSION + ":" + hex.substring(0, 16); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/Globs.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/Globs.java new file mode 100644 index 000000000000..b0f81b98a563 --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/Globs.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.rules; + +import org.apache.druid.error.DruidException; +import org.apache.druid.error.InvalidInput; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; + +/** + * Shared glob compilation + matching helpers used by partial-load matchers that match operator-supplied glob patterns + * against strings (projection names, stringified cluster-group values, etc.). Supported metacharacters: + *

    + *
  • {@code *} — any sequence of characters (including empty)
  • + *
  • {@code ?} — any single character
  • + *
  • {@code \} — escapes the following character so it is treated literally; use {@code \*}, {@code \?}, or + * {@code \\} to match a literal {@code *}, {@code ?}, or {@code \}. A trailing unescaped {@code \} is + * rejected.
  • + *
+ * Other characters are literal; regex metacharacters in literal positions are escaped automatically. + */ +public final class Globs +{ + private Globs() + { + // no instantiation + } + + /** + * Translates a glob pattern into a regex pattern that matches the entire input string. + * + * @throws org.apache.druid.error.DruidException if {@code glob} ends with an unescaped backslash + */ + public static String globToRegex(String glob) + { + final StringBuilder sb = new StringBuilder(glob.length() + 4); + boolean escaping = false; + for (int i = 0; i < glob.length(); i++) { + final char c = glob.charAt(i); + if (escaping) { + appendLiteral(sb, c); + escaping = false; + continue; + } + switch (c) { + case '\\': + escaping = true; + break; + case '*': + sb.append(".*"); + break; + case '?': + sb.append('.'); + break; + default: + appendLiteral(sb, c); + } + } + if (escaping) { + throw InvalidInput.exception("Glob pattern [%s] ends with an unescaped backslash", glob); + } + return sb.toString(); + } + + public static List compileAll(List globs) + { + if (globs.isEmpty()) { + return List.of(); + } + final List compiled = new ArrayList<>(globs.size()); + for (String glob : globs) { + compiled.add(Pattern.compile(globToRegex(glob))); + } + return List.copyOf(compiled); + } + + public static boolean matchesAny(String name, List patterns) + { + for (Pattern pattern : patterns) { + if (pattern.matcher(name).matches()) { + return true; + } + } + return false; + } + + /** + * Compile a glob, special-casing the literal {@code "*"} to a {@link CompiledGlob#matchAny} marker that matches any + * value including null + */ + public static CompiledGlob compile(String glob) + { + if ("*".equals(glob)) { + return CompiledGlob.MATCH_ANY; + } + return new CompiledGlob(false, Pattern.compile(globToRegex(glob))); + } + + private static void appendLiteral(StringBuilder sb, char c) + { + switch (c) { + case '.': + case '(': + case ')': + case '[': + case ']': + case '{': + case '}': + case '+': + case '|': + case '^': + case '$': + case '\\': + case '*': + case '?': + sb.append('\\').append(c); + break; + default: + sb.append(c); + } + } + + /** + * Compiled, per-column glob. The literal {@code "*"} short-circuits to a "match any value, including null" flag; + * any other glob compiles to a regex matched against the rendered tuple value (and never matches null). + */ + public static final class CompiledGlob + { + public static final CompiledGlob MATCH_ANY = new CompiledGlob(true, null); + + final boolean matchAny; + @Nullable + final Pattern pattern; + + private CompiledGlob(boolean matchAny, @Nullable Pattern pattern) + { + if (matchAny) { + DruidException.conditionalDefensive(pattern == null, "matchAny=true must not carry a pattern"); + } else { + DruidException.conditionalDefensive(pattern != null, "pattern must be non-null when matchAny=false"); + } + this.matchAny = matchAny; + this.pattern = pattern; + } + + public boolean matches(@Nullable String value) + { + if (matchAny) { + return true; + } + if (value == null) { + return false; + } + return pattern.matcher(value).matches(); + } + + public boolean isMatchAny() + { + return matchAny; + } + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java index b1c9c78ace11..65c69113b627 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java @@ -37,7 +37,8 @@ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = UnknownPartialLoadMatcher.class) @JsonSubTypes({ @JsonSubTypes.Type(name = ExactProjectionPartialLoadMatcher.TYPE, value = ExactProjectionPartialLoadMatcher.class), - @JsonSubTypes.Type(name = WildcardProjectionPartialLoadMatcher.TYPE, value = WildcardProjectionPartialLoadMatcher.class) + @JsonSubTypes.Type(name = WildcardProjectionPartialLoadMatcher.TYPE, value = WildcardProjectionPartialLoadMatcher.class), + @JsonSubTypes.Type(name = WildcardClusterGroupPartialLoadMatcher.TYPE, value = WildcardClusterGroupPartialLoadMatcher.class) }) public interface PartialLoadMatcher { diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardClusterGroupPartialLoadMatcher.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardClusterGroupPartialLoadMatcher.java new file mode 100644 index 000000000000..bf88d2952d0c --- /dev/null +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardClusterGroupPartialLoadMatcher.java @@ -0,0 +1,339 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.rules; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.segment.VirtualColumn; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.timeline.ClusterGroupTuples; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.utils.CollectionUtils; + +import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; + +/** + * Selects cluster groups whose clustering tuples match any of the configured per-column glob patterns, minus any + * groups matched by an entry in {@code excludePatterns}. Each pattern is a {@code Map} where keys are + * clustering column names (or operator-side virtual column names, see below) and values are glob patterns matched + * against the rendered tuple value at that column. Columns omitted from a pattern are treated as wildcards. Glob + * syntax (including escape semantics) is shared with {@link WildcardProjectionPartialLoadMatcher} via {@link Globs}. + *

+ * If the operator supplies {@link #getVirtualColumns()}, a pattern key may also reference one of those virtual + * columns. At match time, the matcher resolves such a key to a clustering column on the segment via + * {@link VirtualColumns#findEquivalent(VirtualColumns.Node)} between the matchers VCs and the segment's clustering + * VCs (carried on {@link ClusterGroupTuples#virtualColumns()}). This lets operators author portable rules, they + * write their preferred VC name and expression, and the matcher resolves to whatever name the segment happens to use + * for the equivalent clustering VC. + *

+ * If the operator-side VC for a pattern key has no equivalent clustering VC on the segment, the pattern is treated as + * non-matching for that segment (defensive against typos or schema drift). The operator-VC-first ordering also + * disambiguates the shadowing case where an operator-VC and a clustering column share a name: the operator-VC + * interpretation wins, and a pattern is only matchable when the VCs are actually equivalent. + *

+ * Null clustering values are matched only by omitting the column entirely or using the literal {@code "*"} glob; both + * have explicit "match-anything-including-null" semantics. Any other glob (including {@code "null"}) does not match a + * null clustering value. Matching specifically by null clustering values (e.g., load only the null-keyed group) is + * not supported by this matcher and is left for a future typed-tuple matcher variant. + */ +public class WildcardClusterGroupPartialLoadMatcher extends ClusterGroupPartialLoadMatcher +{ + public static final String TYPE = "globClusterGroup"; + + private final List> patterns; + private final List> excludePatterns; + private final VirtualColumns virtualColumns; + private final List> compiledPatterns; + private final List> compiledExcludePatterns; + + @JsonCreator + public WildcardClusterGroupPartialLoadMatcher( + @JsonProperty("patterns") List> patterns, + @JsonProperty("excludePatterns") @Nullable List> excludePatterns, + @JsonProperty("virtualColumns") @Nullable VirtualColumns virtualColumns + ) + { + if (patterns == null || patterns.isEmpty()) { + throw InvalidInput.exception("patterns must not be null or empty for globClusterGroup matcher"); + } + this.patterns = copyAndValidatePatterns(patterns, "patterns"); + this.excludePatterns = excludePatterns == null + ? List.of() + : copyAndValidatePatterns(excludePatterns, "excludePatterns"); + this.virtualColumns = internVirtualColumns(virtualColumns); + this.compiledPatterns = compileAll(this.patterns); + this.compiledExcludePatterns = compileAll(this.excludePatterns); + } + + /** + * Convenience constructor for callers that don't carry operator-side virtual columns. Equivalent to passing + * {@code null} for the virtual columns argument. + */ + public WildcardClusterGroupPartialLoadMatcher( + List> patterns, + @Nullable List> excludePatterns + ) + { + this(patterns, excludePatterns, null); + } + + @JsonProperty + public List> getPatterns() + { + return patterns; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_EMPTY) + public List> getExcludePatterns() + { + return excludePatterns; + } + + @JsonProperty + @JsonInclude(JsonInclude.Include.NON_EMPTY) + public VirtualColumns getVirtualColumns() + { + return virtualColumns; + } + + @Override + protected List resolveClusterGroupIndices(DataSegment segment) + { + final ClusterGroupTuples clusterGroups = segment.getClusterGroups(); + if (clusterGroups == null) { + return Collections.emptyList(); + } + final RowSignature clusteringColumns = clusterGroups.clusteringColumns(); + final VirtualColumns segmentVcs = clusterGroups.virtualColumns(); + final List> tuples = clusterGroups.tuples(); + + // Per-pattern resolution: which clustering column does each pattern key map to? Resolution is segment-scoped + // (depends only on the segment's clustering signature + VCs), so compute it once up front and reuse it across + // tuples. A null entry in the resolved list marks the pattern as non-matching for this segment. + final List> resolvedPatterns = resolveAll(compiledPatterns, clusteringColumns, segmentVcs); + final List> resolvedExcludes = resolveAll(compiledExcludePatterns, clusteringColumns, segmentVcs); + + final TreeSet matched = new TreeSet<>(); + for (int i = 0; i < tuples.size(); i++) { + final List tuple = tuples.get(i); + if (!matchesAnyPattern(tuple, clusteringColumns, compiledPatterns, resolvedPatterns)) { + continue; + } + if (matchesAnyPattern(tuple, clusteringColumns, compiledExcludePatterns, resolvedExcludes)) { + continue; + } + matched.add(i); + } + return new ArrayList<>(matched); + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + WildcardClusterGroupPartialLoadMatcher that = (WildcardClusterGroupPartialLoadMatcher) o; + return Objects.equals(patterns, that.patterns) + && Objects.equals(excludePatterns, that.excludePatterns) + && Objects.equals(virtualColumns, that.virtualColumns); + } + + @Override + public int hashCode() + { + return Objects.hash(patterns, excludePatterns, virtualColumns); + } + + @Override + public String toString() + { + return "WildcardClusterGroupPartialLoadMatcher{patterns=" + patterns + + ", excludePatterns=" + excludePatterns + + ", virtualColumns=" + virtualColumns + "}"; + } + + /** + * For each compiled pattern, compute the {@code patternKey -> segment clustering column name} map. A {@code null} + * entry in the returned list marks a pattern as non-matching for this segment (some pattern key couldn't be + * resolved to a clustering column via either direct name or operator-VC equivalence). + */ + private List> resolveAll( + List> compiled, + RowSignature clusteringColumns, + VirtualColumns segmentVcs + ) + { + if (compiled.isEmpty()) { + return List.of(); + } + final List> out = new ArrayList<>(compiled.size()); + for (Map pattern : compiled) { + out.add(resolvePattern(pattern.keySet(), clusteringColumns, segmentVcs)); + } + return out; + } + + @Nullable + private Map resolvePattern( + Set patternKeys, + RowSignature clusteringColumns, + VirtualColumns segmentVcs + ) + { + final Map resolved = CollectionUtils.newLinkedHashMapWithExpectedSize(patternKeys.size()); + for (String key : patternKeys) { + final String target = resolveKey(key, clusteringColumns, segmentVcs); + if (target == null) { + return null; // pattern unresolvable for this segment + } + resolved.put(key, target); + } + return resolved; + } + + /** + * Resolve a pattern key to a clustering column name on the segment. Three cases: + *
    + *
  1. The matcher carries an virtual column by this name, resolve via + * {@link VirtualColumns#findEquivalent(VirtualColumns.Node)} against the segment's clustering VCs. The + * matcher VC interpretation wins regardless of any same-name clustering column (shadowing).
  2. + *
  3. Otherwise, if the key is directly a clustering column name -> identity.
  4. + *
  5. Otherwise -> unresolvable (returns {@code null}).
  6. + *
+ */ + @Nullable + private String resolveKey(String key, RowSignature clusteringColumns, VirtualColumns segmentVcs) + { + final VirtualColumns.Node operatorVcNode = virtualColumns.getNode(key); + if (operatorVcNode != null) { + final VirtualColumn equivalent = segmentVcs.findEquivalent(operatorVcNode); + if (equivalent == null) { + return null; + } + final String equivalentName = equivalent.getOutputName(); + return clusteringColumns.contains(equivalentName) ? equivalentName : null; + } + return clusteringColumns.contains(key) ? key : null; + } + + private static boolean matchesAnyPattern( + List tuple, + RowSignature clusteringColumns, + List> compiledPatterns, + List> resolvedPatterns + ) + { + for (int p = 0; p < compiledPatterns.size(); p++) { + final Map resolved = resolvedPatterns.get(p); + if (resolved == null) { + continue; + } + if (matchesPattern(tuple, clusteringColumns, compiledPatterns.get(p), resolved)) { + return true; + } + } + return false; + } + + private static boolean matchesPattern( + List tuple, + RowSignature clusteringColumns, + Map pattern, + Map resolved + ) + { + for (Map.Entry entry : pattern.entrySet()) { + final String resolvedColumn = resolved.get(entry.getKey()); + final int idx = clusteringColumns.indexOf(resolvedColumn); + // resolved is guaranteed to map every patternKey to a real clustering column (else the pattern was skipped). + final Globs.CompiledGlob glob = entry.getValue(); + if (glob.isMatchAny()) { + // Literal "*" matches every value, including null. + continue; + } + final Object value = tuple.get(idx); + if (value == null) { + // Any non-"*" glob cannot match a null clustering value. + return false; + } + if (!glob.matches(value.toString())) { + return false; + } + } + return true; + } + + private static List> copyAndValidatePatterns(List> input, String fieldName) + { + final List> out = new ArrayList<>(input.size()); + for (int i = 0; i < input.size(); i++) { + final Map pattern = input.get(i); + if (pattern == null || pattern.isEmpty()) { + throw InvalidInput.exception("%s[%s] must not be null or empty", fieldName, i); + } + out.add(Map.copyOf(pattern)); + } + return List.copyOf(out); + } + + private static List> compileAll(List> patterns) + { + if (patterns.isEmpty()) { + return List.of(); + } + final List> out = new ArrayList<>(patterns.size()); + for (Map pattern : patterns) { + final Map compiled = CollectionUtils.newLinkedHashMapWithExpectedSize(pattern.size()); + for (Map.Entry entry : pattern.entrySet()) { + compiled.put(entry.getKey(), Globs.compile(entry.getValue())); + } + out.add(Collections.unmodifiableMap(compiled)); + } + return List.copyOf(out); + } + + private static VirtualColumns internVirtualColumns(@Nullable VirtualColumns virtualColumns) + { + if (virtualColumns == null || virtualColumns.isEmpty()) { + return VirtualColumns.EMPTY; + } + return VirtualColumns.create( + Arrays.stream(virtualColumns.getVirtualColumns()) + .map(DataSegment.virtualColumnInterner()::intern) + .toList() + ); + } +} diff --git a/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcher.java b/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcher.java index 934eaf6f9589..5adf3c09642f 100644 --- a/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcher.java +++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcher.java @@ -72,8 +72,8 @@ public WildcardProjectionPartialLoadMatcher( } this.patterns = List.copyOf(patterns); this.excludePatterns = excludePatterns == null ? List.of() : List.copyOf(excludePatterns); - this.compiledPatterns = compileAll(this.patterns); - this.compiledExcludePatterns = compileAll(this.excludePatterns); + this.compiledPatterns = Globs.compileAll(this.patterns); + this.compiledExcludePatterns = Globs.compileAll(this.excludePatterns); } @JsonProperty @@ -98,99 +98,16 @@ protected List resolveProjectionNames(DataSegment segment) } final TreeSet matched = new TreeSet<>(); for (String name : segmentProjections) { - if (matchesAny(name, compiledExcludePatterns)) { + if (Globs.matchesAny(name, compiledExcludePatterns)) { continue; } - if (matchesAny(name, compiledPatterns)) { + if (Globs.matchesAny(name, compiledPatterns)) { matched.add(name); } } return new ArrayList<>(matched); } - private static List compileAll(List globs) - { - if (globs.isEmpty()) { - return List.of(); - } - final List compiled = new ArrayList<>(globs.size()); - for (String glob : globs) { - compiled.add(Pattern.compile(globToRegex(glob))); - } - return List.copyOf(compiled); - } - - private static boolean matchesAny(String name, List patterns) - { - for (Pattern pattern : patterns) { - if (pattern.matcher(name).matches()) { - return true; - } - } - return false; - } - - /** - * Translates a glob pattern with {@code *}, {@code ?}, and {@code \} escape semantics into an equivalent regex - * pattern that matches the entire input string. Regex metacharacters in literal positions are escaped. - * - * @throws org.apache.druid.error.DruidException if {@code glob} ends with an unescaped backslash - */ - static String globToRegex(String glob) - { - final StringBuilder sb = new StringBuilder(glob.length() + 4); - boolean escaping = false; - for (int i = 0; i < glob.length(); i++) { - final char c = glob.charAt(i); - if (escaping) { - appendLiteral(sb, c); - escaping = false; - continue; - } - switch (c) { - case '\\': - escaping = true; - break; - case '*': - sb.append(".*"); - break; - case '?': - sb.append('.'); - break; - default: - appendLiteral(sb, c); - } - } - if (escaping) { - throw InvalidInput.exception("Glob pattern [%s] ends with an unescaped backslash", glob); - } - return sb.toString(); - } - - private static void appendLiteral(StringBuilder sb, char c) - { - switch (c) { - case '.': - case '(': - case ')': - case '[': - case ']': - case '{': - case '}': - case '+': - case '|': - case '^': - case '$': - case '\\': - case '*': - case '?': - sb.append('\\').append(c); - break; - default: - sb.append(c); - } - } - @Override public boolean equals(Object o) { diff --git a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java index fa5971e96570..4f76b7b52c17 100644 --- a/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java +++ b/server/src/test/java/org/apache/druid/client/CachingClusteredClientTest.java @@ -2761,6 +2761,7 @@ private MyDataSegment() null, null, null, + null, NoneShardSpec.instance(), null, -1, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java index 1546eb76cdfb..afee36e530b0 100644 --- a/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java +++ b/server/src/test/java/org/apache/druid/server/coordinator/CreateDataSegments.java @@ -248,6 +248,7 @@ private NumberedDataSegment( Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), + null, shardSpec, compactionState, IndexIO.CURRENT_VERSION_ID, diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/GlobsTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/GlobsTest.java new file mode 100644 index 000000000000..b8ad7aa9180a --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/GlobsTest.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.rules; + +import org.apache.druid.error.DruidException; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.util.List; +import java.util.regex.Pattern; + +/** + * Focused unit coverage for the shared {@link Globs} helpers. Integration semantics (e.g., how patterns interact with + * excludes inside a matcher) live with the matcher tests; this file pins the translation rules so future refactors + * don't drift them silently. + */ +class GlobsTest +{ + @Test + void testLiteralGlobMatchesItself() + { + Assertions.assertEquals("user_daily", Globs.globToRegex("user_daily")); + } + + @Test + void testStarTranslatesToDotStar() + { + Assertions.assertEquals("user_.*", Globs.globToRegex("user_*")); + } + + @Test + void testQuestionMarkTranslatesToDot() + { + Assertions.assertEquals("user_.", Globs.globToRegex("user_?")); + } + + @Test + void testRegexMetacharactersInLiteralPositionsAreEscaped() + { + // Each of these has special regex meaning; the translator must escape them so they match literally. + Assertions.assertEquals("user\\.daily", Globs.globToRegex("user.daily")); + Assertions.assertEquals("a\\+b", Globs.globToRegex("a+b")); + Assertions.assertEquals("\\(foo\\)", Globs.globToRegex("(foo)")); + Assertions.assertEquals("a\\|b", Globs.globToRegex("a|b")); + Assertions.assertEquals("\\^start", Globs.globToRegex("^start")); + Assertions.assertEquals("end\\$", Globs.globToRegex("end$")); + } + + @Test + void testBackslashEscapesGlobMetacharacters() + { + // Escaped glob metacharacters become literal regex characters (themselves escaped). + Assertions.assertEquals("a\\*b", Globs.globToRegex("a\\*b")); + Assertions.assertEquals("a\\?b", Globs.globToRegex("a\\?b")); + Assertions.assertEquals("a\\\\b", Globs.globToRegex("a\\\\b")); + } + + @Test + void testTrailingUnescapedBackslashIsRejected() + { + Assertions.assertThrows(DruidException.class, () -> Globs.globToRegex("foo\\")); + } + + @Test + void testCompileAllReturnsEmptyListForEmptyInput() + { + Assertions.assertTrue(Globs.compileAll(List.of()).isEmpty()); + } + + @Test + void testCompileAllCompilesEachPattern() + { + final List compiled = Globs.compileAll(List.of("user_*", "report_?")); + Assertions.assertEquals(2, compiled.size()); + Assertions.assertTrue(compiled.get(0).matcher("user_daily").matches()); + Assertions.assertTrue(compiled.get(1).matcher("report_x").matches()); + Assertions.assertFalse(compiled.get(1).matcher("report_xy").matches()); + } + + @Test + void testMatchesAnyReturnsTrueOnFirstHit() + { + final List compiled = Globs.compileAll(List.of("user_*", "report_*")); + Assertions.assertTrue(Globs.matchesAny("user_daily", compiled)); + Assertions.assertTrue(Globs.matchesAny("report_hourly", compiled)); + Assertions.assertFalse(Globs.matchesAny("other", compiled)); + } + + @Test + void testMatchesAnyEmptyPatternListNeverMatches() + { + Assertions.assertFalse(Globs.matchesAny("anything", List.of())); + } + + @Test + void testCompileLiteralStarReturnsMatchAny() + { + // The literal "*" is special-cased to MATCH_ANY so callers can short-circuit a "matches any value, including + // null" branch without paying a regex match. Any other glob takes the regex path. + final Globs.CompiledGlob compiled = Globs.compile("*"); + Assertions.assertSame(Globs.CompiledGlob.MATCH_ANY, compiled); + Assertions.assertTrue(compiled.matchAny); + Assertions.assertNull(compiled.pattern); + } + + @Test + void testCompileGlobReturnsCompiledRegex() + { + final Globs.CompiledGlob compiled = Globs.compile("us-*"); + Assertions.assertFalse(compiled.matchAny); + Assertions.assertNotNull(compiled.pattern); + Assertions.assertTrue(compiled.matches("us-east-1")); + Assertions.assertFalse(compiled.matches("eu-west")); + } + + @Test + void testCompileLiteralNullStringHasNoSpecialTreatment() + { + // The string "null" goes through the regex path like any other literal glob, the helper does not give the + // literal-string "null" any special meaning + final Globs.CompiledGlob compiled = Globs.compile("null"); + Assertions.assertFalse(compiled.matchAny); + Assertions.assertNotNull(compiled.pattern); + Assertions.assertTrue(compiled.matches("null")); + Assertions.assertFalse(compiled.matches("nullx")); + } +} diff --git a/server/src/test/java/org/apache/druid/server/coordinator/rules/WildcardClusterGroupPartialLoadMatcherTest.java b/server/src/test/java/org/apache/druid/server/coordinator/rules/WildcardClusterGroupPartialLoadMatcherTest.java new file mode 100644 index 000000000000..0ff1bdc804ed --- /dev/null +++ b/server/src/test/java/org/apache/druid/server/coordinator/rules/WildcardClusterGroupPartialLoadMatcherTest.java @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.server.coordinator.rules; + +import com.fasterxml.jackson.databind.InjectableValues; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.druid.error.DruidException; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.math.expr.ExprMacroTable; +import org.apache.druid.query.expression.TestExprMacroTable; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.segment.virtual.ExpressionVirtualColumn; +import org.apache.druid.timeline.ClusterGroupTuples; +import org.apache.druid.timeline.DataSegment; +import org.apache.druid.timeline.SegmentId; +import org.apache.druid.timeline.partition.NumberedShardSpec; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +class WildcardClusterGroupPartialLoadMatcherTest +{ + private static final Map BASE_LOAD_SPEC = Map.of("type", "local", "path", "/seg"); + + private final ObjectMapper mapper = new DefaultObjectMapper(); + + @BeforeEach + void setUp() + { + final InjectableValues.Std injectables = new InjectableValues.Std(); + injectables.addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT); + mapper.setInjectableValues(injectables); + } + + private static RowSignature tenantRegion() + { + return RowSignature.builder() + .add("tenant", ColumnType.STRING) + .add("region", ColumnType.STRING) + .build(); + } + + /** A 3-group fixture: (acme, us-east-1), (acme, us-west-2), (globex, us-east-1). */ + private static DataSegment threeGroupSegment() + { + return segmentWithGroups(new ClusterGroupTuples( + tenantRegion(), + List.of( + List.of("acme", "us-east-1"), + List.of("acme", "us-west-2"), + List.of("globex", "us-east-1") + ) + )); + } + + private static DataSegment segmentWithGroups(ClusterGroupTuples groups) + { + return DataSegment.builder(SegmentId.of( + "ds", + Intervals.of("2026-01-01/2026-01-02"), + "v", + new NumberedShardSpec(0, 1) + )).loadSpec(BASE_LOAD_SPEC).size(0).clusterGroups(groups).build(); + } + + @Test + void testConstructorRejectsNullPatterns() + { + Assertions.assertThrows(DruidException.class, () -> new WildcardClusterGroupPartialLoadMatcher(null, null)); + } + + @Test + void testConstructorRejectsEmptyPatterns() + { + Assertions.assertThrows(DruidException.class, () -> new WildcardClusterGroupPartialLoadMatcher(List.of(), null)); + } + + @Test + void testConstructorRejectsEmptyPatternMap() + { + Assertions.assertThrows( + DruidException.class, + () -> new WildcardClusterGroupPartialLoadMatcher(List.of(Map.of()), null) + ); + } + + @Test + void testConstructorRejectsEmptyExcludePatternMap() + { + Assertions.assertThrows( + DruidException.class, + () -> new WildcardClusterGroupPartialLoadMatcher(List.of(Map.of("tenant", "*")), List.of(Map.of())) + ); + } + + @Test + void testNonClusteredSegmentReturnsNull() + { + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("tenant", "acme")), + null + ); + final DataSegment segment = DataSegment.builder(SegmentId.of( + "ds", + Intervals.of("2026-01-01/2026-01-02"), + "v", + new NumberedShardSpec(0, 1) + )).loadSpec(BASE_LOAD_SPEC).size(0).build(); + Assertions.assertNull(matcher.match(segment, BASE_LOAD_SPEC)); + } + + @Test + void testSingleColumnExactMatch() + { + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("tenant", "acme")), + null + ); + final PartialLoadMatcher.MatchResult result = matcher.match(threeGroupSegment(), BASE_LOAD_SPEC); + Assertions.assertNotNull(result); + Assertions.assertEquals(List.of(0, 1), result.wrappedLoadSpec().get("clusterGroupIndices")); + } + + @Test + void testSingleColumnGlob() + { + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("region", "us-east-*")), + null + ); + final PartialLoadMatcher.MatchResult result = matcher.match(threeGroupSegment(), BASE_LOAD_SPEC); + Assertions.assertNotNull(result); + Assertions.assertEquals(List.of(0, 2), result.wrappedLoadSpec().get("clusterGroupIndices")); + } + + @Test + void testMultiColumnPatternWithExactAndGlob() + { + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("tenant", "globex", "region", "us-east-*")), + null + ); + final PartialLoadMatcher.MatchResult result = matcher.match(threeGroupSegment(), BASE_LOAD_SPEC); + Assertions.assertEquals(List.of(2), result.wrappedLoadSpec().get("clusterGroupIndices")); + } + + @Test + void testOmittedColumnIsWildcard() + { + // No region constraint -> all acme rows match regardless of region. + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("tenant", "acme")), + null + ); + final PartialLoadMatcher.MatchResult result = matcher.match(threeGroupSegment(), BASE_LOAD_SPEC); + Assertions.assertEquals(List.of(0, 1), result.wrappedLoadSpec().get("clusterGroupIndices")); + } + + @Test + void testExcludePatternRemovesGroups() + { + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("tenant", "acme")), + List.of(Map.of("region", "us-west-2")) + ); + final PartialLoadMatcher.MatchResult result = matcher.match(threeGroupSegment(), BASE_LOAD_SPEC); + Assertions.assertEquals(List.of(0), result.wrappedLoadSpec().get("clusterGroupIndices")); + } + + @Test + void testStarMatchesAllIncludingNullValuesAtThatColumn() + { + final DataSegment segment = segmentWithGroups(new ClusterGroupTuples( + tenantRegion(), + Arrays.asList( + Arrays.asList("acme", "us-east-1"), + Arrays.asList("acme", null), + Arrays.asList(null, "us-east-1") + ) + )); + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("region", "*")), + null + ); + final PartialLoadMatcher.MatchResult result = matcher.match(segment, BASE_LOAD_SPEC); + // The "*" glob matches every value including null at the region position. + Assertions.assertEquals(List.of(0, 1, 2), result.wrappedLoadSpec().get("clusterGroupIndices")); + } + + @Test + void testOmittedColumnMatchesNullAtThatPosition() + { + final DataSegment segment = segmentWithGroups(new ClusterGroupTuples( + tenantRegion(), + Arrays.asList( + Arrays.asList("acme", "us-east-1"), + Arrays.asList("acme", null) + ) + )); + // No constraint on region; the null-region tuple is still picked up. + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("tenant", "acme")), + null + ); + final PartialLoadMatcher.MatchResult result = matcher.match(segment, BASE_LOAD_SPEC); + Assertions.assertEquals(List.of(0, 1), result.wrappedLoadSpec().get("clusterGroupIndices")); + } + + @Test + void testNonStarGlobDoesNotMatchNull() + { + final DataSegment segment = segmentWithGroups(new ClusterGroupTuples( + tenantRegion(), + Arrays.asList( + Arrays.asList("acme", "us-east-1"), + Arrays.asList("acme", null) + ) + )); + // "us-*" must not match a null region (the only way to match null is "*" or omission). + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("region", "us-*")), + null + ); + final PartialLoadMatcher.MatchResult result = matcher.match(segment, BASE_LOAD_SPEC); + Assertions.assertEquals(List.of(0), result.wrappedLoadSpec().get("clusterGroupIndices")); + } + + @Test + void testLiteralStringNullDoesNotMatchNullClusteringValue() + { + final DataSegment segment = segmentWithGroups(new ClusterGroupTuples( + tenantRegion(), + Arrays.asList(Arrays.asList("acme", null)) + )); + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("region", "null")), + null + ); + Assertions.assertNull(matcher.match(segment, BASE_LOAD_SPEC)); + } + + @Test + void testUnknownColumnInPatternNoMatch() + { + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("not_a_clustering_column", "anything")), + null + ); + Assertions.assertNull(matcher.match(threeGroupSegment(), BASE_LOAD_SPEC)); + } + + @Test + void testNoMatchReturnsNull() + { + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("tenant", "nobody")), + null + ); + Assertions.assertNull(matcher.match(threeGroupSegment(), BASE_LOAD_SPEC)); + } + + @Test + void testIndicesAreSortedAndDeduped() + { + // Two patterns that both match the same group should not duplicate the index in the output. + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("tenant", "acme"), Map.of("region", "us-east-1")), + null + ); + final PartialLoadMatcher.MatchResult result = matcher.match(threeGroupSegment(), BASE_LOAD_SPEC); + // acme matches 0, 1; us-east-1 matches 0, 2 — union sorted = [0, 1, 2]. + Assertions.assertEquals(List.of(0, 1, 2), result.wrappedLoadSpec().get("clusterGroupIndices")); + } + + @Test + void testFingerprintStableAcrossPatternOrder() + { + final WildcardClusterGroupPartialLoadMatcher a = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("tenant", "acme"), Map.of("region", "us-east-1")), + null + ); + final WildcardClusterGroupPartialLoadMatcher b = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("region", "us-east-1"), Map.of("tenant", "acme")), + null + ); + final PartialLoadMatcher.MatchResult ra = a.match(threeGroupSegment(), BASE_LOAD_SPEC); + final PartialLoadMatcher.MatchResult rb = b.match(threeGroupSegment(), BASE_LOAD_SPEC); + Assertions.assertEquals(ra.fingerprint(), rb.fingerprint()); + } + + @Test + void testJsonRoundTrip() throws Exception + { + final WildcardClusterGroupPartialLoadMatcher original = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("tenant", "acme"), Map.of("tenant", "globex", "region", "us-east-*")), + List.of(Map.of("region", "us-west-2")) + ); + final String json = mapper.writeValueAsString(original); + final PartialLoadMatcher back = mapper.readValue(json, PartialLoadMatcher.class); + Assertions.assertEquals(original, back); + } + + @Test + void testOperatorVcResolvesToClusteringVcByEquivalence() + { + // Operator names their VC "queryLower" with lower(tenant); the segment's clustering VC is "tenant_lower" with + // the same expression. The matcher should find equivalence and match the segment's tuples. + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("queryLower", "acme")), + null, + lowerTenantVcs("queryLower") + ); + final DataSegment segment = vcClusteredSegment("acme", "globex"); + final PartialLoadMatcher.MatchResult result = matcher.match(segment, BASE_LOAD_SPEC); + Assertions.assertNotNull(result); + Assertions.assertEquals(List.of(0), result.wrappedLoadSpec().get("clusterGroupIndices")); + } + + @Test + void testOperatorVcWithoutEquivalenceIsNonMatching() + { + // Operator-VC computes upper(tenant); segment's clustering VC computes lower(tenant). Not equivalent → the + // pattern is non-matching for this segment. + final VirtualColumns operatorVcs = VirtualColumns.create(new ExpressionVirtualColumn( + "queryUpper", + "upper(tenant)", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + )); + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("queryUpper", "ACME")), + null, + operatorVcs + ); + Assertions.assertNull(matcher.match(vcClusteredSegment("acme"), BASE_LOAD_SPEC)); + } + + @Test + void testOperatorVcShadowsClusteringColumnName() + { + // Operator declares a VC named "tenant" with an unrelated expression. The pattern key "tenant" resolves through + // the operator's VC (operator-VC interpretation wins). Without an equivalent on the segment side, the pattern + // is non-matching even though "tenant" happens to also be a clustering column name on a different segment. + final VirtualColumns operatorVcs = VirtualColumns.create(new ExpressionVirtualColumn( + "tenant", + "lower(otherCol)", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + )); + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("tenant", "acme")), + null, + operatorVcs + ); + // Segment that clusters directly on physical "tenant" (no segment VCs). The operator-VC shadowing rule means + // we don't silently treat the pattern's "tenant" as the clustering column "tenant" — it's interpreted as the + // operator-VC, which has no equivalent on the segment → non-matching. + Assertions.assertNull(matcher.match(threeGroupSegment(), BASE_LOAD_SPEC)); + } + + @Test + void testNoOperatorVcsKeepsDirectNameMatching() + { + // Sanity check: when no operator VCs are configured, the pattern key path falls through to direct clustering + // column name resolution, unchanged from the pre-VC behavior. + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("tenant", "acme")), + null + ); + Assertions.assertNotNull(matcher.match(threeGroupSegment(), BASE_LOAD_SPEC)); + } + + @Test + void testOperatorVcJsonRoundTrip() throws Exception + { + // Round-trip needs an injectable ExprMacroTable for ExpressionVirtualColumn deserialization. + mapper.setInjectableValues( + new InjectableValues.Std() + .addValue(DataSegment.PruneSpecsHolder.class, DataSegment.PruneSpecsHolder.DEFAULT) + .addValue(ExprMacroTable.class, TestExprMacroTable.INSTANCE) + ); + final WildcardClusterGroupPartialLoadMatcher original = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("queryLower", "acme")), + null, + lowerTenantVcs("queryLower") + ); + final String json = mapper.writeValueAsString(original); + Assertions.assertTrue(json.contains("\"virtualColumns\""), () -> "expected virtualColumns in JSON: " + json); + final PartialLoadMatcher back = mapper.readValue(json, PartialLoadMatcher.class); + Assertions.assertEquals(original, back); + } + + @Test + void testVirtualColumnsOmittedFromJsonWhenEmpty() throws Exception + { + final WildcardClusterGroupPartialLoadMatcher matcher = new WildcardClusterGroupPartialLoadMatcher( + List.of(Map.of("tenant", "acme")), + null + ); + final String json = mapper.writeValueAsString(matcher); + Assertions.assertFalse(json.contains("virtualColumns"), () -> "did not expect virtualColumns in JSON: " + json); + } + + private static VirtualColumns lowerTenantVcs(String outputName) + { + return VirtualColumns.create(new ExpressionVirtualColumn( + outputName, + "lower(tenant)", + ColumnType.STRING, + TestExprMacroTable.INSTANCE + )); + } + + private static DataSegment vcClusteredSegment(String... lowerTenants) + { + final RowSignature clusteringColumns = RowSignature.builder().add("tenant_lower", ColumnType.STRING).build(); + final List> tuples = new ArrayList<>(lowerTenants.length); + for (String t : lowerTenants) { + tuples.add(Collections.singletonList(t)); + } + return segmentWithGroups(new ClusterGroupTuples(clusteringColumns, lowerTenantVcs("tenant_lower"), tuples)); + } +}