From 4e42e933f20cf0d2def37b64d369543d31fe123f Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Fri, 17 Oct 2025 17:24:18 -0700 Subject: [PATCH 01/12] working write bencmark --- .../benchmarks/AbstractBenchmarkState.java | 15 +- .../defaults/benchmarks/BenchmarkUtils.java | 3 +- .../benchmarks/WorkloadBenchmark.java | 29 +- .../defaults/benchmarks/models/TableInfo.java | 104 +++++-- .../benchmarks/models/WorkloadSpec.java | 5 +- .../defaults/benchmarks/models/WriteSpec.java | 118 ++++++++ .../workloadrunners/ReadMetadataRunner.java | 4 +- .../workloadrunners/WorkloadRunner.java | 13 + .../workloadrunners/WriteRunner.java | 269 ++++++++++++++++++ .../specs/add_and_remove/commit_add.json | 1 + .../specs/add_and_remove/commit_remove.json | 1 + .../specs/add_and_remove/spec.json | 8 + .../specs/single_add/commit_add.json | 1 + .../basic_append/specs/single_add/spec.json | 7 + .../basic_append/table_info.json | 2 +- 15 files changed, 536 insertions(+), 44 deletions(-) create mode 100644 kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java create mode 100644 kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_add.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_remove.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/commit_add.json create mode 100644 kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java index 6e3bcfc6fbb..9497784caa2 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java @@ -40,14 +40,14 @@ public abstract class AbstractBenchmarkState { * dynamically by JMH. The value is set in the main method. */ @Param({}) - private String workloadSpecJson; + public String workloadSpecJson; /** * The engine to use for this benchmark. Note: This parameter will be set dynamically by JMH. The * value is set in the main method. */ @Param({}) - private String engineName; + public String engineName; /** The workload runner initialized for this benchmark invocation. */ private WorkloadRunner runner; @@ -76,6 +76,17 @@ public void setupInvocation() throws Exception { runner.setup(); } + /** + * Teardown method that runs after each benchmark invocation. This calls the {@link + * WorkloadRunner#cleanup()} to clean up any state created during execution. + * + * @throws Exception If any error occurs during cleanup. + */ + @TearDown(Level.Invocation) + public void teardownInvocation() throws Exception { + runner.cleanup(); + } + /** * Returns an instance of the desired engine based on the provided engine name. * diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkUtils.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkUtils.java index 9fa0b976936..1cb533e9727 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkUtils.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/BenchmarkUtils.java @@ -115,11 +115,10 @@ private static List loadSpecsFromTable(Path tableDir) { validateTableStructure(tableDir); Path tableInfoPath = tableDir.resolve(TABLE_INFO_FILE_NAME); - Path deltaDir = tableDir.resolve(DELTA_DIR_NAME); Path specsDir = tableDir.resolve(SPECS_DIR_NAME); TableInfo tableInfo = - TableInfo.fromJsonPath(tableInfoPath.toString(), deltaDir.toAbsolutePath().toString()); + TableInfo.fromJsonPath(tableInfoPath.toString(), tableDir.toAbsolutePath().toString()); return findSpecDirectories(specsDir).stream() .map(specDir -> loadSingleSpec(specDir, tableInfo)) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java index 6806a573d8f..17a1ffe1f0c 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java @@ -33,6 +33,8 @@ import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; import org.openjdk.jmh.runner.options.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Generic JMH benchmark for all workload types. Automatically loads and runs benchmarks based on @@ -45,12 +47,23 @@ @Measurement(iterations = 5, time = 1) public class WorkloadBenchmark { + private static final Logger log = LoggerFactory.getLogger(WorkloadBenchmark.class); + /** Default implementation of BenchmarkState that supports only the "default" engine. */ public static class DefaultBenchmarkState extends AbstractBenchmarkState { @Override protected Engine getEngine(String engineName) { if (engineName.equals("default")) { - return DefaultEngine.create(new Configuration()); + return DefaultEngine.create( + new Configuration() { + { + // Set the batch sizes to small so that we get to test the multiple batch/file + // scenarios. + set("delta.kernel.default.parquet.reader.batch-size", "20"); + set("delta.kernel.default.json.reader.batch-size", "20"); + set("delta.kernel.default.parquet.writer.targetMaxFileSize", "20"); + } + }); } else { throw new IllegalArgumentException("Unsupported engine: " + engineName); } @@ -78,6 +91,7 @@ public void benchmarkWorkload(DefaultBenchmarkState state, Blackhole blackhole) public static void main(String[] args) throws RunnerException, IOException { // Get workload specs from the workloads directory List workloadSpecs = BenchmarkUtils.loadAllWorkloads(WORKLOAD_SPECS_DIR); + System.out.println("Loaded " + workloadSpecs.size() + " workload specs"); if (workloadSpecs.isEmpty()) { throw new RunnerException( "No workloads found. Please add workload specs to the workloads directory."); @@ -87,7 +101,12 @@ public static void main(String[] args) throws RunnerException, IOException { List filteredSpecs = new ArrayList<>(); for (WorkloadSpec spec : workloadSpecs) { // TODO: In the future, we can filter specific workloads using command line args here. - filteredSpecs.addAll(spec.getWorkloadVariants()); + List variants = spec.getWorkloadVariants(); + for (WorkloadSpec variant : variants) { + if (variant.getType().equals("write")) { + filteredSpecs.add(variant); + } + } } // Convert paths into a String array for JMH. JMH requires that parameters be of type String[]. @@ -102,13 +121,13 @@ public static void main(String[] args) throws RunnerException, IOException { // TODO: In the future, this can be extended to support multiple engines. .param("engineName", "default") .forks(1) - .warmupIterations(3) // Proper warmup for production benchmarks - .measurementIterations(5) // Proper measurement iterations for production benchmarks + .warmupIterations(3) + .measurementIterations(5) .warmupTime(TimeValue.seconds(1)) .measurementTime(TimeValue.seconds(1)) .addProfiler(KernelMetricsProfiler.class) .build(); - new Runner(opt, new WorkloadOutputFormat()).run(); + new Runner(opt).run(); } } diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java index 647b52e5eb9..21464fa84d6 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java @@ -16,10 +16,12 @@ package io.delta.kernel.defaults.benchmarks.models; +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.ObjectMapper; import java.io.File; import java.io.IOException; -import org.codehaus.jackson.map.ObjectMapper; +import java.nio.file.Paths; /** * Represents metadata about a Delta table used in benchmark workloads. @@ -30,17 +32,26 @@ * *

TableInfo instances are typically loaded from JSON files in the workload specifications * directory structure. Each table directory should contain a {@code table_info.json} file with the - * table metadata and a {@code delta} subdirectory containing the actual table data. The table root - * path is the absolute path to the root of the table and is provided separately in {@link - * WorkloadSpec#fromJsonPath(String, String, TableInfo)}. + * table metadata and a {@code delta} subdirectory containing the actual table data. * - *

Example JSON structure: + *

Example JSON structure for relative table (default): * *

{@code
  * {
- *   "name": "large-table",
- *   "description": "A large Delta table with multi-part checkpoints for performance testing",
- *   "engineInfo": "Apache-Spark/3.5.1 Delta-Lake/3.1.0"
+ *   "name": "basic_table",
+ *   "description": "A basic Delta table for benchmarking",
+ *   "engine_info": "Apache-Spark/3.5.1 Delta-Lake/3.1.0"
+ * }
+ * }
+ * + *

Example JSON structure for absolute path table: + * + *

{@code
+ * {
+ *   "name": "s3_table",
+ *   "description": "Table stored in S3",
+ *   "table_type": "absolute",
+ *   "table_path": "s3://my-bucket/path/to/table"
  * }
  * }
*/ @@ -60,23 +71,28 @@ public class TableInfo { @JsonProperty("engine_info") public String engineInfo; - /** The root path where the Delta table is stored. */ - @JsonProperty("table_root") - public String tableRoot; + /** + * The type of table location: "relative" (default) or "absolute". + * + *

When "relative", the table is located at {table_info_directory}/delta. When "absolute", the + * table is located at the path specified in {@link #tablePath}. + */ + @JsonProperty("table_type") + private String tableType; - /** @return the absolute path to the root of the table */ - public String getTableRoot() { - return tableRoot; - } + /** + * The absolute path to the table when {@link #tableType} is "absolute". Can be a local path + * (file:///) or S3 path (s3://). Null when table_type is "relative". + */ + @JsonProperty("table_path") + private String tablePath; /** - * Sets the root path of the Delta table. - * - * @param tableRoot the absolute path to the root of the table + * The resolved absolute path to the root of the table. This is computed after deserialization + * based on {@link #tableType} and {@link #tablePath}. */ - public void setTableRoot(String tableRoot) { - this.tableRoot = tableRoot; - } + @JsonProperty("table_info_path") + private String tableInfoPath; /** * Default constructor for Jackson deserialization. @@ -86,23 +102,47 @@ public void setTableRoot(String tableRoot) { */ public TableInfo() {} + /** Resolves the table root path based on the table type and location configuration. */ + @JsonIgnore + public String getResolvedTableRoot() { + if ("absolute".equals(tableType)) { + if (tablePath == null || tablePath.trim().isEmpty()) { + throw new IllegalStateException( + "table_path must be specified when table_type is 'absolute'"); + } + return tablePath; + } else { + // Default to "relative" if tableType is null or "relative" + return Paths.get(tableInfoPath, "delta").toAbsolutePath().toString(); + } + } + + public String getTableInfoPath() { + return tableInfoPath; + } + + public void setTableInfoPath(String tableInfoDirectory) { + this.tableInfoPath = tableInfoDirectory; + } + /** * Creates a TableInfo instance by reading from a JSON file at the specified path. * - *

This method loads table metadata from a JSON file and sets the table root path. The JSON - * file should contain the table name and description, while the table root path is provided - * separately with the absolute path. + *

This method loads table metadata from a JSON file and resolves the table root path. The JSON + * file should contain the table name and description, while the table root path is computed based + * on the table_type and table_path fields. * * @param jsonPath the path to the JSON file containing the TableInfo metadata - * @param tableRoot the absolute path to the root of the Delta table - * @return a TableInfo instance populated from the JSON file and table root path + * @param tableInfoPath the directory containing the table_info.json file (used for relative path + * resolution) + * @return a TableInfo instance populated from the JSON file with resolved table root path * @throws RuntimeException if there is an error reading or parsing the JSON file */ - public static TableInfo fromJsonPath(String jsonPath, String tableRoot) { + public static TableInfo fromJsonPath(String jsonPath, String tableInfoPath) { ObjectMapper mapper = new ObjectMapper(); try { TableInfo info = mapper.readValue(new File(jsonPath), TableInfo.class); - info.setTableRoot(tableRoot); + info.setTableInfoPath(tableInfoPath); return info; } catch (IOException e) { throw new RuntimeException("Failed to read TableInfo from JSON file: " + jsonPath, e); @@ -112,8 +152,8 @@ public static TableInfo fromJsonPath(String jsonPath, String tableRoot) { /** * Returns a string representation of this TableInfo. * - *

The string includes the table name, description, and engine info, but excludes the table - * root path for security reasons (as it may contain sensitive path information). + *

The string includes the table name, description, engine info, and CCv2 status, but excludes + * the table root path for security reasons (as it may contain sensitive path information). * * @return a string representation of this TableInfo */ @@ -125,6 +165,8 @@ public String toString() { + description + "', engineInfo='" + engineInfo - + "'}"; + + "', tableType='" + + tableType + + "}"; } } diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java index 1d1ed0ba71c..861de96187b 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java @@ -33,7 +33,10 @@ * field in the JSON. */ @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type") -@JsonSubTypes({@JsonSubTypes.Type(value = ReadSpec.class, name = "read")}) +@JsonSubTypes({ + @JsonSubTypes.Type(value = ReadSpec.class, name = "read"), + @JsonSubTypes.Type(value = WriteSpec.class, name = "write") +}) public abstract class WorkloadSpec { /** * The type of workload (e.g., "read"). This is used by Jackson's polymorphic deserialization to diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java new file mode 100644 index 00000000000..2a321c34dc0 --- /dev/null +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java @@ -0,0 +1,118 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.defaults.benchmarks.models; + +import com.fasterxml.jackson.annotation.JsonProperty; +import io.delta.kernel.defaults.benchmarks.workloadrunners.WorkloadRunner; +import io.delta.kernel.defaults.benchmarks.workloadrunners.WriteRunner; +import io.delta.kernel.engine.Engine; +import java.util.Collections; +import java.util.List; + +/** + * Workload specification for write benchmarks. Defines test cases for writing to Delta tables with + * one or more commits containing add/remove actions. + * + *

Usage

+ * + *

To run this workload, use {@link WorkloadSpec#getRunner(Engine)} to get the appropriate {@link + * WriteRunner}. + * + * @see WriteRunner + */ +public class WriteSpec extends WorkloadSpec { + + /** + * Container for a single commit's configuration. + * + *

Each commit references a file containing the Delta log JSON actions (add/remove files) to be + * committed. + */ + public static class CommitSpec { + /** + * Path to the commit file containing Delta log JSON actions. The path is relative to the spec + * directory (where spec.json is located). + * + *

Example: "commit_a.json" + */ + @JsonProperty("data_files_path") + private String dataFilesPath; + + /** Default constructor for Jackson. */ + public CommitSpec() {} + + public CommitSpec(String dataFilesPath) { + this.dataFilesPath = dataFilesPath; + } + + public String getDataFilesPath() { + return dataFilesPath; + } + } + + /** + * List of commits to execute in sequence. Each commit contains a reference to a file with Delta + * log JSON actions. All commits are executed as part of the timed benchmark. + */ + @JsonProperty("commits") + private List commits; + + // Default constructor for Jackson + public WriteSpec() { + super("write"); + } + + /** + * Gets the list of commits to execute. + * + * @return list of commit specifications + */ + public List getCommits() { + return commits != null ? commits : Collections.emptyList(); + } + + /** @return the full name of this workload, derived from table name, case name, and type. */ + @Override + public String getFullName() { + return this.tableInfo.name + "/" + caseName + "/write"; + } + + @Override + public WorkloadRunner getRunner(Engine engine) { + return new WriteRunner(this, engine); + } + + /** + * Generates workload variants from this test case specification. + * + *

Currently, WriteSpec generates a single variant (itself). In the future, this could be + * extended to generate variants for different write patterns or configurations. + * + * @return list of WriteSpec variants, each representing a separate workload execution + */ + @Override + public List getWorkloadVariants() { + return Collections.singletonList(this); + } + + @Override + public String toString() { + return String.format( + "Write{caseName='%s', commits=%d, tableInfo='%s'}", + caseName, getCommits().size(), tableInfo); + } +} diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/ReadMetadataRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/ReadMetadataRunner.java index 8b264b11665..7a2e4ee4b31 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/ReadMetadataRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/ReadMetadataRunner.java @@ -56,8 +56,8 @@ public ReadMetadataRunner(ReadSpec workloadSpec, Engine engine) { } @Override - public void setup() { - String workloadTableRoot = workloadSpec.getTableInfo().getTableRoot(); + public void setup() throws Exception { + String workloadTableRoot = workloadSpec.getTableInfo().getResolvedTableRoot(); SnapshotBuilder builder = TableManager.loadSnapshot(workloadTableRoot); if (workloadSpec.getVersion() != null) { builder.atVersion(workloadSpec.getVersion()); diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java index 5107bab2a99..d3916413eb5 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java @@ -61,6 +61,19 @@ public WorkloadRunner() {} */ public abstract void executeAsBenchmark(Blackhole blackhole) throws Exception; + /** + * Cleans up any state created during benchmark execution. For write workloads, this removes added + * files and reverts table state. For read workloads, this is typically a no-op. + * + *

This method is called after each benchmark invocation to ensure a clean state for the next + * run. + * + * @throws Exception if any error occurs during cleanup. + */ + public void cleanup() throws Exception { + // Default implementation: no-op for read workloads + } + // TODO: Add executeAsTest() method for correctness validation // public abstract void executeAsTest() throws Exception; } diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java new file mode 100644 index 00000000000..1d93977f1ce --- /dev/null +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -0,0 +1,269 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.delta.kernel.defaults.benchmarks.workloadrunners; + +import static io.delta.kernel.internal.util.Utils.toCloseableIterator; + +import io.delta.kernel.*; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.benchmarks.models.WorkloadSpec; +import io.delta.kernel.defaults.benchmarks.models.WriteSpec; +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.SingleAction; +import io.delta.kernel.internal.util.FileNames; +import io.delta.kernel.transaction.UpdateTableTransactionBuilder; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import org.openjdk.jmh.infra.Blackhole; + +/** + * A WorkloadRunner that executes write workloads as benchmarks. This runner performs one or more + * commits to a Delta table and measures the performance of those commits. + * + *

The runner executes commits specified in the {@link WriteSpec}, where each commit contains a + * set of Delta log actions (add/remove files) defined in external JSON files. + * + *

If run as a benchmark using {@link #executeAsBenchmark(Blackhole)}, this measures the time to + * execute all commits. Setup (loading commit files) and cleanup (reverting changes) are not + * included in the benchmark timing. + */ +public class WriteRunner extends WorkloadRunner { + private final Engine engine; + private final WriteSpec workloadSpec; + private List> commitActions; + private List committedVersions; + private Snapshot currentSnapshot; + private long originalVersion; + + /** + * Constructs the WriteRunner from the workload spec and engine. + * + * @param workloadSpec The write workload specification. + * @param engine The engine to use for executing the workload. + */ + public WriteRunner(WriteSpec workloadSpec, Engine engine) { + this.workloadSpec = workloadSpec; + this.engine = engine; + this.committedVersions = new ArrayList<>(); + this.originalVersion = -1; + } + + @Override + public void setup() throws Exception { + commitActions = new ArrayList<>(); + + String tableRoot = workloadSpec.getTableInfo().getResolvedTableRoot(); + + // Get the current version before any commits + SnapshotBuilder builder = TableManager.loadSnapshot(tableRoot); + currentSnapshot = builder.build(engine); + originalVersion = currentSnapshot.getVersion(); + + // Load and parse all commit files + for (WriteSpec.CommitSpec commitSpec : workloadSpec.getCommits()) { + String commitFilePath = + workloadSpec.getTableInfo().getTableInfoPath() + + "/specs/" + + workloadSpec.getCaseName() + + "/" + + commitSpec.getDataFilesPath(); + List actions = parseCommitFile(commitFilePath); + commitActions.add(actions); + } + } + + /** @return the name of this workload. */ + @Override + public String getName() { + return "write"; + } + + /** @return The workload specification used to create this runner. */ + @Override + public WorkloadSpec getWorkloadSpec() { + return workloadSpec; + } + + /** + * Executes the write workload as a benchmark, consuming results via the provided Blackhole. + * + *

This method executes all commits specified in the workload spec in sequence. The timing + * includes only the commit execution, not the setup or cleanup. We reuse the post-commit snapshot + * from each transaction to avoid reloading from disk, which makes the benchmark more efficient + * and realistic. + * + * @param blackhole The Blackhole to consume results and avoid dead code elimination. + */ + @Override + public void executeAsBenchmark(Blackhole blackhole) throws Exception { + // Execute all commits in sequence (timed) + for (List actions : commitActions) { + // Create transaction from table (first iteration) or from post-commit snapshot (subsequent) + UpdateTableTransactionBuilder txnBuilder = + currentSnapshot.buildUpdateTableTransaction("Delta-Kernel-Benchmarks", Operation.WRITE); + + // Build and commit the transaction + Transaction txn = txnBuilder.build(engine); + + // Convert actions list to CloseableIterable (required by commit API) + CloseableIterator actionsIter = toCloseableIterator(actions.iterator()); + io.delta.kernel.utils.CloseableIterable dataActions = + io.delta.kernel.utils.CloseableIterable.inMemoryIterable(actionsIter); + + TransactionCommitResult result = txn.commit(engine, dataActions); + + long version = result.getVersion(); + committedVersions.add(version); + blackhole.consume(version); + + // Use the post-commit snapshot for the next transaction + // Post-commit snapshot should always be present unless there was a conflict + currentSnapshot = + result + .getPostCommitSnapshot() + .orElseThrow( + () -> + new IllegalStateException( + "Post-commit snapshot not available. This indicates a conflict occurred during " + + "the benchmark, which should not happen. Ensure no other processes are writing " + + "to the table: " + + workloadSpec.getTableInfo().getResolvedTableRoot())); + } + } + + /** + * Parses a Delta log JSON file and returns a list of action Rows. + * + *

Uses Kernel's built-in JsonHandler to read Delta log format JSON files. + * + * @param commitFilePath path to the commit file containing Delta log JSON actions + * @return list of Row objects representing the actions (AddFile, RemoveFile, etc.) + * @throws IOException if there's an error reading or parsing the file + */ + private List parseCommitFile(String commitFilePath) throws IOException { + /** Schema for reading commit files with both "add" and "remove" actions. */ + final StructType COMMIT_FILE_SCHEMA = + new StructType() + .add("add", AddFile.FULL_SCHEMA, true /* nullable */) + .add("remove", RemoveFile.FULL_SCHEMA, true /* nullable */); + + List actions = new ArrayList<>(); + + File file = new File(commitFilePath); + if (!file.exists()) { + throw new IOException("Commit file not found: " + commitFilePath); + } + + // Create a FileStatus for the commit file + FileStatus fileStatus = FileStatus.of(commitFilePath, file.length(), file.lastModified()); + + // Use Kernel's JsonHandler to read the file + try (CloseableIterator fileIter = + toCloseableIterator(Collections.singletonList(fileStatus).iterator()); + CloseableIterator batchIter = + engine.getJsonHandler().readJsonFiles(fileIter, COMMIT_FILE_SCHEMA, Optional.empty())) { + + while (batchIter.hasNext()) { + ColumnarBatch batch = batchIter.next(); + + // Process each row in the batch + try (CloseableIterator rowIter = batch.getRows()) { + while (rowIter.hasNext()) { + Row singleActionRow = rowIter.next(); + + // Extract the actual action Row and wrap it in SingleAction format + // Check if this row has an "add" action + if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("add"))) { + Row addRow = singleActionRow.getStruct(COMMIT_FILE_SCHEMA.indexOf("add")); + // Wrap in SingleAction format for commit + actions.add(SingleAction.createAddFileSingleAction(addRow)); + } + // Check if this row has a "remove" action + else if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("remove"))) { + Row removeRow = singleActionRow.getStruct(COMMIT_FILE_SCHEMA.indexOf("remove")); + // Wrap in SingleAction format for commit + actions.add(SingleAction.createRemoveFileSingleAction(removeRow)); + } else { + // Throw an error if the action is not recognized (not "add" or "remove") + throw new IOException( + "Unrecognized action in commit file row: " + singleActionRow.toString()); + } + } + } + } + } + + return actions; + } + + /** + * Cleans up the state created during benchmark execution by reverting all committed changes. + * + *

This method removes the commit files for any version greater than the original version, + * effectively reverting all changes made during the benchmark. This ensures the table is returned + * to its original state for the next benchmark iteration. + */ + @Override + public void cleanup() throws Exception { + if (originalVersion < 0) { + // Setup was never called or failed + return; + } + + // Delete all Delta log files with version > originalVersion + io.delta.kernel.internal.fs.Path deltaLogPath = + new io.delta.kernel.internal.fs.Path( + workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log"); + + // Use listingPrefix to start listing from originalVersion + 1 + // This is more efficient than listing from version 0 + String startPath = FileNames.listingPrefix(deltaLogPath, originalVersion + 1); + + try (CloseableIterator filesIter = + engine.getFileSystemClient().listFrom(startPath)) { + + while (filesIter.hasNext()) { + FileStatus file = filesIter.next(); + String filePath = file.getPath(); + + try { + // Get version from any recognized Delta log file (commit, checkpoint, checksum/CRC, etc.) + // FileNames.getFileVersion handles: .json, .checkpoint.parquet, .crc, .compacted.json + long version = FileNames.getFileVersion(new io.delta.kernel.internal.fs.Path(filePath)); + if (version > originalVersion) { + // Delete this file (includes CRC files automatically) + engine.getFileSystemClient().delete(filePath); + } + } catch (IllegalArgumentException e) { + // Skip unrecognized files - ensures forward compatibility + } + } + } + + committedVersions.clear(); + } +} diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_add.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_add.json new file mode 100644 index 00000000000..1f37b2b9871 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_add.json @@ -0,0 +1 @@ +{"add":{"path":"part-00000-a9daef62-5a40-43c5-ac63-3ad4a7d749ae-c000.snappy.parquet","partitionValues":{},"size":1024,"modificationTime":1712091404545,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"letter\":\"a\",\"number\":1,\"a_float\":1.1},\"maxValues\":{\"letter\":\"j\",\"number\":10,\"a_float\":10.10},\"nullCount\":{\"letter\":0,\"number\":0,\"a_float\":0}}"}} \ No newline at end of file diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_remove.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_remove.json new file mode 100644 index 00000000000..bd9089e46f4 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/commit_remove.json @@ -0,0 +1 @@ +{"remove":{"path":"part-00000-a9daef62-5a40-43c5-ac63-3ad4a7d749ae-c000.snappy.parquet","partitionValues":{},"size":1024,"modificationTime":1712091404545,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"letter\":\"a\",\"number\":1,\"a_float\":1.1},\"maxValues\":{\"letter\":\"j\",\"number\":10,\"a_float\":10.10},\"nullCount\":{\"letter\":0,\"number\":0,\"a_float\":0}}"}} \ No newline at end of file diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json new file mode 100644 index 00000000000..3b8c404532c --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json @@ -0,0 +1,8 @@ +{ + "type": "write", + "commits": [ + {"data_files_path": "commit_add.json"}, + {"data_files_path": "commit_remove.json"} + ] +} + diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/commit_add.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/commit_add.json new file mode 100644 index 00000000000..1f37b2b9871 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/commit_add.json @@ -0,0 +1 @@ +{"add":{"path":"part-00000-a9daef62-5a40-43c5-ac63-3ad4a7d749ae-c000.snappy.parquet","partitionValues":{},"size":1024,"modificationTime":1712091404545,"dataChange":true,"stats":"{\"numRecords\":10,\"minValues\":{\"letter\":\"a\",\"number\":1,\"a_float\":1.1},\"maxValues\":{\"letter\":\"j\",\"number\":10,\"a_float\":10.10},\"nullCount\":{\"letter\":0,\"number\":0,\"a_float\":0}}"}} \ No newline at end of file diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json new file mode 100644 index 00000000000..207c7ce20a2 --- /dev/null +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json @@ -0,0 +1,7 @@ +{ + "type": "write", + "commits": [ + {"data_files_path": "commit_add.json"} + ] +} + diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/table_info.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/table_info.json index ad470d337d1..94637d8fc33 100644 --- a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/table_info.json +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/table_info.json @@ -1,5 +1,5 @@ { "name": "basic_append", "description": "A basic table with two append writes.", - "engineInfo": "Apache-Spark/3.5.1 Delta-Lake/3.1.0" + "engine_info": "Apache-Spark/3.5.1 Delta-Lake/3.1.0" } \ No newline at end of file From 2472fc3a5f6410fc8dbb7e5797ed366be7fe0c00 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 09:59:41 -0700 Subject: [PATCH 02/12] cleanup --- .../benchmarks/AbstractBenchmarkState.java | 4 ++-- .../benchmarks/WorkloadBenchmark.java | 22 +++++-------------- 2 files changed, 8 insertions(+), 18 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java index 9497784caa2..9869dbb10f9 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/AbstractBenchmarkState.java @@ -40,14 +40,14 @@ public abstract class AbstractBenchmarkState { * dynamically by JMH. The value is set in the main method. */ @Param({}) - public String workloadSpecJson; + private String workloadSpecJson; /** * The engine to use for this benchmark. Note: This parameter will be set dynamically by JMH. The * value is set in the main method. */ @Param({}) - public String engineName; + private String engineName; /** The workload runner initialized for this benchmark invocation. */ private WorkloadRunner runner; diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java index 17a1ffe1f0c..f9c09174f22 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java @@ -33,8 +33,6 @@ import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; import org.openjdk.jmh.runner.options.TimeValue; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; /** * Generic JMH benchmark for all workload types. Automatically loads and runs benchmarks based on @@ -47,8 +45,6 @@ @Measurement(iterations = 5, time = 1) public class WorkloadBenchmark { - private static final Logger log = LoggerFactory.getLogger(WorkloadBenchmark.class); - /** Default implementation of BenchmarkState that supports only the "default" engine. */ public static class DefaultBenchmarkState extends AbstractBenchmarkState { @Override @@ -57,11 +53,10 @@ protected Engine getEngine(String engineName) { return DefaultEngine.create( new Configuration() { { - // Set the batch sizes to small so that we get to test the multiple batch/file - // scenarios. - set("delta.kernel.default.parquet.reader.batch-size", "20"); - set("delta.kernel.default.json.reader.batch-size", "20"); - set("delta.kernel.default.parquet.writer.targetMaxFileSize", "20"); + // Set the batch size. This is required for writes. + set("delta.kernel.default.parquet.reader.batch-size", "1024"); + set("delta.kernel.default.json.reader.batch-size", "1024"); + set("delta.kernel.default.parquet.writer.targetMaxFileSize", "10485760"); // 1 MB } }); } else { @@ -101,12 +96,7 @@ public static void main(String[] args) throws RunnerException, IOException { List filteredSpecs = new ArrayList<>(); for (WorkloadSpec spec : workloadSpecs) { // TODO: In the future, we can filter specific workloads using command line args here. - List variants = spec.getWorkloadVariants(); - for (WorkloadSpec variant : variants) { - if (variant.getType().equals("write")) { - filteredSpecs.add(variant); - } - } + filteredSpecs.addAll(spec.getWorkloadVariants()); } // Convert paths into a String array for JMH. JMH requires that parameters be of type String[]. @@ -128,6 +118,6 @@ public static void main(String[] args) throws RunnerException, IOException { .addProfiler(KernelMetricsProfiler.class) .build(); - new Runner(opt).run(); + new Runner(opt, new WorkloadOutputFormat()).run(); } } From 99da4298b883d60e4536a0abd05f7aa714c4e613 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 10:16:30 -0700 Subject: [PATCH 03/12] cleanup --- .../workloadrunners/WriteRunner.java | 69 ++++++++++--------- 1 file changed, 37 insertions(+), 32 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index 1d93977f1ce..bc2dbf55dcb 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -27,17 +27,13 @@ import io.delta.kernel.internal.actions.AddFile; import io.delta.kernel.internal.actions.RemoveFile; import io.delta.kernel.internal.actions.SingleAction; -import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.transaction.UpdateTableTransactionBuilder; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; import java.io.File; import java.io.IOException; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Optional; +import java.util.*; import org.openjdk.jmh.infra.Blackhole; /** @@ -58,6 +54,7 @@ public class WriteRunner extends WorkloadRunner { private List committedVersions; private Snapshot currentSnapshot; private long originalVersion; + private Set initialDeltaLogFiles; /** * Constructs the WriteRunner from the workload spec and engine. @@ -78,10 +75,12 @@ public void setup() throws Exception { String tableRoot = workloadSpec.getTableInfo().getResolvedTableRoot(); - // Get the current version before any commits + // Get the current snapshot SnapshotBuilder builder = TableManager.loadSnapshot(tableRoot); currentSnapshot = builder.build(engine); - originalVersion = currentSnapshot.getVersion(); + + // Capture initial listing of delta log files + initialDeltaLogFiles = captureFileListing(); // Load and parse all commit files for (WriteSpec.CommitSpec commitSpec : workloadSpec.getCommits()) { @@ -129,7 +128,7 @@ public void executeAsBenchmark(Blackhole blackhole) throws Exception { // Build and commit the transaction Transaction txn = txnBuilder.build(engine); - // Convert actions list to CloseableIterable (required by commit API) + // Convert actions list to CloseableIterable CloseableIterator actionsIter = toCloseableIterator(actions.iterator()); io.delta.kernel.utils.CloseableIterable dataActions = io.delta.kernel.utils.CloseableIterable.inMemoryIterable(actionsIter); @@ -234,36 +233,42 @@ public void cleanup() throws Exception { return; } - // Delete all Delta log files with version > originalVersion - io.delta.kernel.internal.fs.Path deltaLogPath = - new io.delta.kernel.internal.fs.Path( - workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log"); + // Delete any files that weren't present initially + Set currentFiles = captureFileListing(); + for (String filePath : currentFiles) { + if (!initialDeltaLogFiles.contains(filePath)) { + engine.getFileSystemClient().delete(filePath); + } + } - // Use listingPrefix to start listing from originalVersion + 1 - // This is more efficient than listing from version 0 - String startPath = FileNames.listingPrefix(deltaLogPath, originalVersion + 1); + committedVersions.clear(); + } - try (CloseableIterator filesIter = - engine.getFileSystemClient().listFrom(startPath)) { + /** + * Captures a listing of all files whose paths start with the given prefix. Use a trailing slash + * to list files inside a directory. + * + * @return a set of all file paths starting with that prefix + */ + private Set captureFileListing() throws IOException { + // Construct path prefix for all files in `_delta_log/`. The prefix is for file with name `0` + // because + // the filesystem client lists all _sibling_ files in the directory with a path greater than + // `0`. + String deltaLogPathPrefix = + new io.delta.kernel.internal.fs.Path( + workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log/0") + .toUri() + .getPath(); + Set files = new HashSet<>(); + try (CloseableIterator filesIter = + engine.getFileSystemClient().listFrom(deltaLogPathPrefix)) { while (filesIter.hasNext()) { FileStatus file = filesIter.next(); - String filePath = file.getPath(); - - try { - // Get version from any recognized Delta log file (commit, checkpoint, checksum/CRC, etc.) - // FileNames.getFileVersion handles: .json, .checkpoint.parquet, .crc, .compacted.json - long version = FileNames.getFileVersion(new io.delta.kernel.internal.fs.Path(filePath)); - if (version > originalVersion) { - // Delete this file (includes CRC files automatically) - engine.getFileSystemClient().delete(filePath); - } - } catch (IllegalArgumentException e) { - // Skip unrecognized files - ensures forward compatibility - } + files.add(file.getPath()); } } - - committedVersions.clear(); + return files; } } From d100845e7f8fe715f4e22bed45990745ba6aac58 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 10:21:44 -0700 Subject: [PATCH 04/12] fix cleanup, remove prints --- .../benchmarks/workloadrunners/WriteRunner.java | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index bc2dbf55dcb..cede23b109a 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -66,7 +66,6 @@ public WriteRunner(WriteSpec workloadSpec, Engine engine) { this.workloadSpec = workloadSpec; this.engine = engine; this.committedVersions = new ArrayList<>(); - this.originalVersion = -1; } @Override @@ -228,11 +227,6 @@ else if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("remove"))) { */ @Override public void cleanup() throws Exception { - if (originalVersion < 0) { - // Setup was never called or failed - return; - } - // Delete any files that weren't present initially Set currentFiles = captureFileListing(); for (String filePath : currentFiles) { @@ -252,9 +246,8 @@ public void cleanup() throws Exception { */ private Set captureFileListing() throws IOException { // Construct path prefix for all files in `_delta_log/`. The prefix is for file with name `0` - // because - // the filesystem client lists all _sibling_ files in the directory with a path greater than - // `0`. + // because the filesystem client lists all _sibling_ files in the directory with a path greater + // than `0`. String deltaLogPathPrefix = new io.delta.kernel.internal.fs.Path( workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log/0") From b8da66105026dea509881b425cd0d1d2dbce98c6 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 10:33:22 -0700 Subject: [PATCH 05/12] improve comments --- .../defaults/benchmarks/workloadrunners/WriteRunner.java | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index cede23b109a..3e35070def7 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -220,10 +220,6 @@ else if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("remove"))) { /** * Cleans up the state created during benchmark execution by reverting all committed changes. - * - *

This method removes the commit files for any version greater than the original version, - * effectively reverting all changes made during the benchmark. This ensures the table is returned - * to its original state for the next benchmark iteration. */ @Override public void cleanup() throws Exception { @@ -239,10 +235,7 @@ public void cleanup() throws Exception { } /** - * Captures a listing of all files whose paths start with the given prefix. Use a trailing slash - * to list files inside a directory. - * - * @return a set of all file paths starting with that prefix + * @return a set of all file paths in the the `_delta_log/` directory of the table. */ private Set captureFileListing() throws IOException { // Construct path prefix for all files in `_delta_log/`. The prefix is for file with name `0` From 41584465b3c9f86546e20514305f876a9e40ab4c Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 10:47:29 -0700 Subject: [PATCH 06/12] clean up tableinfo --- .../defaults/benchmarks/models/TableInfo.java | 37 ++----------------- .../workloadrunners/WriteRunner.java | 8 +--- 2 files changed, 5 insertions(+), 40 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java index 21464fa84d6..d7617d5f20c 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java @@ -50,8 +50,6 @@ * { * "name": "s3_table", * "description": "Table stored in S3", - * "table_type": "absolute", - * "table_path": "s3://my-bucket/path/to/table" * } * } */ @@ -71,26 +69,7 @@ public class TableInfo { @JsonProperty("engine_info") public String engineInfo; - /** - * The type of table location: "relative" (default) or "absolute". - * - *

When "relative", the table is located at {table_info_directory}/delta. When "absolute", the - * table is located at the path specified in {@link #tablePath}. - */ - @JsonProperty("table_type") - private String tableType; - - /** - * The absolute path to the table when {@link #tableType} is "absolute". Can be a local path - * (file:///) or S3 path (s3://). Null when table_type is "relative". - */ - @JsonProperty("table_path") - private String tablePath; - - /** - * The resolved absolute path to the root of the table. This is computed after deserialization - * based on {@link #tableType} and {@link #tablePath}. - */ + /** The resolved absolute path to the root of the table. */ @JsonProperty("table_info_path") private String tableInfoPath; @@ -105,16 +84,8 @@ public TableInfo() {} /** Resolves the table root path based on the table type and location configuration. */ @JsonIgnore public String getResolvedTableRoot() { - if ("absolute".equals(tableType)) { - if (tablePath == null || tablePath.trim().isEmpty()) { - throw new IllegalStateException( - "table_path must be specified when table_type is 'absolute'"); - } - return tablePath; - } else { - // Default to "relative" if tableType is null or "relative" - return Paths.get(tableInfoPath, "delta").toAbsolutePath().toString(); - } + // Default to "relative" if tableType is null or "relative" + return Paths.get(tableInfoPath, "delta").toAbsolutePath().toString(); } public String getTableInfoPath() { @@ -165,8 +136,6 @@ public String toString() { + description + "', engineInfo='" + engineInfo - + "', tableType='" - + tableType + "}"; } } diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index 3e35070def7..35955fba5e6 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -218,9 +218,7 @@ else if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("remove"))) { return actions; } - /** - * Cleans up the state created during benchmark execution by reverting all committed changes. - */ + /** Cleans up the state created during benchmark execution by reverting all committed changes. */ @Override public void cleanup() throws Exception { // Delete any files that weren't present initially @@ -234,9 +232,7 @@ public void cleanup() throws Exception { committedVersions.clear(); } - /** - * @return a set of all file paths in the the `_delta_log/` directory of the table. - */ + /** @return a set of all file paths in the the `_delta_log/` directory of the table. */ private Set captureFileListing() throws IOException { // Construct path prefix for all files in `_delta_log/`. The prefix is for file with name `0` // because the filesystem client lists all _sibling_ files in the directory with a path greater From 9364cd8a43194bb53634cd7f4dbf1747c093e5e4 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 10:58:30 -0700 Subject: [PATCH 07/12] final tableinfo cleanup --- .../benchmarks/WorkloadBenchmark.java | 1 - .../defaults/benchmarks/models/TableInfo.java | 35 ++++++++----------- 2 files changed, 14 insertions(+), 22 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java index f9c09174f22..f64717a1c3b 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java @@ -86,7 +86,6 @@ public void benchmarkWorkload(DefaultBenchmarkState state, Blackhole blackhole) public static void main(String[] args) throws RunnerException, IOException { // Get workload specs from the workloads directory List workloadSpecs = BenchmarkUtils.loadAllWorkloads(WORKLOAD_SPECS_DIR); - System.out.println("Loaded " + workloadSpecs.size() + " workload specs"); if (workloadSpecs.isEmpty()) { throw new RunnerException( "No workloads found. Please add workload specs to the workloads directory."); diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java index d7617d5f20c..6f3d598cd5c 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/TableInfo.java @@ -32,24 +32,17 @@ * *

TableInfo instances are typically loaded from JSON files in the workload specifications * directory structure. Each table directory should contain a {@code table_info.json} file with the - * table metadata and a {@code delta} subdirectory containing the actual table data. + * table metadata and a {@code delta} subdirectory containing the actual table data. The table root + * path is the absolute path to the root of the table and is provided separately in {@link + * WorkloadSpec#fromJsonPath(String, String, TableInfo)}. * - *

Example JSON structure for relative table (default): + *

Example JSON structure: * *

{@code
  * {
- *   "name": "basic_table",
- *   "description": "A basic Delta table for benchmarking",
- *   "engine_info": "Apache-Spark/3.5.1 Delta-Lake/3.1.0"
- * }
- * }
- * - *

Example JSON structure for absolute path table: - * - *

{@code
- * {
- *   "name": "s3_table",
- *   "description": "Table stored in S3",
+ *   "name": "large-table",
+ *   "description": "A large Delta table with multi-part checkpoints for performance testing",
+ *   "engineInfo": "Apache-Spark/3.5.1 Delta-Lake/3.1.0"
  * }
  * }
*/ @@ -99,14 +92,14 @@ public void setTableInfoPath(String tableInfoDirectory) { /** * Creates a TableInfo instance by reading from a JSON file at the specified path. * - *

This method loads table metadata from a JSON file and resolves the table root path. The JSON - * file should contain the table name and description, while the table root path is computed based - * on the table_type and table_path fields. + *

This method loads table metadata from a JSON file and sets the table root path. The JSON + * file should contain the table name and description, while the table root path is provided + * separately with the absolute path. * * @param jsonPath the path to the JSON file containing the TableInfo metadata * @param tableInfoPath the directory containing the table_info.json file (used for relative path * resolution) - * @return a TableInfo instance populated from the JSON file with resolved table root path + * @return a TableInfo instance populated from the JSON file and table root path * @throws RuntimeException if there is an error reading or parsing the JSON file */ public static TableInfo fromJsonPath(String jsonPath, String tableInfoPath) { @@ -123,8 +116,8 @@ public static TableInfo fromJsonPath(String jsonPath, String tableInfoPath) { /** * Returns a string representation of this TableInfo. * - *

The string includes the table name, description, engine info, and CCv2 status, but excludes - * the table root path for security reasons (as it may contain sensitive path information). + *

The string includes the table name, description, and engine info, but excludes the table + * root path for security reasons (as it may contain sensitive path information). * * @return a string representation of this TableInfo */ @@ -136,6 +129,6 @@ public String toString() { + description + "', engineInfo='" + engineInfo - + "}"; + + "'}"; } } From a6324c5e57ae551645a8d75361deea9518d30df0 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 13:48:28 -0700 Subject: [PATCH 08/12] refactor --- .../benchmarks/models/WorkloadSpec.java | 5 + .../defaults/benchmarks/models/WriteSpec.java | 82 ++++++++++++++++ .../workloadrunners/WriteRunner.java | 96 ++----------------- 3 files changed, 94 insertions(+), 89 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java index 861de96187b..66c1c8349f1 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WorkloadSpec.java @@ -83,6 +83,11 @@ public TableInfo getTableInfo() { return tableInfo; } + @JsonIgnore + public String getSpecDirectoryPath() { + return tableInfo.getTableInfoPath() + "/specs/" + caseName; + } + /** * Sets the table information for this workload specification. * diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java index 2a321c34dc0..a34b03871fb 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java @@ -16,12 +16,28 @@ package io.delta.kernel.defaults.benchmarks.models; +import static io.delta.kernel.internal.util.Utils.toCloseableIterator; + +import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import io.delta.kernel.data.ColumnarBatch; +import io.delta.kernel.data.Row; import io.delta.kernel.defaults.benchmarks.workloadrunners.WorkloadRunner; import io.delta.kernel.defaults.benchmarks.workloadrunners.WriteRunner; import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.actions.AddFile; +import io.delta.kernel.internal.actions.RemoveFile; +import io.delta.kernel.internal.actions.SingleAction; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Optional; /** * Workload specification for write benchmarks. Defines test cases for writing to Delta tables with @@ -52,6 +68,13 @@ public static class CommitSpec { @JsonProperty("data_files_path") private String dataFilesPath; + /** Schema for reading commit files with both "add" and "remove" actions. */ + @JsonIgnore + private static final StructType COMMIT_SPEC_SCHEMA = + new StructType() + .add("add", AddFile.FULL_SCHEMA, true /* nullable */) + .add("remove", RemoveFile.FULL_SCHEMA, true /* nullable */); + /** Default constructor for Jackson. */ public CommitSpec() {} @@ -62,6 +85,65 @@ public CommitSpec(String dataFilesPath) { public String getDataFilesPath() { return dataFilesPath; } + + /** + * Uses Kernel's built-in JsonHandler to read commit spec containing data files. + * + * @param engine the Engine instance to use for reading JSON files + * @param specPath the base path where the commit file is located + * @throws IOException if there's an error reading or parsing the file + */ + public List parseActions(Engine engine, String specPath) throws IOException { + String commitFilePath = new Path(specPath, getDataFilesPath()).toString(); + + System.out.println("CommitSpec: Reading commit file at " + commitFilePath); + File file = new File(commitFilePath); + if (!file.exists()) { + throw new IOException("Commit file not found: " + commitFilePath); + } + + // Create a FileStatus for the commit file + FileStatus fileStatus = FileStatus.of(commitFilePath, file.length(), file.lastModified()); + + // Use Kernel's JsonHandler to read the file and collect actions + List actions = new ArrayList<>(); + try (CloseableIterator fileIter = + toCloseableIterator(Collections.singletonList(fileStatus).iterator()); + CloseableIterator batchIter = + engine + .getJsonHandler() + .readJsonFiles(fileIter, COMMIT_SPEC_SCHEMA, Optional.empty())) { + + while (batchIter.hasNext()) { + ColumnarBatch batch = batchIter.next(); + + // Process each row in the batch + try (CloseableIterator rowIter = batch.getRows()) { + while (rowIter.hasNext()) { + Row singleActionRow = rowIter.next(); + + // Extract the actual action Row and wrap it in SingleAction format + // Check if this row has an "add" action + if (!singleActionRow.isNullAt(COMMIT_SPEC_SCHEMA.indexOf("add"))) { + Row addRow = singleActionRow.getStruct(COMMIT_SPEC_SCHEMA.indexOf("add")); + // Wrap in SingleAction format for commit + actions.add(SingleAction.createAddFileSingleAction(addRow)); + } + // Check if this row has a "remove" action + else if (!singleActionRow.isNullAt(COMMIT_SPEC_SCHEMA.indexOf("remove"))) { + Row removeRow = singleActionRow.getStruct(COMMIT_SPEC_SCHEMA.indexOf("remove")); + // Wrap in SingleAction format for commit + actions.add(SingleAction.createRemoveFileSingleAction(removeRow)); + } else { + // Throw an error if the action is not recognized (not "add" or "remove") + throw new IOException("Unrecognized action in commit file row: " + singleActionRow); + } + } + } + } + } + return actions; + } } /** diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index 35955fba5e6..7a4ff9233e7 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -19,19 +19,15 @@ import static io.delta.kernel.internal.util.Utils.toCloseableIterator; import io.delta.kernel.*; -import io.delta.kernel.data.ColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.defaults.benchmarks.models.WorkloadSpec; import io.delta.kernel.defaults.benchmarks.models.WriteSpec; import io.delta.kernel.engine.Engine; -import io.delta.kernel.internal.actions.AddFile; -import io.delta.kernel.internal.actions.RemoveFile; -import io.delta.kernel.internal.actions.SingleAction; +import io.delta.kernel.internal.fs.Path; import io.delta.kernel.transaction.UpdateTableTransactionBuilder; -import io.delta.kernel.types.StructType; +import io.delta.kernel.utils.CloseableIterable; import io.delta.kernel.utils.CloseableIterator; import io.delta.kernel.utils.FileStatus; -import java.io.File; import java.io.IOException; import java.util.*; import org.openjdk.jmh.infra.Blackhole; @@ -51,9 +47,7 @@ public class WriteRunner extends WorkloadRunner { private final Engine engine; private final WriteSpec workloadSpec; private List> commitActions; - private List committedVersions; private Snapshot currentSnapshot; - private long originalVersion; private Set initialDeltaLogFiles; /** @@ -65,7 +59,6 @@ public class WriteRunner extends WorkloadRunner { public WriteRunner(WriteSpec workloadSpec, Engine engine) { this.workloadSpec = workloadSpec; this.engine = engine; - this.committedVersions = new ArrayList<>(); } @Override @@ -81,16 +74,11 @@ public void setup() throws Exception { // Capture initial listing of delta log files initialDeltaLogFiles = captureFileListing(); + System.out.println("Parsing commit files..."); // Load and parse all commit files for (WriteSpec.CommitSpec commitSpec : workloadSpec.getCommits()) { - String commitFilePath = - workloadSpec.getTableInfo().getTableInfoPath() - + "/specs/" - + workloadSpec.getCaseName() - + "/" - + commitSpec.getDataFilesPath(); - List actions = parseCommitFile(commitFilePath); - commitActions.add(actions); + System.out.println("Parsing commitSpec: " + commitSpec.getDataFilesPath()); + commitActions.add(commitSpec.parseActions(engine, workloadSpec.getSpecDirectoryPath())); } } @@ -129,13 +117,11 @@ public void executeAsBenchmark(Blackhole blackhole) throws Exception { // Convert actions list to CloseableIterable CloseableIterator actionsIter = toCloseableIterator(actions.iterator()); - io.delta.kernel.utils.CloseableIterable dataActions = - io.delta.kernel.utils.CloseableIterable.inMemoryIterable(actionsIter); + CloseableIterable dataActions = CloseableIterable.inMemoryIterable(actionsIter); TransactionCommitResult result = txn.commit(engine, dataActions); long version = result.getVersion(); - committedVersions.add(version); blackhole.consume(version); // Use the post-commit snapshot for the next transaction @@ -153,71 +139,6 @@ public void executeAsBenchmark(Blackhole blackhole) throws Exception { } } - /** - * Parses a Delta log JSON file and returns a list of action Rows. - * - *

Uses Kernel's built-in JsonHandler to read Delta log format JSON files. - * - * @param commitFilePath path to the commit file containing Delta log JSON actions - * @return list of Row objects representing the actions (AddFile, RemoveFile, etc.) - * @throws IOException if there's an error reading or parsing the file - */ - private List parseCommitFile(String commitFilePath) throws IOException { - /** Schema for reading commit files with both "add" and "remove" actions. */ - final StructType COMMIT_FILE_SCHEMA = - new StructType() - .add("add", AddFile.FULL_SCHEMA, true /* nullable */) - .add("remove", RemoveFile.FULL_SCHEMA, true /* nullable */); - - List actions = new ArrayList<>(); - - File file = new File(commitFilePath); - if (!file.exists()) { - throw new IOException("Commit file not found: " + commitFilePath); - } - - // Create a FileStatus for the commit file - FileStatus fileStatus = FileStatus.of(commitFilePath, file.length(), file.lastModified()); - - // Use Kernel's JsonHandler to read the file - try (CloseableIterator fileIter = - toCloseableIterator(Collections.singletonList(fileStatus).iterator()); - CloseableIterator batchIter = - engine.getJsonHandler().readJsonFiles(fileIter, COMMIT_FILE_SCHEMA, Optional.empty())) { - - while (batchIter.hasNext()) { - ColumnarBatch batch = batchIter.next(); - - // Process each row in the batch - try (CloseableIterator rowIter = batch.getRows()) { - while (rowIter.hasNext()) { - Row singleActionRow = rowIter.next(); - - // Extract the actual action Row and wrap it in SingleAction format - // Check if this row has an "add" action - if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("add"))) { - Row addRow = singleActionRow.getStruct(COMMIT_FILE_SCHEMA.indexOf("add")); - // Wrap in SingleAction format for commit - actions.add(SingleAction.createAddFileSingleAction(addRow)); - } - // Check if this row has a "remove" action - else if (!singleActionRow.isNullAt(COMMIT_FILE_SCHEMA.indexOf("remove"))) { - Row removeRow = singleActionRow.getStruct(COMMIT_FILE_SCHEMA.indexOf("remove")); - // Wrap in SingleAction format for commit - actions.add(SingleAction.createRemoveFileSingleAction(removeRow)); - } else { - // Throw an error if the action is not recognized (not "add" or "remove") - throw new IOException( - "Unrecognized action in commit file row: " + singleActionRow.toString()); - } - } - } - } - } - - return actions; - } - /** Cleans up the state created during benchmark execution by reverting all committed changes. */ @Override public void cleanup() throws Exception { @@ -228,8 +149,6 @@ public void cleanup() throws Exception { engine.getFileSystemClient().delete(filePath); } } - - committedVersions.clear(); } /** @return a set of all file paths in the the `_delta_log/` directory of the table. */ @@ -238,8 +157,7 @@ private Set captureFileListing() throws IOException { // because the filesystem client lists all _sibling_ files in the directory with a path greater // than `0`. String deltaLogPathPrefix = - new io.delta.kernel.internal.fs.Path( - workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log/0") + new Path(workloadSpec.getTableInfo().getResolvedTableRoot(), "_delta_log/0") .toUri() .getPath(); From 3e155b77ae1670766c71383fd39010e79a10ffa1 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 13:58:43 -0700 Subject: [PATCH 09/12] final cleanup --- .../delta/kernel/defaults/benchmarks/models/WriteSpec.java | 1 - .../defaults/benchmarks/workloadrunners/WorkloadRunner.java | 2 +- .../defaults/benchmarks/workloadrunners/WriteRunner.java | 6 ++---- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java index a34b03871fb..c95a54d39f4 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/models/WriteSpec.java @@ -96,7 +96,6 @@ public String getDataFilesPath() { public List parseActions(Engine engine, String specPath) throws IOException { String commitFilePath = new Path(specPath, getDataFilesPath()).toString(); - System.out.println("CommitSpec: Reading commit file at " + commitFilePath); File file = new File(commitFilePath); if (!file.exists()) { throw new IOException("Commit file not found: " + commitFilePath); diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java index d3916413eb5..35709b6eeed 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WorkloadRunner.java @@ -71,7 +71,7 @@ public WorkloadRunner() {} * @throws Exception if any error occurs during cleanup. */ public void cleanup() throws Exception { - // Default implementation: no-op for read workloads + // Default implementation is no-op } // TODO: Add executeAsTest() method for correctness validation diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index 7a4ff9233e7..86f74e2ea9a 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -74,10 +74,8 @@ public void setup() throws Exception { // Capture initial listing of delta log files initialDeltaLogFiles = captureFileListing(); - System.out.println("Parsing commit files..."); // Load and parse all commit files for (WriteSpec.CommitSpec commitSpec : workloadSpec.getCommits()) { - System.out.println("Parsing commitSpec: " + commitSpec.getDataFilesPath()); commitActions.add(commitSpec.parseActions(engine, workloadSpec.getSpecDirectoryPath())); } } @@ -116,8 +114,8 @@ public void executeAsBenchmark(Blackhole blackhole) throws Exception { Transaction txn = txnBuilder.build(engine); // Convert actions list to CloseableIterable - CloseableIterator actionsIter = toCloseableIterator(actions.iterator()); - CloseableIterable dataActions = CloseableIterable.inMemoryIterable(actionsIter); + CloseableIterable dataActions = + CloseableIterable.inMemoryIterable(toCloseableIterator(actions.iterator())); TransactionCommitResult result = txn.commit(engine, dataActions); From 0bb12f2b5e67d3395a64a8ae9ec8f64e259d8ad0 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 14:03:46 -0700 Subject: [PATCH 10/12] more cleanup --- .../workload_specs/basic_append/specs/add_and_remove/spec.json | 3 +-- .../specs/{single_add => single_append}/commit_add.json | 0 .../basic_append/specs/{single_add => single_append}/spec.json | 3 +-- 3 files changed, 2 insertions(+), 4 deletions(-) rename kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/{single_add => single_append}/commit_add.json (100%) rename kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/{single_add => single_append}/spec.json (96%) diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json index 3b8c404532c..0330c385d5d 100644 --- a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/add_and_remove/spec.json @@ -4,5 +4,4 @@ {"data_files_path": "commit_add.json"}, {"data_files_path": "commit_remove.json"} ] -} - +} \ No newline at end of file diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/commit_add.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_append/commit_add.json similarity index 100% rename from kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/commit_add.json rename to kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_append/commit_add.json diff --git a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_append/spec.json similarity index 96% rename from kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json rename to kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_append/spec.json index 207c7ce20a2..b5434ddb398 100644 --- a/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_add/spec.json +++ b/kernel/kernel-defaults/src/test/resources/workload_specs/basic_append/specs/single_append/spec.json @@ -3,5 +3,4 @@ "commits": [ {"data_files_path": "commit_add.json"} ] -} - +} \ No newline at end of file From 3d2c4c91c02ecba5e46a3e24aa748f068b259e70 Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 14:34:59 -0700 Subject: [PATCH 11/12] try to fix style check --- .../defaults/benchmarks/workloadrunners/WriteRunner.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java index 86f74e2ea9a..4e61fda7757 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/workloadrunners/WriteRunner.java @@ -130,9 +130,9 @@ public void executeAsBenchmark(Blackhole blackhole) throws Exception { .orElseThrow( () -> new IllegalStateException( - "Post-commit snapshot not available. This indicates a conflict occurred during " - + "the benchmark, which should not happen. Ensure no other processes are writing " - + "to the table: " + "Post-commit snapshot not available. This indicates a conflict " + + "occurred during the benchmark, which should not happen. " + + "Ensure no other processes are writing to the table: " + workloadSpec.getTableInfo().getResolvedTableRoot())); } } From 91c7ffd0c8d5ae6c034813117eb12c8619008ceb Mon Sep 17 00:00:00 2001 From: Oussama Saoudi Date: Mon, 27 Oct 2025 14:46:27 -0700 Subject: [PATCH 12/12] simplify workload benchmark --- .../kernel/defaults/benchmarks/WorkloadBenchmark.java | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java index f64717a1c3b..b60fd7a0c02 100644 --- a/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java +++ b/kernel/kernel-defaults/src/test/java/io/delta/kernel/defaults/benchmarks/WorkloadBenchmark.java @@ -50,15 +50,7 @@ public static class DefaultBenchmarkState extends AbstractBenchmarkState { @Override protected Engine getEngine(String engineName) { if (engineName.equals("default")) { - return DefaultEngine.create( - new Configuration() { - { - // Set the batch size. This is required for writes. - set("delta.kernel.default.parquet.reader.batch-size", "1024"); - set("delta.kernel.default.json.reader.batch-size", "1024"); - set("delta.kernel.default.parquet.writer.targetMaxFileSize", "10485760"); // 1 MB - } - }); + return DefaultEngine.create(new Configuration()); } else { throw new IllegalArgumentException("Unsupported engine: " + engineName); }