Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.delta.kernel.spark.read;

import io.delta.kernel.internal.actions.AddFile;
import org.apache.spark.sql.delta.sources.AdmittableFile;

/**
* Java version of IndexedFile.scala that uses Kernel's action classes.
Expand All @@ -24,7 +25,7 @@
*
* <p>Indexed: refers to the index in DeltaSourceOffset, assigned by the streaming engine.
*/
public class IndexedFile {
public class IndexedFile implements AdmittableFile {
private final long version;
private final long index;
private final AddFile addFile;
Expand All @@ -47,6 +48,16 @@ public AddFile getAddFile() {
return addFile;
}

@Override
public boolean hasFileAction() {
return addFile != null;
}

@Override
public long getFileSize() {
return addFile.getSize();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: null check?

}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.spark.sql.connector.read.streaming.MicroBatchStream;
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.delta.DeltaErrors;
import org.apache.spark.sql.delta.sources.DeltaSource;
import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
import scala.Option;

Expand Down Expand Up @@ -101,6 +102,37 @@ public void stop() {
// getFileChanges //
////////////////////

/**
* Get file changes with rate limiting applied. Mimics DeltaSource.getFileChangesWithRateLimit.
*
* @param fromVersion The starting version (exclusive with fromIndex)
* @param fromIndex The starting index within fromVersion (exclusive)
* @param isInitialSnapshot Whether this is the initial snapshot
* @param limits Rate limits to apply (Option.empty for no limits)
* @return An iterator of IndexedFile with rate limiting applied
*/
CloseableIterator<IndexedFile> getFileChangesWithRateLimit(
long fromVersion,
long fromIndex,
boolean isInitialSnapshot,
Option<DeltaSource.AdmissionLimits> limits) {
// TODO(#5319): getFileChangesForCDC if CDC is enabled.

CloseableIterator<IndexedFile> changes =
getFileChanges(fromVersion, fromIndex, isInitialSnapshot, /*endOffset=*/ Option.empty());

// Take each change until we've seen the configured number of addFiles. Some changes don't
// represent file additions; we retain them for offset tracking, but they don't count toward
// the maxFilesPerTrigger conf.
if (limits.isDefined()) {
DeltaSource.AdmissionLimits admissionControl = limits.get();
changes = changes.takeWhile(admissionControl::admit);
}

// TODO(#5318): Stop at schema change barriers
return changes;
}

/**
* Get file changes between fromVersion/fromIndex and endOffset. This is the Kernel-based
* implementation of DeltaSource.getFileChanges.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.spark.sql.connector.read.streaming.Offset;
import org.apache.spark.sql.delta.DeltaLog;
import org.apache.spark.sql.delta.DeltaOptions;
import org.apache.spark.sql.delta.sources.DeltaSource;
import org.apache.spark.sql.delta.sources.DeltaSourceOffset;
import org.apache.spark.sql.delta.storage.ClosableIterator;
import org.junit.jupiter.api.BeforeEach;
Expand Down Expand Up @@ -138,21 +139,11 @@ public void testGetFileChanges(

// Create 5 versions of data (versions 1-5, version 0 is the CREATE TABLE)
// Insert 100 rows per commit to potentially trigger multiple batches
for (int i = 0; i < 5; i++) {
StringBuilder insertValues = new StringBuilder();
for (int j = 0; j < 100; j++) {
if (j > 0) insertValues.append(", ");
int id = i * 100 + j;
insertValues.append(String.format("(%d, 'User%d')", id, id));
}
sql("INSERT INTO %s VALUES %s", testTableName, insertValues.toString());
}
SparkMicroBatchStream stream = new SparkMicroBatchStream(testTablePath, new Configuration());
insertVersions(testTableName, /* numVersions= */ 5, /* rowsPerVersion= */ 100);

// dsv1 DeltaSource
DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
org.apache.spark.sql.delta.sources.DeltaSource deltaSource =
createDeltaSource(deltaLog, testTablePath);
DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath);

scala.Option<DeltaSourceOffset> scalaEndOffset = scala.Option.empty();
if (endVersion.isPresent()) {
Expand All @@ -176,6 +167,7 @@ public void testGetFileChanges(
deltaChanges.close();

// dsv2 SparkMicroBatchStream
SparkMicroBatchStream stream = new SparkMicroBatchStream(testTablePath, new Configuration());
Option<DeltaSourceOffset> endOffsetOption = scalaEndOffset;
try (CloseableIterator<IndexedFile> kernelChanges =
stream.getFileChanges(fromVersion, fromIndex, isInitialSnapshot, endOffsetOption)) {
Expand Down Expand Up @@ -237,6 +229,90 @@ private static Stream<Arguments> getFileChangesParameters() {
2L, 50L, notInitialSnapshot, Optional.of(2L), Optional.of(40L), "Empty Range"));
}

// ================================================================================================
// Tests for getFileChangesWithRateLimit parity between DSv1 and DSv2
// ================================================================================================

/**
* Test that verifies parity between DSv1 DeltaSource.getFileChangesWithRateLimit and DSv2
* SparkMicroBatchStream.getFileChangesWithRateLimit.
*/
@ParameterizedTest
@MethodSource("getFileChangesWithRateLimitParameters")
public void testGetFileChangesWithRateLimit(
Optional<Integer> maxFiles,
Optional<Long> maxBytes,
String testDescription,
@TempDir File tempDir)
throws Exception {
String testTablePath = tempDir.getAbsolutePath();
String testTableName =
"test_rate_limit_" + Math.abs(testDescription.hashCode()) + "_" + System.nanoTime();
createEmptyTestTable(testTablePath, testTableName);

// Create 5 versions with 10 rows each (versions 1-5)
insertVersions(testTableName, /* numVersions= */ 5, /* rowsPerVersion= */ 10);

// dsv1 DeltaSource
DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath);
DeltaOptions options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf());

scala.Option<Object> scalaMaxFiles =
maxFiles.isPresent() ? scala.Option.apply(maxFiles.get()) : scala.Option.empty();
scala.Option<Object> scalaMaxBytes =
maxBytes.isPresent() ? scala.Option.apply(maxBytes.get()) : scala.Option.empty();
Option<DeltaSource.AdmissionLimits> dsv1Limits =
deltaSource.createAdmissionLimits(scalaMaxFiles, scalaMaxBytes);

ClosableIterator<org.apache.spark.sql.delta.sources.IndexedFile> deltaChanges =
deltaSource.getFileChangesWithRateLimit(
/*fromVersion=*/ 0L,
/* fromIndex=*/ DeltaSourceOffset.BASE_INDEX(),
/* isInitialSnapshot=*/ false,
dsv1Limits);
List<org.apache.spark.sql.delta.sources.IndexedFile> deltaFilesList = new ArrayList<>();
while (deltaChanges.hasNext()) {
deltaFilesList.add(deltaChanges.next());
}
deltaChanges.close();

// dsv2 SparkMicroBatchStream
SparkMicroBatchStream stream = new SparkMicroBatchStream(testTablePath, new Configuration());
// We need a separate AdmissionLimits object for DSv2 because the method is stateful.
scala.Option<DeltaSource.AdmissionLimits> dsv2Limits =
deltaSource.createAdmissionLimits(scalaMaxFiles, scalaMaxBytes);

try (CloseableIterator<IndexedFile> kernelChanges =
stream.getFileChangesWithRateLimit(
/*fromVersion=*/ 0L,
/* fromIndex=*/ DeltaSourceOffset.BASE_INDEX(),
/* isInitialSnapshot=*/ false,
dsv2Limits)) {
List<IndexedFile> kernelFilesList = new ArrayList<>();
while (kernelChanges.hasNext()) {
kernelFilesList.add(kernelChanges.next());
}
compareFileChanges(deltaFilesList, kernelFilesList);
}
}

/** Provides test parameters for the parameterized getFileChangesWithRateLimit test. */
private static Stream<Arguments> getFileChangesWithRateLimitParameters() {
Optional<Integer> noMaxFiles = Optional.empty();
Optional<Long> noMaxBytes = Optional.empty();

return Stream.of(
// No rate limits
Arguments.of(noMaxFiles, noMaxBytes, "No limits"),
// MaxFiles only
Arguments.of(Optional.of(5), noMaxBytes, "MaxFiles"),
// MaxBytes only
Arguments.of(noMaxFiles, Optional.of(5000L), "MaxBytes"),
// Both limits
Arguments.of(Optional.of(10), Optional.of(10000L), "MaxFiles and MaxBytes"));
}

private void compareFileChanges(
List<org.apache.spark.sql.delta.sources.IndexedFile> deltaSourceFiles,
List<IndexedFile> kernelFiles) {
Expand Down Expand Up @@ -312,8 +388,7 @@ public void testGetFileChanges_EmptyVersions(

// Test DSv1 DeltaSource
DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
org.apache.spark.sql.delta.sources.DeltaSource deltaSource =
createDeltaSource(deltaLog, testTablePath);
DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath);

ClosableIterator<org.apache.spark.sql.delta.sources.IndexedFile> deltaChanges =
deltaSource.getFileChanges(
Expand Down Expand Up @@ -390,8 +465,7 @@ public void testGetFileChanges_OnRemoveFile_throwError(

// Test DSv1 DeltaSource
DeltaLog deltaLog = DeltaLog.forTable(spark, new Path(testTablePath));
org.apache.spark.sql.delta.sources.DeltaSource deltaSource =
createDeltaSource(deltaLog, testTablePath);
DeltaSource deltaSource = createDeltaSource(deltaLog, testTablePath);

UnsupportedOperationException dsv1Exception =
assertThrows(
Expand Down Expand Up @@ -535,17 +609,35 @@ private static void sql(String query, Object... args) {
SparkDsv2TestBase.spark.sql(String.format(query, args));
}

/**
* Helper method to insert multiple versions of data into a test table.
*
* @param tableName The name of the table to insert into
* @param numVersions The number of versions (commits) to create
* @param rowsPerVersion The number of rows to insert per version
*/
private void insertVersions(String tableName, int numVersions, int rowsPerVersion) {
for (int i = 0; i < numVersions; i++) {
StringBuilder values = new StringBuilder();
for (int j = 0; j < rowsPerVersion; j++) {
if (j > 0) values.append(", ");
int id = i * rowsPerVersion + j;
values.append(String.format("(%d, 'User%d')", id, id));
}
sql("INSERT INTO %s VALUES %s", tableName, values.toString());
}
}

/** Helper method to create a DeltaSource instance for testing. */
private org.apache.spark.sql.delta.sources.DeltaSource createDeltaSource(
DeltaLog deltaLog, String tablePath) {
private DeltaSource createDeltaSource(DeltaLog deltaLog, String tablePath) {
DeltaOptions options = new DeltaOptions(Map$.MODULE$.empty(), spark.sessionState().conf());
scala.collection.immutable.Seq<org.apache.spark.sql.catalyst.expressions.Expression> emptySeq =
scala.collection.JavaConverters.asScalaBuffer(
new java.util.ArrayList<org.apache.spark.sql.catalyst.expressions.Expression>())
.toList();
org.apache.spark.sql.delta.Snapshot snapshot =
deltaLog.update(false, Option.empty(), Option.empty());
return new org.apache.spark.sql.delta.sources.DeltaSource(
return new DeltaSource(
spark,
deltaLog,
/* catalogTableOpt= */ Option.empty(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.delta.sources;

/**
* Interface for files that can be admitted by admission control in Delta streaming sources.
* This abstraction allows both DSv1 and DSv2 IndexedFile implementations to be used with
* the admission control logic.
*/
public interface AdmittableFile {
/**
* Returns true if this file has an associated file action (AddFile, RemoveFile, or CDCFile).
* Placeholder IndexedFiles with no file action will return false.
*/
boolean hasFileAction();

/**
* Returns the size of the file in bytes.
* This method should only be called when hasFileAction() returns true.
*/
long getFileSize();
}
Loading
Loading