diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/IndexedFile.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/IndexedFile.java index 49471d196ba..1f7e21b04ad 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/IndexedFile.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/IndexedFile.java @@ -16,6 +16,7 @@ package io.delta.kernel.spark.read; import io.delta.kernel.internal.actions.AddFile; +import org.apache.spark.sql.delta.sources.AdmittableFile; /** * Java version of IndexedFile.scala that uses Kernel's action classes. @@ -24,7 +25,7 @@ * *

Indexed: refers to the index in DeltaSourceOffset, assigned by the streaming engine. */ -public class IndexedFile { +public class IndexedFile implements AdmittableFile { private final long version; private final long index; private final AddFile addFile; @@ -47,6 +48,16 @@ public AddFile getAddFile() { return addFile; } + @Override + public boolean hasFileAction() { + return addFile != null; + } + + @Override + public long getFileSize() { + return addFile.getSize(); + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); diff --git a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java index a8b85db29d1..83fbd6d8b54 100644 --- a/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java +++ b/kernel-spark/src/main/java/io/delta/kernel/spark/read/SparkMicroBatchStream.java @@ -33,6 +33,7 @@ import org.apache.spark.sql.connector.read.streaming.MicroBatchStream; import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.delta.DeltaErrors; +import org.apache.spark.sql.delta.sources.DeltaSource; import org.apache.spark.sql.delta.sources.DeltaSourceOffset; import scala.Option; @@ -101,6 +102,37 @@ public void stop() { // getFileChanges // //////////////////// + /** + * Get file changes with rate limiting applied. Mimics DeltaSource.getFileChangesWithRateLimit. + * + * @param fromVersion The starting version (exclusive with fromIndex) + * @param fromIndex The starting index within fromVersion (exclusive) + * @param isInitialSnapshot Whether this is the initial snapshot + * @param limits Rate limits to apply (Option.empty for no limits) + * @return An iterator of IndexedFile with rate limiting applied + */ + CloseableIterator getFileChangesWithRateLimit( + long fromVersion, + long fromIndex, + boolean isInitialSnapshot, + Option limits) { + // TODO(#5319): getFileChangesForCDC if CDC is enabled. + + CloseableIterator changes = + getFileChanges(fromVersion, fromIndex, isInitialSnapshot, /*endOffset=*/ Option.empty()); + + // Take each change until we've seen the configured number of addFiles. Some changes don't + // represent file additions; we retain them for offset tracking, but they don't count toward + // the maxFilesPerTrigger conf. + if (limits.isDefined()) { + DeltaSource.AdmissionLimits admissionControl = limits.get(); + changes = changes.takeWhile(admissionControl::admit); + } + + // TODO(#5318): Stop at schema change barriers + return changes; + } + /** * Get file changes between fromVersion/fromIndex and endOffset. This is the Kernel-based * implementation of DeltaSource.getFileChanges. diff --git a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java index 6cc21158141..cd2d5570d35 100644 --- a/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java +++ b/kernel-spark/src/test/java/io/delta/kernel/spark/read/SparkMicroBatchStreamTest.java @@ -31,6 +31,7 @@ import org.apache.spark.sql.connector.read.streaming.Offset; import org.apache.spark.sql.delta.DeltaLog; import org.apache.spark.sql.delta.DeltaOptions; +import org.apache.spark.sql.delta.sources.DeltaSource; import org.apache.spark.sql.delta.sources.DeltaSourceOffset; import org.apache.spark.sql.delta.storage.ClosableIterator; import org.junit.jupiter.api.BeforeEach; @@ -138,21 +139,11 @@ public void testGetFileChanges( // Create 5 versions of data (versions 1-5, version 0 is the CREATE TABLE) // Insert 100 rows per commit to potentially trigger multiple batches - for (int i = 0; i < 5; i++) { - StringBuilder insertValues = new StringBuilder(); - for (int j = 0; j < 100; j++) { - if (j > 0) insertValues.append(", "); - int id = i * 100 + j; - insertValues.append(String.format("(%d, 'User%d')", id, id)); - } - sql("INSERT INTO %s VALUES %s", testTableName, insertValues.toString()); - } - SparkMicroBatchStream stream = new SparkMicroBatchStream(testTablePath, new Configuration()); + insertVersions(testTableName, /* numVersions= */ 5, /* rowsPerVersion= */ 100); // dsv1 DeltaSource DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); - org.apache.spark.sql.delta.sources.DeltaSource deltaSource = - createDeltaSource(deltaLog, testTablePath); + DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath); scala.Option scalaEndOffset = scala.Option.empty(); if (endVersion.isPresent()) { @@ -176,6 +167,7 @@ public void testGetFileChanges( deltaChanges.close(); // dsv2 SparkMicroBatchStream + SparkMicroBatchStream stream = new SparkMicroBatchStream(testTablePath, new Configuration()); Option endOffsetOption = scalaEndOffset; try (CloseableIterator kernelChanges = stream.getFileChanges(fromVersion, fromIndex, isInitialSnapshot, endOffsetOption)) { @@ -237,6 +229,90 @@ private static Stream getFileChangesParameters() { 2L, 50L, notInitialSnapshot, Optional.of(2L), Optional.of(40L), "Empty Range")); } + // ================================================================================================ + // Tests for getFileChangesWithRateLimit parity between DSv1 and DSv2 + // ================================================================================================ + + /** + * Test that verifies parity between DSv1 DeltaSource.getFileChangesWithRateLimit and DSv2 + * SparkMicroBatchStream.getFileChangesWithRateLimit. + */ + @ParameterizedTest + @MethodSource("getFileChangesWithRateLimitParameters") + public void testGetFileChangesWithRateLimit( + Optional maxFiles, + Optional maxBytes, + String testDescription, + @TempDir File tempDir) + throws Exception { + String testTablePath = tempDir.getAbsolutePath(); + String testTableName = + "test_rate_limit_" + Math.abs(testDescription.hashCode()) + "_" + System.nanoTime(); + createEmptyTestTable(testTablePath, testTableName); + + // Create 5 versions with 10 rows each (versions 1-5) + insertVersions(testTableName, /* numVersions= */ 5, /* rowsPerVersion= */ 10); + + // dsv1 DeltaSource + DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); + DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath); + DeltaOptions options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf()); + + scala.Option scalaMaxFiles = + maxFiles.isPresent() ? scala.Option.apply(maxFiles.get()) : scala.Option.empty(); + scala.Option scalaMaxBytes = + maxBytes.isPresent() ? scala.Option.apply(maxBytes.get()) : scala.Option.empty(); + Option dsv1Limits = + deltaSource.createAdmissionLimits(scalaMaxFiles, scalaMaxBytes); + + ClosableIterator deltaChanges = + deltaSource.getFileChangesWithRateLimit( + /*fromVersion=*/ 0L, + /* fromIndex=*/ DeltaSourceOffset.BASE_INDEX(), + /* isInitialSnapshot=*/ false, + dsv1Limits); + List deltaFilesList = new ArrayList<>(); + while (deltaChanges.hasNext()) { + deltaFilesList.add(deltaChanges.next()); + } + deltaChanges.close(); + + // dsv2 SparkMicroBatchStream + SparkMicroBatchStream stream = new SparkMicroBatchStream(testTablePath, new Configuration()); + // We need a separate AdmissionLimits object for DSv2 because the method is stateful. + scala.Option dsv2Limits = + deltaSource.createAdmissionLimits(scalaMaxFiles, scalaMaxBytes); + + try (CloseableIterator kernelChanges = + stream.getFileChangesWithRateLimit( + /*fromVersion=*/ 0L, + /* fromIndex=*/ DeltaSourceOffset.BASE_INDEX(), + /* isInitialSnapshot=*/ false, + dsv2Limits)) { + List kernelFilesList = new ArrayList<>(); + while (kernelChanges.hasNext()) { + kernelFilesList.add(kernelChanges.next()); + } + compareFileChanges(deltaFilesList, kernelFilesList); + } + } + + /** Provides test parameters for the parameterized getFileChangesWithRateLimit test. */ + private static Stream getFileChangesWithRateLimitParameters() { + Optional noMaxFiles = Optional.empty(); + Optional noMaxBytes = Optional.empty(); + + return Stream.of( + // No rate limits + Arguments.of(noMaxFiles, noMaxBytes, "No limits"), + // MaxFiles only + Arguments.of(Optional.of(5), noMaxBytes, "MaxFiles"), + // MaxBytes only + Arguments.of(noMaxFiles, Optional.of(5000L), "MaxBytes"), + // Both limits + Arguments.of(Optional.of(10), Optional.of(10000L), "MaxFiles and MaxBytes")); + } + private void compareFileChanges( List deltaSourceFiles, List kernelFiles) { @@ -312,8 +388,7 @@ public void testGetFileChanges_EmptyVersions( // Test DSv1 DeltaSource DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); - org.apache.spark.sql.delta.sources.DeltaSource deltaSource = - createDeltaSource(deltaLog, testTablePath); + DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath); ClosableIterator deltaChanges = deltaSource.getFileChanges( @@ -390,8 +465,7 @@ public void testGetFileChanges_OnRemoveFile_throwError( // Test DSv1 DeltaSource DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath)); - org.apache.spark.sql.delta.sources.DeltaSource deltaSource = - createDeltaSource(deltaLog, testTablePath); + DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath); UnsupportedOperationException dsv1Exception = assertThrows( @@ -535,9 +609,27 @@ private static void sql(String query, Object... args) { SparkDsv2TestBase.spark.sql(String.format(query, args)); } + /** + * Helper method to insert multiple versions of data into a test table. + * + * @param tableName The name of the table to insert into + * @param numVersions The number of versions (commits) to create + * @param rowsPerVersion The number of rows to insert per version + */ + private void insertVersions(String tableName, int numVersions, int rowsPerVersion) { + for (int i = 0; i < numVersions; i++) { + StringBuilder values = new StringBuilder(); + for (int j = 0; j < rowsPerVersion; j++) { + if (j > 0) values.append(", "); + int id = i * rowsPerVersion + j; + values.append(String.format("(%d, 'User%d')", id, id)); + } + sql("INSERT INTO %s VALUES %s", tableName, values.toString()); + } + } + /** Helper method to create a DeltaSource instance for testing. */ - private org.apache.spark.sql.delta.sources.DeltaSource createDeltaSource( - DeltaLog deltaLog, String tablePath) { + private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath) { DeltaOptions options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf()); scala.collection.immutable.Seq emptySeq = scala.collection.JavaConverters.asScalaBuffer( @@ -545,7 +637,7 @@ private org.apache.spark.sql.delta.sources.DeltaSource createDeltaSource( .toList(); org.apache.spark.sql.delta.Snapshot snapshot = deltaLog.update(false, Option.empty(), Option.empty()); - return new org.apache.spark.sql.delta.sources.DeltaSource( + return new DeltaSource( spark, deltaLog, /* catalogTableOpt= */ Option.empty(), diff --git a/spark/src/main/java/org/apache/spark/sql/delta/sources/AdmittableFile.java b/spark/src/main/java/org/apache/spark/sql/delta/sources/AdmittableFile.java new file mode 100644 index 00000000000..3d0075295fa --- /dev/null +++ b/spark/src/main/java/org/apache/spark/sql/delta/sources/AdmittableFile.java @@ -0,0 +1,36 @@ +/* + * Copyright (2021) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.delta.sources; + +/** + * Interface for files that can be admitted by admission control in Delta streaming sources. + * This abstraction allows both DSv1 and DSv2 IndexedFile implementations to be used with + * the admission control logic. + */ +public interface AdmittableFile { + /** + * Returns true if this file has an associated file action (AddFile, RemoveFile, or CDCFile). + * Placeholder IndexedFiles with no file action will return false. + */ + boolean hasFileAction(); + + /** + * Returns the size of the file in bytes. + * This method should only be called when hasFileAction() returns true. + */ + long getFileSize(); +} diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala index 735a058d1f6..baa6ed7afe7 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSource.scala @@ -76,7 +76,7 @@ private[delta] case class IndexedFile( add: AddFile, remove: RemoveFile = null, cdc: AddCDCFile = null, - shouldSkip: Boolean = false) { + shouldSkip: Boolean = false) extends AdmittableFile { require(Option(add).size + Option(remove).size + Option(cdc).size <= 1, "IndexedFile must have at most one of add, remove, or cdc") @@ -91,11 +91,11 @@ private[delta] case class IndexedFile( } } - def hasFileAction: Boolean = { + override def hasFileAction(): Boolean = { getFileAction != null } - def getFileSize: Long = { + override def getFileSize(): Long = { if (add != null) { add.size } else if (remove != null) { @@ -1279,7 +1279,27 @@ case class DeltaSource( override def toString(): String = s"DeltaSource[${deltaLog.dataPath}]" - trait DeltaSourceAdmissionBase { self: AdmissionLimits => + /** + * Helper method for Java callers to create AdmissionLimits. + * Returns None if both maxFiles and maxBytes are empty/None. + */ + def createAdmissionLimits( + maxFiles: Option[Int], + maxBytes: Option[Long]): Option[AdmissionLimits] = { + if (maxFiles.isEmpty && maxBytes.isEmpty) { + None + } else { + Some(new AdmissionLimits(maxFiles, maxBytes.getOrElse(Long.MaxValue))) + } + } + + /** + * Class that helps controlling how much data should be processed by a single micro-batch. + */ + case class AdmissionLimits( + maxFiles: Option[Int] = options.maxFilesPerTrigger, + var bytesToTake: Long = options.maxBytesPerTrigger.getOrElse(Long.MaxValue) + ) { // This variable indicates whether a commit has already been processed by a batch or not. var commitProcessedInBatch = false @@ -1292,9 +1312,9 @@ case class DeltaSource( * This overloaded method checks if all the FileActions for a commit can be accommodated by * the rate limit. */ - def admit(indexedFiles: Seq[IndexedFile]): Boolean = { - def getSize(actions: Seq[IndexedFile]): Long = { - actions.filter(_.hasFileAction).foldLeft(0L) { (l, r) => l + r.getFileAction.getFileSize } + def admit(indexedFiles: Seq[AdmittableFile]): Boolean = { + def getSize(actions: Seq[AdmittableFile]): Long = { + actions.filter(_.hasFileAction).foldLeft(0L) { (l, r) => l + r.getFileSize } } if (indexedFiles.isEmpty) { true @@ -1315,7 +1335,7 @@ case class DeltaSource( * Whether to admit the next file. Dummy IndexedFile entries with no attached file action are * always admitted. */ - def admit(indexedFile: IndexedFile): Boolean = { + def admit(indexedFile: AdmittableFile): Boolean = { commitProcessedInBatch = true if (!indexedFile.hasFileAction) { @@ -1328,7 +1348,7 @@ case class DeltaSource( // will even admit a file when it is larger than the remaining capacity, and that we will // admit at least one file. val shouldAdmit = hasCapacity - take(files = 1, bytes = indexedFile.getFileAction.getFileSize) + take(files = 1, bytes = indexedFile.getFileSize) shouldAdmit } @@ -1337,16 +1357,6 @@ case class DeltaSource( filesToTake > 0 && bytesToTake > 0 } - } - - /** - * Class that helps controlling how much data should be processed by a single micro-batch. - */ - case class AdmissionLimits( - maxFiles: Option[Int] = options.maxFilesPerTrigger, - var bytesToTake: Long = options.maxBytesPerTrigger.getOrElse(Long.MaxValue) - ) extends DeltaSourceAdmissionBase { - var filesToTake = maxFiles.getOrElse { if (options.maxBytesPerTrigger.isEmpty) { DeltaOptions.MAX_FILES_PER_TRIGGER_OPTION_DEFAULT