From 6c7b027213a17aba41e7fb5b6dbaeb1e94215ec4 Mon Sep 17 00:00:00 2001 From: Uros Stojkovic Date: Wed, 15 Oct 2025 09:15:24 +0000 Subject: [PATCH] skip _metadata struct fields on data skipping Signed-off-by: Uros Stojkovic --- .../sql/delta/stats/DataSkippingReader.scala | 6 ++++- .../delta/stats/DataSkippingDeltaTests.scala | 23 +++++++++++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala index 008f86d291c..3149d90a935 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala @@ -1316,8 +1316,12 @@ trait DataSkippingReaderBase // For data skipping, avoid using the filters that either: // 1. involve subqueries. // 2. are non-deterministic. + // 3. involve file metadata struct fields var (ineligibleFilters, eligibleFilters) = filters.partition { - case f => containsSubquery(f) || !f.deterministic + case f => containsSubquery(f) || !f.deterministic || f.exists { + case MetadataAttribute(_) => true + case _ => false + } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala b/spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala index 0c28ad210cb..956cf9317cc 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/stats/DataSkippingDeltaTests.scala @@ -2091,6 +2091,29 @@ trait DataSkippingDeltaTestsBase extends DeltaExcludedBySparkVersionTestMixinShi } } + test("Data skipping handles aliasing for _metadata fields") { + withTable("t") { + // Create table with BIGINT file_name column + sql("create or replace table t(file_name BIGINT) using delta") + sql("insert into t values (1), (2), (3)") + sql("insert into t values (4), (5), (6)") + val (fileName, fileCount) = { + val dataFilesDF = sql("select distinct _metadata.file_name from t") + (dataFilesDF.first().getString(0), dataFilesDF.count()) + } + // Filter rows by _metadata.file_name + val df = sql(s"select * from t where _metadata.file_name = '$fileName'") + // Verify the predicate is not used for data skipping + val predicates = df.queryExecution.optimizedPlan.collect { + case Filter(condition, _) => condition + }.flatMap(splitConjunctivePredicates) + val scanResult = DeltaLog.forTable(spark, TableIdentifier("t")).update() + .filesForScan(predicates) + assert(scanResult.unusedFilters.nonEmpty, + "Expected predicate to be ineligible for data skipping") + } + } + protected def parse(deltaLog: DeltaLog, predicate: String): Seq[Expression] = super.parse(spark, deltaLog, predicate)