Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 40 additions & 10 deletions ydb/library/yql/providers/common/pushdown/collection.cpp
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#include "collection.h"

#include <yql/essentials/core/yql_expr_type_annotation.h>
#include <yql/essentials/utils/log/log.h>

#include <vector>

Expand All @@ -24,9 +23,10 @@ class TPredicateMarkup {
using EFlag = TSettings::EFeatureFlag;

public:
TPredicateMarkup(const TExprBase& lambdaArg, const TSettings& settings)
TPredicateMarkup(const TExprBase& lambdaArg, const TSettings& settings, TExprContext& ctx)
: LambdaArg(lambdaArg)
, Settings(settings)
, Ctx(ctx)
{}

void MarkupPredicates(const TExprBase& predicate, TPredicateNode& predicateTree) {
Expand Down Expand Up @@ -207,6 +207,9 @@ class TPredicateMarkup {
if (Settings.IsEnabled(EFlag::StringTypes) && (node.Maybe<TCoUtf8>() || node.Maybe<TCoString>())) {
return true;
}
if (Settings.IsEnabled(EFlag::DecimalCtor) && node.Maybe<TCoDecimal>()) {
return true;
}
return false;
}

Expand Down Expand Up @@ -249,6 +252,22 @@ class TPredicateMarkup {
return CheckExpressionNodeForPushdown(toBytesExpr);
}

bool IsSupportedToString(const TExprBase& toString) {
if (!Settings.IsEnabled(EFlag::ToStringFromStringExpressions)) {
return false;
}

if (toString.Ref().ChildrenSize() != 1) {
return false;
}

auto toStringExpr = TExprBase(toString.Ref().Child(0));
if (!IsStringExpr(toStringExpr)) {
return false;
}
return CheckExpressionNodeForPushdown(toStringExpr);
}

bool IsSupportedLambda(const TCoLambda& lambda, ui64 numberArguments) {
const auto args = lambda.Args();
if (args.Size() != numberArguments) {
Expand Down Expand Up @@ -299,6 +318,9 @@ class TPredicateMarkup {
if (node.Ref().IsCallable({"ToBytes"})) {
return IsSupportedToBytes(node);
}
if (node.Ref().IsCallable({"ToString"})) {
return IsSupportedToString(node);
}
if (auto maybeData = node.Maybe<TCoDataCtor>()) {
return IsSupportedDataType(maybeData.Cast());
}
Expand Down Expand Up @@ -621,33 +643,41 @@ class TPredicateMarkup {
}

bool ApplyCanBePushed(const TCoApply& apply) {
// Check callable
if (auto udf = apply.Callable().Maybe<TCoUdf>()) {
if (!UdfCanBePushed(udf.Cast(), apply.Ref().ChildrenList())) {
return false;
}
// Check if callable is a UDF and can be pushed
auto udf = apply.Callable().Maybe<TCoUdf>();
if (!udf || !UdfCanBePushed(udf.Cast(), apply.Ref().ChildrenList())) {
return false;
}

// Check arguments
// Check if all arguments can be pushed
for (size_t i = 1; i < apply.Ref().ChildrenSize(); ++i) {
if (!CheckExpressionNodeForPushdown(TExprBase(apply.Ref().Child(i)))) {
return false;
}
}

return true;
}

private:
const TExprBase& LambdaArg; // Predicate input item, has struct type
const TSettings& Settings;
[[maybe_unused]] TExprContext& Ctx; // To be used for pretty printing

std::unordered_set<const TExprNode*> LambdaArguments;
};

} // anonymous namespace end

void CollectPredicates(const TExprBase& predicate, TPredicateNode& predicateTree, const TExprBase& lambdaArg, const TExprBase& /*lambdaBody*/, const TSettings& settings) {
TPredicateMarkup markup(lambdaArg, settings);
void CollectPredicates(
TExprContext& ctx,
const TExprBase& predicate,
TPredicateNode& predicateTree,
const TExprBase& lambdaArg,
const TExprBase& /*lambdaBody*/,
const TSettings& settings
) {
TPredicateMarkup markup(lambdaArg, settings, ctx);
markup.MarkupPredicates(predicate, predicateTree);
}

Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/common/pushdown/collection.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
namespace NYql::NPushdown {

// Collects subpredicate that we can then push down
void CollectPredicates(const NNodes::TExprBase& predicate, TPredicateNode& predicateTree,
void CollectPredicates(TExprContext& ctx,
const NNodes::TExprBase& predicate, TPredicateNode& predicateTree,
const NNodes::TExprBase& lambdaArg, const NNodes::TExprBase& lambdaBody,
const TSettings& settings);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ NPushdown::TPredicateNode MakePushdownNode(const NNodes::TCoLambda& lambda, TExp

TCoOptionalIf optionalIf = maybeOptionalIf.Cast();
NPushdown::TPredicateNode predicateTree(optionalIf.Predicate());
NPushdown::CollectPredicates(optionalIf.Predicate(), predicateTree, TExprBase(lambdaArg), TExprBase(lambdaArg), settings);
NPushdown::CollectPredicates(ctx, optionalIf.Predicate(), predicateTree, TExprBase(lambdaArg), TExprBase(lambdaArg), settings);
YQL_ENSURE(predicateTree.IsValid(), "Collected filter predicates are invalid");

return SplitForPartialPushdown(predicateTree, ctx, pos, settings);
Expand Down
7 changes: 6 additions & 1 deletion ydb/library/yql/providers/common/pushdown/settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ struct TSettings {
// In case of unsupported / complicated expressions $B and $D
SplitOrOperator = 1 << 22,
ToBytesFromStringExpressions = 1 << 23, // ToBytes(string like)
FlatMapOverOptionals = 1 << 24 // FlatMap(Optional<T>, Lmabda (T) -> Optional<U>)
FlatMapOverOptionals = 1 << 24, // FlatMap(Optional<T>, Lmabda (T) -> Optional<U>)
ToStringFromStringExpressions = 1 << 25, // ToString(string like)
IntervalCtor = 1 << 26,
MinMax = 1 << 27,
NonDeterministic = 1 << 28,
DecimalCtor = 1 << 29,
};

explicit TSettings(NLog::EComponent logComponent)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,9 @@ message TPredicate {
G = 6; // "$column > value"
IND = 7;// "$column IS NOT DISTINCT value"
ID = 8; // "$column IS DISTINCT value"
STARTS_WITH = 9;
ENDS_WITH = 10;
CONTAINS = 11;
}

EOperation operation = 1;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <ydb/library/yql/providers/generic/expr_nodes/yql_generic_expr_nodes.h>
#include <ydb/library/yql/providers/generic/proto/source.pb.h>
#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h>
#include <ydb/library/yql/providers/generic/provider/yql_generic_provider.h>
#include <ydb/library/yql/providers/generic/provider/yql_generic_state.h>

#include <yql/essentials/ast/yql_ast.h>
#include <yql/essentials/ast/yql_expr.h>
#include <yql/essentials/core/dq_integration/yql_dq_integration.h>
#include <yql/essentials/core/services/yql_out_transformers.h>
#include <yql/essentials/core/services/yql_transform_pipeline.h>
#include <yql/essentials/core/yql_graph_transformer.h>
#include <yql/essentials/core/yql_type_annotation.h>
#include <yql/essentials/core/services/yql_transform_pipeline.h>
#include <yql/essentials/core/services/yql_out_transformers.h>
#include <yql/essentials/core/dq_integration/yql_dq_integration.h>
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
#include <yql/essentials/minikql/mkql_function_registry.h>
#include <ydb/library/yql/providers/common/db_id_async_resolver/db_async_resolver.h>
#include <yql/essentials/providers/common/provider/yql_provider_names.h>
#include <yql/essentials/providers/common/transform/yql_optimize.h>
#include <ydb/library/yql/providers/dq/common/yql_dq_settings.h>
#include <ydb/library/yql/providers/dq/expr_nodes/dqs_expr_nodes.h>
#include <ydb/library/yql/dq/expr_nodes/dq_expr_nodes.h>
#include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
#include <yql/essentials/providers/result/provider/yql_result_provider.h>
#include <yql/essentials/sql/sql.h>
#include <yql/essentials/utils/log/log.h>

#include <library/cpp/testing/unittest/registar.h>

#include <library/cpp/random_provider/random_provider.h>

#include <google/protobuf/text_format.h>
Expand Down Expand Up @@ -143,6 +143,24 @@ struct TFakeGenericClient: public NConnector::IClient {
PRIMITIVE_TYPE_COL("json_document", JSON_DOCUMENT);
PRIMITIVE_TYPE_COL("dynumber", DYNUMBER);

// Add decimal columns
{
auto* col = schema.add_columns();
col->set_name("col_decimal_precision10_scale0");
auto* t = col->mutable_type();
auto* decimalType = t->mutable_decimal_type();
decimalType->set_precision(10);
decimalType->set_scale(0);
}
{
auto* col = schema.add_columns();
col->set_name("col_decimal_precision4_scale2");
auto* t = col->mutable_type();
auto* decimalType = t->mutable_decimal_type();
decimalType->set_precision(4);
decimalType->set_scale(2);
}

return NThreading::MakeFuture<NConnector::TDescribeTableAsyncResult::value_type>(std::move(result));
}

Expand Down Expand Up @@ -246,7 +264,12 @@ struct TPushdownFixture: public NUnitTest::TBaseFixture {
TypesCtx = MakeIntrusive<TTypeAnnotationContext>();
TypesCtx->RandomProvider = CreateDeterministicRandomProvider(1);

FunctionRegistry = CreateFunctionRegistry(CreateBuiltinRegistry())->Clone(); // TODO: remove Clone()
auto functionRegistry = CreateFunctionRegistry(&PrintBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, {})->Clone();
NKikimr::NMiniKQL::FillStaticModules(*functionRegistry);
FunctionRegistry = std::move(functionRegistry);

TypesCtx->UdfResolver = NYql::NCommon::CreateSimpleUdfResolver(FunctionRegistry.Get());
TypesCtx->UserDataStorage = MakeIntrusive<TUserDataStorage>(nullptr, TUserDataTable(), nullptr, nullptr);

{
auto* setting = GatewaysCfg.MutableGeneric()->AddDefaultSettings();
Expand Down Expand Up @@ -711,4 +734,118 @@ Y_UNIT_TEST_SUITE_F(PushdownTest, TPushdownFixture) {
)proto"
);
}

Y_UNIT_TEST(RegexpPushdown) {
AssertFilter(
// Test REGEXP pushdown with a simple pattern matching digits
R"ast(
(Coalesce
(Apply (Udf '"Re2.Grep" '((String '"\\\\d+") (Nothing
(OptionalType
(StructType
'('"CaseSensitive" (DataType 'Bool))
'('"DotNl" (DataType 'Bool))
'('"Literal" (DataType 'Bool))
'('"LogErrors" (DataType 'Bool))
'('"LongestMatch" (DataType 'Bool))
'('"MaxMem" (DataType 'Uint64))
'('"NeverCapture" (DataType 'Bool))
'('"NeverNl" (DataType 'Bool))
'('"OneLine" (DataType 'Bool))
'('"PerlClasses" (DataType 'Bool))
'('"PosixSyntax" (DataType 'Bool))
'('"Utf8" (DataType 'Bool))
'('"WordBoundary" (DataType 'Bool))
)
)
)))
(Member $row '"col_string")
)
(Bool '"false")
)
)ast",
R"proto(
regexp {
value {
column: "col_string"
}
pattern {
typed_value {
type {
type_id: STRING
}
value {
bytes_value: "\\\\d+"
}
}
}
}
)proto"
);
}

Y_UNIT_TEST(DecimalPushdownPrecision10Scale0) {
AssertFilter(
R"ast(
(==
(Member $row '"col_decimal_precision10_scale0")
(Decimal '"1" '"10" '"0")
)
)ast",
R"proto(
comparison {
operation: EQ
left_value {
column: "col_decimal_precision10_scale0"
}
right_value {
typed_value {
type {
decimal_type {
precision: 10
scale: 0
}
}
value {
bytes_value: "\001\000\000\000\000\000\000\000\000\000\000\000\000\000\000\000"
}
}
}
}
)proto"
);
}

Y_UNIT_TEST(DecimalPushdownPrecesion4Scale2) {
// Test with negative decimal value with fractional part
AssertFilter(
R"ast(
(==
(Member $row '"col_decimal_precision4_scale2")
(Decimal '"-22.22" '"4" '"2")
)
)ast",
R"proto(
comparison {
operation: EQ
left_value {
column: "col_decimal_precision4_scale2"
}
right_value {
typed_value {
type {
decimal_type {
precision: 4
scale: 2
}
}
value {
bytes_value: "R\367\377\377\377\377\377\377\377\377\377\377\377\377\377\377"
}
}
}
}
)proto"
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,19 @@ SRCS(
PEERDIR(
contrib/libs/fmt
library/cpp/random_provider
ydb/library/yql/dq/expr_nodes
ydb/library/yql/providers/common/db_id_async_resolver
ydb/library/yql/providers/generic/expr_nodes
yql/essentials/ast
yql/essentials/core
yql/essentials/core/services
ydb/library/yql/dq/expr_nodes
yql/essentials/minikql
ydb/library/yql/providers/common/db_id_async_resolver
ydb/library/yql/providers/generic/expr_nodes
yql/essentials/minikql/invoke_builtins/llvm16
yql/essentials/providers/result/provider
yql/essentials/public/udf/service/stub
yql/essentials/sql
yql/essentials/minikql/invoke_builtins/llvm16
yql/essentials/sql/pg_dummy
yql/essentials/udfs/common/re2
)

SIZE(SMALL)
Expand Down
Loading
Loading