Skip to content

Commit 9cdedcd

Browse files
authored
feat: partial load for clustered segments (apache#19620)
1 parent c010e42 commit 9cdedcd

12 files changed

Lines changed: 1331 additions & 132 deletions

processing/src/main/java/org/apache/druid/segment/IndexMergerV10.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -418,9 +418,8 @@ private File makeClusteredIndexFiles(
418418

419419
for (Map.Entry<List<Integer>, List<AdapterAndGroup>> groupEntry : merged.groupSources.entrySet()) {
420420
final List<Integer> mergedTuple = groupEntry.getKey();
421-
final String groupPrefix = Projections.getClusterGroupSegmentInternalFilePrefix(mergedTuple);
422-
// file-bundle name is the prefix without the trailing slash
423-
final String groupName = groupPrefix.substring(0, groupPrefix.length() - 1);
421+
final String groupName = Projections.getClusterGroupBundleName(mergedTuple);
422+
final String groupPrefix = groupName + "/";
424423

425424
final List<IndexableAdapter> groupAdapters = Lists.newArrayListWithCapacity(groupEntry.getValue().size());
426425
for (AdapterAndGroup source : groupEntry.getValue()) {

processing/src/main/java/org/apache/druid/segment/PartialQueryableIndex.java

Lines changed: 175 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import org.apache.druid.segment.column.ColumnConfig;
3535
import org.apache.druid.segment.column.ColumnDescriptor;
3636
import org.apache.druid.segment.column.ColumnHolder;
37+
import org.apache.druid.segment.column.ColumnType;
38+
import org.apache.druid.segment.column.ValueType;
3739
import org.apache.druid.segment.data.Indexed;
3840
import org.apache.druid.segment.data.ListIndexed;
3941
import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
@@ -46,6 +48,7 @@
4648
import org.apache.druid.segment.projections.ProjectionMetadata;
4749
import org.apache.druid.segment.projections.Projections;
4850
import org.apache.druid.segment.projections.QueryableProjection;
51+
import org.apache.druid.segment.projections.TableClusterGroupSpec;
4952
import org.joda.time.Interval;
5053

5154
import javax.annotation.Nullable;
@@ -58,6 +61,7 @@
5861
import java.util.Map;
5962
import java.util.SortedSet;
6063
import java.util.concurrent.ConcurrentHashMap;
64+
import java.util.function.Function;
6165
import java.util.stream.Collectors;
6266

6367
/**
@@ -103,6 +107,16 @@ public class PartialQueryableIndex implements QueryableIndex
103107
private final ConcurrentHashMap<String, Map<String, Supplier<BaseColumnHolder>>> projectionColumnsByName =
104108
new ConcurrentHashMap<>();
105109

110+
// clustered base summary when this segment is a clustered base table, else null. when non-null, the base table has
111+
// no top-level columns
112+
@Nullable
113+
private final ClusteredValueGroupsBaseTableSchema clusteredBaseSummary;
114+
115+
// per-cluster-group column suppliers, keyed by group index (into the summary's group list). built on demand like
116+
// projectionColumnsByName; each supplier defers both mapFile() and deserialization until the column is read.
117+
private final ConcurrentHashMap<Integer, Map<String, Supplier<BaseColumnHolder>>> clusterGroupColumnsByIndex =
118+
new ConcurrentHashMap<>();
119+
106120
// lazy dimension handlers
107121
private final Supplier<Map<String, DimensionHandler>> dimensionHandlers;
108122

@@ -125,14 +139,14 @@ public PartialQueryableIndex(
125139
baseProjection.getSchema().getName()
126140
);
127141
final BaseTableProjectionSchema baseSchema = (BaseTableProjectionSchema) baseProjection.getSchema();
128-
// Clustered V10 segments keep their data in per-cluster-group bundles, matched from metadata much like
129-
// projections, but partial loading does not wire that up yet (and the write side isn't available to build one for
130-
// testing). Fail loudly here rather than mis-treating the clustered base summary as a plain base table, whose
131-
// top-level columns are empty. Both acquire modes route partial-eligible segments through this index, so this
132-
// guards the full path too. Remove once partial loading supports clustered segments.
133-
if (baseSchema instanceof ClusteredValueGroupsBaseTableSchema) {
142+
this.clusteredBaseSummary = baseSchema instanceof ClusteredValueGroupsBaseTableSchema
143+
? (ClusteredValueGroupsBaseTableSchema) baseSchema
144+
: null;
145+
if (clusteredBaseSummary != null && !clusteredBaseSummary.getSharedColumns().isEmpty()) {
146+
// Shared base columns aren't wired into partial loading yet
134147
throw DruidException.defensive(
135-
"Clustered V10 segments are not yet supported for partial loading (interval[%s])",
148+
"Clustered V10 segments with shared columns%s are not yet supported for partial loading (interval[%s])",
149+
clusteredBaseSummary.getSharedColumns(),
136150
metadata.getInterval()
137151
);
138152
}
@@ -141,16 +155,24 @@ public PartialQueryableIndex(
141155
this.baseProjectionPrefix = Projections.getProjectionSegmentInternalFilePrefix(baseSchema);
142156
this.dataInterval = Intervals.of(metadata.getInterval());
143157
this.bitmapFactory = metadata.getBitmapEncoding().getBitmapFactory();
144-
this.availableDimensions = new ListIndexed<>(baseSchema.getDimensionNames());
145158

146-
// build column names (dimensions first, then other columns, excluding __time)
147-
final LinkedHashSet<String> dimsFirst = new LinkedHashSet<>(baseSchema.getDimensionNames());
148-
for (String columnName : baseSchema.getColumnNames()) {
149-
if (!ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
150-
dimsFirst.add(columnName);
159+
// A clustered base table has no top-level columns (its data lives in per-cluster-group bundles), so its available
160+
// dimensions / column names are empty; logical columns are resolved via getColumnCapabilities + cluster-group
161+
// dispatch (matching the eager SimpleQueryableIndex). A regular base table lists dimensions first, then the
162+
// remaining columns (excluding __time).
163+
if (clusteredBaseSummary != null) {
164+
this.availableDimensions = new ListIndexed<>(List.of());
165+
this.columnNames = List.of();
166+
} else {
167+
this.availableDimensions = new ListIndexed<>(baseSchema.getDimensionNames());
168+
final LinkedHashSet<String> dimsFirst = new LinkedHashSet<>(baseSchema.getDimensionNames());
169+
for (String columnName : baseSchema.getColumnNames()) {
170+
if (!ColumnHolder.TIME_COLUMN_NAME.equals(columnName)) {
171+
dimsFirst.add(columnName);
172+
}
151173
}
174+
this.columnNames = List.copyOf(dimsFirst);
152175
}
153-
this.columnNames = List.copyOf(dimsFirst);
154176

155177
// build aggregate projection metadata for matching
156178
final List<AggregateProjectionMetadata> aggProjections = new ArrayList<>();
@@ -193,8 +215,11 @@ public PartialQueryableIndex(
193215
}
194216

195217
// build per-column suppliers for the base table. each supplier is memoized and defers both mapFile() and
196-
// deserialization until the column is accessed.
197-
this.baseColumns = buildProjectionColumnSuppliers(baseProjection, Map.of());
218+
// deserialization until the column is accessed. A clustered base has no top-level columns (its data lives in
219+
// per-cluster-group bundles), so its base column map is empty.
220+
this.baseColumns = clusteredBaseSummary != null
221+
? Map.of()
222+
: buildProjectionColumnSuppliers(baseProjection, Map.of());
198223

199224
this.dimensionHandlers = Suppliers.memoize(this::initDimensionHandlers);
200225
}
@@ -281,9 +306,42 @@ public BaseColumnHolder getColumnHolder(String columnName)
281306
@Override
282307
public ColumnCapabilities getColumnCapabilities(String column)
283308
{
309+
if (clusteredBaseSummary != null) {
310+
return getClusteredColumnCapabilities(column);
311+
}
284312
// look up the column in the base table projection's namespace
285313
final String smooshName = baseProjectionPrefix + column;
286-
final ColumnDescriptor descriptor = metadata.getColumnDescriptors().get(smooshName);
314+
return capabilitiesFromDescriptor(metadata.getColumnDescriptors().get(smooshName));
315+
}
316+
317+
/**
318+
* Column capabilities for a clustered base table, answered from metadata only (no downloads): clustering columns
319+
* resolve from the summary's typed clustering signature; data columns (and {@code __time}) resolve from the first
320+
* cluster group's {@link ColumnDescriptor} (all groups share the same per-group shape). Mirrors the eager
321+
* {@link SimpleQueryableIndex} clustered branch, but reads the descriptor directly rather than routing through a
322+
* group sub-index's {@code getColumnHolder} (which would trigger a download).
323+
*/
324+
@Nullable
325+
private ColumnCapabilities getClusteredColumnCapabilities(String column)
326+
{
327+
final ColumnType clusteringType = clusteredBaseSummary.getClusteringColumns().getColumnType(column).orElse(null);
328+
if (clusteringType != null) {
329+
return clusteringType.is(ValueType.STRING)
330+
? ColumnCapabilitiesImpl.createSimpleSingleValueStringColumnCapabilities()
331+
: ColumnCapabilitiesImpl.createSimpleNumericColumnCapabilities(clusteringType);
332+
}
333+
final List<TableClusterGroupSpec> groups = clusteredBaseSummary.getClusterGroups();
334+
if (groups.isEmpty()) {
335+
return null;
336+
}
337+
final String smooshName =
338+
Projections.getClusterGroupSegmentInternalFileName(groups.getFirst().getClusteringValueIds(), column);
339+
return capabilitiesFromDescriptor(metadata.getColumnDescriptors().get(smooshName));
340+
}
341+
342+
@Nullable
343+
private static ColumnCapabilities capabilitiesFromDescriptor(@Nullable ColumnDescriptor descriptor)
344+
{
287345
if (descriptor == null) {
288346
return null;
289347
}
@@ -378,6 +436,74 @@ public List<OrderBy> getOrdering()
378436
};
379437
}
380438

439+
@Nullable
440+
@Override
441+
public ClusteredValueGroupsBaseTableSchema getClusteredBaseSummary()
442+
{
443+
return clusteredBaseSummary;
444+
}
445+
446+
@Override
447+
public List<TableClusterGroupSpec> getClusterGroupSchemas()
448+
{
449+
return clusteredBaseSummary == null ? List.of() : clusteredBaseSummary.getClusterGroups();
450+
}
451+
452+
/**
453+
* Returns a {@link QueryableIndex} sub-view scoped to a single cluster group's column data, The per-group column
454+
* suppliers are memoized by group index so a pre-fetch (async path) and the later cursor build share the same
455+
* already-downloaded columns. Clustering columns are NOT present in the returned index; they are injected at the
456+
* cursor-factory level via {@code ClusteringColumnSelectorFactory}.
457+
*/
458+
@Override
459+
public QueryableIndex getClusterGroupQueryableIndex(TableClusterGroupSpec groupSpec)
460+
{
461+
if (clusteredBaseSummary == null) {
462+
throw DruidException.defensive("getClusterGroupQueryableIndex called on a non-clustered segment");
463+
}
464+
final List<TableClusterGroupSpec> groups = clusteredBaseSummary.getClusterGroups();
465+
final int groupIndex = groups.indexOf(groupSpec);
466+
if (groupIndex < 0) {
467+
throw DruidException.defensive("Cluster group spec is not part of this segment");
468+
}
469+
final Map<String, Supplier<BaseColumnHolder>> groupColumns = clusterGroupColumnsByIndex.computeIfAbsent(
470+
groupIndex,
471+
i -> buildColumnSuppliers(
472+
clusteredBaseSummary.getTimeColumnName(),
473+
groupSpec.getNumRows(),
474+
clusteredBaseSummary.getGroupColumnNames(),
475+
column -> Projections.getClusterGroupSegmentInternalFileName(groupSpec.getClusteringValueIds(), column),
476+
Map.of()
477+
)
478+
);
479+
final Metadata groupMetadata = new Metadata(
480+
null,
481+
null,
482+
null,
483+
clusteredBaseSummary.getEffectiveGranularity(),
484+
false,
485+
clusteredBaseSummary.getGroupOrdering(),
486+
null,
487+
null
488+
);
489+
return new SimpleQueryableIndex(
490+
dataInterval,
491+
new ListIndexed<>(clusteredBaseSummary.getGroupDimensionNames()),
492+
bitmapFactory,
493+
groupColumns,
494+
fileMapper,
495+
groupMetadata,
496+
null
497+
)
498+
{
499+
@Override
500+
public Metadata getMetadata()
501+
{
502+
return groupMetadata;
503+
}
504+
};
505+
}
506+
381507
@Override
382508
public void close()
383509
{
@@ -406,12 +532,34 @@ private Map<String, Supplier<BaseColumnHolder>> buildProjectionColumnSuppliers(
406532
Map<String, Supplier<BaseColumnHolder>> parentColumns
407533
)
408534
{
409-
final String timeColumnName = projectionSpec.getSchema().getTimeColumnName();
535+
return buildColumnSuppliers(
536+
projectionSpec.getSchema().getTimeColumnName(),
537+
projectionSpec.getNumRows(),
538+
projectionSpec.getSchema().getColumnNames(),
539+
column -> Projections.getProjectionSegmentInternalFileName(projectionSpec.getSchema(), column),
540+
parentColumns
541+
);
542+
}
543+
544+
/**
545+
* Shared builder for lazy per-column suppliers. Each supplier is memoized and defers both
546+
* {@link SegmentFileMapper#mapFile} and {@link ColumnDescriptor#read} until the column is actually accessed, so
547+
* queries only trigger downloads for the specific columns they use. {@code fileNameFn} maps a logical column name to
548+
* its segment-internal (smoosh) file name in the right bundle namespace.
549+
*/
550+
private Map<String, Supplier<BaseColumnHolder>> buildColumnSuppliers(
551+
@Nullable String timeColumnName,
552+
int numRows,
553+
List<String> columnNames,
554+
Function<String, String> fileNameFn,
555+
Map<String, Supplier<BaseColumnHolder>> parentColumns
556+
)
557+
{
410558
final boolean renameTime = !ColumnHolder.TIME_COLUMN_NAME.equals(timeColumnName);
411-
final Map<String, Supplier<BaseColumnHolder>> projectionColumns = new LinkedHashMap<>();
559+
final Map<String, Supplier<BaseColumnHolder>> columns = new LinkedHashMap<>();
412560

413-
for (String column : projectionSpec.getSchema().getColumnNames()) {
414-
final String smooshName = Projections.getProjectionSegmentInternalFileName(projectionSpec.getSchema(), column);
561+
for (String column : columnNames) {
562+
final String smooshName = fileNameFn.apply(column);
415563
final ColumnDescriptor columnDescriptor = metadata.getColumnDescriptors().get(smooshName);
416564
if (columnDescriptor == null) {
417565
continue;
@@ -430,21 +578,21 @@ private Map<String, Supplier<BaseColumnHolder>> buildProjectionColumnSuppliers(
430578
}
431579
});
432580

433-
projectionColumns.put(internedColumnName, columnSupplier);
581+
columns.put(internedColumnName, columnSupplier);
434582

435583
if (column.equals(timeColumnName) && renameTime) {
436-
projectionColumns.put(ColumnHolder.TIME_COLUMN_NAME, projectionColumns.get(column));
437-
projectionColumns.remove(column);
584+
columns.put(ColumnHolder.TIME_COLUMN_NAME, columns.get(column));
585+
columns.remove(column);
438586
}
439587
}
440588

441589
if (timeColumnName == null) {
442-
projectionColumns.put(
590+
columns.put(
443591
ColumnHolder.TIME_COLUMN_NAME,
444-
ConstantTimeColumn.makeConstantTimeSupplier(projectionSpec.getNumRows(), dataInterval.getStartMillis())
592+
ConstantTimeColumn.makeConstantTimeSupplier(numRows, dataInterval.getStartMillis())
445593
);
446594
}
447595

448-
return projectionColumns;
596+
return columns;
449597
}
450598
}

0 commit comments

Comments
 (0)