Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
import static io.delta.kernel.spark.utils.ScalaUtils.toScalaMap;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.Snapshot;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.spark.read.SparkScanBuilder;
import io.delta.kernel.spark.snapshot.PathBasedSnapshotManager;
import io.delta.kernel.spark.snapshot.SnapshotManager;
import io.delta.kernel.spark.utils.SchemaUtils;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
Expand All @@ -40,10 +43,8 @@ public class SparkTable implements Table, SupportsRead {
EnumSet.of(TableCapability.BATCH_READ, TableCapability.MICRO_BATCH_READ));

private final Identifier identifier;
private final String tablePath;
private final Map<String, String> options;
// TODO: [delta-io/delta#5029] Add getProperties() in snapshot to avoid using Impl class.
private final SnapshotImpl snapshot;
private final SnapshotManager snapshotManager;
private final Configuration hadoopConf;

private final StructType schema;
Expand All @@ -54,12 +55,12 @@ public class SparkTable implements Table, SupportsRead {
private final Transform[] partitionTransforms;

/**
* Creates a SparkTable backed by a Delta Kernel snapshot and initializes Spark-facing metadata
* (schemas, partitioning, capabilities).
* Creates a SparkTable backed by a Delta Kernel snapshot manager and initializes Spark-facing
* metadata (schemas, partitioning, capabilities).
*
* <p>Side effects: - Loads the latest snapshot for the given tablePath. - Builds Hadoop
* configuration from options for subsequent I/O. - Derives data schema, partition schema, and
* full table schema from the snapshot.
* <p>Side effects: - Initializes a SnapshotManager for the given tablePath. - Loads the latest
* snapshot via the manager. - Builds Hadoop configuration from options for subsequent I/O. -
* Derives data schema, partition schema, and full table schema from the snapshot.
*
* <p>Notes: - Partition column order from the snapshot is preserved for partitioning and appended
* after data columns in the public Spark schema, per Spark conventions. - Read-time scan options
Expand All @@ -72,14 +73,13 @@ public class SparkTable implements Table, SupportsRead {
*/
public SparkTable(Identifier identifier, String tablePath, Map<String, String> options) {
this.identifier = requireNonNull(identifier, "identifier is null");
this.tablePath = requireNonNull(tablePath, "snapshot is null");
this.options = options;
this.hadoopConf =
SparkSession.active().sessionState().newHadoopConfWithOptions(toScalaMap(options));
this.snapshot =
(SnapshotImpl)
io.delta.kernel.TableManager.loadSnapshot(tablePath)
.build(io.delta.kernel.defaults.engine.DefaultEngine.create(hadoopConf));
this.snapshotManager = new PathBasedSnapshotManager(tablePath, hadoopConf);

// Load the initial snapshot through the manager
Snapshot snapshot = snapshotManager.loadLatestSnapshot();

this.schema = SchemaUtils.convertKernelSchemaToSparkSchema(snapshot.getSchema());
this.partColNames =
Expand Down Expand Up @@ -154,7 +154,7 @@ public Transform[] partitioning() {

@Override
public Map<String, String> properties() {
Map<String, String> props = new HashMap<>(snapshot.getMetadata().getConfiguration());
Map<String, String> props = new HashMap<>(snapshotManager.getMetadata().getConfiguration());
props.putAll(this.options);
return Collections.unmodifiableMap(props);
}
Expand All @@ -169,7 +169,10 @@ public ScanBuilder newScanBuilder(CaseInsensitiveStringMap scanOptions) {
Map<String, String> combined = new HashMap<>(this.options);
combined.putAll(scanOptions.asCaseSensitiveMap());
CaseInsensitiveStringMap merged = new CaseInsensitiveStringMap(combined);
return new SparkScanBuilder(name(), tablePath, dataSchema, partitionSchema, snapshot, merged);
// Cast to SnapshotImpl is required because SparkScanBuilder needs SnapshotImpl
SnapshotImpl snapshot = (SnapshotImpl) snapshotManager.unsafeVolatileSnapshot();
return new SparkScanBuilder(
name(), snapshotManager, dataSchema, partitionSchema, merged);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
import scala.collection.JavaConverters;

public class SparkBatch implements Batch {
private final String tablePath;
private final StructType readDataSchema;
private final StructType dataSchema;
private final StructType partitionSchema;
Expand All @@ -55,7 +54,6 @@ public class SparkBatch implements Batch {
private final List<PartitionedFile> partitionedFiles;

public SparkBatch(
String tablePath,
StructType dataSchema,
StructType partitionSchema,
StructType readDataSchema,
Expand All @@ -66,7 +64,6 @@ public SparkBatch(
scala.collection.immutable.Map<String, String> scalaOptions,
Configuration hadoopConf) {

this.tablePath = Objects.requireNonNull(tablePath, "tableName is null");
this.dataSchema = Objects.requireNonNull(dataSchema, "dataSchema is null");
this.partitionSchema = Objects.requireNonNull(partitionSchema, "partitionSchema is null");
this.readDataSchema = Objects.requireNonNull(readDataSchema, "readDataSchema is null");
Expand Down Expand Up @@ -125,8 +122,8 @@ public boolean equals(Object obj) {
if (!(obj instanceof SparkBatch)) return false;

SparkBatch that = (SparkBatch) obj;
return Objects.equals(this.tablePath, that.tablePath)
&& Objects.equals(this.readDataSchema, that.readDataSchema)
return Objects.equals(this.readDataSchema, that.readDataSchema)
&& Objects.equals(this.dataSchema, that.dataSchema)
&& Objects.equals(this.partitionSchema, that.partitionSchema)
&& Arrays.equals(this.pushedToKernelFilters, that.pushedToKernelFilters)
&& Arrays.equals(this.dataFilters, that.dataFilters)
Expand All @@ -135,8 +132,8 @@ public boolean equals(Object obj) {

@Override
public int hashCode() {
int result = tablePath.hashCode();
result = 31 * result + readDataSchema.hashCode();
int result = readDataSchema.hashCode();
result = 31 * result + dataSchema.hashCode();
result = 31 * result + partitionSchema.hashCode();
result = 31 * result + Arrays.hashCode(pushedToKernelFilters);
result = 31 * result + Arrays.hashCode(dataFilters);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,27 @@
*/
package io.delta.kernel.spark.read;

import io.delta.kernel.*;
import io.delta.kernel.CommitRange;
import io.delta.kernel.Snapshot;
import io.delta.kernel.data.ColumnarBatch;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.internal.DeltaLogActionUtils.DeltaAction;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.actions.RemoveFile;
import io.delta.kernel.internal.util.Utils;
import io.delta.kernel.spark.snapshot.SnapshotManager;
import io.delta.kernel.spark.utils.StreamingHelper;
import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
Expand All @@ -43,10 +52,10 @@ public class SparkMicroBatchStream implements MicroBatchStream {
new HashSet<>(Arrays.asList(DeltaAction.ADD, DeltaAction.REMOVE)));

private final Engine engine;
private final String tablePath;
private final SnapshotManager snapshotManager;

public SparkMicroBatchStream(String tablePath, Configuration hadoopConf) {
this.tablePath = tablePath;
public SparkMicroBatchStream(SnapshotManager snapshotManager, Configuration hadoopConf) {
this.snapshotManager = Objects.requireNonNull(snapshotManager, "snapshotManager is null");
this.engine = DefaultEngine.create(hadoopConf);
}

Expand Down Expand Up @@ -153,21 +162,14 @@ CloseableIterator<IndexedFile> getFileChanges(
private CloseableIterator<IndexedFile> filterDeltaLogs(
long startVersion, Option<DeltaSourceOffset> endOffset) {
List<IndexedFile> allIndexedFiles = new ArrayList<>();
// StartBoundary (inclusive)
CommitRangeBuilder builder =
TableManager.loadCommitRange(tablePath)
.withStartBoundary(CommitRangeBuilder.CommitBoundary.atVersion(startVersion));
if (endOffset.isDefined()) {
// EndBoundary (inclusive)
builder =
builder.withEndBoundary(
CommitRangeBuilder.CommitBoundary.atVersion(endOffset.get().reservoirVersion()));
}
CommitRange commitRange = builder.build(engine);

// Get commit range via snapshot manager
Optional<Long> endVersionOpt =
endOffset.isDefined() ? Optional.of(endOffset.get().reservoirVersion()) : Optional.empty();
CommitRange commitRange = snapshotManager.getTableChanges(engine, startVersion, endVersionOpt);
// Required by kernel: perform protocol validation by creating a snapshot at startVersion.
// TODO(#5318): This is not working with ccv2 table
Snapshot startSnapshot =
TableManager.loadSnapshot(tablePath).atVersion(startVersion).build(engine);
Snapshot startSnapshot = snapshotManager.loadSnapshotAt(startVersion);
try (CloseableIterator<ColumnarBatch> actionsIter =
commitRange.getActions(engine, startSnapshot, ACTION_SET)) {
// Each ColumnarBatch belongs to a single commit version,
Expand Down Expand Up @@ -263,7 +265,8 @@ private void validateCommit(
if (removeOpt.isPresent()) {
RemoveFile removeFile = removeOpt.get();
Throwable error =
DeltaErrors.deltaSourceIgnoreDeleteError(version, removeFile.getPath(), tablePath);
DeltaErrors.deltaSourceIgnoreDeleteError(
version, removeFile.getPath(), snapshotManager.unsafeVolatileSnapshot().getPath());
if (error instanceof RuntimeException) {
throw (RuntimeException) error;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.delta.kernel.spark.read;

import java.io.IOException;
import java.util.Objects;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.execution.datasources.FilePartition;
Expand All @@ -38,8 +39,8 @@ public class SparkPartitionReader<T> implements PartitionReader<T> {

public SparkPartitionReader(
Function1<PartitionedFile, Iterator<InternalRow>> readFunc, FilePartition partition) {
this.readFunc = java.util.Objects.requireNonNull(readFunc, "readFunc");
this.partition = java.util.Objects.requireNonNull(partition, "partition");
this.readFunc = Objects.requireNonNull(readFunc, "readFunc");
this.partition = Objects.requireNonNull(partition, "partition");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@

import static io.delta.kernel.spark.utils.ExpressionUtils.dsv2PredicateToCatalystExpression;

import io.delta.kernel.data.FilteredColumnarBatch;
import io.delta.kernel.data.MapValue;
import io.delta.kernel.data.Row;
import io.delta.kernel.defaults.engine.DefaultEngine;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.actions.AddFile;
import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.spark.snapshot.SnapshotManager;
import io.delta.kernel.spark.utils.ScalaUtils;
import io.delta.kernel.utils.CloseableIterator;
import java.io.IOException;
Expand Down Expand Up @@ -50,7 +53,7 @@
/** Spark DSV2 Scan implementation backed by Delta Kernel. */
public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntimeV2Filtering {

private final String tablePath;
private final SnapshotManager snapshotManager;
private final StructType readDataSchema;
private final StructType dataSchema;
private final StructType partitionSchema;
Expand All @@ -69,7 +72,7 @@ public class SparkScan implements Scan, SupportsReportStatistics, SupportsRuntim
private volatile boolean planned = false;

public SparkScan(
String tablePath,
SnapshotManager snapshotManager,
StructType dataSchema,
StructType partitionSchema,
StructType readDataSchema,
Expand All @@ -78,9 +81,7 @@ public SparkScan(
io.delta.kernel.Scan kernelScan,
CaseInsensitiveStringMap options) {

final String normalizedTablePath = Objects.requireNonNull(tablePath, "tablePath is null");
this.tablePath =
normalizedTablePath.endsWith("/") ? normalizedTablePath : normalizedTablePath + "/";
this.snapshotManager = Objects.requireNonNull(snapshotManager, "snapshotManager is null");
this.dataSchema = Objects.requireNonNull(dataSchema, "dataSchema is null");
this.partitionSchema = Objects.requireNonNull(partitionSchema, "partitionSchema is null");
this.readDataSchema = Objects.requireNonNull(readDataSchema, "readDataSchema is null");
Expand Down Expand Up @@ -112,7 +113,6 @@ public StructType readSchema() {
public Batch toBatch() {
ensurePlanned();
return new SparkBatch(
tablePath,
dataSchema,
partitionSchema,
readDataSchema,
Expand All @@ -126,7 +126,7 @@ public Batch toBatch() {

@Override
public MicroBatchStream toMicroBatchStream(String checkpointLocation) {
return new SparkMicroBatchStream(tablePath, hadoopConf);
return new SparkMicroBatchStream(snapshotManager, hadoopConf);
}

@Override
Expand Down Expand Up @@ -202,15 +202,20 @@ private InternalRow getPartitionRow(MapValue partitionValues) {
*/
private void planScanFiles() {
final Engine tableEngine = DefaultEngine.create(hadoopConf);
final Iterator<io.delta.kernel.data.FilteredColumnarBatch> scanFileBatches =
kernelScan.getScanFiles(tableEngine);

// Get table path from scan state
final Row scanState = kernelScan.getScanState(tableEngine);
final String tableRoot = ScanStateRow.getTableRoot(scanState).toUri().toString();
final String tablePath = tableRoot.endsWith("/") ? tableRoot : tableRoot + "/";

final Iterator<FilteredColumnarBatch> scanFileBatches = kernelScan.getScanFiles(tableEngine);

final String[] locations = new String[0];
final scala.collection.immutable.Map<String, Object> otherConstantMetadataColumnValues =
scala.collection.immutable.Map$.MODULE$.empty();

while (scanFileBatches.hasNext()) {
final io.delta.kernel.data.FilteredColumnarBatch batch = scanFileBatches.next();
final FilteredColumnarBatch batch = scanFileBatches.next();

try (CloseableIterator<Row> addFileRowIter = batch.getRows()) {
while (addFileRowIter.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.delta.kernel.expressions.And;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.internal.SnapshotImpl;
import io.delta.kernel.spark.snapshot.SnapshotManager;
import io.delta.kernel.spark.utils.ExpressionUtils;
import java.util.*;
import java.util.stream.Collectors;
Expand All @@ -39,7 +40,7 @@ public class SparkScanBuilder
implements ScanBuilder, SupportsPushDownRequiredColumns, SupportsPushDownFilters {

private io.delta.kernel.ScanBuilder kernelScanBuilder;
private final String tablePath;
private final SnapshotManager snapshotManager;
private final StructType dataSchema;
private final StructType partitionSchema;
private final CaseInsensitiveStringMap options;
Expand All @@ -55,13 +56,12 @@ public class SparkScanBuilder

public SparkScanBuilder(
String tableName,
String tablePath,
SnapshotManager snapshotManager,
StructType dataSchema,
StructType partitionSchema,
SnapshotImpl snapshot,
CaseInsensitiveStringMap options) {
this.kernelScanBuilder = requireNonNull(snapshot, "snapshot is null").getScanBuilder();
this.tablePath = requireNonNull(tablePath, "tablePath is null");
this.kernelScanBuilder =snapshotManager.unsafeVolatileSnapshot().getScanBuilder();
this.snapshotManager = requireNonNull(snapshotManager, "snapshotManager is null");
this.dataSchema = requireNonNull(dataSchema, "dataSchema is null");
this.partitionSchema = requireNonNull(partitionSchema, "partitionSchema is null");
this.options = requireNonNull(options, "options is null");
Expand Down Expand Up @@ -146,7 +146,7 @@ public Filter[] pushedFilters() {
@Override
public org.apache.spark.sql.connector.read.Scan build() {
return new SparkScan(
tablePath,
snapshotManager,
dataSchema,
partitionSchema,
requiredDataSchema,
Expand Down
Loading
Loading