Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,17 @@ public void setupInvocation() throws Exception {
runner.setup();
}

/**
* Teardown method that runs after each benchmark invocation. This calls the {@link
* WorkloadRunner#cleanup()} to clean up any state created during execution.
*
* @throws Exception If any error occurs during cleanup.
*/
@TearDown(Level.Invocation)
public void teardownInvocation() throws Exception {
runner.cleanup();
}

/**
* Returns an instance of the desired engine based on the provided engine name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,10 @@ private static List<WorkloadSpec> loadSpecsFromTable(Path tableDir) {
validateTableStructure(tableDir);

Path tableInfoPath = tableDir.resolve(TABLE_INFO_FILE_NAME);
Path deltaDir = tableDir.resolve(DELTA_DIR_NAME);
Path specsDir = tableDir.resolve(SPECS_DIR_NAME);

TableInfo tableInfo =
TableInfo.fromJsonPath(tableInfoPath.toString(), deltaDir.toAbsolutePath().toString());
TableInfo.fromJsonPath(tableInfoPath.toString(), tableDir.toAbsolutePath().toString());

return findSpecDirectories(specsDir).stream()
.map(specDir -> loadSingleSpec(specDir, tableInfo))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,15 @@ public static class DefaultBenchmarkState extends AbstractBenchmarkState {
@Override
protected Engine getEngine(String engineName) {
if (engineName.equals("default")) {
return DefaultEngine.create(new Configuration());
return DefaultEngine.create(
new Configuration() {
{
// Set the batch size. This is required for writes.
set("delta.kernel.default.parquet.reader.batch-size", "1024");
set("delta.kernel.default.json.reader.batch-size", "1024");
set("delta.kernel.default.parquet.writer.targetMaxFileSize", "10485760"); // 1 MB
}
});
} else {
throw new IllegalArgumentException("Unsupported engine: " + engineName);
}
Expand Down Expand Up @@ -78,6 +86,7 @@ public void benchmarkWorkload(DefaultBenchmarkState state, Blackhole blackhole)
public static void main(String[] args) throws RunnerException, IOException {
// Get workload specs from the workloads directory
List<WorkloadSpec> workloadSpecs = BenchmarkUtils.loadAllWorkloads(WORKLOAD_SPECS_DIR);
System.out.println("Loaded " + workloadSpecs.size() + " workload specs");
if (workloadSpecs.isEmpty()) {
throw new RunnerException(
"No workloads found. Please add workload specs to the workloads directory.");
Expand All @@ -102,8 +111,8 @@ public static void main(String[] args) throws RunnerException, IOException {
// TODO: In the future, this can be extended to support multiple engines.
.param("engineName", "default")
.forks(1)
.warmupIterations(3) // Proper warmup for production benchmarks
.measurementIterations(5) // Proper measurement iterations for production benchmarks
.warmupIterations(3)
.measurementIterations(5)
.warmupTime(TimeValue.seconds(1))
.measurementTime(TimeValue.seconds(1))
.addProfiler(KernelMetricsProfiler.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@

package io.delta.kernel.defaults.benchmarks.models;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.File;
import java.io.IOException;
import org.codehaus.jackson.map.ObjectMapper;
import java.nio.file.Paths;

/**
* Represents metadata about a Delta table used in benchmark workloads.
Expand All @@ -30,17 +32,26 @@
*
* <p>TableInfo instances are typically loaded from JSON files in the workload specifications
* directory structure. Each table directory should contain a {@code table_info.json} file with the
* table metadata and a {@code delta} subdirectory containing the actual table data. The table root
* path is the absolute path to the root of the table and is provided separately in {@link
* WorkloadSpec#fromJsonPath(String, String, TableInfo)}.
* table metadata and a {@code delta} subdirectory containing the actual table data.
*
* <p>Example JSON structure:
* <p>Example JSON structure for relative table (default):
*
* <pre>{@code
* {
* "name": "large-table",
* "description": "A large Delta table with multi-part checkpoints for performance testing",
* "engineInfo": "Apache-Spark/3.5.1 Delta-Lake/3.1.0"
* "name": "basic_table",
* "description": "A basic Delta table for benchmarking",
* "engine_info": "Apache-Spark/3.5.1 Delta-Lake/3.1.0"
* }
* }</pre>
*
* <p>Example JSON structure for absolute path table:
*
* <pre>{@code
* {
* "name": "s3_table",
* "description": "Table stored in S3",
* "table_type": "absolute",
* "table_path": "s3://my-bucket/path/to/table"
* }
* }</pre>
*/
Expand All @@ -60,23 +71,28 @@ public class TableInfo {
@JsonProperty("engine_info")
public String engineInfo;

/** The root path where the Delta table is stored. */
@JsonProperty("table_root")
public String tableRoot;
/**
* The type of table location: "relative" (default) or "absolute".
*
* <p>When "relative", the table is located at {table_info_directory}/delta. When "absolute", the
* table is located at the path specified in {@link #tablePath}.
*/
@JsonProperty("table_type")
private String tableType;

/** @return the absolute path to the root of the table */
public String getTableRoot() {
return tableRoot;
}
/**
* The absolute path to the table when {@link #tableType} is "absolute". Can be a local path
* (file:///) or S3 path (s3://). Null when table_type is "relative".
*/
@JsonProperty("table_path")
private String tablePath;

/**
* Sets the root path of the Delta table.
*
* @param tableRoot the absolute path to the root of the table
* The resolved absolute path to the root of the table. This is computed after deserialization
* based on {@link #tableType} and {@link #tablePath}.
*/
public void setTableRoot(String tableRoot) {
this.tableRoot = tableRoot;
}
@JsonProperty("table_info_path")
private String tableInfoPath;

/**
* Default constructor for Jackson deserialization.
Expand All @@ -86,23 +102,47 @@ public void setTableRoot(String tableRoot) {
*/
public TableInfo() {}

/** Resolves the table root path based on the table type and location configuration. */
@JsonIgnore
public String getResolvedTableRoot() {
if ("absolute".equals(tableType)) {
if (tablePath == null || tablePath.trim().isEmpty()) {
throw new IllegalStateException(
"table_path must be specified when table_type is 'absolute'");
}
return tablePath;
} else {
// Default to "relative" if tableType is null or "relative"
return Paths.get(tableInfoPath, "delta").toAbsolutePath().toString();
}
}

public String getTableInfoPath() {
return tableInfoPath;
}

public void setTableInfoPath(String tableInfoDirectory) {
this.tableInfoPath = tableInfoDirectory;
}

/**
* Creates a TableInfo instance by reading from a JSON file at the specified path.
*
* <p>This method loads table metadata from a JSON file and sets the table root path. The JSON
* file should contain the table name and description, while the table root path is provided
* separately with the absolute path.
* <p>This method loads table metadata from a JSON file and resolves the table root path. The JSON
* file should contain the table name and description, while the table root path is computed based
* on the table_type and table_path fields.
*
* @param jsonPath the path to the JSON file containing the TableInfo metadata
* @param tableRoot the absolute path to the root of the Delta table
* @return a TableInfo instance populated from the JSON file and table root path
* @param tableInfoPath the directory containing the table_info.json file (used for relative path
* resolution)
* @return a TableInfo instance populated from the JSON file with resolved table root path
* @throws RuntimeException if there is an error reading or parsing the JSON file
*/
public static TableInfo fromJsonPath(String jsonPath, String tableRoot) {
public static TableInfo fromJsonPath(String jsonPath, String tableInfoPath) {
ObjectMapper mapper = new ObjectMapper();
try {
TableInfo info = mapper.readValue(new File(jsonPath), TableInfo.class);
info.setTableRoot(tableRoot);
info.setTableInfoPath(tableInfoPath);
return info;
} catch (IOException e) {
throw new RuntimeException("Failed to read TableInfo from JSON file: " + jsonPath, e);
Expand All @@ -112,8 +152,8 @@ public static TableInfo fromJsonPath(String jsonPath, String tableRoot) {
/**
* Returns a string representation of this TableInfo.
*
* <p>The string includes the table name, description, and engine info, but excludes the table
* root path for security reasons (as it may contain sensitive path information).
* <p>The string includes the table name, description, engine info, and CCv2 status, but excludes
* the table root path for security reasons (as it may contain sensitive path information).
*
* @return a string representation of this TableInfo
*/
Expand All @@ -125,6 +165,8 @@ public String toString() {
+ description
+ "', engineInfo='"
+ engineInfo
+ "'}";
+ "', tableType='"
+ tableType
+ "}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,10 @@
* field in the JSON.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes({@JsonSubTypes.Type(value = ReadSpec.class, name = "read")})
@JsonSubTypes({
@JsonSubTypes.Type(value = ReadSpec.class, name = "read"),
@JsonSubTypes.Type(value = WriteSpec.class, name = "write")
})
public abstract class WorkloadSpec {
/**
* The type of workload (e.g., "read"). This is used by Jackson's polymorphic deserialization to
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.kernel.defaults.benchmarks.models;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.delta.kernel.defaults.benchmarks.workloadrunners.WorkloadRunner;
import io.delta.kernel.defaults.benchmarks.workloadrunners.WriteRunner;
import io.delta.kernel.engine.Engine;
import java.util.Collections;
import java.util.List;

/**
* Workload specification for write benchmarks. Defines test cases for writing to Delta tables with
* one or more commits containing add/remove actions.
*
* <h2>Usage</h2>
*
* <p>To run this workload, use {@link WorkloadSpec#getRunner(Engine)} to get the appropriate {@link
* WriteRunner}.
*
* @see WriteRunner
*/
public class WriteSpec extends WorkloadSpec {

/**
* Container for a single commit's configuration.
*
* <p>Each commit references a file containing the Delta log JSON actions (add/remove files) to be
* committed.
*/
public static class CommitSpec {
/**
* Path to the commit file containing Delta log JSON actions. The path is relative to the spec
* directory (where spec.json is located).
*
* <p>Example: "commit_a.json"
*/
@JsonProperty("data_files_path")
private String dataFilesPath;

/** Default constructor for Jackson. */
public CommitSpec() {}

public CommitSpec(String dataFilesPath) {
this.dataFilesPath = dataFilesPath;
}

public String getDataFilesPath() {
return dataFilesPath;
}
}

/**
* List of commits to execute in sequence. Each commit contains a reference to a file with Delta
* log JSON actions. All commits are executed as part of the timed benchmark.
*/
@JsonProperty("commits")
private List<CommitSpec> commits;

// Default constructor for Jackson
public WriteSpec() {
super("write");
}

/**
* Gets the list of commits to execute.
*
* @return list of commit specifications
*/
public List<CommitSpec> getCommits() {
return commits != null ? commits : Collections.emptyList();
}

/** @return the full name of this workload, derived from table name, case name, and type. */
@Override
public String getFullName() {
return this.tableInfo.name + "/" + caseName + "/write";
}

@Override
public WorkloadRunner getRunner(Engine engine) {
return new WriteRunner(this, engine);
}

/**
* Generates workload variants from this test case specification.
*
* <p>Currently, WriteSpec generates a single variant (itself). In the future, this could be
* extended to generate variants for different write patterns or configurations.
*
* @return list of WriteSpec variants, each representing a separate workload execution
*/
@Override
public List<WorkloadSpec> getWorkloadVariants() {
return Collections.singletonList(this);
}

@Override
public String toString() {
return String.format(
"Write{caseName='%s', commits=%d, tableInfo='%s'}",
caseName, getCommits().size(), tableInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,8 @@ public ReadMetadataRunner(ReadSpec workloadSpec, Engine engine) {
}

@Override
public void setup() {
String workloadTableRoot = workloadSpec.getTableInfo().getTableRoot();
public void setup() throws Exception {
String workloadTableRoot = workloadSpec.getTableInfo().getResolvedTableRoot();
SnapshotBuilder builder = TableManager.loadSnapshot(workloadTableRoot);
if (workloadSpec.getVersion() != null) {
builder.atVersion(workloadSpec.getVersion());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ public WorkloadRunner() {}
*/
public abstract void executeAsBenchmark(Blackhole blackhole) throws Exception;

/**
* Cleans up any state created during benchmark execution. For write workloads, this removes added
* files and reverts table state. For read workloads, this is typically a no-op.
*
* <p>This method is called after each benchmark invocation to ensure a clean state for the next
* run.
*
* @throws Exception if any error occurs during cleanup.
*/
public void cleanup() throws Exception {
// Default implementation: no-op for read workloads
}

// TODO: Add executeAsTest() method for correctness validation
// public abstract void executeAsTest() throws Exception;
}
Loading
Loading