Skip to content

Use bloom filters to collect large DFs #25009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@
"experimental.dynamic-filtering-refresh-interval",
"experimental.enable-dynamic-filtering",
"enable-coordinator-dynamic-filters-distribution",
"dynamic-filtering.small-partitioned.range-row-limit-per-driver",
"dynamic-filtering.large-partitioned.range-row-limit-per-driver",
"dynamic-filtering.small-broadcast.range-row-limit-per-driver",
"dynamic-filtering.large-broadcast.range-row-limit-per-driver",
})
public class DynamicFilterConfig
{
Expand All @@ -59,23 +63,21 @@ public class DynamicFilterConfig
*/
private int smallMaxDistinctValuesPerDriver = 1_000;
private DataSize smallMaxSizePerDriver = DataSize.of(100, KILOBYTE);
private int smallRangeRowLimitPerDriver = 2_000;
private DataSize smallMaxSizePerOperator = DataSize.of(1, MEGABYTE);
private int smallPartitionedMaxDistinctValuesPerDriver = 100;
private DataSize smallPartitionedMaxSizePerDriver = DataSize.of(50, KILOBYTE);
private int smallPartitionedRangeRowLimitPerDriver = 500;
private DataSize smallPartitionedMaxSizePerOperator = DataSize.of(500, KILOBYTE);
private DataSize smallMaxSizePerFilter = DataSize.of(5, MEGABYTE);

private int largeMaxDistinctValuesPerDriver = 50_000;
private DataSize largeMaxSizePerDriver = DataSize.of(4, MEGABYTE);
private int largeRangeRowLimitPerDriver = 100_000;
private DataSize largeMaxSizePerOperator = DataSize.of(5, MEGABYTE);
private int largePartitionedMaxDistinctValuesPerDriver = 20_000;
private DataSize largePartitionedMaxSizePerDriver = DataSize.of(200, KILOBYTE);
private int largePartitionedRangeRowLimitPerDriver = 30_000;
private DataSize largePartitionedMaxSizePerOperator = DataSize.of(5, MEGABYTE);
private DataSize largeMaxSizePerFilter = DataSize.of(10, MEGABYTE);
private int bloomFilterMaxDistinctValuesPerDriver = 100_000;
private int partitionedBloomFilterMaxDistinctValuesPerDriver = 25_000;

public boolean isEnableDynamicFiltering()
{
Expand Down Expand Up @@ -157,20 +159,6 @@ public DynamicFilterConfig setSmallMaxSizePerDriver(DataSize smallMaxSizePerDriv
return this;
}

@Min(0)
public int getSmallRangeRowLimitPerDriver()
{
return smallRangeRowLimitPerDriver;
}

@LegacyConfig("dynamic-filtering.small-broadcast.range-row-limit-per-driver")
@Config("dynamic-filtering.small.range-row-limit-per-driver")
public DynamicFilterConfig setSmallRangeRowLimitPerDriver(int smallRangeRowLimitPerDriver)
{
this.smallRangeRowLimitPerDriver = smallRangeRowLimitPerDriver;
return this;
}

@MaxDataSize("10MB")
public DataSize getSmallMaxSizePerOperator()
{
Expand Down Expand Up @@ -211,19 +199,6 @@ public DynamicFilterConfig setSmallPartitionedMaxSizePerDriver(DataSize smallPar
return this;
}

@Min(0)
public int getSmallPartitionedRangeRowLimitPerDriver()
{
return smallPartitionedRangeRowLimitPerDriver;
}

@Config("dynamic-filtering.small-partitioned.range-row-limit-per-driver")
public DynamicFilterConfig setSmallPartitionedRangeRowLimitPerDriver(int smallPartitionedRangeRowLimitPerDriver)
{
this.smallPartitionedRangeRowLimitPerDriver = smallPartitionedRangeRowLimitPerDriver;
return this;
}

@MaxDataSize("10MB")
public DataSize getSmallPartitionedMaxSizePerOperator()
{
Expand Down Expand Up @@ -278,20 +253,6 @@ public DynamicFilterConfig setLargeMaxSizePerDriver(DataSize largeMaxSizePerDriv
return this;
}

@Min(0)
public int getLargeRangeRowLimitPerDriver()
{
return largeRangeRowLimitPerDriver;
}

@LegacyConfig("dynamic-filtering.large-broadcast.range-row-limit-per-driver")
@Config("dynamic-filtering.large.range-row-limit-per-driver")
public DynamicFilterConfig setLargeRangeRowLimitPerDriver(int largeRangeRowLimitPerDriver)
{
this.largeRangeRowLimitPerDriver = largeRangeRowLimitPerDriver;
return this;
}

@MaxDataSize("100MB")
public DataSize getLargeMaxSizePerOperator()
{
Expand Down Expand Up @@ -331,19 +292,6 @@ public DynamicFilterConfig setLargePartitionedMaxSizePerDriver(DataSize largePar
return this;
}

@Min(0)
public int getLargePartitionedRangeRowLimitPerDriver()
{
return largePartitionedRangeRowLimitPerDriver;
}

@Config("dynamic-filtering.large-partitioned.range-row-limit-per-driver")
public DynamicFilterConfig setLargePartitionedRangeRowLimitPerDriver(int largePartitionedRangeRowLimitPerDriver)
{
this.largePartitionedRangeRowLimitPerDriver = largePartitionedRangeRowLimitPerDriver;
return this;
}

@MaxDataSize("50MB")
public DataSize getLargePartitionedMaxSizePerOperator()
{
Expand All @@ -370,4 +318,28 @@ public DynamicFilterConfig setLargeMaxSizePerFilter(DataSize largeMaxSizePerFilt
this.largeMaxSizePerFilter = largeMaxSizePerFilter;
return this;
}

public int getBloomFilterMaxDistinctValuesPerDriver()
{
return bloomFilterMaxDistinctValuesPerDriver;
}

@Config("dynamic-filtering.bloom-filter.max-distinct-values-per-driver")
public DynamicFilterConfig setBloomFilterMaxDistinctValuesPerDriver(int bloomFilterMaxDistinctValuesPerDriver)
{
this.bloomFilterMaxDistinctValuesPerDriver = bloomFilterMaxDistinctValuesPerDriver;
return this;
}

public int getPartitionedBloomFilterMaxDistinctValuesPerDriver()
{
return partitionedBloomFilterMaxDistinctValuesPerDriver;
}

@Config("dynamic-filtering.partitioned-bloom-filter.max-distinct-values-per-driver")
public DynamicFilterConfig setPartitionedBloomFilterMaxDistinctValuesPerDriver(int partitionedBloomFilterMaxDistinctValuesPerDriver)
{
this.partitionedBloomFilterMaxDistinctValuesPerDriver = partitionedBloomFilterMaxDistinctValuesPerDriver;
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableMap;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.trino.spi.predicate.Domain;
import io.trino.sql.planner.DynamicFilterDomain;
import io.trino.sql.planner.plan.DynamicFilterId;

import java.util.HashMap;
Expand All @@ -44,15 +44,15 @@ public DynamicFiltersCollector(Runnable notifyTaskStatusChanged)
this.notifyTaskStatusChanged = requireNonNull(notifyTaskStatusChanged, "notifyTaskStatusChanged is null");
}

public void updateDomains(Map<DynamicFilterId, Domain> newDynamicFilterDomains)
public void updateDomains(Map<DynamicFilterId, DynamicFilterDomain> newDynamicFilterDomains)
{
if (newDynamicFilterDomains.isEmpty()) {
return;
}

synchronized (this) {
long currentVersion = ++this.currentVersion;
for (Map.Entry<DynamicFilterId, Domain> entry : newDynamicFilterDomains.entrySet()) {
for (Map.Entry<DynamicFilterId, DynamicFilterDomain> entry : newDynamicFilterDomains.entrySet()) {
dynamicFilterDomains.merge(
entry.getKey(),
new VersionedDomain(currentVersion, entry.getValue()),
Expand Down Expand Up @@ -95,10 +95,10 @@ public synchronized VersionedDynamicFilterDomains getCurrentDynamicFilterDomains
public static class VersionedDynamicFilterDomains
{
private final long version;
private final Map<DynamicFilterId, Domain> dynamicFilterDomains;
private final Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains;

@JsonCreator
public VersionedDynamicFilterDomains(long version, Map<DynamicFilterId, Domain> dynamicFilterDomains)
public VersionedDynamicFilterDomains(long version, Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains)
{
this.version = version;
this.dynamicFilterDomains = ImmutableMap.copyOf(requireNonNull(dynamicFilterDomains, "dynamicFilterDomains is null"));
Expand All @@ -111,7 +111,7 @@ public long getVersion()
}

@JsonProperty
public Map<DynamicFilterId, Domain> getDynamicFilterDomains()
public Map<DynamicFilterId, DynamicFilterDomain> getDynamicFilterDomains()
{
return dynamicFilterDomains;
}
Expand All @@ -120,9 +120,9 @@ public Map<DynamicFilterId, Domain> getDynamicFilterDomains()
private static class VersionedDomain
{
private final long version;
private final Domain domain;
private final DynamicFilterDomain domain;

private VersionedDomain(long version, Domain domain)
private VersionedDomain(long version, DynamicFilterDomain domain)
{
this.version = version;
this.domain = requireNonNull(domain, "domain is null");
Expand All @@ -133,7 +133,7 @@ public long getVersion()
return version;
}

public Domain getDomain()
public DynamicFilterDomain getDomain()
{
return domain;
}
Expand Down
4 changes: 2 additions & 2 deletions core/trino-main/src/main/java/io/trino/execution/SqlTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import io.trino.operator.TaskContext;
import io.trino.operator.TaskStats;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.predicate.Domain;
import io.trino.sql.planner.DynamicFilterDomain;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.DynamicFilterId;
import io.trino.sql.planner.plan.PlanNodeId;
Expand Down Expand Up @@ -490,7 +490,7 @@ public TaskInfo updateTask(
Optional<PlanFragment> fragment,
List<SplitAssignment> splitAssignments,
OutputBuffers outputBuffers,
Map<DynamicFilterId, Domain> dynamicFilterDomains,
Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains,
boolean speculative)
{
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@
import io.trino.spi.VersionEmbedder;
import io.trino.spi.catalog.CatalogProperties;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.predicate.Domain;
import io.trino.spiller.LocalSpillManager;
import io.trino.spiller.NodeSpillConfig;
import io.trino.sql.planner.DynamicFilterDomain;
import io.trino.sql.planner.LocalExecutionPlanner;
import io.trino.sql.planner.PlanFragment;
import io.trino.sql.planner.plan.DynamicFilterId;
Expand Down Expand Up @@ -500,7 +500,7 @@ public TaskInfo updateTask(
Optional<PlanFragment> fragment,
List<SplitAssignment> splitAssignments,
OutputBuffers outputBuffers,
Map<DynamicFilterId, Domain> dynamicFilterDomains,
Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains,
boolean speculative)
{
try {
Expand All @@ -520,7 +520,7 @@ private TaskInfo doUpdateTask(
Optional<PlanFragment> fragment,
List<SplitAssignment> splitAssignments,
OutputBuffers outputBuffers,
Map<DynamicFilterId, Domain> dynamicFilterDomains,
Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains,
boolean speculative)
{
requireNonNull(session, "session is null");
Expand Down
Loading
Loading