Skip to content

Commit 35e139b

Browse files
committed
Use bloom filters to collect large DFs
1 parent d6907ac commit 35e139b

File tree

45 files changed

+1812
-514
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1812
-514
lines changed

Diff for: core/trino-main/src/main/java/io/trino/execution/DynamicFilterConfig.java

+30-58
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@
3737
"experimental.dynamic-filtering-refresh-interval",
3838
"experimental.enable-dynamic-filtering",
3939
"enable-coordinator-dynamic-filters-distribution",
40+
"dynamic-filtering.small-partitioned.range-row-limit-per-driver",
41+
"dynamic-filtering.large-partitioned.range-row-limit-per-driver",
42+
"dynamic-filtering.small-broadcast.range-row-limit-per-driver",
43+
"dynamic-filtering.large-broadcast.range-row-limit-per-driver",
4044
})
4145
public class DynamicFilterConfig
4246
{
@@ -59,23 +63,21 @@ public class DynamicFilterConfig
5963
*/
6064
private int smallMaxDistinctValuesPerDriver = 1_000;
6165
private DataSize smallMaxSizePerDriver = DataSize.of(100, KILOBYTE);
62-
private int smallRangeRowLimitPerDriver = 2_000;
6366
private DataSize smallMaxSizePerOperator = DataSize.of(1, MEGABYTE);
6467
private int smallPartitionedMaxDistinctValuesPerDriver = 100;
6568
private DataSize smallPartitionedMaxSizePerDriver = DataSize.of(50, KILOBYTE);
66-
private int smallPartitionedRangeRowLimitPerDriver = 500;
6769
private DataSize smallPartitionedMaxSizePerOperator = DataSize.of(500, KILOBYTE);
6870
private DataSize smallMaxSizePerFilter = DataSize.of(5, MEGABYTE);
6971

7072
private int largeMaxDistinctValuesPerDriver = 50_000;
7173
private DataSize largeMaxSizePerDriver = DataSize.of(4, MEGABYTE);
72-
private int largeRangeRowLimitPerDriver = 100_000;
7374
private DataSize largeMaxSizePerOperator = DataSize.of(5, MEGABYTE);
7475
private int largePartitionedMaxDistinctValuesPerDriver = 20_000;
7576
private DataSize largePartitionedMaxSizePerDriver = DataSize.of(200, KILOBYTE);
76-
private int largePartitionedRangeRowLimitPerDriver = 30_000;
7777
private DataSize largePartitionedMaxSizePerOperator = DataSize.of(5, MEGABYTE);
7878
private DataSize largeMaxSizePerFilter = DataSize.of(10, MEGABYTE);
79+
private int bloomFilterMaxDistinctValuesPerDriver = 100_000;
80+
private int partitionedBloomFilterMaxDistinctValuesPerDriver = 25_000;
7981

8082
public boolean isEnableDynamicFiltering()
8183
{
@@ -157,20 +159,6 @@ public DynamicFilterConfig setSmallMaxSizePerDriver(DataSize smallMaxSizePerDriv
157159
return this;
158160
}
159161

160-
@Min(0)
161-
public int getSmallRangeRowLimitPerDriver()
162-
{
163-
return smallRangeRowLimitPerDriver;
164-
}
165-
166-
@LegacyConfig("dynamic-filtering.small-broadcast.range-row-limit-per-driver")
167-
@Config("dynamic-filtering.small.range-row-limit-per-driver")
168-
public DynamicFilterConfig setSmallRangeRowLimitPerDriver(int smallRangeRowLimitPerDriver)
169-
{
170-
this.smallRangeRowLimitPerDriver = smallRangeRowLimitPerDriver;
171-
return this;
172-
}
173-
174162
@MaxDataSize("10MB")
175163
public DataSize getSmallMaxSizePerOperator()
176164
{
@@ -211,19 +199,6 @@ public DynamicFilterConfig setSmallPartitionedMaxSizePerDriver(DataSize smallPar
211199
return this;
212200
}
213201

214-
@Min(0)
215-
public int getSmallPartitionedRangeRowLimitPerDriver()
216-
{
217-
return smallPartitionedRangeRowLimitPerDriver;
218-
}
219-
220-
@Config("dynamic-filtering.small-partitioned.range-row-limit-per-driver")
221-
public DynamicFilterConfig setSmallPartitionedRangeRowLimitPerDriver(int smallPartitionedRangeRowLimitPerDriver)
222-
{
223-
this.smallPartitionedRangeRowLimitPerDriver = smallPartitionedRangeRowLimitPerDriver;
224-
return this;
225-
}
226-
227202
@MaxDataSize("10MB")
228203
public DataSize getSmallPartitionedMaxSizePerOperator()
229204
{
@@ -278,20 +253,6 @@ public DynamicFilterConfig setLargeMaxSizePerDriver(DataSize largeMaxSizePerDriv
278253
return this;
279254
}
280255

281-
@Min(0)
282-
public int getLargeRangeRowLimitPerDriver()
283-
{
284-
return largeRangeRowLimitPerDriver;
285-
}
286-
287-
@LegacyConfig("dynamic-filtering.large-broadcast.range-row-limit-per-driver")
288-
@Config("dynamic-filtering.large.range-row-limit-per-driver")
289-
public DynamicFilterConfig setLargeRangeRowLimitPerDriver(int largeRangeRowLimitPerDriver)
290-
{
291-
this.largeRangeRowLimitPerDriver = largeRangeRowLimitPerDriver;
292-
return this;
293-
}
294-
295256
@MaxDataSize("100MB")
296257
public DataSize getLargeMaxSizePerOperator()
297258
{
@@ -331,19 +292,6 @@ public DynamicFilterConfig setLargePartitionedMaxSizePerDriver(DataSize largePar
331292
return this;
332293
}
333294

334-
@Min(0)
335-
public int getLargePartitionedRangeRowLimitPerDriver()
336-
{
337-
return largePartitionedRangeRowLimitPerDriver;
338-
}
339-
340-
@Config("dynamic-filtering.large-partitioned.range-row-limit-per-driver")
341-
public DynamicFilterConfig setLargePartitionedRangeRowLimitPerDriver(int largePartitionedRangeRowLimitPerDriver)
342-
{
343-
this.largePartitionedRangeRowLimitPerDriver = largePartitionedRangeRowLimitPerDriver;
344-
return this;
345-
}
346-
347295
@MaxDataSize("50MB")
348296
public DataSize getLargePartitionedMaxSizePerOperator()
349297
{
@@ -370,4 +318,28 @@ public DynamicFilterConfig setLargeMaxSizePerFilter(DataSize largeMaxSizePerFilt
370318
this.largeMaxSizePerFilter = largeMaxSizePerFilter;
371319
return this;
372320
}
321+
322+
public int getBloomFilterMaxDistinctValuesPerDriver()
323+
{
324+
return bloomFilterMaxDistinctValuesPerDriver;
325+
}
326+
327+
@Config("dynamic-filtering.bloom-filter.max-distinct-values-per-driver")
328+
public DynamicFilterConfig setBloomFilterMaxDistinctValuesPerDriver(int bloomFilterMaxDistinctValuesPerDriver)
329+
{
330+
this.bloomFilterMaxDistinctValuesPerDriver = bloomFilterMaxDistinctValuesPerDriver;
331+
return this;
332+
}
333+
334+
public int getPartitionedBloomFilterMaxDistinctValuesPerDriver()
335+
{
336+
return partitionedBloomFilterMaxDistinctValuesPerDriver;
337+
}
338+
339+
@Config("dynamic-filtering.partitioned-bloom-filter.max-distinct-values-per-driver")
340+
public DynamicFilterConfig setPartitionedBloomFilterMaxDistinctValuesPerDriver(int partitionedBloomFilterMaxDistinctValuesPerDriver)
341+
{
342+
this.partitionedBloomFilterMaxDistinctValuesPerDriver = partitionedBloomFilterMaxDistinctValuesPerDriver;
343+
return this;
344+
}
373345
}

Diff for: core/trino-main/src/main/java/io/trino/execution/DynamicFiltersCollector.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import com.fasterxml.jackson.annotation.JsonProperty;
1818
import com.google.common.collect.ImmutableMap;
1919
import com.google.errorprone.annotations.concurrent.GuardedBy;
20-
import io.trino.spi.predicate.Domain;
20+
import io.trino.sql.planner.DynamicFilterDomain;
2121
import io.trino.sql.planner.plan.DynamicFilterId;
2222

2323
import java.util.HashMap;
@@ -44,15 +44,15 @@ public DynamicFiltersCollector(Runnable notifyTaskStatusChanged)
4444
this.notifyTaskStatusChanged = requireNonNull(notifyTaskStatusChanged, "notifyTaskStatusChanged is null");
4545
}
4646

47-
public void updateDomains(Map<DynamicFilterId, Domain> newDynamicFilterDomains)
47+
public void updateDomains(Map<DynamicFilterId, DynamicFilterDomain> newDynamicFilterDomains)
4848
{
4949
if (newDynamicFilterDomains.isEmpty()) {
5050
return;
5151
}
5252

5353
synchronized (this) {
5454
long currentVersion = ++this.currentVersion;
55-
for (Map.Entry<DynamicFilterId, Domain> entry : newDynamicFilterDomains.entrySet()) {
55+
for (Map.Entry<DynamicFilterId, DynamicFilterDomain> entry : newDynamicFilterDomains.entrySet()) {
5656
dynamicFilterDomains.merge(
5757
entry.getKey(),
5858
new VersionedDomain(currentVersion, entry.getValue()),
@@ -95,10 +95,10 @@ public synchronized VersionedDynamicFilterDomains getCurrentDynamicFilterDomains
9595
public static class VersionedDynamicFilterDomains
9696
{
9797
private final long version;
98-
private final Map<DynamicFilterId, Domain> dynamicFilterDomains;
98+
private final Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains;
9999

100100
@JsonCreator
101-
public VersionedDynamicFilterDomains(long version, Map<DynamicFilterId, Domain> dynamicFilterDomains)
101+
public VersionedDynamicFilterDomains(long version, Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains)
102102
{
103103
this.version = version;
104104
this.dynamicFilterDomains = ImmutableMap.copyOf(requireNonNull(dynamicFilterDomains, "dynamicFilterDomains is null"));
@@ -111,7 +111,7 @@ public long getVersion()
111111
}
112112

113113
@JsonProperty
114-
public Map<DynamicFilterId, Domain> getDynamicFilterDomains()
114+
public Map<DynamicFilterId, DynamicFilterDomain> getDynamicFilterDomains()
115115
{
116116
return dynamicFilterDomains;
117117
}
@@ -120,9 +120,9 @@ public Map<DynamicFilterId, Domain> getDynamicFilterDomains()
120120
private static class VersionedDomain
121121
{
122122
private final long version;
123-
private final Domain domain;
123+
private final DynamicFilterDomain domain;
124124

125-
private VersionedDomain(long version, Domain domain)
125+
private VersionedDomain(long version, DynamicFilterDomain domain)
126126
{
127127
this.version = version;
128128
this.domain = requireNonNull(domain, "domain is null");
@@ -133,7 +133,7 @@ public long getVersion()
133133
return version;
134134
}
135135

136-
public Domain getDomain()
136+
public DynamicFilterDomain getDomain()
137137
{
138138
return domain;
139139
}

Diff for: core/trino-main/src/main/java/io/trino/execution/SqlTask.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import io.trino.operator.TaskContext;
4343
import io.trino.operator.TaskStats;
4444
import io.trino.spi.connector.CatalogHandle;
45-
import io.trino.spi.predicate.Domain;
45+
import io.trino.sql.planner.DynamicFilterDomain;
4646
import io.trino.sql.planner.PlanFragment;
4747
import io.trino.sql.planner.plan.DynamicFilterId;
4848
import io.trino.sql.planner.plan.PlanNodeId;
@@ -490,7 +490,7 @@ public TaskInfo updateTask(
490490
Optional<PlanFragment> fragment,
491491
List<SplitAssignment> splitAssignments,
492492
OutputBuffers outputBuffers,
493-
Map<DynamicFilterId, Domain> dynamicFilterDomains,
493+
Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains,
494494
boolean speculative)
495495
{
496496
try {

Diff for: core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@
5454
import io.trino.spi.VersionEmbedder;
5555
import io.trino.spi.catalog.CatalogProperties;
5656
import io.trino.spi.connector.CatalogHandle;
57-
import io.trino.spi.predicate.Domain;
5857
import io.trino.spiller.LocalSpillManager;
5958
import io.trino.spiller.NodeSpillConfig;
59+
import io.trino.sql.planner.DynamicFilterDomain;
6060
import io.trino.sql.planner.LocalExecutionPlanner;
6161
import io.trino.sql.planner.PlanFragment;
6262
import io.trino.sql.planner.plan.DynamicFilterId;
@@ -500,7 +500,7 @@ public TaskInfo updateTask(
500500
Optional<PlanFragment> fragment,
501501
List<SplitAssignment> splitAssignments,
502502
OutputBuffers outputBuffers,
503-
Map<DynamicFilterId, Domain> dynamicFilterDomains,
503+
Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains,
504504
boolean speculative)
505505
{
506506
try {
@@ -520,7 +520,7 @@ private TaskInfo doUpdateTask(
520520
Optional<PlanFragment> fragment,
521521
List<SplitAssignment> splitAssignments,
522522
OutputBuffers outputBuffers,
523-
Map<DynamicFilterId, Domain> dynamicFilterDomains,
523+
Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains,
524524
boolean speculative)
525525
{
526526
requireNonNull(session, "session is null");

0 commit comments

Comments
 (0)