Skip to content

Conversation

szehon-ho
Copy link
Member

What changes were proposed in this pull request?

#51377 added a DataSourceV2 API that sends operation metrics along with the commit, via a map of string, long. Change this to a proper model.

Suggestion from @aokolnychyi

Why are the changes needed?

It would be cleaner to model it as a proper object so that it is more clear what metrics Spark sends, and to handle future cases where metrics may not be long values.

Does this PR introduce any user-facing change?

No, unreleased DSV2 API.

How was this patch tested?

Existing tests

Was this patch authored or co-authored using generative AI tooling?

No

* </ul>
* </li>
* </ul>
* @param operationMetrics operation metrics collected from the query producing write.
Copy link
Contributor

@aokolnychyi aokolnychyi Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we missing an empty space here? Can we also mention that it is best effort to collect/compute these so that users don't report bugs for cases that we currently don't support?

import org.apache.spark.annotation.Evolving;

/**
* Interface that provides merge operation metrics.
Copy link
Contributor

@aokolnychyi aokolnychyi Oct 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: merge -> MERGE?

* @since 4.1.0
*/
@Evolving
public interface OperationMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This naming seems reasonable to me but if folks have better ideas, it would be great to hear them.

cc @cloud-fan @viirya @gengliangwang @dongjoon-hyun @huaxingao

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the AS-IS name, OperationMetrics.

/**
* Returns the number of target rows copied unmodified because they did not match any action.
*/
long numTargetRowsCopied();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we will use -1 if unknown. Shall we document this?

collectFirst(query) { case m: MergeRowsExec => m }.map { n =>
val metrics = n.metrics
MergeOperationMetricsImpl(
metrics.get("numTargetRowsCopied").map(_.value).getOrElse(-1L),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add constants for these in a separate PR? It seems fragile.

* @since 4.1.0
*/
@Evolving
public interface OperationMetrics {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking how to simplify consumption of these objects in connectors. The question is whether this interface should have some sort of operation method that would tell the type of metrics. That said, it is probably not the end of the world if connectors do a class check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thoughts anyone?

@aokolnychyi
Copy link
Contributor

I feel like using a proper object is the right call here compared to the map. Left some questions.

Would love to hear what others think too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants