diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java index 2c8de41bb386..811a0d35b8f8 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java @@ -20,6 +20,7 @@ package org.apache.druid.iceberg.input; import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputFormat; import org.apache.druid.error.DruidException; import org.apache.druid.iceberg.filter.IcebergFilter; @@ -28,6 +29,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.logger.Logger; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; @@ -35,27 +37,28 @@ import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.types.Types; import org.joda.time.DateTime; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.List; +import java.util.stream.Collectors; /* * Druid wrapper for an iceberg catalog. * The configured catalog is used to load the specified iceberg table and retrieve the underlying live data files upto the latest snapshot. - * This does not perform any projections on the table yet, therefore all the underlying columns will be retrieved from the data files. + * This applies column projections to read only required columns from the data files, reducing data transfer and memory usage. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = InputFormat.TYPE_PROPERTY) -public abstract class IcebergCatalog -{ +public abstract class IcebergCatalog { public static final String DRUID_DYNAMIC_CONFIG_PROVIDER_KEY = "druid.dynamic.config.provider"; private static final Logger log = new Logger(IcebergCatalog.class); public abstract Catalog retrieveCatalog(); - public boolean isCaseSensitive() - { + public boolean isCaseSensitive() { return true; } @@ -68,16 +71,17 @@ public boolean isCaseSensitive() * @param snapshotTime Datetime that will be used to fetch the most recent snapshot as of this time * @param residualFilterMode Controls how residual filters are handled. When filtering on non-partition * columns, residual rows may be returned that need row-level filtering. + * @param columnsFilter Column filter used to project the table scan. If null, all columns are read. * @return a list of data file paths */ public List extractSnapshotDataFiles( - String tableNamespace, - String tableName, - IcebergFilter icebergFilter, - DateTime snapshotTime, - ResidualFilterMode residualFilterMode - ) - { + String tableNamespace, + String tableName, + IcebergFilter icebergFilter, + DateTime snapshotTime, + ResidualFilterMode residualFilterMode, + @Nullable ColumnsFilter columnsFilter + ) { Catalog catalog = retrieveCatalog(); Namespace namespace = Namespace.of(tableNamespace); String tableIdentifier = tableNamespace + "." + tableName; @@ -88,15 +92,27 @@ public List extractSnapshotDataFiles( try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); TableIdentifier icebergTableIdentifier = catalog.listTables(namespace).stream() - .filter(tableId -> tableId.toString().equals(tableIdentifier)) - .findFirst() - .orElseThrow(() -> new IAE( - " Couldn't retrieve table identifier for '%s'. Please verify that the table exists in the given catalog", - tableIdentifier - )); + .filter(tableId -> tableId.toString().equals(tableIdentifier)) + .findFirst() + .orElseThrow(() -> new IAE( + " Couldn't retrieve table identifier for '%s'. Please verify that the table exists in the given catalog", + tableIdentifier + )); long start = System.currentTimeMillis(); - TableScan tableScan = catalog.loadTable(icebergTableIdentifier).newScan(); + Table table = catalog.loadTable(icebergTableIdentifier); + TableScan tableScan = table.newScan(); + + if (columnsFilter != null) { + List projectedColumns = table + .schema() + .columns() + .stream() + .map(Types.NestedField::name) + .filter(columnsFilter::apply) + .collect(Collectors.toList()); + tableScan = tableScan.select(new ArrayList<>(projectedColumns)); + } if (icebergFilter != null) { tableScan = icebergFilter.filter(tableScan); @@ -124,17 +140,17 @@ public List extractSnapshotDataFiles( // Handle residual filter based on mode if (detectedResidual != null) { String message = StringUtils.format( - "Iceberg filter produced residual expression that requires row-level filtering. " - + "This typically means the filter is on a non-partition column. " - + "Residual rows may be ingested unless filtered by transformSpec. " - + "Residual filter: [%s]", - detectedResidual + "Iceberg filter produced residual expression that requires row-level filtering. " + + "This typically means the filter is on a non-partition column. " + + "Residual rows may be ingested unless filtered by transformSpec. " + + "Residual filter: [%s]", + detectedResidual ); if (residualFilterMode == ResidualFilterMode.FAIL) { throw DruidException.forPersona(DruidException.Persona.DEVELOPER) - .ofCategory(DruidException.Category.RUNTIME_FAILURE) - .build(message); + .ofCategory(DruidException.Category.RUNTIME_FAILURE) + .build(message); } log.warn(message); } diff --git a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java index ccbb10af14dc..d5d3a96ae4ba 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java +++ b/extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; import org.apache.druid.common.config.Configs; +import org.apache.druid.data.input.ColumnsFilter; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.InputRowListPlusRawValues; @@ -114,7 +115,7 @@ public InputSourceReader reader( ) { if (!isLoaded) { - retrieveIcebergDatafiles(); + retrieveIcebergDatafiles(inputRowSchema); } return getDelegateInputSource().reader(inputRowSchema, inputFormat, temporaryDirectory); } @@ -126,7 +127,7 @@ public Stream>> createSplits( ) throws IOException { if (!isLoaded) { - retrieveIcebergDatafiles(); + retrieveIcebergDatafiles(null); } return getDelegateInputSource().createSplits(inputFormat, splitHintSpec); } @@ -135,7 +136,7 @@ public Stream>> createSplits( public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec) throws IOException { if (!isLoaded) { - retrieveIcebergDatafiles(); + retrieveIcebergDatafiles(null); } return getDelegateInputSource().estimateNumSplits(inputFormat, splitHintSpec); } @@ -194,14 +195,21 @@ public SplittableInputSource getDelegateInputSource() return delegateInputSource; } - protected void retrieveIcebergDatafiles() + private ColumnsFilter getColumnsFilter(@Nullable final InputRowSchema inputRowSchema) { - List snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles( + return inputRowSchema == null ? null : inputRowSchema.getColumnsFilter(); + } + + protected void retrieveIcebergDatafiles(@Nullable final InputRowSchema inputRowSchema) + { + final ColumnsFilter columnsFilter = getColumnsFilter(inputRowSchema); + final List snapshotDataFiles = icebergCatalog.extractSnapshotDataFiles( getNamespace(), getTableName(), getIcebergFilter(), getSnapshotTime(), - getResidualFilterMode() + getResidualFilterMode(), + columnsFilter ); if (snapshotDataFiles.isEmpty()) { delegateInputSource = new EmptyInputSource(); diff --git a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java index 93d7412cc77a..2667743e5b83 100644 --- a/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java +++ b/extensions-contrib/druid-iceberg-extensions/src/test/java/org/apache/druid/iceberg/input/IcebergInputSourceTest.java @@ -21,10 +21,16 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.druid.data.input.ColumnsFilter; +import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.InputSplit; import org.apache.druid.data.input.MaxSizeSplitHintSpec; +import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.LocalInputSource; import org.apache.druid.data.input.impl.LocalInputSourceFactory; +import org.apache.druid.data.input.impl.StringDimensionSchema; +import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.error.DruidException; import org.apache.druid.iceberg.filter.IcebergEqualsFilter; import org.apache.druid.java.util.common.DateTimes; @@ -55,6 +61,7 @@ import java.io.File; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -126,6 +133,91 @@ public void testInputSource() throws IOException } } + // Reads the table without InputRowSchema and then once per column using InputRowSchema column filters. + @Test + public void testReadTableWithAndWithoutInputRowSchema() throws IOException + { + // Read the table with no InputRowSchema to preserve the legacy full-table scan behavior. + final IcebergInputSource inputSourceWithoutSchema = new IcebergInputSource( + TABLENAME, + NAMESPACE, + null, + testCatalog, + new LocalInputSourceFactory(), + null, + null + ); + + inputSourceWithoutSchema.retrieveIcebergDatafiles(null); + + final List fullScanFiles = ((LocalInputSource) inputSourceWithoutSchema.getDelegateInputSource()) + .getFiles(); + Assert.assertEquals(1, fullScanFiles.size()); + + try (final CloseableIterable datafileReader = Parquet.read(Files.localInput(fullScanFiles.get(0))) + .project(tableSchema) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader( + tableSchema, + fileSchema + )) + .build()) { + for (Record record : datafileReader) { + Assert.assertEquals(tableData.get("id"), record.get(0)); + Assert.assertEquals(tableData.get("name"), record.get(1)); + } + } + + // Read the table once per column using InputRowSchema column filters to verify single-column projections. + for (final Types.NestedField column : tableSchema.columns()) { + final IcebergInputSource inputSourceWithSchema = new IcebergInputSource( + TABLENAME, + NAMESPACE, + null, + testCatalog, + new LocalInputSourceFactory(), + null, + null + ); + final InputRowSchema inputRowSchema = createInputRowSchema( + ColumnsFilter.inclusionBased(ImmutableSet.of(column.name())) + ); + + inputSourceWithSchema.retrieveIcebergDatafiles(inputRowSchema); + + final List projectedFiles = ((LocalInputSource) inputSourceWithSchema.getDelegateInputSource()) + .getFiles(); + Assert.assertEquals(1, projectedFiles.size()); + + final Schema projectedSchema = new Schema(column); + try (final CloseableIterable datafileReader = Parquet.read(Files.localInput(projectedFiles.get(0))) + .project(projectedSchema) + .createReaderFunc(fileSchema -> GenericParquetReaders.buildReader( + projectedSchema, + fileSchema + )) + .build()) { + for (Record record : datafileReader) { + Assert.assertEquals(1, record.size()); + Assert.assertEquals(tableData.get(column.name()), record.get(0)); + } + } + } + } + + private InputRowSchema createInputRowSchema(final ColumnsFilter columnsFilter) + { + return new InputRowSchema( + new TimestampSpec("id", "auto", null), + new DimensionsSpec( + ImmutableList.of( + new StringDimensionSchema("id"), + new StringDimensionSchema("name") + ) + ), + columnsFilter + ); + } + @Test public void testInputSourceWithEmptySource() throws IOException {