From 4e42e933f20cf0d2def37b64d369543d31fe123f Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Fri, 17 Oct 2025 17:24:18 -0700 Subject: [PATCH 01/13] working write bencmark --- .../benchmarks/AbstractBenchmarkState.java | 15 +- .../defaults/benchmarks/BenchmarkUtils.java | 3 +- .../benchmarks/WorkloadBenchmark.java | 29 +- .../defaults/benchmarks/models/TableInfo.java | 104 +++++-- .../benchmarks/models/WorkloadSpec.java | 5 +- .../defaults/benchmarks/models/WriteSpec.java | 118 ++++++++ .../workloadrunners/ReadMetadataRunner.java | 4 +- .../workloadrunners/WorkloadRunner.java | 13 + .../workloadrunners/WriteRunner.java | 269 ++++++++++++++++++ .../specs/add_and_remove/commit_add.json | 1 + .../specs/add_and_remove/commit_remove.json | 1 + .../specs/add_and_remove/spec.json | 8 + .../specs/single_add/commit_add.json | 1 + .../basic_append/specs/single_add/spec.json | 7 + .../basic_append/table_info.json | 2 +- 15 files changed, 536 insertions(+), 44 deletions(-) create mode 100644 kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java create mode 100644 kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_add.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_remove.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/commit_add.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java index 6e3bcfc6fbb..9497784caa2 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java @@ -40,14 +40,14 @@ public abstract class AbstractBenchmarkState { * dynamically by JMH. The value is set in the main method. */ @Param({}) - private String workloadSpecJson; + public String workloadSpecJson; /** * The engine to use for this benchmark. Note: This parameter will be set dynamically by JMH. The * value is set in the main method. */ @Param({}) - private String engineName; + public String engineName; /** The workload runner initialized for this benchmark invocation. */ private WorkloadRunner runner; @@ -76,6 +76,17 @@ public void setupInvocation() throws Exception { runner.setup(); } + /** + * Teardown method that runs after each benchmark invocation. This calls the {@link + * WorkloadRunner#cleanup()} to clean up any state created during execution. + * + * @throws Exception If any error occurs during cleanup. + */ + @TearDown(Level.Invocation) + public void teardownInvocation() throws Exception { + runner.cleanup(); + } + /** * Returns an instance of the desired engine based on the provided engine name. * diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkUtils.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkUtils.java index 9fa0b976936..1cb533e9727 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkUtils.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkUtils.java @@ -115,11 +115,10 @@ private static List loadSpecsFromTable(Path tableDir) { validateTableStructure(tableDir); Path tableInfoPath = tableDir.resolve(TABLE_INFO_FILE_NAME); - Path deltaDir = tableDir.resolve(DELTA_DIR_NAME); Path specsDir = tableDir.resolve(SPECS_DIR_NAME); TableInfo tableInfo = - TableInfo.fromJsonPath(tableInfoPath.toString(), deltaDir.toAbsolutePath().toString()); + TableInfo.fromJsonPath(tableInfoPath.toString(), tableDir.toAbsolutePath().toString()); return findSpecDirectories(specsDir).stream() .map(specDir -> loadSingleSpec(specDir, tableInfo)) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java index 6806a573d8f..17a1ffe1f0c 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java @@ -33,6 +33,8 @@ import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; import org.openjdk.jmh.runner.options.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Generic JMH benchmark for all workload types. Automatically loads and runs benchmarks based on @@ -45,12 +47,23 @@ @Measurement(iterations = 5, time = 1) public class WorkloadBenchmark { + private static final Logger log = LoggerFactory.getLogger(WorkloadBenchmark.class); + /** Default implementation of BenchmarkState that supports only the "default" engine. */ public static class DefaultBenchmarkState extends AbstractBenchmarkState { @Override protected Engine getEngine(String engineName) { if (engineName.equals("default")) { - return DefaultEngine.create(new Configuration()); + return DefaultEngine.create( + new Configuration() { + { + // Set the batch sizes to small so that we get to test the multiple batch/file + // scenarios. + set("delta.kernel.default.parquet.reader.batch-size", "20"); + set("delta.kernel.default.json.reader.batch-size", "20"); + set("delta.kernel.default.parquet.writer.targetMaxFileSize", "20"); + } + }); } else { throw new IllegalArgumentException("Unsupported engine: " + engineName); } @@ -78,6 +91,7 @@ public void benchmarkWorkload(DefaultBenchmarkState state, Blackhole blackhole) public static void main(String[] args) throws RunnerException, IOException { // Get workload specs from the workloads directory List workloadSpecs = BenchmarkUtils.loadAllWorkloads(WORKLOAD_SPECS_DIR); + System.out.println("Loaded " + workloadSpecs.size() + " workload specs"); if (workloadSpecs.isEmpty()) { throw new RunnerException( "No workloads found. Please add workload specs to the workloads directory."); @@ -87,7 +101,12 @@ public static void main(String[] args) throws RunnerException, IOException { List filteredSpecs = new ArrayList<>(); for (WorkloadSpec spec : workloadSpecs) { // TODO: In the future, we can filter specific workloads using command line args here. - filteredSpecs.addAll(spec.getWorkloadVariants()); + List variants = spec.getWorkloadVariants(); + for (WorkloadSpec variant : variants) { + if (variant.getType().equals("write")) { + filteredSpecs.add(variant); + } + } } // Convert paths into a String array for JMH. JMH requires that parameters be of type String[]. @@ -102,13 +121,13 @@ public static void main(String[] args) throws RunnerException, IOException { // TODO: In the future, this can be extended to support multiple engines. .param("engineName", "default") .forks(1) - .warmupIterations(3) // Proper warmup for production benchmarks - .measurementIterations(5) // Proper measurement iterations for production benchmarks + .warmupIterations(3) + .measurementIterations(5) .warmupTime(TimeValue.seconds(1)) .measurementTime(TimeValue.seconds(1)) .addProfiler(KernelMetricsProfiler.class) .build(); - new Runner(opt, new WorkloadOutputFormat()).run(); + new Runner(opt).run(); } } diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java index 647b52e5eb9..21464fa84d6 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java @@ -16,10 +16,12 @@ package io.delta.kernel.defaults.benchmarks.models; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.File; import java.io.IOException; -import org.codehaus.jackson.map.ObjectMapper; +import java.nio.file.Paths; /** * Represents metadata about a Delta table used in benchmark workloads. @@ -30,17 +32,26 @@ * *

TableInfo instances are typically loaded from JSON files in the workload specifications * directory structure. Each table directory should contain a {@code table_info.json} file with the - * table metadata and a {@code delta} subdirectory containing the actual table data. The table root - * path is the absolute path to the root of the table and is provided separately in {@link - * WorkloadSpec#fromJsonPath(String, String, TableInfo)}. + * table metadata and a {@code delta} subdirectory containing the actual table data. * - *

Example JSON structure: + *

Example JSON structure for relative table (default): * *

{@code
  * {
- *   "name": "large-table",
- *   "description": "A large Delta table with multi-part checkpoints for performance testing",
- *   "engineInfo": "Apache-Spark/3.5.1 Delta-Lake/3.1.0"
+ *   "name": "basic_table",
+ *   "description": "A basic Delta table for benchmarking",
+ *   "engine_info": "Apache-Spark/3.5.1 Delta-Lake/3.1.0"
+ * }
+ * }
+ * + *

Example JSON structure for absolute path table: + * + *

{@code
+ * {
+ *   "name": "s3_table",
+ *   "description": "Table stored in S3",
+ *   "table_type": "absolute",
+ *   "table_path": "s3://my-bucket/path/to/table"
  * }
  * }
*/ @@ -60,23 +71,28 @@ public class TableInfo { @JsonProperty("engine_info") public String engineInfo; - /** The root path where the Delta table is stored. */ - @JsonProperty("table_root") - public String tableRoot; + /** + * The type of table location: "relative" (default) or "absolute". + * + *

When "relative", the table is located at {table_info_directory}/delta. When "absolute", the + * table is located at the path specified in {@link #tablePath}. + */ + @JsonProperty("table_type") + private String tableType; - /** @return the absolute path to the root of the table */ - public String getTableRoot() { - return tableRoot; - } + /** + * The absolute path to the table when {@link #tableType} is "absolute". Can be a local path + * (file:///) or S3 path (s3://). Null when table_type is "relative". + */ + @JsonProperty("table_path") + private String tablePath; /** - * Sets the root path of the Delta table. - * - * @param tableRoot the absolute path to the root of the table + * The resolved absolute path to the root of the table. This is computed after deserialization + * based on {@link #tableType} and {@link #tablePath}. */ - public void setTableRoot(String tableRoot) { - this.tableRoot = tableRoot; - } + @JsonProperty("table_info_path") + private String tableInfoPath; /** * Default constructor for Jackson deserialization. @@ -86,23 +102,47 @@ public void setTableRoot(String tableRoot) { */ public TableInfo() {} + /** Resolves the table root path based on the table type and location configuration. */ + @JsonIgnore + public String getResolvedTableRoot() { + if ("absolute".equals(tableType)) { + if (tablePath == null || tablePath.trim().isEmpty()) { + throw new IllegalStateException( + "table_path must be specified when table_type is 'absolute'"); + } + return tablePath; + } else { + // Default to "relative" if tableType is null or "relative" + return Paths.get(tableInfoPath, "delta").toAbsolutePath().toString(); + } + } + + public String getTableInfoPath() { + return tableInfoPath; + } + + public void setTableInfoPath(String tableInfoDirectory) { + this.tableInfoPath = tableInfoDirectory; + } + /** * Creates a TableInfo instance by reading from a JSON file at the specified path. * - *

This method loads table metadata from a JSON file and sets the table root path. The JSON - * file should contain the table name and description, while the table root path is provided - * separately with the absolute path. + *

This method loads table metadata from a JSON file and resolves the table root path. The JSON + * file should contain the table name and description, while the table root path is computed based + * on the table_type and table_path fields. * * @param jsonPath the path to the JSON file containing the TableInfo metadata - * @param tableRoot the absolute path to the root of the Delta table - * @return a TableInfo instance populated from the JSON file and table root path + * @param tableInfoPath the directory containing the table_info.json file (used for relative path + * resolution) + * @return a TableInfo instance populated from the JSON file with resolved table root path * @throws RuntimeException if there is an error reading or parsing the JSON file */ - public static TableInfo fromJsonPath(String jsonPath, String tableRoot) { + public static TableInfo fromJsonPath(String jsonPath, String tableInfoPath) { ObjectMapper mapper = new ObjectMapper(); try { TableInfo info = mapper.readValue(new File(jsonPath), TableInfo.class); - info.setTableRoot(tableRoot); + info.setTableInfoPath(tableInfoPath); return info; } catch (IOException e) { throw new RuntimeException("Failed to read TableInfo from JSON file: " + jsonPath, e); @@ -112,8 +152,8 @@ public static TableInfo fromJsonPath(String jsonPath, String tableRoot) { /** * Returns a string representation of this TableInfo. * - *

The string includes the table name, description, and engine info, but excludes the table - * root path for security reasons (as it may contain sensitive path information). + *

The string includes the table name, description, engine info, and CCv2 status, but excludes + * the table root path for security reasons (as it may contain sensitive path information). * * @return a string representation of this TableInfo */ @@ -125,6 +165,8 @@ public String toString() { + description + "', engineInfo='" + engineInfo - + "'}"; + + "', tableType='" + + tableType + + "}"; } } diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java index 1d1ed0ba71c..861de96187b 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java @@ -33,7 +33,10 @@ * field in the JSON. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") -@JsonSubTypes({@JsonSubTypes.Type(value = ReadSpec.class, name = "read")}) +@JsonSubTypes({ + @JsonSubTypes.Type(value = ReadSpec.class, name = "read"), + @JsonSubTypes.Type(value = WriteSpec.class, name = "write") +}) public abstract class WorkloadSpec { /** * The type of workload (e.g., "read"). This is used by Jackson's polymorphic deserialization to diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java new file mode 100644 index 00000000000..2a321c34dc0 --- /dev/null +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java @@ -0,0 +1,118 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.defaults.benchmarks.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.delta.kernel.defaults.benchmarks.workloadrunners.WorkloadRunner; +import io.delta.kernel.defaults.benchmarks.workloadrunners.WriteRunner; +import io.delta.kernel.engine.Engine; +import java.util.Collections; +import java.util.List; + +/** + * Workload specification for write benchmarks. Defines test cases for writing to Delta tables with + * one or more commits containing add/remove actions. + * + *

Usage

+ * + *

To run this workload, use {@link WorkloadSpec#getRunner(Engine)} to get the appropriate {@link + * WriteRunner}. + * + * @see WriteRunner + */ +public class WriteSpec extends WorkloadSpec { + + /** + * Container for a single commit's configuration. + * + *

Each commit references a file containing the Delta log JSON actions (add/remove files) to be + * committed. + */ + public static class CommitSpec { + /** + * Path to the commit file containing Delta log JSON actions. The path is relative to the spec + * directory (where spec.json is located). + * + *

Example: "commit_a.json" + */ + @JsonProperty("data_files_path") + private String dataFilesPath; + + /** Default constructor for Jackson. */ + public CommitSpec() {} + + public CommitSpec(String dataFilesPath) { + this.dataFilesPath = dataFilesPath; + } + + public String getDataFilesPath() { + return dataFilesPath; + } + } + + /** + * List of commits to execute in sequence. Each commit contains a reference to a file with Delta + * log JSON actions. All commits are executed as part of the timed benchmark. + */ + @JsonProperty("commits") + private List commits; + + // Default constructor for Jackson + public WriteSpec() { + super("write"); + } + + /** + * Gets the list of commits to execute. + * + * @return list of commit specifications + */ + public List getCommits() { + return commits != null ? commits : Collections.emptyList(); + } + + /** @return the full name of this workload, derived from table name, case name, and type. */ + @Override + public String getFullName() { + return this.tableInfo.name + "/" + caseName + "/write"; + } + + @Override + public WorkloadRunner getRunner(Engine engine) { + return new WriteRunner(this, engine); + } + + /** + * Generates workload variants from this test case specification. + * + *

Currently, WriteSpec generates a single variant (itself). In the future, this could be + * extended to generate variants for different write patterns or configurations. + * + * @return list of WriteSpec variants, each representing a separate workload execution + */ + @Override + public List getWorkloadVariants() { + return Collections.singletonList(this); + } + + @Override + public String toString() { + return String.format( + "Write{caseName='%s', commits=%d, tableInfo='%s'}", + caseName, getCommits().size(), tableInfo); + } +} diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/ReadMetadataRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/ReadMetadataRunner.java index 8b264b11665..7a2e4ee4b31 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/ReadMetadataRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/ReadMetadataRunner.java @@ -56,8 +56,8 @@ public ReadMetadataRunner(ReadSpec workloadSpec, Engine engine) { } @Override - public void setup() { - String workloadTableRoot = workloadSpec.getTableInfo().getTableRoot(); + public void setup() throws Exception { + String workloadTableRoot = workloadSpec.getTableInfo().getResolvedTableRoot(); SnapshotBuilder builder = TableManager.loadSnapshot(workloadTableRoot); if (workloadSpec.getVersion() != null) { builder.atVersion(workloadSpec.getVersion()); diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java index 5107bab2a99..d3916413eb5 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java @@ -61,6 +61,19 @@ public WorkloadRunner() {} */ public abstract void executeAsBenchmark(Blackhole blackhole) throws Exception; + /** + * Cleans up any state created during benchmark execution. For write workloads, this removes added + * files and reverts table state. For read workloads, this is typically a no-op. + * + *

This method is called after each benchmark invocation to ensure a clean state for the next + * run. + * + * @throws Exception if any error occurs during cleanup. + */ + public void cleanup() throws Exception { + // Default implementation: no-op for read workloads + } + // TODO: Add executeAsTest() method for correctness validation // public abstract void executeAsTest() throws Exception; } diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java new file mode 100644 index 00000000000..1d93977f1ce --- /dev/null +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -0,0 +1,269 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.defaults.benchmarks.workloadrunners; + +import static io.delta.kernel.internal.util.Utils.toCloseableIterator; + +import io.delta.kernel.*; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.benchmarks.models.WorkloadSpec; +import io.delta.kernel.defaults.benchmarks.models.WriteSpec; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.SingleAction; +import io.delta.kernel.internal.util.FileNames; +import io.delta.kernel.transaction.UpdateTableTransactionBuilder; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.openjdk.jmh.infra.Blackhole; + +/** + * A WorkloadRunner that executes write workloads as benchmarks. This runner performs one or more + * commits to a Delta table and measures the performance of those commits. + * + *

The runner executes commits specified in the {@link WriteSpec}, where each commit contains a + * set of Delta log actions (add/remove files) defined in external JSON files. + * + *

If run as a benchmark using {@link #executeAsBenchmark(Blackhole)}, this measures the time to + * execute all commits. Setup (loading commit files) and cleanup (reverting changes) are not + * included in the benchmark timing. + */ +public class WriteRunner extends WorkloadRunner { + private final Engine engine; + private final WriteSpec workloadSpec; + private List> commitActions; + private List committedVersions; + private Snapshot currentSnapshot; + private long originalVersion; + + /** + * Constructs the WriteRunner from the workload spec and engine. + * + * @param workloadSpec The write workload specification. + * @param engine The engine to use for executing the workload. + */ + public WriteRunner(WriteSpec workloadSpec, Engine engine) { + this.workloadSpec = workloadSpec; + this.engine = engine; + this.committedVersions = new ArrayList<>(); + this.originalVersion = -1; + } + + @Override + public void setup() throws Exception { + commitActions = new ArrayList<>(); + + String tableRoot = workloadSpec.getTableInfo().getResolvedTableRoot(); + + // Get the current version before any commits + SnapshotBuilder builder = TableManager.loadSnapshot(tableRoot); + currentSnapshot = builder.build(engine); + originalVersion = currentSnapshot.getVersion(); + + // Load and parse all commit files + for (WriteSpec.CommitSpec commitSpec : workloadSpec.getCommits()) { + String commitFilePath = + workloadSpec.getTableInfo().getTableInfoPath() + + "/specs/" + + workloadSpec.getCaseName() + + "/" + + commitSpec.getDataFilesPath(); + List actions = parseCommitFile(commitFilePath); + commitActions.add(actions); + } + } + + /** @return the name of this workload. */ + @Override + public String getName() { + return "write"; + } + + /** @return The workload specification used to create this runner. */ + @Override + public WorkloadSpec getWorkloadSpec() { + return workloadSpec; + } + + /** + * Executes the write workload as a benchmark, consuming results via the provided Blackhole. + * + *

This method executes all commits specified in the workload spec in sequence. The timing + * includes only the commit execution, not the setup or cleanup. We reuse the post-commit snapshot + * from each transaction to avoid reloading from disk, which makes the benchmark more efficient + * and realistic. + * + * @param blackhole The Blackhole to consume results and avoid dead code elimination. + */ + @Override + public void executeAsBenchmark(Blackhole blackhole) throws Exception { + // Execute all commits in sequence (timed) + for (List actions : commitActions) { + // Create transaction from table (first iteration) or from post-commit snapshot (subsequent) + UpdateTableTransactionBuilder txnBuilder = + currentSnapshot.buildUpdateTableTransaction("Delta-Kernel-Benchmarks", Operation.WRITE); + + // Build and commit the transaction + Transaction txn = txnBuilder.build(engine); + + // Convert actions list to CloseableIterable (required by commit API) + CloseableIterator actionsIter = toCloseableIterator(actions.iterator()); + io.delta.kernel.utils.CloseableIterable dataActions = + io.delta.kernel.utils.CloseableIterable.inMemoryIterable(actionsIter); + + TransactionCommitResult result = txn.commit(engine, dataActions); + + long version = result.getVersion(); + committedVersions.add(version); + blackhole.consume(version); + + // Use the post-commit snapshot for the next transaction + // Post-commit snapshot should always be present unless there was a conflict + currentSnapshot = + result + .getPostCommitSnapshot() + .orElseThrow( + () -> + new IllegalStateException( + "Post-commit snapshot not available. This indicates a conflict occurred during " + + "the benchmark, which should not happen. Ensure no other processes are writing " + + "to the table: " + + workloadSpec.getTableInfo().getResolvedTableRoot())); + } + } + + /** + * Parses a Delta log JSON file and returns a list of action Rows. + * + *

Uses Kernel's built-in JsonHandler to read Delta log format JSON files. + * + * @param commitFilePath path to the commit file containing Delta log JSON actions + * @return list of Row objects representing the actions (AddFile, RemoveFile, etc.) + * @throws IOException if there's an error reading or parsing the file + */ + private List parseCommitFile(String commitFilePath) throws IOException { + /** Schema for reading commit files with both "add" and "remove" actions. */ + final StructType COMMIT_FILE_SCHEMA = + new StructType() + .add("add", AddFile.FULL_SCHEMA, true /* nullable */) + .add("remove", RemoveFile.FULL_SCHEMA, true /* nullable */); + + List actions = new ArrayList<>(); + + File file = new File(commitFilePath); + if (!file.exists()) { + throw new IOException("Commit file not found: " + commitFilePath); + } + + // Create a FileStatus for the commit file + FileStatus fileStatus = FileStatus.of(commitFilePath, file.length(), file.lastModified()); + + // Use Kernel's JsonHandler to read the file + try (CloseableIterator fileIter = + toCloseableIterator(Collections.singletonList(fileStatus).iterator()); + CloseableIterator batchIter = + engine.getJsonHandler().readJsonFiles(fileIter, COMMIT_FILE_SCHEMA, Optional.empty())) { + + while (batchIter.hasNext()) { + ColumnarBatch batch = batchIter.next(); + + // Process each row in the batch + try (CloseableIterator rowIter = batch.getRows()) { + while (rowIter.hasNext()) { + Row singleActionRow = rowIter.next(); + + // Extract the actual action Row and wrap it in SingleAction format + // Check if this row has an "add" action + if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("add"))) { + Row addRow = singleActionRow.getStruct(COMMIT_FILE_SCHEMA.indexOf("add")); + // Wrap in SingleAction format for commit + actions.add(SingleAction.createAddFileSingleAction(addRow)); + } + // Check if this row has a "remove" action + else if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("remove"))) { + Row removeRow = singleActionRow.getStruct(COMMIT_FILE_SCHEMA.indexOf("remove")); + // Wrap in SingleAction format for commit + actions.add(SingleAction.createRemoveFileSingleAction(removeRow)); + } else { + // Throw an error if the action is not recognized (not "add" or "remove") + throw new IOException( + "Unrecognized action in commit file row: " + singleActionRow.toString()); + } + } + } + } + } + + return actions; + } + + /** + * Cleans up the state created during benchmark execution by reverting all committed changes. + * + *

This method removes the commit files for any version greater than the original version, + * effectively reverting all changes made during the benchmark. This ensures the table is returned + * to its original state for the next benchmark iteration. + */ + @Override + public void cleanup() throws Exception { + if (originalVersion < 0) { + // Setup was never called or failed + return; + } + + // Delete all Delta log files with version > originalVersion + io.delta.kernel.internal.fs.Path deltaLogPath = + new io.delta.kernel.internal.fs.Path( + workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log"); + + // Use listingPrefix to start listing from originalVersion + 1 + // This is more efficient than listing from version 0 + String startPath = FileNames.listingPrefix(deltaLogPath, originalVersion + 1); + + try (CloseableIterator filesIter = + engine.getFileSystemClient().listFrom(startPath)) { + + while (filesIter.hasNext()) { + FileStatus file = filesIter.next(); + String filePath = file.getPath(); + + try { + // Get version from any recognized Delta log file (commit, checkpoint, checksum/CRC, etc.) + // FileNames.getFileVersion handles: .json, .checkpoint.parquet, .crc, .compacted.json + long version = FileNames.getFileVersion(new io.delta.kernel.internal.fs.Path(filePath)); + if (version > originalVersion) { + // Delete this file (includes CRC files automatically) + engine.getFileSystemClient().delete(filePath); + } + } catch (IllegalArgumentException e) { + // Skip unrecognized files - ensures forward compatibility + } + } + } + + committedVersions.clear(); + } +} diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_add.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_add.json new file mode 100644 index 00000000000..1f37b2b9871 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_add.json @@ -0,0 +1 @@ +{"add":{"path":"part-00000-a9daef62-5a40-43c5-ac63-3ad4a7d749ae-c000.snappy.parquet","partitionValues":{},"size":1024,"modificationTime":1712091404545,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"letter\":\"a\",\"number\":1,\"a_float\":1.1},\"maxValues\":{\"letter\":\"j\",\"number\":10,\"a_float\":10.10},\"nullCount\":{\"letter\":0,\"number\":0,\"a_float\":0}}"}} \ No newline at end of file diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_remove.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_remove.json new file mode 100644 index 00000000000..bd9089e46f4 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_remove.json @@ -0,0 +1 @@ +{"remove":{"path":"part-00000-a9daef62-5a40-43c5-ac63-3ad4a7d749ae-c000.snappy.parquet","partitionValues":{},"size":1024,"modificationTime":1712091404545,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"letter\":\"a\",\"number\":1,\"a_float\":1.1},\"maxValues\":{\"letter\":\"j\",\"number\":10,\"a_float\":10.10},\"nullCount\":{\"letter\":0,\"number\":0,\"a_float\":0}}"}} \ No newline at end of file diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json new file mode 100644 index 00000000000..3b8c404532c --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json @@ -0,0 +1,8 @@ +{ + "type": "write", + "commits": [ + {"data_files_path": "commit_add.json"}, + {"data_files_path": "commit_remove.json"} + ] +} + diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/commit_add.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/commit_add.json new file mode 100644 index 00000000000..1f37b2b9871 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/commit_add.json @@ -0,0 +1 @@ +{"add":{"path":"part-00000-a9daef62-5a40-43c5-ac63-3ad4a7d749ae-c000.snappy.parquet","partitionValues":{},"size":1024,"modificationTime":1712091404545,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"letter\":\"a\",\"number\":1,\"a_float\":1.1},\"maxValues\":{\"letter\":\"j\",\"number\":10,\"a_float\":10.10},\"nullCount\":{\"letter\":0,\"number\":0,\"a_float\":0}}"}} \ No newline at end of file diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json new file mode 100644 index 00000000000..207c7ce20a2 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json @@ -0,0 +1,7 @@ +{ + "type": "write", + "commits": [ + {"data_files_path": "commit_add.json"} + ] +} + diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/table_info.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/table_info.json index ad470d337d1..94637d8fc33 100644 --- a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/table_info.json +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/table_info.json @@ -1,5 +1,5 @@ { "name": "basic_append", "description": "A basic table with two append writes.", - "engineInfo": "Apache-Spark/3.5.1 Delta-Lake/3.1.0" + "engine_info": "Apache-Spark/3.5.1 Delta-Lake/3.1.0" } \ No newline at end of file From 2472fc3a5f6410fc8dbb7e5797ed366be7fe0c00 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 09:59:41 -0700 Subject: [PATCH 02/13] cleanup --- .../benchmarks/AbstractBenchmarkState.java | 4 ++-- .../benchmarks/WorkloadBenchmark.java | 22 +++++-------------- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java index 9497784caa2..9869dbb10f9 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java @@ -40,14 +40,14 @@ public abstract class AbstractBenchmarkState { * dynamically by JMH. The value is set in the main method. */ @Param({}) - public String workloadSpecJson; + private String workloadSpecJson; /** * The engine to use for this benchmark. Note: This parameter will be set dynamically by JMH. The * value is set in the main method. */ @Param({}) - public String engineName; + private String engineName; /** The workload runner initialized for this benchmark invocation. */ private WorkloadRunner runner; diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java index 17a1ffe1f0c..f9c09174f22 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java @@ -33,8 +33,6 @@ import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; import org.openjdk.jmh.runner.options.TimeValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Generic JMH benchmark for all workload types. Automatically loads and runs benchmarks based on @@ -47,8 +45,6 @@ @Measurement(iterations = 5, time = 1) public class WorkloadBenchmark { - private static final Logger log = LoggerFactory.getLogger(WorkloadBenchmark.class); - /** Default implementation of BenchmarkState that supports only the "default" engine. */ public static class DefaultBenchmarkState extends AbstractBenchmarkState { @Override @@ -57,11 +53,10 @@ protected Engine getEngine(String engineName) { return DefaultEngine.create( new Configuration() { { - // Set the batch sizes to small so that we get to test the multiple batch/file - // scenarios. - set("delta.kernel.default.parquet.reader.batch-size", "20"); - set("delta.kernel.default.json.reader.batch-size", "20"); - set("delta.kernel.default.parquet.writer.targetMaxFileSize", "20"); + // Set the batch size. This is required for writes. + set("delta.kernel.default.parquet.reader.batch-size", "1024"); + set("delta.kernel.default.json.reader.batch-size", "1024"); + set("delta.kernel.default.parquet.writer.targetMaxFileSize", "10485760"); // 1 MB } }); } else { @@ -101,12 +96,7 @@ public static void main(String[] args) throws RunnerException, IOException { List filteredSpecs = new ArrayList<>(); for (WorkloadSpec spec : workloadSpecs) { // TODO: In the future, we can filter specific workloads using command line args here. - List variants = spec.getWorkloadVariants(); - for (WorkloadSpec variant : variants) { - if (variant.getType().equals("write")) { - filteredSpecs.add(variant); - } - } + filteredSpecs.addAll(spec.getWorkloadVariants()); } // Convert paths into a String array for JMH. JMH requires that parameters be of type String[]. @@ -128,6 +118,6 @@ public static void main(String[] args) throws RunnerException, IOException { .addProfiler(KernelMetricsProfiler.class) .build(); - new Runner(opt).run(); + new Runner(opt, new WorkloadOutputFormat()).run(); } } From 99da4298b883d60e4536a0abd05f7aa714c4e613 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 10:16:30 -0700 Subject: [PATCH 03/13] cleanup --- .../workloadrunners/WriteRunner.java | 69 ++++++++++--------- 1 file changed, 37 insertions(+), 32 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index 1d93977f1ce..bc2dbf55dcb 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -27,17 +27,13 @@ import io.delta.kernel.internal.actions.AddFile; import io.delta.kernel.internal.actions.RemoveFile; import io.delta.kernel.internal.actions.SingleAction; -import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.transaction.UpdateTableTransactionBuilder; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.*; import org.openjdk.jmh.infra.Blackhole; /** @@ -58,6 +54,7 @@ public class WriteRunner extends WorkloadRunner { private List committedVersions; private Snapshot currentSnapshot; private long originalVersion; + private Set initialDeltaLogFiles; /** * Constructs the WriteRunner from the workload spec and engine. @@ -78,10 +75,12 @@ public void setup() throws Exception { String tableRoot = workloadSpec.getTableInfo().getResolvedTableRoot(); - // Get the current version before any commits + // Get the current snapshot SnapshotBuilder builder = TableManager.loadSnapshot(tableRoot); currentSnapshot = builder.build(engine); - originalVersion = currentSnapshot.getVersion(); + + // Capture initial listing of delta log files + initialDeltaLogFiles = captureFileListing(); // Load and parse all commit files for (WriteSpec.CommitSpec commitSpec : workloadSpec.getCommits()) { @@ -129,7 +128,7 @@ public void executeAsBenchmark(Blackhole blackhole) throws Exception { // Build and commit the transaction Transaction txn = txnBuilder.build(engine); - // Convert actions list to CloseableIterable (required by commit API) + // Convert actions list to CloseableIterable CloseableIterator actionsIter = toCloseableIterator(actions.iterator()); io.delta.kernel.utils.CloseableIterable dataActions = io.delta.kernel.utils.CloseableIterable.inMemoryIterable(actionsIter); @@ -234,36 +233,42 @@ public void cleanup() throws Exception { return; } - // Delete all Delta log files with version > originalVersion - io.delta.kernel.internal.fs.Path deltaLogPath = - new io.delta.kernel.internal.fs.Path( - workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log"); + // Delete any files that weren't present initially + Set currentFiles = captureFileListing(); + for (String filePath : currentFiles) { + if (!initialDeltaLogFiles.contains(filePath)) { + engine.getFileSystemClient().delete(filePath); + } + } - // Use listingPrefix to start listing from originalVersion + 1 - // This is more efficient than listing from version 0 - String startPath = FileNames.listingPrefix(deltaLogPath, originalVersion + 1); + committedVersions.clear(); + } - try (CloseableIterator filesIter = - engine.getFileSystemClient().listFrom(startPath)) { + /** + * Captures a listing of all files whose paths start with the given prefix. Use a trailing slash + * to list files inside a directory. + * + * @return a set of all file paths starting with that prefix + */ + private Set captureFileListing() throws IOException { + // Construct path prefix for all files in `_delta_log/`. The prefix is for file with name `0` + // because + // the filesystem client lists all _sibling_ files in the directory with a path greater than + // `0`. + String deltaLogPathPrefix = + new io.delta.kernel.internal.fs.Path( + workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log/0") + .toUri() + .getPath(); + Set files = new HashSet<>(); + try (CloseableIterator filesIter = + engine.getFileSystemClient().listFrom(deltaLogPathPrefix)) { while (filesIter.hasNext()) { FileStatus file = filesIter.next(); - String filePath = file.getPath(); - - try { - // Get version from any recognized Delta log file (commit, checkpoint, checksum/CRC, etc.) - // FileNames.getFileVersion handles: .json, .checkpoint.parquet, .crc, .compacted.json - long version = FileNames.getFileVersion(new io.delta.kernel.internal.fs.Path(filePath)); - if (version > originalVersion) { - // Delete this file (includes CRC files automatically) - engine.getFileSystemClient().delete(filePath); - } - } catch (IllegalArgumentException e) { - // Skip unrecognized files - ensures forward compatibility - } + files.add(file.getPath()); } } - - committedVersions.clear(); + return files; } } From d100845e7f8fe715f4e22bed45990745ba6aac58 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 10:21:44 -0700 Subject: [PATCH 04/13] fix cleanup, remove prints --- .../benchmarks/workloadrunners/WriteRunner.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index bc2dbf55dcb..cede23b109a 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -66,7 +66,6 @@ public WriteRunner(WriteSpec workloadSpec, Engine engine) { this.workloadSpec = workloadSpec; this.engine = engine; this.committedVersions = new ArrayList<>(); - this.originalVersion = -1; } @Override @@ -228,11 +227,6 @@ else if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("remove"))) { */ @Override public void cleanup() throws Exception { - if (originalVersion < 0) { - // Setup was never called or failed - return; - } - // Delete any files that weren't present initially Set currentFiles = captureFileListing(); for (String filePath : currentFiles) { @@ -252,9 +246,8 @@ public void cleanup() throws Exception { */ private Set captureFileListing() throws IOException { // Construct path prefix for all files in `_delta_log/`. The prefix is for file with name `0` - // because - // the filesystem client lists all _sibling_ files in the directory with a path greater than - // `0`. + // because the filesystem client lists all _sibling_ files in the directory with a path greater + // than `0`. String deltaLogPathPrefix = new io.delta.kernel.internal.fs.Path( workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log/0") From b8da66105026dea509881b425cd0d1d2dbce98c6 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 10:33:22 -0700 Subject: [PATCH 05/13] improve comments --- .../defaults/benchmarks/workloadrunners/WriteRunner.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index cede23b109a..3e35070def7 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -220,10 +220,6 @@ else if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("remove"))) { /** * Cleans up the state created during benchmark execution by reverting all committed changes. - * - *

This method removes the commit files for any version greater than the original version, - * effectively reverting all changes made during the benchmark. This ensures the table is returned - * to its original state for the next benchmark iteration. */ @Override public void cleanup() throws Exception { @@ -239,10 +235,7 @@ public void cleanup() throws Exception { } /** - * Captures a listing of all files whose paths start with the given prefix. Use a trailing slash - * to list files inside a directory. - * - * @return a set of all file paths starting with that prefix + * @return a set of all file paths in the the `_delta_log/` directory of the table. */ private Set captureFileListing() throws IOException { // Construct path prefix for all files in `_delta_log/`. The prefix is for file with name `0` From 41584465b3c9f86546e20514305f876a9e40ab4c Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 10:47:29 -0700 Subject: [PATCH 06/13] clean up tableinfo --- .../defaults/benchmarks/models/TableInfo.java | 37 ++----------------- .../workloadrunners/WriteRunner.java | 8 +--- 2 files changed, 5 insertions(+), 40 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java index 21464fa84d6..d7617d5f20c 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java @@ -50,8 +50,6 @@ * { * "name": "s3_table", * "description": "Table stored in S3", - * "table_type": "absolute", - * "table_path": "s3://my-bucket/path/to/table" * } * } */ @@ -71,26 +69,7 @@ public class TableInfo { @JsonProperty("engine_info") public String engineInfo; - /** - * The type of table location: "relative" (default) or "absolute". - * - *

When "relative", the table is located at {table_info_directory}/delta. When "absolute", the - * table is located at the path specified in {@link #tablePath}. - */ - @JsonProperty("table_type") - private String tableType; - - /** - * The absolute path to the table when {@link #tableType} is "absolute". Can be a local path - * (file:///) or S3 path (s3://). Null when table_type is "relative". - */ - @JsonProperty("table_path") - private String tablePath; - - /** - * The resolved absolute path to the root of the table. This is computed after deserialization - * based on {@link #tableType} and {@link #tablePath}. - */ + /** The resolved absolute path to the root of the table. */ @JsonProperty("table_info_path") private String tableInfoPath; @@ -105,16 +84,8 @@ public TableInfo() {} /** Resolves the table root path based on the table type and location configuration. */ @JsonIgnore public String getResolvedTableRoot() { - if ("absolute".equals(tableType)) { - if (tablePath == null || tablePath.trim().isEmpty()) { - throw new IllegalStateException( - "table_path must be specified when table_type is 'absolute'"); - } - return tablePath; - } else { - // Default to "relative" if tableType is null or "relative" - return Paths.get(tableInfoPath, "delta").toAbsolutePath().toString(); - } + // Default to "relative" if tableType is null or "relative" + return Paths.get(tableInfoPath, "delta").toAbsolutePath().toString(); } public String getTableInfoPath() { @@ -165,8 +136,6 @@ public String toString() { + description + "', engineInfo='" + engineInfo - + "', tableType='" - + tableType + "}"; } } diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index 3e35070def7..35955fba5e6 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -218,9 +218,7 @@ else if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("remove"))) { return actions; } - /** - * Cleans up the state created during benchmark execution by reverting all committed changes. - */ + /** Cleans up the state created during benchmark execution by reverting all committed changes. */ @Override public void cleanup() throws Exception { // Delete any files that weren't present initially @@ -234,9 +232,7 @@ public void cleanup() throws Exception { committedVersions.clear(); } - /** - * @return a set of all file paths in the the `_delta_log/` directory of the table. - */ + /** @return a set of all file paths in the the `_delta_log/` directory of the table. */ private Set captureFileListing() throws IOException { // Construct path prefix for all files in `_delta_log/`. The prefix is for file with name `0` // because the filesystem client lists all _sibling_ files in the directory with a path greater From 9364cd8a43194bb53634cd7f4dbf1747c093e5e4 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 10:58:30 -0700 Subject: [PATCH 07/13] final tableinfo cleanup --- .../benchmarks/WorkloadBenchmark.java | 1 - .../defaults/benchmarks/models/TableInfo.java | 35 ++++++++----------- 2 files changed, 14 insertions(+), 22 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java index f9c09174f22..f64717a1c3b 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java @@ -86,7 +86,6 @@ public void benchmarkWorkload(DefaultBenchmarkState state, Blackhole blackhole) public static void main(String[] args) throws RunnerException, IOException { // Get workload specs from the workloads directory List workloadSpecs = BenchmarkUtils.loadAllWorkloads(WORKLOAD_SPECS_DIR); - System.out.println("Loaded " + workloadSpecs.size() + " workload specs"); if (workloadSpecs.isEmpty()) { throw new RunnerException( "No workloads found. Please add workload specs to the workloads directory."); diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java index d7617d5f20c..6f3d598cd5c 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java @@ -32,24 +32,17 @@ * *

TableInfo instances are typically loaded from JSON files in the workload specifications * directory structure. Each table directory should contain a {@code table_info.json} file with the - * table metadata and a {@code delta} subdirectory containing the actual table data. + * table metadata and a {@code delta} subdirectory containing the actual table data. The table root + * path is the absolute path to the root of the table and is provided separately in {@link + * WorkloadSpec#fromJsonPath(String, String, TableInfo)}. * - *

Example JSON structure for relative table (default): + *

Example JSON structure: * *

{@code
  * {
- *   "name": "basic_table",
- *   "description": "A basic Delta table for benchmarking",
- *   "engine_info": "Apache-Spark/3.5.1 Delta-Lake/3.1.0"
- * }
- * }
- * - *

Example JSON structure for absolute path table: - * - *

{@code
- * {
- *   "name": "s3_table",
- *   "description": "Table stored in S3",
+ *   "name": "large-table",
+ *   "description": "A large Delta table with multi-part checkpoints for performance testing",
+ *   "engineInfo": "Apache-Spark/3.5.1 Delta-Lake/3.1.0"
  * }
  * }
*/ @@ -99,14 +92,14 @@ public void setTableInfoPath(String tableInfoDirectory) { /** * Creates a TableInfo instance by reading from a JSON file at the specified path. * - *

This method loads table metadata from a JSON file and resolves the table root path. The JSON - * file should contain the table name and description, while the table root path is computed based - * on the table_type and table_path fields. + *

This method loads table metadata from a JSON file and sets the table root path. The JSON + * file should contain the table name and description, while the table root path is provided + * separately with the absolute path. * * @param jsonPath the path to the JSON file containing the TableInfo metadata * @param tableInfoPath the directory containing the table_info.json file (used for relative path * resolution) - * @return a TableInfo instance populated from the JSON file with resolved table root path + * @return a TableInfo instance populated from the JSON file and table root path * @throws RuntimeException if there is an error reading or parsing the JSON file */ public static TableInfo fromJsonPath(String jsonPath, String tableInfoPath) { @@ -123,8 +116,8 @@ public static TableInfo fromJsonPath(String jsonPath, String tableInfoPath) { /** * Returns a string representation of this TableInfo. * - *

The string includes the table name, description, engine info, and CCv2 status, but excludes - * the table root path for security reasons (as it may contain sensitive path information). + *

The string includes the table name, description, and engine info, but excludes the table + * root path for security reasons (as it may contain sensitive path information). * * @return a string representation of this TableInfo */ @@ -136,6 +129,6 @@ public String toString() { + description + "', engineInfo='" + engineInfo - + "}"; + + "'}"; } } From a6324c5e57ae551645a8d75361deea9518d30df0 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 13:48:28 -0700 Subject: [PATCH 08/13] refactor --- .../benchmarks/models/WorkloadSpec.java | 5 + .../defaults/benchmarks/models/WriteSpec.java | 82 ++++++++++++++++ .../workloadrunners/WriteRunner.java | 96 ++----------------- 3 files changed, 94 insertions(+), 89 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java index 861de96187b..66c1c8349f1 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java @@ -83,6 +83,11 @@ public TableInfo getTableInfo() { return tableInfo; } + @JsonIgnore + public String getSpecDirectoryPath() { + return tableInfo.getTableInfoPath() + "/specs/" + caseName; + } + /** * Sets the table information for this workload specification. * diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java index 2a321c34dc0..a34b03871fb 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java @@ -16,12 +16,28 @@ package io.delta.kernel.defaults.benchmarks.models; +import static io.delta.kernel.internal.util.Utils.toCloseableIterator; + +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.Row; import io.delta.kernel.defaults.benchmarks.workloadrunners.WorkloadRunner; import io.delta.kernel.defaults.benchmarks.workloadrunners.WriteRunner; import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.SingleAction; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; /** * Workload specification for write benchmarks. Defines test cases for writing to Delta tables with @@ -52,6 +68,13 @@ public static class CommitSpec { @JsonProperty("data_files_path") private String dataFilesPath; + /** Schema for reading commit files with both "add" and "remove" actions. */ + @JsonIgnore + private static final StructType COMMIT_SPEC_SCHEMA = + new StructType() + .add("add", AddFile.FULL_SCHEMA, true /* nullable */) + .add("remove", RemoveFile.FULL_SCHEMA, true /* nullable */); + /** Default constructor for Jackson. */ public CommitSpec() {} @@ -62,6 +85,65 @@ public CommitSpec(String dataFilesPath) { public String getDataFilesPath() { return dataFilesPath; } + + /** + * Uses Kernel's built-in JsonHandler to read commit spec containing data files. + * + * @param engine the Engine instance to use for reading JSON files + * @param specPath the base path where the commit file is located + * @throws IOException if there's an error reading or parsing the file + */ + public List parseActions(Engine engine, String specPath) throws IOException { + String commitFilePath = new Path(specPath, getDataFilesPath()).toString(); + + System.out.println("CommitSpec: Reading commit file at " + commitFilePath); + File file = new File(commitFilePath); + if (!file.exists()) { + throw new IOException("Commit file not found: " + commitFilePath); + } + + // Create a FileStatus for the commit file + FileStatus fileStatus = FileStatus.of(commitFilePath, file.length(), file.lastModified()); + + // Use Kernel's JsonHandler to read the file and collect actions + List actions = new ArrayList<>(); + try (CloseableIterator fileIter = + toCloseableIterator(Collections.singletonList(fileStatus).iterator()); + CloseableIterator batchIter = + engine + .getJsonHandler() + .readJsonFiles(fileIter, COMMIT_SPEC_SCHEMA, Optional.empty())) { + + while (batchIter.hasNext()) { + ColumnarBatch batch = batchIter.next(); + + // Process each row in the batch + try (CloseableIterator rowIter = batch.getRows()) { + while (rowIter.hasNext()) { + Row singleActionRow = rowIter.next(); + + // Extract the actual action Row and wrap it in SingleAction format + // Check if this row has an "add" action + if (!singleActionRow.isNullAt(COMMIT_SPEC_SCHEMA.indexOf("add"))) { + Row addRow = singleActionRow.getStruct(COMMIT_SPEC_SCHEMA.indexOf("add")); + // Wrap in SingleAction format for commit + actions.add(SingleAction.createAddFileSingleAction(addRow)); + } + // Check if this row has a "remove" action + else if (!singleActionRow.isNullAt(COMMIT_SPEC_SCHEMA.indexOf("remove"))) { + Row removeRow = singleActionRow.getStruct(COMMIT_SPEC_SCHEMA.indexOf("remove")); + // Wrap in SingleAction format for commit + actions.add(SingleAction.createRemoveFileSingleAction(removeRow)); + } else { + // Throw an error if the action is not recognized (not "add" or "remove") + throw new IOException("Unrecognized action in commit file row: " + singleActionRow); + } + } + } + } + } + return actions; + } } /** diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index 35955fba5e6..7a4ff9233e7 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -19,19 +19,15 @@ import static io.delta.kernel.internal.util.Utils.toCloseableIterator; import io.delta.kernel.*; -import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.defaults.benchmarks.models.WorkloadSpec; import io.delta.kernel.defaults.benchmarks.models.WriteSpec; import io.delta.kernel.engine.Engine; -import io.delta.kernel.internal.actions.AddFile; -import io.delta.kernel.internal.actions.RemoveFile; -import io.delta.kernel.internal.actions.SingleAction; +import io.delta.kernel.internal.fs.Path; import io.delta.kernel.transaction.UpdateTableTransactionBuilder; -import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterable; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; -import java.io.File; import java.io.IOException; import java.util.*; import org.openjdk.jmh.infra.Blackhole; @@ -51,9 +47,7 @@ public class WriteRunner extends WorkloadRunner { private final Engine engine; private final WriteSpec workloadSpec; private List> commitActions; - private List committedVersions; private Snapshot currentSnapshot; - private long originalVersion; private Set initialDeltaLogFiles; /** @@ -65,7 +59,6 @@ public class WriteRunner extends WorkloadRunner { public WriteRunner(WriteSpec workloadSpec, Engine engine) { this.workloadSpec = workloadSpec; this.engine = engine; - this.committedVersions = new ArrayList<>(); } @Override @@ -81,16 +74,11 @@ public void setup() throws Exception { // Capture initial listing of delta log files initialDeltaLogFiles = captureFileListing(); + System.out.println("Parsing commit files..."); // Load and parse all commit files for (WriteSpec.CommitSpec commitSpec : workloadSpec.getCommits()) { - String commitFilePath = - workloadSpec.getTableInfo().getTableInfoPath() - + "/specs/" - + workloadSpec.getCaseName() - + "/" - + commitSpec.getDataFilesPath(); - List actions = parseCommitFile(commitFilePath); - commitActions.add(actions); + System.out.println("Parsing commitSpec: " + commitSpec.getDataFilesPath()); + commitActions.add(commitSpec.parseActions(engine, workloadSpec.getSpecDirectoryPath())); } } @@ -129,13 +117,11 @@ public void executeAsBenchmark(Blackhole blackhole) throws Exception { // Convert actions list to CloseableIterable CloseableIterator actionsIter = toCloseableIterator(actions.iterator()); - io.delta.kernel.utils.CloseableIterable dataActions = - io.delta.kernel.utils.CloseableIterable.inMemoryIterable(actionsIter); + CloseableIterable dataActions = CloseableIterable.inMemoryIterable(actionsIter); TransactionCommitResult result = txn.commit(engine, dataActions); long version = result.getVersion(); - committedVersions.add(version); blackhole.consume(version); // Use the post-commit snapshot for the next transaction @@ -153,71 +139,6 @@ public void executeAsBenchmark(Blackhole blackhole) throws Exception { } } - /** - * Parses a Delta log JSON file and returns a list of action Rows. - * - *

Uses Kernel's built-in JsonHandler to read Delta log format JSON files. - * - * @param commitFilePath path to the commit file containing Delta log JSON actions - * @return list of Row objects representing the actions (AddFile, RemoveFile, etc.) - * @throws IOException if there's an error reading or parsing the file - */ - private List parseCommitFile(String commitFilePath) throws IOException { - /** Schema for reading commit files with both "add" and "remove" actions. */ - final StructType COMMIT_FILE_SCHEMA = - new StructType() - .add("add", AddFile.FULL_SCHEMA, true /* nullable */) - .add("remove", RemoveFile.FULL_SCHEMA, true /* nullable */); - - List actions = new ArrayList<>(); - - File file = new File(commitFilePath); - if (!file.exists()) { - throw new IOException("Commit file not found: " + commitFilePath); - } - - // Create a FileStatus for the commit file - FileStatus fileStatus = FileStatus.of(commitFilePath, file.length(), file.lastModified()); - - // Use Kernel's JsonHandler to read the file - try (CloseableIterator fileIter = - toCloseableIterator(Collections.singletonList(fileStatus).iterator()); - CloseableIterator batchIter = - engine.getJsonHandler().readJsonFiles(fileIter, COMMIT_FILE_SCHEMA, Optional.empty())) { - - while (batchIter.hasNext()) { - ColumnarBatch batch = batchIter.next(); - - // Process each row in the batch - try (CloseableIterator rowIter = batch.getRows()) { - while (rowIter.hasNext()) { - Row singleActionRow = rowIter.next(); - - // Extract the actual action Row and wrap it in SingleAction format - // Check if this row has an "add" action - if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("add"))) { - Row addRow = singleActionRow.getStruct(COMMIT_FILE_SCHEMA.indexOf("add")); - // Wrap in SingleAction format for commit - actions.add(SingleAction.createAddFileSingleAction(addRow)); - } - // Check if this row has a "remove" action - else if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("remove"))) { - Row removeRow = singleActionRow.getStruct(COMMIT_FILE_SCHEMA.indexOf("remove")); - // Wrap in SingleAction format for commit - actions.add(SingleAction.createRemoveFileSingleAction(removeRow)); - } else { - // Throw an error if the action is not recognized (not "add" or "remove") - throw new IOException( - "Unrecognized action in commit file row: " + singleActionRow.toString()); - } - } - } - } - } - - return actions; - } - /** Cleans up the state created during benchmark execution by reverting all committed changes. */ @Override public void cleanup() throws Exception { @@ -228,8 +149,6 @@ public void cleanup() throws Exception { engine.getFileSystemClient().delete(filePath); } } - - committedVersions.clear(); } /** @return a set of all file paths in the the `_delta_log/` directory of the table. */ @@ -238,8 +157,7 @@ private Set captureFileListing() throws IOException { // because the filesystem client lists all _sibling_ files in the directory with a path greater // than `0`. String deltaLogPathPrefix = - new io.delta.kernel.internal.fs.Path( - workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log/0") + new Path(workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log/0") .toUri() .getPath(); From 3e155b77ae1670766c71383fd39010e79a10ffa1 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 13:58:43 -0700 Subject: [PATCH 09/13] final cleanup --- .../delta/kernel/defaults/benchmarks/models/WriteSpec.java | 1 - .../defaults/benchmarks/workloadrunners/WorkloadRunner.java | 2 +- .../defaults/benchmarks/workloadrunners/WriteRunner.java | 6 ++---- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java index a34b03871fb..c95a54d39f4 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java @@ -96,7 +96,6 @@ public String getDataFilesPath() { public List parseActions(Engine engine, String specPath) throws IOException { String commitFilePath = new Path(specPath, getDataFilesPath()).toString(); - System.out.println("CommitSpec: Reading commit file at " + commitFilePath); File file = new File(commitFilePath); if (!file.exists()) { throw new IOException("Commit file not found: " + commitFilePath); diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java index d3916413eb5..35709b6eeed 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java @@ -71,7 +71,7 @@ public WorkloadRunner() {} * @throws Exception if any error occurs during cleanup. */ public void cleanup() throws Exception { - // Default implementation: no-op for read workloads + // Default implementation is no-op } // TODO: Add executeAsTest() method for correctness validation diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index 7a4ff9233e7..86f74e2ea9a 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -74,10 +74,8 @@ public void setup() throws Exception { // Capture initial listing of delta log files initialDeltaLogFiles = captureFileListing(); - System.out.println("Parsing commit files..."); // Load and parse all commit files for (WriteSpec.CommitSpec commitSpec : workloadSpec.getCommits()) { - System.out.println("Parsing commitSpec: " + commitSpec.getDataFilesPath()); commitActions.add(commitSpec.parseActions(engine, workloadSpec.getSpecDirectoryPath())); } } @@ -116,8 +114,8 @@ public void executeAsBenchmark(Blackhole blackhole) throws Exception { Transaction txn = txnBuilder.build(engine); // Convert actions list to CloseableIterable - CloseableIterator actionsIter = toCloseableIterator(actions.iterator()); - CloseableIterable dataActions = CloseableIterable.inMemoryIterable(actionsIter); + CloseableIterable dataActions = + CloseableIterable.inMemoryIterable(toCloseableIterator(actions.iterator())); TransactionCommitResult result = txn.commit(engine, dataActions); From 0bb12f2b5e67d3395a64a8ae9ec8f64e259d8ad0 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 14:03:46 -0700 Subject: [PATCH 10/13] more cleanup --- .../workload_specs/basic_append/specs/add_and_remove/spec.json | 3 +-- .../specs/{single_add => single_append}/commit_add.json | 0 .../basic_append/specs/{single_add => single_append}/spec.json | 3 +-- 3 files changed, 2 insertions(+), 4 deletions(-) rename kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/{single_add => single_append}/commit_add.json (100%) rename kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/{single_add => single_append}/spec.json (96%) diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json index 3b8c404532c..0330c385d5d 100644 --- a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json @@ -4,5 +4,4 @@ {"data_files_path": "commit_add.json"}, {"data_files_path": "commit_remove.json"} ] -} - +} \ No newline at end of file diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/commit_add.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_append/commit_add.json similarity index 100% rename from kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/commit_add.json rename to kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_append/commit_add.json diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_append/spec.json similarity index 96% rename from kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json rename to kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_append/spec.json index 207c7ce20a2..b5434ddb398 100644 --- a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_append/spec.json @@ -3,5 +3,4 @@ "commits": [ {"data_files_path": "commit_add.json"} ] -} - +} \ No newline at end of file From 3d2c4c91c02ecba5e46a3e24aa748f068b259e70 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 14:34:59 -0700 Subject: [PATCH 11/13] try to fix style check --- .../defaults/benchmarks/workloadrunners/WriteRunner.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index 86f74e2ea9a..4e61fda7757 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -130,9 +130,9 @@ public void executeAsBenchmark(Blackhole blackhole) throws Exception { .orElseThrow( () -> new IllegalStateException( - "Post-commit snapshot not available. This indicates a conflict occurred during " - + "the benchmark, which should not happen. Ensure no other processes are writing " - + "to the table: " + "Post-commit snapshot not available. This indicates a conflict " + + "occurred during the benchmark, which should not happen. " + + "Ensure no other processes are writing to the table: " + workloadSpec.getTableInfo().getResolvedTableRoot())); } } From 91c7ffd0c8d5ae6c034813117eb12c8619008ceb Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 14:46:27 -0700 Subject: [PATCH 12/13] simplify workload benchmark --- .../kernel/defaults/benchmarks/WorkloadBenchmark.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java index f64717a1c3b..b60fd7a0c02 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java @@ -50,15 +50,7 @@ public static class DefaultBenchmarkState extends AbstractBenchmarkState { @Override protected Engine getEngine(String engineName) { if (engineName.equals("default")) { - return DefaultEngine.create( - new Configuration() { - { - // Set the batch size. This is required for writes. - set("delta.kernel.default.parquet.reader.batch-size", "1024"); - set("delta.kernel.default.json.reader.batch-size", "1024"); - set("delta.kernel.default.parquet.writer.targetMaxFileSize", "10485760"); // 1 MB - } - }); + return DefaultEngine.create(new Configuration()); } else { throw new IllegalArgumentException("Unsupported engine: " + engineName); } From 4c205e254e39523647d3dc83d7fbf7e9c1935518 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Tue, 21 Oct 2025 14:11:04 -0700 Subject: [PATCH 13/13] first cut of ccv2 --- .../defaults/benchmarks/CCv2Context.java | 280 ++++++++++++++++++ .../benchmarks/WorkloadBenchmark.java | 10 +- .../benchmarks/WorkloadOutputFormat.java | 58 ++-- .../defaults/benchmarks/models/CCv2Info.java | 161 ++++++++++ .../defaults/benchmarks/models/TableInfo.java | 50 ++++ .../workloadrunners/ReadMetadataRunner.java | 10 +- .../workloadrunners/WorkloadRunner.java | 45 +++ .../workloadrunners/WriteRunner.java | 199 ++++++++++--- .../workload_specs/basic_ccv2/ccv2_info.json | 13 + .../_delta_log/00000000000000000000.json | 4 + .../_delta_log/00000000000000000001.json | 2 + ....a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d.json | 2 + ....f7e8d9c0-b1a2-4536-9748-5a6b7c8d9e0f.json | 2 + ...43c5-ac63-3ad4a7d749ae-c000.snappy.parquet | Bin 0 -> 984 bytes ...45dd-b33d-ae9aa1b96909-c000.snappy.parquet | Bin 0 -> 996 bytes .../specs/read_with_staged/spec.json | 5 + .../write_with_staged/add_back_files.json | 2 + .../specs/write_with_staged/spec.json | 6 + .../workload_specs/basic_ccv2/table_info.json | 7 + 19 files changed, 793 insertions(+), 63 deletions(-) create mode 100644 kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/CCv2Context.java create mode 100644 kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/CCv2Info.java create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/ccv2_info.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/00000000000000000000.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/00000000000000000001.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/_staged_commits/00000000000000000002.a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/_staged_commits/00000000000000000003.f7e8d9c0-b1a2-4536-9748-5a6b7c8d9e0f.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/part-00000-a9daef62-5a40-43c5-ac63-3ad4a7d749ae-c000.snappy.parquet create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/part-00000-c9f44819-b06d-45dd-b33d-ae9aa1b96909-c000.snappy.parquet create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/specs/read_with_staged/spec.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/specs/write_with_staged/add_back_files.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/specs/write_with_staged/spec.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/table_info.json diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/CCv2Context.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/CCv2Context.java new file mode 100644 index 00000000000..c71830200c4 --- /dev/null +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/CCv2Context.java @@ -0,0 +1,280 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.defaults.benchmarks; + +import io.delta.kernel.commit.CommitFailedException; +import io.delta.kernel.commit.CommitMetadata; +import io.delta.kernel.commit.CommitResponse; +import io.delta.kernel.commit.Committer; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.benchmarks.models.CCv2Info; +import io.delta.kernel.defaults.benchmarks.models.TableInfo; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.files.ParsedCatalogCommitData; +import io.delta.kernel.internal.files.ParsedLogData; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import io.delta.storage.commit.Commit; +import io.delta.storage.commit.uccommitcoordinator.UCClient; +import java.io.IOException; +import java.net.URI; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import org.apache.spark.sql.delta.coordinatedcommits.InMemoryUCClient; +import org.apache.spark.sql.delta.coordinatedcommits.InMemoryUCCommitCoordinator; +import scala.Option; + +/** + * Context for CCv2 (Coordinated Commits v2) tables in benchmarks. + * + *

This class encapsulates all the infrastructure needed to work with CCv2 tables, including the + * Unity Catalog client, commit coordinator, and custom committer. It's created from a {@link + * TableInfo} that has CCv2 configuration. + * + *

The context pre-populates the commit coordinator with staged commits from the CCv2Info + * configuration, allowing benchmarks to read from and write to CCv2 tables. + */ +public class CCv2Context { + + private final InMemoryUCCommitCoordinator coordinator; + private final InMemoryUCClient ucClient; + private final String tableId; + private final URI tableUri; + private final BenchmarkingCCv2Committer committer; + private final List parsedLogData; + private final String tableRoot; + + /** + * Private constructor. Use {@link #createFromTableInfo(TableInfo, Engine)} to create instances. + */ + private CCv2Context( + InMemoryUCCommitCoordinator coordinator, + InMemoryUCClient ucClient, + String tableId, + URI tableUri, + BenchmarkingCCv2Committer committer, + List parsedLogData, + String tableRoot) { + this.coordinator = coordinator; + this.ucClient = ucClient; + this.tableId = tableId; + this.tableUri = tableUri; + this.committer = committer; + this.parsedLogData = parsedLogData; + this.tableRoot = tableRoot; + } + + /** @return the committer for CCv2 commits */ + public Committer getCommitter() { + return committer; + } + + /** @return the list of parsed log data (staged commits) for SnapshotBuilder */ + public List getParsedLogData() { + return parsedLogData; + } + + /** + * Creates a CCv2Context from a TableInfo that has CCv2 configuration. + * + *

This method: + * + *

    + *
  1. Loads CCv2Info from the TableInfo + *
  2. Creates an InMemoryUCCommitCoordinator + *
  3. Pre-populates the coordinator with staged commits from the log_tail + *
  4. Creates an InMemoryUCClient wrapping the coordinator + *
  5. Creates a BenchmarkingCCv2Committer for writes + *
  6. Converts staged commits to ParsedLogData for SnapshotBuilder + *
+ * + * @param tableInfo the TableInfo containing CCv2 configuration + * @param engine the Engine to use for filesystem operations + * @return a CCv2Context ready for use + * @throws IllegalArgumentException if the TableInfo is not a CCv2 table + * @throws RuntimeException if there's an error setting up the CCv2 infrastructure + */ + public static CCv2Context createFromTableInfo(TableInfo tableInfo, Engine engine) { + if (!tableInfo.isCCv2Table()) { + throw new IllegalArgumentException("TableInfo does not have CCv2 configuration"); + } + + try { + // 1. Load CCv2Info + CCv2Info ccv2Info = tableInfo.getCCv2Info(); + String tableRoot = tableInfo.getResolvedTableRoot(); + + // 2. Create coordinator + InMemoryUCCommitCoordinator coordinator = new InMemoryUCCommitCoordinator(); + + // 3. Generate table ID and URI + String tableId = UUID.randomUUID().toString(); + URI tableUri = Paths.get(tableRoot).toUri(); + + // 4. Pre-populate coordinator with log_tail commits + List parsedLogDataList = new ArrayList<>(); + for (CCv2Info.StagedCommit stagedCommit : ccv2Info.getLogTail()) { + // Get file info for the staged commit using Engine's filesystem + String stagedCommitPath = stagedCommit.getFullPath(tableRoot); + FileStatus fileStatus = engine.getFileSystemClient().getFileStatus(stagedCommitPath); + + // Register with coordinator (use full path to the staged commit) + coordinator.commitToCoordinator( + tableId, + tableUri, + Option.apply(stagedCommitPath), // commitFileName (full path) + Option.apply(stagedCommit.getVersion()), // commitVersion + Option.apply(fileStatus.getSize()), // commitFileSize + Option.apply(fileStatus.getModificationTime()), // commitFileModTime + Option.apply(System.currentTimeMillis()), // commitTimestamp + Option.empty(), // lastKnownBackfilledVersion + false, // isDisownCommit + Option.empty(), // protocolOpt + Option.empty() // metadataOpt + ); + + // Convert to ParsedLogData + parsedLogDataList.add(ParsedCatalogCommitData.forFileStatus(fileStatus)); + } + + // 5. Create UCClient + String metastoreId = "benchmark-metastore-" + tableId; + InMemoryUCClient ucClient = new InMemoryUCClient(metastoreId, coordinator); + + // 6. Create committer + BenchmarkingCCv2Committer committer = + new BenchmarkingCCv2Committer(ucClient, tableId, tableUri, tableRoot); + + // 7. Return context + return new CCv2Context( + coordinator, ucClient, tableId, tableUri, committer, parsedLogDataList, tableRoot); + + } catch (Exception e) { + throw new RuntimeException("Failed to create CCv2Context", e); + } + } + + /** + * Helper method to convert Kernel FileStatus to Hadoop FileStatus. + * + * @param kernelFileStatus Kernel FileStatus to convert + * @return Hadoop FileStatus + */ + private static org.apache.hadoop.fs.FileStatus kernelFileStatusToHadoopFileStatus( + io.delta.kernel.utils.FileStatus kernelFileStatus) { + return new org.apache.hadoop.fs.FileStatus( + kernelFileStatus.getSize() /* length */, + false /* isDirectory */, + 1 /* blockReplication */, + 128 * 1024 * 1024 /* blockSize (128MB) */, + kernelFileStatus.getModificationTime() /* modificationTime */, + kernelFileStatus.getModificationTime() /* accessTime */, + org.apache.hadoop.fs.permission.FsPermission.getFileDefault() /* permission */, + "unknown" /* owner */, + "unknown" /* group */, + new org.apache.hadoop.fs.Path(kernelFileStatus.getPath()) /* path */); + } + + /** + * Committer implementation for CCv2 benchmarks. + * + *

This committer writes staged commits to the `_staged_commits/` directory and registers them + * with the Unity Catalog coordinator. + */ + static class BenchmarkingCCv2Committer implements Committer { + private final UCClient ucClient; + private final String tableId; + private final URI tableUri; + private final String tableRoot; + + public BenchmarkingCCv2Committer( + UCClient ucClient, String tableId, URI tableUri, String tableRoot) { + this.ucClient = ucClient; + this.tableId = tableId; + this.tableUri = tableUri; + this.tableRoot = tableRoot; + } + + @Override + public CommitResponse commit( + Engine engine, CloseableIterator finalizedActions, CommitMetadata commitMetadata) + throws CommitFailedException { + + long version = commitMetadata.getVersion(); + String stagedCommitsDir = Paths.get(tableRoot, "_delta_log", "_staged_commits").toString(); + + // Ensure _staged_commits directory exists using Engine's filesystem + try { + engine.getFileSystemClient().mkdirs(stagedCommitsDir); + } catch (IOException e) { + throw new CommitFailedException( + true /* retryable */, + false /* conflict */, + "Failed to create _staged_commits directory: " + e.getMessage(), + e); + } + + // 1. Write staged commit with UUID name + String commitUuid = UUID.randomUUID().toString(); + String stagedCommitFileName = String.format("%020d.%s.json", version, commitUuid); + String stagedCommitPath = Paths.get(stagedCommitsDir, stagedCommitFileName).toString(); + + try { + // Write the staged commit file + engine + .getJsonHandler() + .writeJsonFileAtomically(stagedCommitPath, finalizedActions, false /* overwrite */); + + // Get file status + FileStatus stagedFileStatus = engine.getFileSystemClient().getFileStatus(stagedCommitPath); + + // Convert to Hadoop FileStatus + org.apache.hadoop.fs.FileStatus hadoopFileStatus = + kernelFileStatusToHadoopFileStatus(stagedFileStatus); + + // 2. Register with UCClient + Commit commit = + new Commit( + version, hadoopFileStatus, System.currentTimeMillis() // commitTimestamp + ); + + ucClient.commit( + tableId, + tableUri, + Optional.of(commit), + Optional.empty(), // lastKnownBackfilledVersion + false, // disown + Optional.empty(), // newMetadata + Optional.empty() // newProtocol + ); + + // Return commit response with the staged commit file + return new CommitResponse(ParsedCatalogCommitData.forFileStatus(stagedFileStatus)); + + } catch (IOException e) { + throw new CommitFailedException( + true /* retryable */, false /* conflict */, "Failed to commit: " + e.getMessage(), e); + } catch (Exception e) { + throw new CommitFailedException( + false /* retryable */, false /* conflict */, "Failed to commit: " + e.getMessage(), e); + } + } + } +} diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java index b60fd7a0c02..c43f125d5f1 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java @@ -87,13 +87,21 @@ public static void main(String[] args) throws RunnerException, IOException { List filteredSpecs = new ArrayList<>(); for (WorkloadSpec spec : workloadSpecs) { // TODO: In the future, we can filter specific workloads using command line args here. - filteredSpecs.addAll(spec.getWorkloadVariants()); + List variants = spec.getWorkloadVariants(); + for (WorkloadSpec variant : variants) { + System.out.println("Variant: " + variant.getFullName()); + if (variant.getTableInfo().name.equals("basic_ccv2")) { + filteredSpecs.add(variant); + } + } } // Convert paths into a String array for JMH. JMH requires that parameters be of type String[]. String[] workloadSpecsArray = filteredSpecs.stream().map(WorkloadSpec::toJsonString).toArray(String[]::new); + System.out.println("Filtered specs list: " + Arrays.toString(workloadSpecsArray)); + // Configure and run JMH benchmark with the loaded workload specs Options opt = new OptionsBuilder() diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadOutputFormat.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadOutputFormat.java index e7bd2372700..5f2f6189825 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadOutputFormat.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadOutputFormat.java @@ -24,10 +24,7 @@ import java.util.HashMap; import org.openjdk.jmh.infra.BenchmarkParams; import org.openjdk.jmh.infra.IterationParams; -import org.openjdk.jmh.results.BenchmarkResult; -import org.openjdk.jmh.results.IterationResult; -import org.openjdk.jmh.results.Result; -import org.openjdk.jmh.results.RunResult; +import org.openjdk.jmh.results.*; import org.openjdk.jmh.runner.format.OutputFormat; import org.openjdk.jmh.util.Statistics; @@ -313,33 +310,40 @@ public void endRun(Collection result) { HashMap benchmarks = new HashMap<>(); for (RunResult res : result) { - for (BenchmarkResult br : res.getBenchmarkResults()) { - try { - WorkloadSpec spec = - WorkloadSpec.fromJsonString(br.getParams().getParam("workloadSpecJson")); - HashMap additionalParams = new HashMap<>(); - additionalParams.put("engine", br.getParams().getParam("engineName")); - - HashMap secondaryMetrics = new HashMap<>(); - for (String resultKey : br.getSecondaryResults().keySet()) { - Result r = br.getSecondaryResults().get(resultKey); - if (r instanceof org.openjdk.jmh.results.SampleTimeResult) { - secondaryMetrics.put(r.getLabel(), TimingMetric.fromResult(r)); - } else if (r instanceof org.openjdk.jmh.results.ScalarResult) { + System.out.println("Run result: " + res.toString()); + BenchmarkResult br = res.getAggregatedResult(); + System.out.println("Benchmark results: " + br.toString()); + try { + WorkloadSpec spec = + WorkloadSpec.fromJsonString(br.getParams().getParam("workloadSpecJson")); + HashMap additionalParams = new HashMap<>(); + additionalParams.put("engine", br.getParams().getParam("engineName")); + + HashMap secondaryMetrics = new HashMap<>(); + for (String resultKey : br.getSecondaryResults().keySet()) { + Result r = br.getSecondaryResults().get(resultKey); + if (r instanceof org.openjdk.jmh.results.SampleTimeResult) { + secondaryMetrics.put(r.getLabel(), TimingMetric.fromResult(r)); + } else if (r instanceof org.openjdk.jmh.results.ScalarResult) { + ScalarResult scalarResult = (ScalarResult) r; + if (scalarResult.getScoreUnit().equals("count")) { + // Treat as a long count metric secondaryMetrics.put(r.getLabel(), (long) r.getScore()); + } else { + secondaryMetrics.put(r.getLabel(), r.getScore()); } } - - BenchmarkDetails details = - new BenchmarkDetails( - spec, - additionalParams, - TimingMetric.fromResult(br.getPrimaryResult()), - secondaryMetrics); - benchmarks.put(spec.getFullName(), details); - } catch (IOException e) { - throw new RuntimeException(e); } + + BenchmarkDetails details = + new BenchmarkDetails( + spec, + additionalParams, + TimingMetric.fromResult(br.getPrimaryResult()), + secondaryMetrics); + benchmarks.put(spec.getFullName(), details); + } catch (IOException e) { + throw new RuntimeException(e); } } diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/CCv2Info.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/CCv2Info.java new file mode 100644 index 00000000000..dd7eacac5e1 --- /dev/null +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/CCv2Info.java @@ -0,0 +1,161 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.defaults.benchmarks.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; + +/** + * Represents CCv2 (Coordinated Commits v2) configuration for a Delta table used in benchmarks. + * + *

This class contains information about staged commits that need to be registered with the Unity + * Catalog commit coordinator for CCv2 tables. The staged commits are stored in the + * `_delta_log/_staged_commits/` directory. + * + *

Example JSON structure: + * + *

{@code
+ * {
+ *   "log_tail": [
+ *     {
+ *       "version": 0,
+ *       "staged_commit_name": "00000000-0000-0000-0000-000000000000.json"
+ *     },
+ *     {
+ *       "version": 1,
+ *       "staged_commit_name": "00000000-0000-0000-0000-000000000001.json"
+ *     }
+ *   ]
+ * }
+ * }
+ */ +public class CCv2Info { + + /** + * Represents a single staged commit in the CCv2 log tail. + * + *

Each staged commit corresponds to a UUID-based JSON file in the + * `_delta_log/_staged_commits/` directory that contains the Delta log actions for that version. + */ + public static class StagedCommit { + /** The version number of this commit. */ + @JsonProperty("version") + private long version; + + /** + * The filename of the staged commit, relative to `_delta_log/_staged_commits/`. + * + *

Example: "00000000-0000-0000-0000-000000000000.json" + */ + @JsonProperty("staged_commit_name") + private String stagedCommitName; + + /** Default constructor for Jackson deserialization. */ + public StagedCommit() {} + + /** + * Constructor for creating a StagedCommit. + * + * @param version the version number of the commit + * @param stagedCommitName the filename of the staged commit + */ + public StagedCommit(long version, String stagedCommitName) { + this.version = version; + this.stagedCommitName = stagedCommitName; + } + + /** @return the version number of this commit */ + public long getVersion() { + return version; + } + + /** @return the filename of the staged commit */ + public String getStagedCommitName() { + return stagedCommitName; + } + + /** + * Resolves the full path to the staged commit file. + * + * @param tableRoot the root path of the Delta table + * @return the full path to the staged commit file + */ + public String getFullPath(String tableRoot) { + return Paths.get(tableRoot, "_delta_log", "_staged_commits", stagedCommitName) + .toUri() // Format this as a URI + .toString(); + } + + @Override + public String toString() { + return String.format( + "StagedCommit{version=%d, stagedCommitName='%s'}", version, stagedCommitName); + } + } + + /** + * The list of staged commits that make up the log tail for this CCv2 table. + * + *

These commits have been ratified by the Unity Catalog coordinator but may not yet be + * backfilled to the regular Delta log. + */ + @JsonProperty("log_tail") + private List logTail; + + /** Default constructor for Jackson deserialization. */ + public CCv2Info() {} + + /** + * Constructor for creating CCv2Info. + * + * @param logTail the list of staged commits + */ + public CCv2Info(List logTail) { + this.logTail = logTail; + } + + /** + * Gets the list of staged commits in the log tail. + * + * @return the list of staged commits, or an empty list if none + */ + public List getLogTail() { + return logTail != null ? logTail : Collections.emptyList(); + } + + /** + * Loads a CCv2Info instance from a JSON file. + * + * @param jsonPath the path to the JSON file containing the CCv2Info + * @return the CCv2Info instance parsed from the file + * @throws IOException if there is an error reading or parsing the file + */ + public static CCv2Info fromJsonPath(String jsonPath) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(new File(jsonPath), CCv2Info.class); + } + + @Override + public String toString() { + return String.format("CCv2Info{logTail=%s}", logTail); + } +} diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java index 6f3d598cd5c..a710ea0cc80 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java @@ -66,6 +66,19 @@ public class TableInfo { @JsonProperty("table_info_path") private String tableInfoPath; + /** + * Whether this table is a CCv2 (Coordinated Commits v2) table. If true, the CCv2 info is loaded + * from a fixed path: ccv2_info.json in the same directory as table_info.json. + */ + @JsonProperty("ccv2_enabled") + private boolean ccv2Enabled; + + /** + * Lazily loaded CCv2 information. This is populated when {@link #getCCv2Info()} is called for the + * first time. + */ + @JsonIgnore private CCv2Info ccv2Info; + /** * Default constructor for Jackson deserialization. * @@ -89,6 +102,43 @@ public void setTableInfoPath(String tableInfoDirectory) { this.tableInfoPath = tableInfoDirectory; } + /** + * Checks if this table is a CCv2 (Coordinated Commits v2) table. + * + * @return true if ccv2_enabled is true, false otherwise + */ + @JsonIgnore + public boolean isCCv2Table() { + return ccv2Enabled; + } + + /** + * Gets the CCv2 information for this table. Lazily loads the CCv2Info from ccv2_info.json in the + * same directory as table_info.json if not already loaded. + * + * @return the CCv2Info for this table + * @throws IllegalStateException if this is not a CCv2 table + * @throws RuntimeException if there is an error loading the CCv2Info + */ + @JsonIgnore + public CCv2Info getCCv2Info() { + if (!isCCv2Table()) { + throw new IllegalStateException( + "This is not a CCv2 table. ccv2_enabled is not set to true in table_info.json"); + } + + if (ccv2Info == null) { + String ccv2InfoFullPath = Paths.get(tableInfoPath, "ccv2_info.json").toString(); + try { + ccv2Info = CCv2Info.fromJsonPath(ccv2InfoFullPath); + } catch (java.io.IOException e) { + throw new RuntimeException("Failed to load CCv2Info from: " + ccv2InfoFullPath, e); + } + } + + return ccv2Info; + } + /** * Creates a TableInfo instance by reading from a JSON file at the specified path. * diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/ReadMetadataRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/ReadMetadataRunner.java index 7a2e4ee4b31..648190f341a 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/ReadMetadataRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/ReadMetadataRunner.java @@ -18,10 +18,12 @@ import io.delta.kernel.*; import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.defaults.benchmarks.CCv2Context; import io.delta.kernel.defaults.benchmarks.models.ReadSpec; import io.delta.kernel.defaults.benchmarks.models.WorkloadSpec; import io.delta.kernel.engine.Engine; import io.delta.kernel.utils.CloseableIterator; +import java.util.Optional; import org.openjdk.jmh.infra.Blackhole; /** @@ -58,9 +60,13 @@ public ReadMetadataRunner(ReadSpec workloadSpec, Engine engine) { @Override public void setup() throws Exception { String workloadTableRoot = workloadSpec.getTableInfo().getResolvedTableRoot(); - SnapshotBuilder builder = TableManager.loadSnapshot(workloadTableRoot); + + // Create CCv2Context if this is a CCv2 table + Optional ccv2Context = createCCv2Context(workloadSpec.getTableInfo(), engine); + + SnapshotBuilder builder = getSnapshotBuilder(workloadTableRoot, ccv2Context); if (workloadSpec.getVersion() != null) { - builder.atVersion(workloadSpec.getVersion()); + builder = builder.atVersion(workloadSpec.getVersion()); } Snapshot snapshot = builder.build(engine); scan = snapshot.getScanBuilder().build(); diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java index 35709b6eeed..fb09bb38d38 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java @@ -16,7 +16,13 @@ package io.delta.kernel.defaults.benchmarks.workloadrunners; +import io.delta.kernel.SnapshotBuilder; +import io.delta.kernel.TableManager; +import io.delta.kernel.defaults.benchmarks.CCv2Context; +import io.delta.kernel.defaults.benchmarks.models.TableInfo; import io.delta.kernel.defaults.benchmarks.models.WorkloadSpec; +import io.delta.kernel.engine.Engine; +import java.util.Optional; import org.openjdk.jmh.infra.Blackhole; /** @@ -35,6 +41,7 @@ *

The {@link #setup()} method must be called before any execution method. */ public abstract class WorkloadRunner { + public WorkloadRunner() {} /** @return the name of this workload derived from the contents of the workload specification. */ @@ -74,6 +81,44 @@ public void cleanup() throws Exception { // Default implementation is no-op } + /** + * Creates a CCv2Context from the given TableInfo if it's a CCv2 table. + * + *

This method checks if the table has CCv2 configuration and creates the necessary + * infrastructure (InMemoryUCClient, coordinator, committer) for working with coordinated commits. + * + * @param tableInfo the table information + * @return Optional containing CCv2Context if this is a CCv2 table, empty otherwise + */ + protected Optional createCCv2Context(TableInfo tableInfo, Engine engine) { + if (tableInfo.isCCv2Table()) { + return Optional.of(CCv2Context.createFromTableInfo(tableInfo, engine)); + } else { + return Optional.empty(); + } + } + + /** + * Creates a SnapshotBuilder for the given table root, optionally configuring it for CCv2 tables. + * + *

If ccv2Context is present, the builder is configured with the CCv2 committer and parsed log + * data to enable reading from and writing to coordinated commits. + * + * @param tableRoot the root path of the Delta table + * @param ccv2Context optional CCv2 context for coordinated commits tables + * @return a SnapshotBuilder configured appropriately for the table type + */ + protected SnapshotBuilder getSnapshotBuilder( + String tableRoot, Optional ccv2Context) { + SnapshotBuilder builder = TableManager.loadSnapshot(tableRoot); + if (ccv2Context.isPresent()) { + CCv2Context context = ccv2Context.get(); + builder = + builder.withCommitter(context.getCommitter()).withLogData(context.getParsedLogData()); + } + return builder; + } + // TODO: Add executeAsTest() method for correctness validation // public abstract void executeAsTest() throws Exception; } diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index 4e61fda7757..7d2e62b2398 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -19,17 +19,29 @@ import static io.delta.kernel.internal.util.Utils.toCloseableIterator; import io.delta.kernel.*; +import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.benchmarks.CCv2Context; import io.delta.kernel.defaults.benchmarks.models.WorkloadSpec; import io.delta.kernel.defaults.benchmarks.models.WriteSpec; import io.delta.kernel.engine.Engine; -import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.SingleAction; +import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.transaction.UpdateTableTransactionBuilder; -import io.delta.kernel.utils.CloseableIterable; +import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; +import java.io.File; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import org.jetbrains.annotations.NotNull; import org.openjdk.jmh.infra.Blackhole; /** @@ -47,7 +59,10 @@ public class WriteRunner extends WorkloadRunner { private final Engine engine; private final WriteSpec workloadSpec; private List> commitActions; + private List committedVersions; private Snapshot currentSnapshot; + private long originalVersion; + private Table table; private Set initialDeltaLogFiles; /** @@ -59,6 +74,8 @@ public class WriteRunner extends WorkloadRunner { public WriteRunner(WriteSpec workloadSpec, Engine engine) { this.workloadSpec = workloadSpec; this.engine = engine; + this.committedVersions = new ArrayList<>(); + this.originalVersion = -1; } @Override @@ -67,16 +84,34 @@ public void setup() throws Exception { String tableRoot = workloadSpec.getTableInfo().getResolvedTableRoot(); - // Get the current snapshot - SnapshotBuilder builder = TableManager.loadSnapshot(tableRoot); - currentSnapshot = builder.build(engine); + // Capture initial state of delta log directory (both main log and staged commits) + io.delta.kernel.internal.fs.Path deltaLogPath = + new io.delta.kernel.internal.fs.Path(tableRoot, "_delta_log"); // Capture initial listing of delta log files - initialDeltaLogFiles = captureFileListing(); + initialDeltaLogFiles = getCurrentDirectoryListing(); + + // Create CCv2Context if this is a CCv2 table + Optional ccv2ContextOpt = createCCv2Context(workloadSpec.getTableInfo(), engine); + + // Create Table object once - will be reused for all commits + table = Table.forPath(engine, tableRoot); + + // Get the current version before any commits + SnapshotBuilder builder = getSnapshotBuilder(tableRoot, ccv2ContextOpt); + currentSnapshot = builder.build(engine); + originalVersion = currentSnapshot.getVersion(); // Load and parse all commit files for (WriteSpec.CommitSpec commitSpec : workloadSpec.getCommits()) { - commitActions.add(commitSpec.parseActions(engine, workloadSpec.getSpecDirectoryPath())); + String commitFilePath = + workloadSpec.getTableInfo().getTableInfoPath() + + "/specs/" + + workloadSpec.getCaseName() + + "/" + + commitSpec.getDataFilesPath(); + List actions = parseCommitFile(commitFilePath); + commitActions.add(actions); } } @@ -113,13 +148,15 @@ public void executeAsBenchmark(Blackhole blackhole) throws Exception { // Build and commit the transaction Transaction txn = txnBuilder.build(engine); - // Convert actions list to CloseableIterable - CloseableIterable dataActions = - CloseableIterable.inMemoryIterable(toCloseableIterator(actions.iterator())); + // Convert actions list to CloseableIterable (required by commit API) + CloseableIterator actionsIter = toCloseableIterator(actions.iterator()); + io.delta.kernel.utils.CloseableIterable dataActions = + io.delta.kernel.utils.CloseableIterable.inMemoryIterable(actionsIter); TransactionCommitResult result = txn.commit(engine, dataActions); long version = result.getVersion(); + committedVersions.add(version); blackhole.consume(version); // Use the post-commit snapshot for the next transaction @@ -130,38 +167,91 @@ public void executeAsBenchmark(Blackhole blackhole) throws Exception { .orElseThrow( () -> new IllegalStateException( - "Post-commit snapshot not available. This indicates a conflict " - + "occurred during the benchmark, which should not happen. " - + "Ensure no other processes are writing to the table: " + "Post-commit snapshot not available. This indicates a conflict occurred during " + + "the benchmark, which should not happen. Ensure no other processes are writing " + + "to the table: " + workloadSpec.getTableInfo().getResolvedTableRoot())); } } - /** Cleans up the state created during benchmark execution by reverting all committed changes. */ - @Override - public void cleanup() throws Exception { - // Delete any files that weren't present initially - Set currentFiles = captureFileListing(); - for (String filePath : currentFiles) { - if (!initialDeltaLogFiles.contains(filePath)) { - engine.getFileSystemClient().delete(filePath); + /** + * Parses a Delta log JSON file and returns a list of action Rows. + * + *

Uses Kernel's built-in JsonHandler to read Delta log format JSON files. + * + * @param commitFilePath path to the commit file containing Delta log JSON actions + * @return list of Row objects representing the actions (AddFile, RemoveFile, etc.) + * @throws IOException if there's an error reading or parsing the file + */ + private List parseCommitFile(String commitFilePath) throws IOException { + /** Schema for reading commit files with both "add" and "remove" actions. */ + final StructType COMMIT_FILE_SCHEMA = + new StructType() + .add("add", AddFile.FULL_SCHEMA, true /* nullable */) + .add("remove", RemoveFile.FULL_SCHEMA, true /* nullable */); + + List actions = new ArrayList<>(); + + File file = new File(commitFilePath); + if (!file.exists()) { + throw new IOException("Commit file not found: " + commitFilePath); + } + + // Create a FileStatus for the commit file + FileStatus fileStatus = FileStatus.of(commitFilePath, file.length(), file.lastModified()); + + // Use Kernel's JsonHandler to read the file + try (CloseableIterator fileIter = + toCloseableIterator(Collections.singletonList(fileStatus).iterator()); + CloseableIterator batchIter = + engine.getJsonHandler().readJsonFiles(fileIter, COMMIT_FILE_SCHEMA, Optional.empty())) { + + while (batchIter.hasNext()) { + ColumnarBatch batch = batchIter.next(); + + // Process each row in the batch + try (CloseableIterator rowIter = batch.getRows()) { + while (rowIter.hasNext()) { + Row singleActionRow = rowIter.next(); + + // Extract the actual action Row and wrap it in SingleAction format + // Check if this row has an "add" action + if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("add"))) { + Row addRow = singleActionRow.getStruct(COMMIT_FILE_SCHEMA.indexOf("add")); + // Wrap in SingleAction format for commit + actions.add(SingleAction.createAddFileSingleAction(addRow)); + } + // Check if this row has a "remove" action + else if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("remove"))) { + Row removeRow = singleActionRow.getStruct(COMMIT_FILE_SCHEMA.indexOf("remove")); + // Wrap in SingleAction format for commit + actions.add(SingleAction.createRemoveFileSingleAction(removeRow)); + } else { + // Throw an error if the action is not recognized (not "add" or "remove") + throw new IOException( + "Unrecognized action in commit file row: " + singleActionRow.toString()); + } + } + } } } - } - /** @return a set of all file paths in the the `_delta_log/` directory of the table. */ - private Set captureFileListing() throws IOException { - // Construct path prefix for all files in `_delta_log/`. The prefix is for file with name `0` - // because the filesystem client lists all _sibling_ files in the directory with a path greater - // than `0`. - String deltaLogPathPrefix = - new Path(workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log/0") - .toUri() - .getPath(); + return actions; + } + /** + * Captures a listing of all files whose paths start with the given prefix. Use a trailing slash + * to list files inside a directory. + * + * @param pathPrefix the path prefix to list from (e.g., "/path/to/dir/" to list files in dir) + * @return a set of all file paths starting with that prefix + */ + private Set captureInitialListing(String pathPrefix) throws IOException { Set files = new HashSet<>(); + // List from the lowest version in the prefix + String listFrom = pathPrefix + "/0"; try (CloseableIterator filesIter = - engine.getFileSystemClient().listFrom(deltaLogPathPrefix)) { + engine.getFileSystemClient().listFrom(listFrom)) { while (filesIter.hasNext()) { FileStatus file = filesIter.next(); files.add(file.getPath()); @@ -169,4 +259,47 @@ private Set captureFileListing() throws IOException { } return files; } + + /** + * Cleans up the state created during benchmark execution by reverting all committed changes. + * + *

This method removes any files in the delta log directory that weren't present at setup time, + * effectively reverting all changes made during the benchmark. This ensures the table is returned + * to its original state for the next benchmark iteration. + */ + @Override + public void cleanup() throws Exception { + if (originalVersion < 0 || initialDeltaLogFiles == null) { + // Setup was never called or failed + return; + } + + Set currentFiles = getCurrentDirectoryListing(); + + // Delete any files that weren't present initially + for (String filePath : currentFiles) { + if (!initialDeltaLogFiles.contains(filePath)) { + engine.getFileSystemClient().delete(filePath); + } + } + + committedVersions.clear(); + } + + private @NotNull Set getCurrentDirectoryListing() throws IOException { + // Get current state of delta log directory (both main log and staged commits) + io.delta.kernel.internal.fs.Path deltaLogPath = + new io.delta.kernel.internal.fs.Path( + workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log"); + + // Add trailing slash to list files INSIDE the directory + Set currentFiles = new HashSet<>(captureInitialListing(deltaLogPath + "/")); + + if (workloadSpec.getTableInfo().isCCv2Table()) { + // Also capture staged commits directory if it exists + String stagedCommitsPath = FileNames.stagedCommitDirectory(deltaLogPath); + currentFiles.addAll(captureInitialListing(stagedCommitsPath + "/")); + } + return currentFiles; + } } diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/ccv2_info.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/ccv2_info.json new file mode 100644 index 00000000000..f8c1c2848e1 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/ccv2_info.json @@ -0,0 +1,13 @@ +{ + "log_tail": [ + { + "version": 2, + "staged_commit_name": "00000000000000000002.a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d.json" + }, + { + "version": 3, + "staged_commit_name": "00000000000000000003.f7e8d9c0-b1a2-4536-9748-5a6b7c8d9e0f.json" + } + ] +} + diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/00000000000000000000.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/00000000000000000000.json new file mode 100644 index 00000000000..0a698ce9566 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/00000000000000000000.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1712091396253,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"3","numOutputBytes":"996"},"engineInfo":"Apache-Spark/3.5.1 Delta-Lake/3.1.0","txnId":"5df7dc20-b980-4207-a5fd-b69cb4541b2e"}} +{"metaData":{"id":"2a1e618f-d92a-4c94-bb06-a2808f8b39f3","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"letter\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"number\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"a_float\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1712091393302}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"add":{"path":"part-00000-c9f44819-b06d-45dd-b33d-ae9aa1b96909-c000.snappy.parquet","partitionValues":{},"size":996,"modificationTime":1712091396057,"dataChange":true,"stats":"{\"numRecords\":3,\"minValues\":{\"letter\":\"a\",\"number\":1,\"a_float\":1.1},\"maxValues\":{\"letter\":\"c\",\"number\":3,\"a_float\":3.3},\"nullCount\":{\"letter\":0,\"number\":0,\"a_float\":0}}"}} \ No newline at end of file diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/00000000000000000001.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/00000000000000000001.json new file mode 100644 index 00000000000..656b7d4ff7f --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/00000000000000000001.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1712091404556,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"2","numOutputBytes":"984"},"engineInfo":"Apache-Spark/3.5.1 Delta-Lake/3.1.0","txnId":"64562965-a4c4-48a7-84dd-e68ee934f467"}} +{"add":{"path":"part-00000-a9daef62-5a40-43c5-ac63-3ad4a7d749ae-c000.snappy.parquet","partitionValues":{},"size":984,"modificationTime":1712091404545,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"letter\":\"d\",\"number\":4,\"a_float\":4.4},\"maxValues\":{\"letter\":\"e\",\"number\":5,\"a_float\":5.5},\"nullCount\":{\"letter\":0,\"number\":0,\"a_float\":0}}"}} \ No newline at end of file diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/_staged_commits/00000000000000000002.a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/_staged_commits/00000000000000000002.a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d.json new file mode 100644 index 00000000000..477b9c6d16d --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/_staged_commits/00000000000000000002.a1b2c3d4-e5f6-4a7b-8c9d-0e1f2a3b4c5d.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1712091410000,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"2","numOutputBytes":"984"},"engineInfo":"Apache-Spark/3.5.1 Delta-Lake/3.1.0","txnId":"ccv2-test-txn-0002"}} +{"remove":{"path":"part-00000-a9daef62-5a40-43c5-ac63-3ad4a7d749ae-c000.snappy.parquet","partitionValues":{},"size":984,"modificationTime":1712091404545,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"letter\":\"d\",\"number\":4,\"a_float\":4.4},\"maxValues\":{\"letter\":\"e\",\"number\":5,\"a_float\":5.5},\"nullCount\":{\"letter\":0,\"number\":0,\"a_float\":0}}"}} diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/_staged_commits/00000000000000000003.f7e8d9c0-b1a2-4536-9748-5a6b7c8d9e0f.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/_staged_commits/00000000000000000003.f7e8d9c0-b1a2-4536-9748-5a6b7c8d9e0f.json new file mode 100644 index 00000000000..5059ae6c61e --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/_delta_log/_staged_commits/00000000000000000003.f7e8d9c0-b1a2-4536-9748-5a6b7c8d9e0f.json @@ -0,0 +1,2 @@ +{"commitInfo":{"timestamp":1712091415000,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":2,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"3","numOutputBytes":"996"},"engineInfo":"Apache-Spark/3.5.1 Delta-Lake/3.1.0","txnId":"ccv2-test-txn-0003"}} +{"remove":{"path":"part-00000-c9f44819-b06d-45dd-b33d-ae9aa1b96909-c000.snappy.parquet","partitionValues":{},"size":996,"modificationTime":1712091396057,"dataChange":true,"stats":"{\"numRecords\":3,\"minValues\":{\"letter\":\"a\",\"number\":1,\"a_float\":1.1},\"maxValues\":{\"letter\":\"c\",\"number\":3,\"a_float\":3.3},\"nullCount\":{\"letter\":0,\"number\":0,\"a_float\":0}}"}} diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/part-00000-a9daef62-5a40-43c5-ac63-3ad4a7d749ae-c000.snappy.parquet b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_ccv2/delta/part-00000-a9daef62-5a40-43c5-ac63-3ad4a7d749ae-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..2cc848773bdc1c8c20cc73904db3ff56b5204771 GIT binary patch literal 984 zcmb7D&x_MQ6n;s&-LUlF!p<;(97=G*F4WM_cDGxKtT&NGmg1@6E|bpeT1=a6ekf9U z@elAK2znCy7d-Uj!ILNP=0X1mFV?FsP0|)t5Qk(k^WOJ;Z|0lH>7A}afKDZnD2<*GWM!F~ix zgOv3qXeB+T5|tL|ecH9O&H?Nx5_g4*Ks$h-Q}jcD?9kULF~RDgFF7}@vEcUTnbqs8 zHq4eTP}p1Au!^p$e@@hAD!C2T43Bdwlh0SZ)SoF>Ah(QZxsOSK>(TWBKMI7>c~XGB zz>QiOeW~uC%T>nw;L_$}j&|A$XYHpl2nYU%ZPe|i?Eu@qh6t}b8xP?smXRI!&+MU$ zxZn|I-<63OJs(T6ZH7@0_an1vUJRrsLbLsFVvhJQNu_6nsTFiEy2SY?_B`%-h(WTf znz^poo=m6JB|kNN#rORGVP`jCMHhaY)ci;JM^j+;vQ6I_29?aEieo!<+o|li@xT)m y;WnIB-Kiakrh8a#$bR#vUK6duT3>MK95}L3YYE}S?mecejX8;WwJh)9x>4sFJ$!|(H- z->?3KF+Ek3^u^wWenZ)B&n&ZVUQnv{@PK$jbkW*YjU_)2>JM4E07` zPlNG1eJhvd7aOXW{+$WUhd2>0iu~31M!a{~+bhY|7S^bj=w@Lm?u^