Skip to content

Add function metadata ability to push down struct argument in optimizer #25175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

kevintang2022
Copy link
Contributor

@kevintang2022 kevintang2022 commented May 22, 2025

Summary:
For some user defined functions, the pushdown subfield optimizer should transparently pass down utilized subfields of a struct type. The goal is to make the query plan look the same as if the udf was not being called on the struct. In order to accomplish this, the user defined function needs to take the struct argument passed into it, and unwrap it when converting an expression to a subfield.

Since there is no guarantee that the struct argument is always the first argument in the udf, the udf needs to specify which argument index to push down in its metadata.

T224244100

Presto version 0.293-20250525.210422-369

Differential Revision: D74738214

Test plan:
With this change, both of the queries below produce the same query plan after the table scan node rewrite

explain with shaped as (SELECT fb_reshape_row(person,CAST(NULL AS ROW(age INTEGER, city VARCHAR))) AS pcol FROM tangk_struct_table),
raw as (select person as pcol from tangk_struct_table)
select pcol.age from raw;
explain with shaped as (SELECT fb_reshape_row(person,CAST(NULL AS ROW(age INTEGER, city VARCHAR))) AS pcol FROM tangk_struct_table),
raw as (select person as pcol from tangk_struct_table)
select pcol.age from shaped;

20250525_235045_00003_tu7a9 correct query plan with pushed down subfield

Fragment 0 [SINGLE]
    CPU: 0.00ns, Scheduled: 0.00ns, Input: 0 rows (0B); per task: avg.: 0.00 std.dev.: 0.00, Output: 0 rows (0B), 1 tasks
    Output layout: [field]
    Output partitioning: SINGLE []
    Output encoding: COLUMNAR
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Output[PlanNodeId 6][Query Plan] => [field:varchar(807)]
            Query Plan := field
        - Values[PlanNodeId 0] => [field:varchar(807)]
                (VARCHAR'- Output[PlanNodeId 10][age] => [expr_3:integer]
                        age := expr_3 (3:8)
                    - RemoteStreamingExchange[PlanNodeId 218][GATHER - COLUMNAR] => [expr_3:integer]
                        - ScanProject[PlanNodeId 0,6][table = TableHandle {connectorId=''prism'', connectorHandle=''PrismTableHandle{schemaName=di, tableName=tangk_struct_table, analyzePartitionValues=Optional.empty, sideTableFeatureIds=[]}'', layout=''Optional[di.tangk_struct_table{}]''}, projectLocality = LOCAL] => [expr_3:integer]
                                expr_3 := DEREFERENCE(fb_reshape_row(person, null), INTEGER''0'') (1:114)
                                LAYOUT: di.tangk_struct_table{}
                                person := person:struct<age:int,city:string>:0:REGULAR:[person.age] (1:113)
                                id:bigint:-13:PARTITION_KEY
                                    :: [["1"], ["2"], ["3"], ["4"], ["5"]]
                ')

20250525_235408_00004_tu7a9 query plan with non relevant function

Fragment 0 [SINGLE]
    CPU: 0.00ns, Scheduled: 0.00ns, Input: 0 rows (0B); per task: avg.: 0.00 std.dev.: 0.00, Output: 0 rows (0B), 1 tasks
    Output layout: [field]
    Output partitioning: SINGLE []
    Output encoding: COLUMNAR
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Output[PlanNodeId 6][Query Plan] => [field:varchar(798)]
            Query Plan := field
        - Values[PlanNodeId 0] => [field:varchar(798)]
                (VARCHAR'- Output[PlanNodeId 10][age] => [expr_3:integer]
                        age := expr_3 (3:8)
                    - RemoteStreamingExchange[PlanNodeId 218][GATHER - COLUMNAR] => [expr_3:integer]
                        - ScanProject[PlanNodeId 0,6][table = TableHandle {connectorId=''prism'', connectorHandle=''PrismTableHandle{schemaName=di, tableName=tangk_struct_table, analyzePartitionValues=Optional.empty, sideTableFeatureIds=[]}'', layout=''Optional[di.tangk_struct_table{}]''}, projectLocality = LOCAL] => [expr_3:integer]
                                expr_3 := DEREFERENCE(fb_reshape_row_old(person, null), INTEGER''0'') (1:118)
                                LAYOUT: di.tangk_struct_table{}
                                person := person:struct<age:int,city:string>:0:REGULAR (1:117)
                                id:bigint:-13:PARTITION_KEY
                                    :: [["1"], ["2"], ["3"], ["4"], ["5"]]
                ')

20250525_235845_00005_tu7a9 expected plan

Fragment 0 [SINGLE]
    CPU: 0.00ns, Scheduled: 0.00ns, Input: 0 rows (0B); per task: avg.: 0.00 std.dev.: 0.00, Output: 0 rows (0B), 1 tasks
    Output layout: [field]
    Output partitioning: SINGLE []
    Output encoding: COLUMNAR
    Stage Execution Strategy: UNGROUPED_EXECUTION
    - Output[PlanNodeId 6][Query Plan] => [field:varchar(783)]
            Query Plan := field
        - Values[PlanNodeId 0] => [field:varchar(783)]
                (VARCHAR'- Output[PlanNodeId 10][age] => [expr_4:integer]
                        age := expr_4 (3:8)
                    - RemoteStreamingExchange[PlanNodeId 211][GATHER - COLUMNAR] => [expr_4:integer]
                        - ScanProject[PlanNodeId 0,6][table = TableHandle {connectorId=''prism'', connectorHandle=''PrismTableHandle{schemaName=di, tableName=tangk_struct_table, analyzePartitionValues=Optional.empty, sideTableFeatureIds=[]}'', layout=''Optional[di.tangk_struct_table{}]''}, projectLocality = LOCAL] => [expr_4:integer]
                                expr_4 := DEREFERENCE(person, INTEGER''0'') (2:17)
                                LAYOUT: di.tangk_struct_table{}
                                person := person:struct<age:int,city:string>:0:REGULAR:[person.age] (2:36)
                                id:bigint:-13:PARTITION_KEY
                                    :: [["1"], ["2"], ["3"], ["4"], ["5"]]
                ')


Verifier suite build: 20250521_205359_71488_cm4iz

pt suite build --predicate "lower(query) like '%fb_reshape_row%'" --suite atn_fb_reshape_row_subfields_udf --region atn --days 100

UDF only
https://www.internalfb.com/intern/presto/verifier/results/?test_id=223902
General
https://our.intern.facebook.com/intern/presto/verifier/results/?test_id=223903

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* Add pushdownSubfieldArgIndex parameter to @ScalarFunction and @CodegenScalarFunction annotation for subfield optimization during query planning




@kevintang2022 kevintang2022 requested a review from a team as a code owner May 22, 2025 17:10
@facebook-github-bot
Copy link
Collaborator

This pull request was exported from Phabricator. Differential Revision: D74738214

@kevintang2022 kevintang2022 requested a review from rschlussel May 22, 2025 17:22
@kevintang2022 kevintang2022 changed the title fb_reshape_row always push down subfields Add function metadata ability to push down struct argument in optimizer May 22, 2025
Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good! Can you add a test? Maybe to TestHiveLogicalPlanner that the subfields are being pushed down and then a query correctness test (see TestLambdaSubfieldPruning for examples of tests for a related feature). You may have to register a function in the test that uses the new field you added (e.g. a passthrough function that takes a row and returns the row unchanged) to exercise your code.

@@ -33,4 +33,6 @@
boolean deterministic() default true;

boolean calledOnNullInput() default false;

int pushdownSubfieldArgIndex() default -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface method, and the ones on ScalarFunction and SqlInvokedScalarFunction, are parameters to an annotation. Does a user implementing a function using the Presto SPI manually specify this value in the annotation? If not, then these should be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes they are specified in the annotation. If it is not specified, then it is -1 by default

Annotation looks like this

@CodegenScalarFunction(value = "function_name", calledOnNullInput = true, pushdownSubfieldArgIndex = 0)

Copy link
Contributor

@tdcmeehan tdcmeehan May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some documentation on how this needs to be used to our documentation?

Also, why not add an annotation to the argument itself? Something like, @RowMayBeDereferenced. (BTW, can you give an example of how a function could know that this is safe to do?) You could annotate multiple of them, and we we could validate that the argument is, indeed, a struct to begin with.

Also, your example reference internal queries, can you add pastes of the explain plans?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add some tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using annotation on the argument, I have chosen to pass the argindex in the codegen decorator because this allows it to be inside the FunctionMetadata.

I added some tests to TestHiveLogicalPlanner. One more change I will make is to perform some validation that the argIndex specified does correspond to a rowtype. And throw a warning when the code path is not reached due to invalid index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add this description to the documentation once this change is merged in

pushdownSubfieldArgIndex is used to specify which parameter in the scalar function corresponds to the parameter that should have its subfields pushed down to filter plan nodes during query planning and optimization. This is helpful to reduce the amount of scanning done for queries involving structs, and it ensures that only utilized subfields of the struct scanned and unused subfields can be pruned from the query plan. In the below example, the pushdownSubfieldArgIndex is set to 0, which means that the first parameter of the custom_struct_with_passthrough function will have its subfields be pushed down to filter plan nodes. So for a query such as SELECT struct.a FROM (SELECT CUSTOM_STRUCT_WITH_PASSTHROUGH(CAST(ROW(orderkey, comment) AS ROW(a BIGINT, b VARCHAR))) AS struct FROM lineitem) , the query plan will only include the orderkey subfield in the table scan, and the comment subfield will be pruned because it is not utilized in the outer query.

@ScalarFunction(value = "custom_struct_with_passthrough", pushdownSubfieldArgIndex = 0)
@TypeParameter(value = "T", boundedBy = ROW)
@SqlType("T")
public static Block customStructWithPassthrough(@SqlType("T") Block struct)
{
    return struct;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a non-trivial example you can add? And can you please add the documentation with the current PR?

Correct me if I'm wrong, this field basically says to the planner, "I'm OK if the function receives a pruned version of the struct, because this function just transparently utilizes and returns this argument." But I'm really struggling to understand under what circumstance that would ever apply (or, how you would ever come up with a non-trivial example). How does fb_reshape_row work? Are there any other non-public functions that this would be beneficial for at Meta?

If this acknowledged to be esoteric, wouldn't it just make sense to add it as a field in ComplexTypeFunctionDescriptor (which is a part of FunctionMetadata), and convert the esoteric fb_reshape_row implementation to manually extend SqlScalarFunction, rather than use the annotation based approach? Usually when we need to do something unusual and where it's awkward to modify the public SPI, this is the route we prefer. Some examples to refer to in OSS include ArrayConcatFunction, ArrayFlattenFunction, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fb_reshape_row is basically a specialized cast. it makes it easier to work with rows that might have had subfields (or with any layer of nesting) added to them, types changed, etc by being able to take a row of one form and cast it to some other shape. it doesn't care if any fields are null/empty/missing.

@steveburnett
Copy link
Contributor

Suggest adding a release note, or a NO RELEASE NOTE block, as appropriate.

@facebook-github-bot
Copy link
Collaborator

@kevintang2022 has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

1 similar comment
@facebook-github-bot
Copy link
Collaborator

@kevintang2022 has imported this pull request. If you are a Meta employee, you can view this diff on Phabricator.

Copy link
Contributor

@rschlussel rschlussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the new tests!

}
checkCondition(signature.getArgumentTypes().size() > pushdownSubfieldArgIndex, FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] has out of range pushdown subfield arg index", method);
String typeVariableName = signature.getArgumentTypes().get(pushdownSubfieldArgIndex).toString();
checkCondition(typeVariableName.equals(com.facebook.presto.common.type.StandardTypes.ROW) || typeConstraintMapping.get(typeVariableName).equals(com.facebook.presto.common.type.StandardTypes.ROW), FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] does not have a struct or row type as pushdown subfield arg", method);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or has no typeConstraintMapping entry (maybe no constraint on the type) or typeConstraintMapping value is null and TypeVariableConstraint.nonDecimalNumericRequired is false (i.e. there's no constraint preventing row type?) ?

checkCondition(pushdownSubfieldArgIndex.get() >= 0, FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] has negative pushdown subfield arg index", method);
checkCondition(signature.getArgumentTypes().size() > pushdownSubfieldArgIndex.get(), FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] has out of range pushdown subfield arg index", method);
String typeVariableName = signature.getArgumentTypes().get(pushdownSubfieldArgIndex.get()).toString();
checkCondition(typeVariableName.equals(com.facebook.presto.common.type.StandardTypes.ROW) || typeConstraintMapping.get(typeVariableName).equals(com.facebook.presto.common.type.StandardTypes.ROW), FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] does not have a struct or row type as pushdown subfield arg", method);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about maybe this is too restrictive and need to add some other conditions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants