Skip to content

Add function metadata ability to push down struct argument in optimizer #25175

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ private static class ScalarTranslationHeader
private final Optional<OperatorType> operatorType;
private final boolean deterministic;
private final boolean calledOnNullInput;
private final int pushdownSubfieldArgIndex;

public static List<ScalarTranslationHeader> fromAnnotatedElement(AnnotatedElement annotated)
{
Expand All @@ -218,37 +219,39 @@ public static List<ScalarTranslationHeader> fromAnnotatedElement(AnnotatedElemen

if (scalarFunction != null) {
String baseName = scalarFunction.value().isEmpty() ? camelToSnake(annotatedName(annotated)) : scalarFunction.value();
builder.add(new ScalarTranslationHeader(baseName, scalarFunction.deterministic(), scalarFunction.calledOnNullInput()));
builder.add(new ScalarTranslationHeader(baseName, scalarFunction.deterministic(), scalarFunction.calledOnNullInput(), scalarFunction.pushdownSubfieldArgIndex()));

for (String alias : scalarFunction.alias()) {
builder.add(new ScalarTranslationHeader(alias, scalarFunction.deterministic(), scalarFunction.calledOnNullInput()));
builder.add(new ScalarTranslationHeader(alias, scalarFunction.deterministic(), scalarFunction.calledOnNullInput(), scalarFunction.pushdownSubfieldArgIndex()));
}
}

if (scalarOperator != null) {
builder.add(new ScalarTranslationHeader(scalarOperator.value(), true, scalarOperator.value().isCalledOnNullInput()));
builder.add(new ScalarTranslationHeader(scalarOperator.value(), true, scalarOperator.value().isCalledOnNullInput(), -1));
}

List<ScalarTranslationHeader> result = builder.build();
checkArgument(!result.isEmpty());
return result;
}

private ScalarTranslationHeader(String name, boolean deterministic, boolean calledOnNullInput)
private ScalarTranslationHeader(String name, boolean deterministic, boolean calledOnNullInput, int pushdownSubfieldArgIndex)
{
// TODO This is a hack. Engine should provide an API for connectors to overwrite functions. Connector should not hard code the builtin function namespace.
this.name = requireNonNull(QualifiedObjectName.valueOf("presto", "default", name));
this.operatorType = Optional.empty();
this.deterministic = deterministic;
this.calledOnNullInput = calledOnNullInput;
this.pushdownSubfieldArgIndex = pushdownSubfieldArgIndex;
}

private ScalarTranslationHeader(OperatorType operatorType, boolean deterministic, boolean calledOnNullInput)
private ScalarTranslationHeader(OperatorType operatorType, boolean deterministic, boolean calledOnNullInput, int pushdownSubfieldArgIndex)
{
this.name = operatorType.getFunctionName();
this.operatorType = Optional.of(operatorType);
this.deterministic = deterministic;
this.calledOnNullInput = calledOnNullInput;
this.pushdownSubfieldArgIndex = pushdownSubfieldArgIndex;
}

private static String annotatedName(AnnotatedElement annotatedElement)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ protected QueryRunner createQueryRunner()
Optional.empty());
}

@Override
protected QueryRunner createExpectedQueryRunner()
throws Exception
{
return getQueryRunner();
}

@Test
public void testMetadataQueryOptimizationWithLimit()
{
Expand Down Expand Up @@ -1366,6 +1373,18 @@ public void testPushdownSubfields()
assertPushdownSubfields("SELECT x.a FROM test_pushdown_struct_subfields WHERE x.a > 10 AND x.b LIKE 'abc%'", "test_pushdown_struct_subfields",
ImmutableMap.of("x", toSubfields("x.a", "x.b")));

assertQuery("SELECT struct.b FROM (SELECT CUSTOM_STRUCT_WITH_PASSTHROUGH(x) AS struct FROM test_pushdown_struct_subfields)");
assertQuery("SELECT struct.b FROM (SELECT CUSTOM_STRUCT_WITHOUT_PASSTHROUGH(x) AS struct FROM test_pushdown_struct_subfields)");

assertPushdownSubfields("SELECT struct.b FROM (SELECT CUSTOM_STRUCT_WITH_PASSTHROUGH(x) AS struct FROM test_pushdown_struct_subfields)", "test_pushdown_struct_subfields",
ImmutableMap.of("x", toSubfields("x.b")));

assertPushdownSubfields("SELECT struct.b FROM (SELECT CUSTOM_STRUCT_WITHOUT_PASSTHROUGH(x) AS struct FROM test_pushdown_struct_subfields)", "test_pushdown_struct_subfields",
ImmutableMap.of());

assertPushdownSubfields("SELECT struct.b FROM (SELECT x AS struct FROM test_pushdown_struct_subfields)", "test_pushdown_struct_subfields",
ImmutableMap.of("x", toSubfields("x.b")));

// Join
assertPlan("SELECT l.orderkey, x.a, mod(x.d.d1, 2) FROM lineitem l, test_pushdown_struct_subfields a WHERE l.linenumber = a.id",
anyTree(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,4 +161,10 @@ private void testPushDownSubfieldsFromLambdas(String tableName)
assertQuery("SELECT TRANSFORM_VALUES(MAP_REMOVE_NULL_VALUES(row_with_map_varchar_key_row_value.map_varchar_key_row_value), (k,v) -> v.orderkey) FROM " + tableName);
assertQuery("SELECT TRANSFORM_VALUES(MAP_SUBSET(row_with_map_varchar_key_row_value.map_varchar_key_row_value, ARRAY['orderdata_ex']), (k,v) -> v.orderkey) FROM " + tableName);
}

public void testPushdownSubfieldArgIndexForScalar()
{
assertQuery("SELECT struct.a FROM (SELECT CUSTOM_STRUCT_WITH_PASSTHROUGH(CAST(ROW(orderkey, comment) AS ROW(a BIGINT, b VARCHAR))) AS struct FROM lineitem)");
assertQuery("SELECT struct.a FROM (SELECT CUSTOM_STRUCT_WITHOUT_PASSTHROUGH(CAST(ROW(orderkey, comment) AS ROW(a BIGINT, b VARCHAR))) AS struct FROM lineitem)");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,7 @@ else if (function instanceof SqlInvokedFunction) {
JAVA,
function.isDeterministic(),
function.isCalledOnNullInput(),
function.getPushdownSubfieldArgIndex(),
function.getComplexTypeFunctionDescriptor());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ public boolean isCalledOnNullInput()
return details.isCalledOnNullInput();
}

@Override
public Optional<Integer> getPushdownSubfieldArgIndex()
{
return details.getPushdownSubfieldArgIndex();
}

@Override
public String getDescription()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ public class ScalarHeader
private final SqlFunctionVisibility visibility;
private final boolean deterministic;
private final boolean calledOnNullInput;
private final int pushdownSubfieldArgIndex;

public ScalarHeader(Optional<String> description, SqlFunctionVisibility visibility, boolean deterministic, boolean calledOnNullInput)
public ScalarHeader(Optional<String> description, SqlFunctionVisibility visibility, boolean deterministic, boolean calledOnNullInput, int pushdownSubfieldArgIndex)
{
this.description = description;
this.visibility = visibility;
this.deterministic = deterministic;
this.calledOnNullInput = calledOnNullInput;
this.pushdownSubfieldArgIndex = pushdownSubfieldArgIndex;
}

public Optional<String> getDescription()
Expand All @@ -51,4 +53,12 @@ public boolean isCalledOnNullInput()
{
return calledOnNullInput;
}

public Optional<Integer> getPushdownSubfieldArgIndex()
{
if (pushdownSubfieldArgIndex < 0) {
return Optional.empty();
}
return Optional.of(pushdownSubfieldArgIndex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.facebook.presto.spi.function.SqlNullable;
import com.facebook.presto.spi.function.SqlType;
import com.facebook.presto.spi.function.TypeParameter;
import com.facebook.presto.spi.function.TypeVariableConstraint;
import com.facebook.presto.sql.gen.lambda.BinaryFunctionInterface;
import com.facebook.presto.sql.gen.lambda.UnaryFunctionInterface;
import com.google.common.collect.ImmutableList;
Expand All @@ -44,7 +45,9 @@
import java.lang.invoke.MethodHandle;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

Expand Down Expand Up @@ -122,6 +125,8 @@ private static SqlScalarFunction createSqlScalarFunction(Method method)
Arrays.stream(method.getParameters()).map(p -> parseTypeSignature(p.getAnnotation(SqlType.class).value())).collect(toImmutableList()),
false);

checkPushdownSubfieldArgIndex(signature, codegenScalarFunction.pushdownSubfieldArgIndex(), method);

return new SqlScalarFunction(signature)
{
@Override
Expand Down Expand Up @@ -166,6 +171,28 @@ public boolean isCalledOnNullInput()
{
return codegenScalarFunction.calledOnNullInput();
}

@Override
public Optional<Integer> getPushdownSubfieldArgIndex()
{
if (codegenScalarFunction.pushdownSubfieldArgIndex() < 0) {
return Optional.empty();
}
return Optional.of(codegenScalarFunction.pushdownSubfieldArgIndex());
}
};
}

private static void checkPushdownSubfieldArgIndex(Signature signature, int pushdownSubfieldArgIndex, Method method)
{
if (pushdownSubfieldArgIndex >= 0) {
Map<String, String> typeConstraintMapping = new HashMap<>();
for (TypeVariableConstraint constraint : signature.getTypeVariableConstraints()) {
typeConstraintMapping.put(constraint.getName(), constraint.getVariadicBound());
}
checkCondition(signature.getArgumentTypes().size() > pushdownSubfieldArgIndex, FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] has out of range pushdown subfield arg index", method);
String typeVariableName = signature.getArgumentTypes().get(pushdownSubfieldArgIndex).toString();
checkCondition(typeVariableName.equals(com.facebook.presto.common.type.StandardTypes.ROW) || typeConstraintMapping.get(typeVariableName).equals(com.facebook.presto.common.type.StandardTypes.ROW), FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] does not have a struct or row type as pushdown subfield arg", method);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or has no typeConstraintMapping entry (maybe no constraint on the type) or typeConstraintMapping value is null and TypeVariableConstraint.nonDecimalNumericRequired is false (i.e. there's no constraint preventing row type?) ?

Copy link
Contributor Author

@kevintang2022 kevintang2022 May 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is too restrictive. The argument index is only checked for these conditions if the pushdownSubfieldArgIndex.isPresent().

Also, the typeVariableName should always be present because signature.getArgumentTypes() always contains the same number of parameters as the function signature. There is always a type constraint imposed on a positional argument.

I think you might have it confused with typeVariableConstraints. typeVariableConstraints might be empty if we don't use an alias for a type (T for RowType). typeConstraintMapping is built using typeVariableConstraints.

So my check ensures that the parameter is either directly named a "row" type, or it has a type alias that maps to a "row" type

In summary: typeVariableConstraints is retreived from this annotation: @TypeParameter(value = "T", boundedBy = ROW)

argumentTypes is retrieved from the function definition public static Block customStructWithPassthrough(@SqlType("T") Block struct)

Screenshot 2025-05-27 at 2 19 52 PM

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To make it clearer, heres an example of what the variables look like when the function takes in more than one parameter

Screenshot 2025-05-27 at 2 56 22 PM Screenshot 2025-05-27 at 2 56 11 PM

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.spi.function.Signature;
import com.facebook.presto.spi.function.SqlInvokedScalarFunction;
import com.facebook.presto.spi.function.SqlType;
import com.facebook.presto.spi.function.TypeVariableConstraint;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;

Expand Down Expand Up @@ -106,6 +107,7 @@ private static SqlScalarFunction parseParametricScalar(ScalarHeaderAndMethods sc
Map<SpecializedSignature, ParametricScalarImplementation.Builder> signatures = new HashMap<>();
for (Method method : scalar.getMethods()) {
ParametricScalarImplementation implementation = ParametricScalarImplementation.Parser.parseImplementation(header, method, constructor);
checkPushdownSubfieldArgIndex(implementation, header, method);
if (!signatures.containsKey(implementation.getSpecializedSignature())) {
ParametricScalarImplementation.Builder builder = new ParametricScalarImplementation.Builder(
implementation.getSignature(),
Expand Down Expand Up @@ -155,4 +157,21 @@ public Set<Method> getMethods()
return methods;
}
}

private static void checkPushdownSubfieldArgIndex(ParametricScalarImplementation implementation, ScalarImplementationHeader header, Method method)
{
Optional<Integer> pushdownSubfieldArgIndex = header.getHeader().getPushdownSubfieldArgIndex();
if (pushdownSubfieldArgIndex.isPresent()) {
Map<String, String> typeConstraintMapping = new HashMap<>();
Signature signature = implementation.getSignature();
for (TypeVariableConstraint constraint : signature.getTypeVariableConstraints()) {
typeConstraintMapping.put(constraint.getName(), constraint.getVariadicBound());
}

checkCondition(pushdownSubfieldArgIndex.get() >= 0, FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] has negative pushdown subfield arg index", method);
checkCondition(signature.getArgumentTypes().size() > pushdownSubfieldArgIndex.get(), FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] has out of range pushdown subfield arg index", method);
String typeVariableName = signature.getArgumentTypes().get(pushdownSubfieldArgIndex.get()).toString();
checkCondition(typeVariableName.equals(com.facebook.presto.common.type.StandardTypes.ROW) || typeConstraintMapping.get(typeVariableName).equals(com.facebook.presto.common.type.StandardTypes.ROW), FUNCTION_IMPLEMENTATION_ERROR, "Method [%s] does not have a struct or row type as pushdown subfield arg", method);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about maybe this is too restrictive and need to add some other conditions.

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,15 +81,15 @@ public static List<ScalarImplementationHeader> fromAnnotatedElement(AnnotatedEle

if (scalarFunction != null) {
String baseName = scalarFunction.value().isEmpty() ? camelToSnake(annotatedName(annotated)) : scalarFunction.value();
builder.add(new ScalarImplementationHeader(baseName, new ScalarHeader(description, scalarFunction.visibility(), scalarFunction.deterministic(), scalarFunction.calledOnNullInput())));
builder.add(new ScalarImplementationHeader(baseName, new ScalarHeader(description, scalarFunction.visibility(), scalarFunction.deterministic(), scalarFunction.calledOnNullInput(), scalarFunction.pushdownSubfieldArgIndex())));

for (String alias : scalarFunction.alias()) {
builder.add(new ScalarImplementationHeader(alias, new ScalarHeader(description, scalarFunction.visibility(), scalarFunction.deterministic(), scalarFunction.calledOnNullInput())));
builder.add(new ScalarImplementationHeader(alias, new ScalarHeader(description, scalarFunction.visibility(), scalarFunction.deterministic(), scalarFunction.calledOnNullInput(), scalarFunction.pushdownSubfieldArgIndex())));
}
}

if (scalarOperator != null) {
builder.add(new ScalarImplementationHeader(scalarOperator.value(), new ScalarHeader(description, HIDDEN, true, scalarOperator.value().isCalledOnNullInput())));
builder.add(new ScalarImplementationHeader(scalarOperator.value(), new ScalarHeader(description, HIDDEN, true, scalarOperator.value().isCalledOnNullInput(), -1)));
}

List<ScalarImplementationHeader> result = builder.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,15 @@ private static Optional<Subfield> toSubfield(
if (expression instanceof VariableReferenceExpression) {
return Optional.of(new Subfield(((VariableReferenceExpression) expression).getName(), elements.build().reverse()));
}
if (expression instanceof CallExpression) {
Optional<Integer> pushdownSubfieldArgIndex = functionAndTypeManager.getFunctionMetadata(((CallExpression) expression).getFunctionHandle()).getPushdownSubfieldArgIndex();
if (pushdownSubfieldArgIndex.isPresent() &&
((CallExpression) expression).getArguments().size() > pushdownSubfieldArgIndex.get() &&
((CallExpression) expression).getArguments().get(pushdownSubfieldArgIndex.get()).getType() instanceof RowType) {
expression = ((CallExpression) expression).getArguments().get(pushdownSubfieldArgIndex.get());
continue;
}
}

if (expression instanceof SpecialFormExpression && ((SpecialFormExpression) expression).getForm() == DEREFERENCE) {
SpecialFormExpression dereference = (SpecialFormExpression) expression;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,6 @@
boolean deterministic() default true;

boolean calledOnNullInput() default false;

int pushdownSubfieldArgIndex() default -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This interface method, and the ones on ScalarFunction and SqlInvokedScalarFunction, are parameters to an annotation. Does a user implementing a function using the Presto SPI manually specify this value in the annotation? If not, then these should be removed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes they are specified in the annotation. If it is not specified, then it is -1 by default

Annotation looks like this

@CodegenScalarFunction(value = "function_name", calledOnNullInput = true, pushdownSubfieldArgIndex = 0)

Copy link
Contributor

@tdcmeehan tdcmeehan May 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add some documentation on how this needs to be used to our documentation?

Also, why not add an annotation to the argument itself? Something like, @RowMayBeDereferenced. (BTW, can you give an example of how a function could know that this is safe to do?) You could annotate multiple of them, and we we could validate that the argument is, indeed, a struct to begin with.

Also, your example reference internal queries, can you add pastes of the explain plans?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also add some tests?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of using annotation on the argument, I have chosen to pass the argindex in the codegen decorator because this allows it to be inside the FunctionMetadata.

I added some tests to TestHiveLogicalPlanner. One more change I will make is to perform some validation that the argIndex specified does correspond to a rowtype. And throw a warning when the code path is not reached due to invalid index

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add this description to the documentation once this change is merged in

pushdownSubfieldArgIndex is used to specify which parameter in the scalar function corresponds to the parameter that should have its subfields pushed down to filter plan nodes during query planning and optimization. This is helpful to reduce the amount of scanning done for queries involving structs, and it ensures that only utilized subfields of the struct scanned and unused subfields can be pruned from the query plan. In the below example, the pushdownSubfieldArgIndex is set to 0, which means that the first parameter of the custom_struct_with_passthrough function will have its subfields be pushed down to filter plan nodes. So for a query such as SELECT struct.a FROM (SELECT CUSTOM_STRUCT_WITH_PASSTHROUGH(CAST(ROW(orderkey, comment) AS ROW(a BIGINT, b VARCHAR))) AS struct FROM lineitem) , the query plan will only include the orderkey subfield in the table scan, and the comment subfield will be pruned because it is not utilized in the outer query.

@ScalarFunction(value = "custom_struct_with_passthrough", pushdownSubfieldArgIndex = 0)
@TypeParameter(value = "T", boundedBy = ROW)
@SqlType("T")
public static Block customStructWithPassthrough(@SqlType("T") Block struct)
{
    return struct;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a non-trivial example you can add? And can you please add the documentation with the current PR?

Correct me if I'm wrong, this field basically says to the planner, "I'm OK if the function receives a pruned version of the struct, because this function just transparently utilizes and returns this argument." But I'm really struggling to understand under what circumstance that would ever apply (or, how you would ever come up with a non-trivial example). How does fb_reshape_row work? Are there any other non-public functions that this would be beneficial for at Meta?

If this acknowledged to be esoteric, wouldn't it just make sense to add it as a field in ComplexTypeFunctionDescriptor (which is a part of FunctionMetadata), and convert the esoteric fb_reshape_row implementation to manually extend SqlScalarFunction, rather than use the annotation based approach? Usually when we need to do something unusual and where it's awkward to modify the public SPI, this is the route we prefer. Some examples to refer to in OSS include ArrayConcatFunction, ArrayFlattenFunction, etc.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fb_reshape_row is basically a specialized cast. it makes it easier to work with rows that might have had subfields (or with any layer of nesting) added to them, types changed, etc by being able to take a row of one form and cast it to some other shape. it doesn't care if any fields are null/empty/missing.

}
Loading
Loading