Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,30 @@ public <T> T timeCallable(Callable<T> callable) throws Exception {
}
}

/**
* Times an operation that can throw a specific checked exception type.
*
* @param <T> The return type
* @param <E> The exception type that can be thrown
*/
@FunctionalInterface
public interface ThrowingSupplier<T, E extends Exception> {
T get() throws E;
}

/** Times an operation that can throw a specific checked exception type. */
@SuppressWarnings("unchecked")
public <T, E extends Exception> T timeChecked(ThrowingSupplier<T, E> operation) throws E {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to be able to time a method that throws a checked exception E, without having to catch the aprent Exception type. See:

public static <T, E extends Exception> T timeCheckedOperation(

try (Timed ignore = start()) {
return operation.get();
} catch (RuntimeException | Error e) {
throw e;
} catch (Exception e) {
// Safe cast since operation can only throw E or unchecked exceptions (handled above)
throw (E) e;
}
}

public void time(Runnable runnable) {
try (Timed ignore = start()) {
runnable.run();
Expand Down
114 changes: 75 additions & 39 deletions unity/src/main/java/io/delta/unity/UCCatalogManagedCommitter.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.Preconditions.checkState;
import static io.delta.unity.UCCatalogManagedClient.UC_TABLE_ID_KEY;
import static io.delta.unity.utils.OperationTimer.timeCheckedOperation;
import static java.util.Objects.requireNonNull;

import io.delta.kernel.commit.*;
Expand All @@ -37,6 +36,7 @@
import io.delta.storage.commit.uccommitcoordinator.UCCommitCoordinatorException;
import io.delta.unity.adapters.MetadataAdapter;
import io.delta.unity.adapters.ProtocolAdapter;
import io.delta.unity.metrics.UcCommitTelemetry;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -83,16 +83,34 @@ public CommitResponse commit(
requireNonNull(commitMetadata, "commitMetadata is null");
validateLogPathBelongsToThisUcTable(commitMetadata);

final CommitMetadata.CommitType commitType = commitMetadata.getCommitType();
final UcCommitTelemetry telemetry =
new UcCommitTelemetry(ucTableId, tablePath.toString(), commitMetadata);
final UcCommitTelemetry.MetricsCollector metricsCollector = telemetry.getMetricsCollector();

if (commitType == CommitMetadata.CommitType.CATALOG_CREATE) {
return createImpl(engine, finalizedActions, commitMetadata);
}
if (commitType == CommitMetadata.CommitType.CATALOG_WRITE) {
return writeImpl(engine, finalizedActions, commitMetadata);
try {
final CommitResponse response =
metricsCollector.totalCommitTimer.timeChecked(
() -> {
final CommitMetadata.CommitType commitType = commitMetadata.getCommitType();

if (commitType == CommitMetadata.CommitType.CATALOG_CREATE) {
return createImpl(engine, finalizedActions, commitMetadata, metricsCollector);
}
if (commitType == CommitMetadata.CommitType.CATALOG_WRITE) {
return writeImpl(engine, finalizedActions, commitMetadata, metricsCollector);
}

throw new UnsupportedOperationException("Unsupported commit type: " + commitType);
});

final UcCommitTelemetry.Report successfulReport = telemetry.createSuccessReport();
engine.getMetricsReporters().forEach(r -> r.report(successfulReport));
return response;
} catch (CommitFailedException | RuntimeException e) {
final UcCommitTelemetry.Report failureReport = telemetry.createFailureReport(e);
engine.getMetricsReporters().forEach(r -> r.report(failureReport));
throw e;
}

throw new UnsupportedOperationException("Unsupported commit type: " + commitType);
}

@Override
Expand Down Expand Up @@ -144,15 +162,19 @@ public Map<String, String> getRequiredTableProperties() {
*/
// TODO: [delta-io/delta#5118] If UC changes CREATE semantics, update logic here.
private CommitResponse createImpl(
Engine engine, CloseableIterator<Row> finalizedActions, CommitMetadata commitMetadata)
Engine engine,
CloseableIterator<Row> finalizedActions,
CommitMetadata commitMetadata,
UcCommitTelemetry.MetricsCollector metricsCollector)
throws CommitFailedException {
checkArgument(
commitMetadata.getVersion() == 0,
"Expected version 0, but got %s",
commitMetadata.getVersion());

final FileStatus kernelPublishedDeltaFileStatus =
writeDeltaFile(engine, finalizedActions, commitMetadata.getPublishedDeltaFilePath());
writeDeltaFile(
engine, finalizedActions, commitMetadata.getPublishedDeltaFilePath(), metricsCollector);

return new CommitResponse(
ParsedPublishedDeltaData.forFileStatus(kernelPublishedDeltaFileStatus));
Expand All @@ -163,15 +185,22 @@ private CommitResponse createImpl(
* call) to UC server.
*/
private CommitResponse writeImpl(
Engine engine, CloseableIterator<Row> finalizedActions, CommitMetadata commitMetadata)
Engine engine,
CloseableIterator<Row> finalizedActions,
CommitMetadata commitMetadata,
UcCommitTelemetry.MetricsCollector metricsCollector)
throws CommitFailedException {
checkArgument(
commitMetadata.getVersion() > 0, "Can only write staged commit files for versions > 0");

final FileStatus kernelStagedCommitFileStatus =
writeDeltaFile(engine, finalizedActions, commitMetadata.generateNewStagedCommitFilePath());
writeDeltaFile(
engine,
finalizedActions,
commitMetadata.generateNewStagedCommitFilePath(),
metricsCollector);

commitToUC(commitMetadata, kernelStagedCommitFileStatus);
commitToUC(commitMetadata, kernelStagedCommitFileStatus, metricsCollector);

return new CommitResponse(ParsedCatalogCommitData.forFileStatus(kernelStagedCommitFileStatus));
}
Expand Down Expand Up @@ -259,43 +288,50 @@ private void validateLogPathBelongsToThisUcTable(CommitMetadata cm) {
* </ul>
*/
private FileStatus writeDeltaFile(
Engine engine, CloseableIterator<Row> finalizedActions, String filePath)
Engine engine,
CloseableIterator<Row> finalizedActions,
String filePath,
UcCommitTelemetry.MetricsCollector metricsCollector)
throws CommitFailedException {
try {
return timeCheckedOperation(
logger,
"Write file: " + filePath,
ucTableId,
() -> {
return metricsCollector.writeCommitFileTimer.timeChecked(
() -> {
try {
logger.info("[{}] Writing file: {}", ucTableId, filePath);

// Note: the engine is responsible for closing the actions iterator once it has been
// fully consumed.
engine
.getJsonHandler()
.writeJsonFileAtomically(filePath, finalizedActions, true /* overwrite */);

return engine.getFileSystemClient().getFileStatus(filePath);
});
} catch (IOException ex) {
// Note that as per the JsonHandler::writeJsonFileAtomically API contract with overwrite=true,
// FileAlreadyExistsException should not be possible here.

throw new CommitFailedException(
true /* retryable */,
false /* conflict */,
"Failed to write delta file due to: " + ex.getMessage(),
ex);
}
} catch (IOException ex) {
// Note that as per the JsonHandler::writeJsonFileAtomically API contract with
// overwrite=true, FileAlreadyExistsException should not be possible here.

throw new CommitFailedException(
true /* retryable */,
false /* conflict */,
"Failed to write delta file due to: " + ex.getMessage(),
ex);
}
});
}

private void commitToUC(CommitMetadata commitMetadata, FileStatus kernelStagedCommitFileStatus)
private void commitToUC(
CommitMetadata commitMetadata,
FileStatus kernelStagedCommitFileStatus,
UcCommitTelemetry.MetricsCollector metricsCollector)
throws CommitFailedException {
timeCheckedOperation(
logger,
"Commit staged commit file to UC: " + kernelStagedCommitFileStatus.getPath(),
ucTableId,
metricsCollector.commitToUcServerTimer.timeChecked(
() -> {
// commitToUc is only for normal catalog WRITES, not for CREATE, or UPGRADE, or DOWNGRADE,
// or anything filesystem related.
logger.info(
"[{}] Committing staged commit file to UC: {}",
ucTableId,
kernelStagedCommitFileStatus.getPath());

// commitToUc is only for normal catalog WRITES, not for CREATE, or UPGRADE, or
// DOWNGRADE, or anything filesystem related.
checkState(
commitMetadata.getCommitType() == CommitMetadata.CommitType.CATALOG_WRITE,
"Only supported commit type is CATALOG_WRITE, but got: %s");
Expand Down
111 changes: 111 additions & 0 deletions unity/src/main/java/io/delta/unity/metrics/UcCommitTelemetry.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.delta.unity.metrics;

import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import io.delta.kernel.commit.CommitMetadata;
import io.delta.kernel.internal.metrics.Timer;
import java.util.Optional;

/**
* Telemetry framework for Unity Catalog commit operations.
*
* <p>Collects timing metrics for commit operations and generates reports for successful and failed
* commits.
*/
public class UcCommitTelemetry {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My goal here is to be VERY succint -- I want the metrics collection and metrics result and the overall report to succinctly be in one class


private final String ucTableId;
private final String ucTablePath;
private final CommitMetadata commitMetadata;
private final MetricsCollector metricsCollector;

public UcCommitTelemetry(String ucTableId, String ucTablePath, CommitMetadata commitMetadata) {
this.ucTableId = ucTableId;
this.ucTablePath = ucTablePath;
this.commitMetadata = commitMetadata;
this.metricsCollector = new MetricsCollector();
}

public MetricsCollector getMetricsCollector() {
return metricsCollector;
}

public Report createSuccessReport() {
return new Report(metricsCollector.capture(), Optional.empty());
}

public Report createFailureReport(Exception error) {
return new Report(metricsCollector.capture(), Optional.of(error));
}

/** Mutable collector for gathering metrics during commit. */
public static class MetricsCollector {
public final Timer totalCommitTimer = new Timer();
public final Timer writeCommitFileTimer = new Timer();
public final Timer commitToUcServerTimer = new Timer();

public MetricsResult capture() {
return new MetricsResult(this);
}
}

/** Immutable snapshot of collected metric results. */
@JsonPropertyOrder({
"totalCommitDurationNs",
"writeCommitFileDurationNs",
"commitToUcServerDurationNs"
})
public static class MetricsResult {
public final long totalCommitDurationNs;
public final long writeCommitFileDurationNs;
public final long commitToUcServerDurationNs;

MetricsResult(MetricsCollector collector) {
this.totalCommitDurationNs = collector.totalCommitTimer.totalDurationNs();
this.writeCommitFileDurationNs = collector.writeCommitFileTimer.totalDurationNs();
this.commitToUcServerDurationNs = collector.commitToUcServerTimer.totalDurationNs();
}
}

/** Complete UC commit report with metadata and metrics. */
@JsonPropertyOrder({
"operationType",
"reportUUID",
"ucTableId",
"ucTablePath",
"commitVersion",
"commitType",
"metrics",
"exception"
})
public class Report implements io.delta.kernel.metrics.MetricsReport {
public final String operationType = "UcCommit";
public final String reportUUID = java.util.UUID.randomUUID().toString();
public final String ucTableId = UcCommitTelemetry.this.ucTableId;
public final String ucTablePath = UcCommitTelemetry.this.ucTablePath;
public final long commitVersion = commitMetadata.getVersion();
public final CommitMetadata.CommitType commitType = commitMetadata.getCommitType();
public final MetricsResult metrics;
public final Optional<String> exception;

public Report(MetricsResult metrics, Optional<Exception> exception) {
this.metrics = metrics;
this.exception = exception.map(Exception::toString);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,22 @@ trait UCCatalogManagedTestUtils extends TestUtils with ActionUtils with WriteUti
val simpleRow = DataBuilderUtils.row(schema, elem)
singletonCloseableIterator(simpleRow)
}

/** Creates a UCCatalogManagedClient with an InMemoryUCClient for testing */
def createUCClientAndCatalogManagedClient(
metastoreId: String = "ucMetastoreId"): (InMemoryUCClient, UCCatalogManagedClient) = {
val ucClient = new InMemoryUCClient(metastoreId)
val ucCatalogManagedClient = new UCCatalogManagedClient(ucClient)
(ucClient, ucCatalogManagedClient)
}

/**
* Initializes a UC table in the InMemoryUCClient after creation.
* This should be called after creating a table with buildCreateTableTransaction.
*/
def initializeUCTable(ucClient: InMemoryUCClient, ucTableId: String): Unit = {
val tableData =
new InMemoryUCClient.TableData(-1, scala.collection.mutable.ArrayBuffer[Commit]())
ucClient.createTableIfNotExistsOrThrow(ucTableId, tableData)
}
}
Loading
Loading