Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -669,6 +669,7 @@ void ApplyServiceConfig(TKikimrConfiguration& kqpConfig, const TTableServiceConf
kqpConfig.EnableOrderOptimizaionFSM = serviceConfig.GetEnableOrderOptimizaionFSM();
kqpConfig.EnableTopSortSelectIndex = serviceConfig.GetEnableTopSortSelectIndex();
kqpConfig.EnablePointPredicateSortAutoSelectIndex = serviceConfig.GetEnablePointPredicateSortAutoSelectIndex();
kqpConfig.EnableDqHashCombineByDefault = serviceConfig.GetEnableDqHashCombineByDefault();

if (const auto limit = serviceConfig.GetResourceManager().GetMkqlHeavyProgramMemoryLimit()) {
kqpConfig._KqpYqlCombinerMemoryLimit = std::max(1_GB, limit - (limit >> 2U));
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -359,6 +359,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
bool enableTopSortSelectIndex = TableServiceConfig.GetEnableTopSortSelectIndex();
bool enablePointPredicateSortAutoSelectIndex = TableServiceConfig.GetEnablePointPredicateSortAutoSelectIndex();

bool enableDqHashCombineByDefault = TableServiceConfig.GetEnableDqHashCombineByDefault();

TableServiceConfig.Swap(event.MutableConfig()->MutableTableServiceConfig());
LOG_INFO(*TlsActivationContext, NKikimrServices::KQP_COMPILE_SERVICE, "Updated config");

Expand Down Expand Up @@ -404,7 +406,8 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
TableServiceConfig.GetEnableOlapPushdownAggregate() != enableOlapPushdownAggregate ||
TableServiceConfig.GetEnableOrderOptimizaionFSM() != enableOrderOptimizaionFSM ||
TableServiceConfig.GetEnableTopSortSelectIndex() != enableTopSortSelectIndex ||
TableServiceConfig.GetEnablePointPredicateSortAutoSelectIndex() != enablePointPredicateSortAutoSelectIndex)
TableServiceConfig.GetEnablePointPredicateSortAutoSelectIndex() != enablePointPredicateSortAutoSelectIndex ||
TableServiceConfig.GetEnableDqHashCombineByDefault() != enableDqHashCombineByDefault)
{

QueryCache->Clear();
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_settings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,4 +230,9 @@ bool TKikimrConfiguration::GetEnableParallelUnionAllConnectionsForExtend() const
bool TKikimrConfiguration::GetEnableOlapPushdownAggregate() const {
return ((GetOptionalFlagValue(OptEnableOlapPushdownAggregate.Get()) == EOptionalFlag::Enabled) || EnableOlapPushdownAggregate);
}

bool TKikimrConfiguration::GetUseDqHashCombine() const {
return UseDqHashCombine.Get().GetOrElse(EnableDqHashCombineByDefault);
}

}
2 changes: 2 additions & 0 deletions ydb/core/kqp/provider/yql_kikimr_settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool EnableTopSortSelectIndex = true;
bool EnablePointPredicateSortAutoSelectIndex = true;
bool EnableSimpleProgramsSinglePartitionOptimization = true;
bool EnableDqHashCombineByDefault = false;

ui32 LangVer = NYql::MinLangVersion;
NYql::EBackportCompatibleFeaturesMode BackportMode = NYql::EBackportCompatibleFeaturesMode::Released;
Expand All @@ -232,6 +233,7 @@ struct TKikimrConfiguration : public TKikimrSettings, public NCommon::TSettingDi
bool GetEnableOlapPushdownProjections() const;
bool GetEnableParallelUnionAllConnectionsForExtend() const;
bool GetEnableOlapPushdownAggregate() const;
bool GetUseDqHashCombine() const;
};

}
2 changes: 2 additions & 0 deletions ydb/core/protos/table_service_config.proto
Original file line number Diff line number Diff line change
Expand Up @@ -426,4 +426,6 @@ message TTableServiceConfig {
optional bool EnablePointPredicateSortAutoSelectIndex = 101 [ default = false ];

optional bool EnableDataShardCreateTableAs = 102 [default = false];

optional bool EnableDqHashCombineByDefault = 105 [default = false];
};
130 changes: 94 additions & 36 deletions ydb/library/yql/dq/comp_nodes/dq_hash_combine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,10 @@ struct TWideUnboxedHasher
const TKeyTypes& Types;
};

bool HasMemoryForProcessing() {
return !TlsAllocState->IsMemoryYellowZoneEnabled();
}

using TEqualsPtr = bool(*)(const NUdf::TUnboxedValuePod*, const NUdf::TUnboxedValuePod*);
using THashPtr = NUdf::THashType(*)(const NUdf::TUnboxedValuePod*);

Expand Down Expand Up @@ -99,21 +103,38 @@ struct TStorageWrapper
}
};

std::optional<size_t> EstimateUvPackSize(const TUnboxedValuePod* items, size_t width) {
std::optional<size_t> EstimateUvPackSize(const TArrayRef<const TUnboxedValuePod> items, const TArrayRef<TType* const> types) {
constexpr const size_t uvSize = sizeof(TUnboxedValuePod);

size_t sizeSum = 0;

const TUnboxedValuePod* itemPtr = items;
for (size_t i = 0; i < width; ++i, ++itemPtr) {
const TUnboxedValuePod& item = *itemPtr;
auto currType = types.begin();
for (const auto& item : items) {
if (!item.HasValue() || item.IsEmbedded() || item.IsInvalid()) {
sizeSum += uvSize;
} else if (item.IsString()) {
sizeSum += uvSize + item.AsStringRef().Size();
} else {
} else if (!item.IsBoxed()) {
return {};
} else {
auto ty = *currType;
while (ty->IsOptional()) {
ty = AS_TYPE(TOptionalType, ty)->GetItemType();
}
if (ty->IsTuple()) {
auto tupleType = AS_TYPE(TTupleType, ty);
auto elements = tupleType->GetElements();
auto tupleSize = EstimateUvPackSize(TArrayRef(item.GetElements(), elements.size()), elements);
if (!tupleSize.has_value()) {
return {};
}
// Tuple contents are generally boxed into a TDirectArrayHolderInplace instance
sizeSum += uvSize + sizeof(TDirectArrayHolderInplace) + tupleSize.value();
} else {
return {};
}
}
++currType;
}

return sizeSum;
Expand All @@ -124,25 +145,40 @@ class TMemoryEstimationHelper
{
private:
static std::optional<size_t> GetUVSizeBound(TType* type) {
using NYql::NUdf::EDataSlot;
if (type->IsData()) {
using NYql::NUdf::EDataSlot;

bool optional = false;
auto dataSlot = UnpackOptionalData(type, optional)->GetDataSlot();
bool optional = false;
auto dataSlot = UnpackOptionalData(type, optional)->GetDataSlot();

if (dataSlot.Empty()) {
return {};
}
if (dataSlot.Empty()) {
return {};
}

switch (dataSlot.GetRef()) {
case EDataSlot::DyNumber:
case EDataSlot::Json:
case EDataSlot::JsonDocument:
case EDataSlot::Yson:
case EDataSlot::Utf8:
case EDataSlot::String:
switch (dataSlot.GetRef()) {
case EDataSlot::DyNumber:
case EDataSlot::Json:
case EDataSlot::JsonDocument:
case EDataSlot::Yson:
case EDataSlot::Utf8:
case EDataSlot::String:
return {};
default:
return sizeof(TUnboxedValuePod);
}
} else if (type->IsTuple()) {
size_t result = 0;
const auto tupleElements = AS_TYPE(TTupleType, type)->GetElements();
for (auto* element : tupleElements) {
auto sz = GetUVSizeBound(element);
if (!sz.has_value()) {
return {};
}
result += sz.value();
}
return result + sizeof(TUnboxedValuePod);
} else {
return {};
default:
return sizeof(TUnboxedValuePod);
}
}

Expand All @@ -163,17 +199,21 @@ class TMemoryEstimationHelper
std::optional<size_t> StateSizeBound;
std::optional<size_t> KeySizeBound;
const size_t KeyWidth;
const std::vector<TType*> KeyItemTypes;

TMemoryEstimationHelper(std::vector<TType*> keyItemTypes, std::vector<TType*> stateItemTypes)
: KeyWidth(keyItemTypes.size())
, KeyItemTypes(keyItemTypes)
{
KeySizeBound = GetMultiUVSizeBound(keyItemTypes);
StateSizeBound = GetMultiUVSizeBound(stateItemTypes);
}

std::optional<size_t> EstimateKeySize(const TUnboxedValuePod* keyPack) const
{
return EstimateUvPackSize(keyPack, KeyWidth);
return EstimateUvPackSize(
TArrayRef<const TUnboxedValuePod>(keyPack, KeyWidth),
TArrayRef<TType* const>(KeyItemTypes.begin(), KeyItemTypes.end()));
}
};

Expand Down Expand Up @@ -215,16 +255,19 @@ class TGenericAggregation: public IAggregation
const NDqHashOperatorCommon::TCombinerNodes& Nodes;
size_t StateWidth;
size_t StateSize;
const std::vector<TType*>& StateItemTypes;

public:
TGenericAggregation(
TComputationContext& ctx,
const NDqHashOperatorCommon::TCombinerNodes& nodes
const NDqHashOperatorCommon::TCombinerNodes& nodes,
const std::vector<TType*>& stateItemTypes
)
: Ctx(ctx)
, Nodes(nodes)
, StateWidth(Nodes.StateNodes.size())
, StateSize(StateWidth * sizeof(TUnboxedValue))
, StateItemTypes(stateItemTypes)
{
}

Expand All @@ -233,7 +276,10 @@ class TGenericAggregation: public IAggregation
}

std::optional<size_t> GetStateMemoryUsage(void* rawState) const override {
return EstimateUvPackSize(static_cast<const TUnboxedValuePod*>(rawState), StateWidth);
return EstimateUvPackSize(
TArrayRef<const TUnboxedValuePod>(static_cast<const TUnboxedValuePod*>(rawState), StateWidth),
TArrayRef<TType* const>(StateItemTypes)
);
}

// Assumes the input row and extracted keys have already been copied into the input nodes, so row isn't even used here
Expand Down Expand Up @@ -412,7 +458,7 @@ class TBaseAggregationState: public TComputationValue<TBaseAggregationState>
}

if (isNew) {
if (Map->GetSize() >= MaxRowCount) {
if (Map->GetSize() >= MaxRowCount || (!HasMemoryForProcessing() && Map->GetSize() >= LowerFixedRowCount)) {
OpenDrain();
return EFillState::Drain;
}
Expand All @@ -433,7 +479,7 @@ class TBaseAggregationState: public TComputationValue<TBaseAggregationState>

TBaseAggregationState(
TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TMemoryEstimationHelper& memoryHelper, size_t memoryLimit, size_t inputWidth,
const NDqHashOperatorCommon::TCombinerNodes& nodes, ui32 wideFieldsIndex, const TKeyTypes& keyTypes
const NDqHashOperatorCommon::TCombinerNodes& nodes, ui32 wideFieldsIndex, const TKeyTypes& keyTypes, const std::vector<TType*>& stateItemTypes
)
: TBase(memInfo)
, Ctx(ctx)
Expand All @@ -445,7 +491,6 @@ class TBaseAggregationState: public TComputationValue<TBaseAggregationState>
, KeyTypes(keyTypes)
, Hasher(TWideUnboxedHasher(KeyTypes))
, Equals(TWideUnboxedEqual(KeyTypes))
, HasGenericAggregation(nodes.StateNodes.size() > 0)
, KeyStateBuffer(nullptr)
, Draining(false)
, SourceEmpty(false)
Expand All @@ -459,7 +504,7 @@ class TBaseAggregationState: public TComputationValue<TBaseAggregationState>
MaxRowCount = TryAllocMapForRowCount(MaxRowCount);

if (HasGenericAggregation) {
Aggs.push_back(std::make_unique<TGenericAggregation>(Ctx, Nodes));
Aggs.push_back(std::make_unique<TGenericAggregation>(Ctx, Nodes, stateItemTypes));
}

MKQL_ENSURE(Aggs.size(), "No aggregations defined");
Expand Down Expand Up @@ -489,6 +534,7 @@ class TBaseAggregationState: public TComputationValue<TBaseAggregationState>
size_t TryAllocMapForRowCount(size_t rowCount)
{
// Avoid reallocating the map
// TODO: although Clear()-ing might be actually more expensive than reallocation
if (Map) {
const size_t oldCapacity = Map->GetCapacity();
size_t newCapacity = GetMapCapacity(rowCount);
Expand All @@ -503,6 +549,10 @@ class TBaseAggregationState: public TComputationValue<TBaseAggregationState>
size_t newCapacity = GetMapCapacity(rows);
try {
Map.Reset(new TMap(Hasher, Equals, newCapacity));
if (!HasMemoryForProcessing()) {
Map.Reset(nullptr);
return false;
}
return true;
}
catch (TMemoryLimitExceededException) {
Expand All @@ -517,6 +567,7 @@ class TBaseAggregationState: public TComputationValue<TBaseAggregationState>
rowCount = rowCount / 2;
}

// This can emit uncaught TMemoryLimitExceededException if we can't afford even a tiny map
size_t smallCapacity = GetMapCapacity(LowerFixedRowCount);
Map.Reset(new TMap(Hasher, Equals, smallCapacity));
return LowerFixedRowCount;
Expand Down Expand Up @@ -596,7 +647,7 @@ class TBaseAggregationState: public TComputationValue<TBaseAggregationState>
const TKeyTypes& KeyTypes;
THashFunc const Hasher;
TEqualsFunc const Equals;
const bool HasGenericAggregation;
constexpr static const bool HasGenericAggregation = true;

using TStore = TStorageWrapper<char>;
std::unique_ptr<TStore> Store;
Expand Down Expand Up @@ -631,9 +682,11 @@ class TWideAggregationState: public TBaseAggregationState
size_t outputWidth,
const NDqHashOperatorCommon::TCombinerNodes& nodes,
ui32 wideFieldsIndex,
const TKeyTypes& keyTypes
const TKeyTypes& keyTypes,
const std::vector<TType*>& stateItemTypes

)
: TBaseAggregationState(memInfo, ctx, memoryHelper, memoryLimit, inputWidth, nodes, wideFieldsIndex, keyTypes)
: TBaseAggregationState(memInfo, ctx, memoryHelper, memoryLimit, inputWidth, nodes, wideFieldsIndex, keyTypes, stateItemTypes)
, StartMoment(TInstant::Now()) // Temporary. Helps correlate debug outputs with SVGs
, OutputWidth(outputWidth)
, DrainMapIterator(nullptr)
Expand Down Expand Up @@ -830,9 +883,10 @@ class TBlockAggregationState: public TBaseAggregationState
size_t inputWidth,
const NDqHashOperatorCommon::TCombinerNodes& nodes,
ui32 wideFieldsIndex,
const TKeyTypes& keyTypes
const TKeyTypes& keyTypes,
const std::vector<TType*>& stateItemTypes
)
: TBaseAggregationState(memInfo, ctx, memoryHelper, memoryLimit, inputWidth, nodes, wideFieldsIndex, keyTypes)
: TBaseAggregationState(memInfo, ctx, memoryHelper, memoryLimit, inputWidth, nodes, wideFieldsIndex, keyTypes, stateItemTypes)
, InputTypes(inputTypes)
, OutputTypes(outputTypes)
, InputColumns(inputTypes.size() - 1)
Expand Down Expand Up @@ -1156,6 +1210,7 @@ class TDqHashCombineFlowWrapper: public TStatefulWideFlowCodegeneratorNode<TDqHa
, Source(source)
, InputTypes(inputTypes)
, OutputTypes(outputTypes)
, StateItemTypes(stateItemTypes)
, InputWidth(inputWidth)
, Nodes(std::move(nodes))
, KeyTypes(std::move(keyTypes))
Expand Down Expand Up @@ -1437,16 +1492,17 @@ class TDqHashCombineFlowWrapper: public TStatefulWideFlowCodegeneratorNode<TDqHa
UDF_LOG(logger, logComponent, NUdf::ELogLevel::Debug, TStringBuilder() << "State initialized");

if (!BlockMode) {
state = ctx.HolderFactory.Create<TWideAggregationState>(ctx, MemoryHelper, MemoryLimit, InputWidth, OutputTypes.size(), Nodes, WideFieldsIndex, KeyTypes);
state = ctx.HolderFactory.Create<TWideAggregationState>(ctx, MemoryHelper, MemoryLimit, InputWidth, OutputTypes.size(), Nodes, WideFieldsIndex, KeyTypes, StateItemTypes);
} else {
state = ctx.HolderFactory.Create<TBlockAggregationState>(ctx, MemoryHelper, MemoryLimit, InputTypes, OutputTypes, InputWidth, Nodes, WideFieldsIndex, KeyTypes);
state = ctx.HolderFactory.Create<TBlockAggregationState>(ctx, MemoryHelper, MemoryLimit, InputTypes, OutputTypes, InputWidth, Nodes, WideFieldsIndex, KeyTypes, StateItemTypes);
}
}

const bool BlockMode;
IComputationWideFlowNode *const Source;
std::vector<TType*> InputTypes;
std::vector<TType*> OutputTypes;
const std::vector<TType*> StateItemTypes;
size_t InputWidth;
const NDqHashOperatorCommon::TCombinerNodes Nodes;
const TKeyTypes KeyTypes;
Expand All @@ -1473,6 +1529,7 @@ class TDqHashCombineStreamWrapper: public TMutableComputationNode<TDqHashCombine
, StreamSource(streamSource)
, InputTypes(inputTypes)
, OutputTypes(outputTypes)
, StateItemTypes(stateItemTypes)
, InputWidth(inputWidth)
, Nodes(std::move(nodes))
, KeyTypes(std::move(keyTypes))
Expand All @@ -1499,9 +1556,9 @@ class TDqHashCombineStreamWrapper: public TMutableComputationNode<TDqHashCombine
UDF_LOG(logger, logComponent, NUdf::ELogLevel::Debug, TStringBuilder() << "State initialized");

if (!BlockMode) {
state = ctx.HolderFactory.Create<TWideAggregationState>(ctx, MemoryHelper, MemoryLimit, InputWidth, OutputTypes.size(), Nodes, WideFieldsIndex, KeyTypes);
state = ctx.HolderFactory.Create<TWideAggregationState>(ctx, MemoryHelper, MemoryLimit, InputWidth, OutputTypes.size(), Nodes, WideFieldsIndex, KeyTypes, StateItemTypes);
} else {
state = ctx.HolderFactory.Create<TBlockAggregationState>(ctx, MemoryHelper, MemoryLimit, InputTypes, OutputTypes, InputWidth, Nodes, WideFieldsIndex, KeyTypes);
state = ctx.HolderFactory.Create<TBlockAggregationState>(ctx, MemoryHelper, MemoryLimit, InputTypes, OutputTypes, InputWidth, Nodes, WideFieldsIndex, KeyTypes, StateItemTypes);
}
}

Expand All @@ -1517,6 +1574,7 @@ class TDqHashCombineStreamWrapper: public TMutableComputationNode<TDqHashCombine
IComputationNode *const StreamSource;
std::vector<TType*> InputTypes;
std::vector<TType*> OutputTypes;
const std::vector<TType*> StateItemTypes;
size_t InputWidth;
const NDqHashOperatorCommon::TCombinerNodes Nodes;
const TKeyTypes KeyTypes;
Expand Down
Loading