Skip to content

Commit 216c41b

Browse files
committed
Use bloom filters to collect large DFs
BenchmarkDynamicPageFilter.filterPages (filterSize) (inputDataSet) (inputNullChance) (nonNullsSelectivity) (nullsAllowed) Mode Cnt Score using fastutil set Score using bloom filter 1000 INT64_RANDOM 0.05 0.2 false thrpt 20 30.282 ± 0.792 ops/s 65.017 ± 0.566 ops/s 10000 INT64_RANDOM 0.05 0.2 false thrpt 20 33.799 ± 0.511 ops/s 63.218 ± 1.783 ops/s 100000 INT64_RANDOM 0.05 0.2 false thrpt 20 29.464 ± 0.469 ops/s 63.482 ± 1.626 ops/s 1000000 INT64_RANDOM 0.05 0.2 false thrpt 20 18.854 ± 0.558 ops/s 63.690 ± 1.662 ops/s BenchmarkDynamicFilterSourceOperator.dynamicFilterCollect, maxDistinctValuesCount = 600572 Collection type (positionsPerPage) Mode Cnt Score Error Units Hash set 4096 avgt 45 39.950 ± 0.281 ms/op Bloom filter 4096 avgt 45 10.297 ± 0.065 ms/op Min-max 4096 avgt 45 5.845 ± 0.038 ms/op no-op 4096 avgt 45 0.075 ± 0.001 ms/op BenchmarkDynamicFilterSourceOperator.dynamicFilterCollect, maxDistinctValuesCount = 6001215 Collection type (positionsPerPage) Mode Cnt Score Error Units Hash set 4096 avgt 45 590.042 ± 22.009 ms/op Bloom filter 4096 avgt 45 98.025 ± 0.750 ms/op Min-max 4096 avgt 45 61.982 ± 6.330 ms/op no-op 4096 avgt 45 0.092 ± 0.001 ms/op
1 parent 4c32d11 commit 216c41b

File tree

55 files changed

+2662
-693
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+2662
-693
lines changed

Diff for: core/trino-main/src/main/java/io/trino/execution/DynamicFilterConfig.java

+30-58
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@
3737
"experimental.dynamic-filtering-refresh-interval",
3838
"experimental.enable-dynamic-filtering",
3939
"enable-coordinator-dynamic-filters-distribution",
40+
"dynamic-filtering.small-partitioned.range-row-limit-per-driver",
41+
"dynamic-filtering.large-partitioned.range-row-limit-per-driver",
42+
"dynamic-filtering.small-broadcast.range-row-limit-per-driver",
43+
"dynamic-filtering.large-broadcast.range-row-limit-per-driver",
4044
})
4145
public class DynamicFilterConfig
4246
{
@@ -59,23 +63,21 @@ public class DynamicFilterConfig
5963
*/
6064
private int smallMaxDistinctValuesPerDriver = 1_000;
6165
private DataSize smallMaxSizePerDriver = DataSize.of(100, KILOBYTE);
62-
private int smallRangeRowLimitPerDriver = 2_000;
6366
private DataSize smallMaxSizePerOperator = DataSize.of(1, MEGABYTE);
6467
private int smallPartitionedMaxDistinctValuesPerDriver = 100;
6568
private DataSize smallPartitionedMaxSizePerDriver = DataSize.of(50, KILOBYTE);
66-
private int smallPartitionedRangeRowLimitPerDriver = 500;
6769
private DataSize smallPartitionedMaxSizePerOperator = DataSize.of(500, KILOBYTE);
6870
private DataSize smallMaxSizePerFilter = DataSize.of(5, MEGABYTE);
6971

7072
private int largeMaxDistinctValuesPerDriver = 50_000;
7173
private DataSize largeMaxSizePerDriver = DataSize.of(4, MEGABYTE);
72-
private int largeRangeRowLimitPerDriver = 100_000;
7374
private DataSize largeMaxSizePerOperator = DataSize.of(5, MEGABYTE);
7475
private int largePartitionedMaxDistinctValuesPerDriver = 20_000;
7576
private DataSize largePartitionedMaxSizePerDriver = DataSize.of(200, KILOBYTE);
76-
private int largePartitionedRangeRowLimitPerDriver = 30_000;
7777
private DataSize largePartitionedMaxSizePerOperator = DataSize.of(5, MEGABYTE);
7878
private DataSize largeMaxSizePerFilter = DataSize.of(10, MEGABYTE);
79+
private int bloomFilterMaxDistinctValuesPerDriver = 100_000;
80+
private int partitionedBloomFilterMaxDistinctValuesPerDriver = 25_000;
7981

8082
public boolean isEnableDynamicFiltering()
8183
{
@@ -157,20 +159,6 @@ public DynamicFilterConfig setSmallMaxSizePerDriver(DataSize smallMaxSizePerDriv
157159
return this;
158160
}
159161

160-
@Min(0)
161-
public int getSmallRangeRowLimitPerDriver()
162-
{
163-
return smallRangeRowLimitPerDriver;
164-
}
165-
166-
@LegacyConfig("dynamic-filtering.small-broadcast.range-row-limit-per-driver")
167-
@Config("dynamic-filtering.small.range-row-limit-per-driver")
168-
public DynamicFilterConfig setSmallRangeRowLimitPerDriver(int smallRangeRowLimitPerDriver)
169-
{
170-
this.smallRangeRowLimitPerDriver = smallRangeRowLimitPerDriver;
171-
return this;
172-
}
173-
174162
@MaxDataSize("10MB")
175163
public DataSize getSmallMaxSizePerOperator()
176164
{
@@ -211,19 +199,6 @@ public DynamicFilterConfig setSmallPartitionedMaxSizePerDriver(DataSize smallPar
211199
return this;
212200
}
213201

214-
@Min(0)
215-
public int getSmallPartitionedRangeRowLimitPerDriver()
216-
{
217-
return smallPartitionedRangeRowLimitPerDriver;
218-
}
219-
220-
@Config("dynamic-filtering.small-partitioned.range-row-limit-per-driver")
221-
public DynamicFilterConfig setSmallPartitionedRangeRowLimitPerDriver(int smallPartitionedRangeRowLimitPerDriver)
222-
{
223-
this.smallPartitionedRangeRowLimitPerDriver = smallPartitionedRangeRowLimitPerDriver;
224-
return this;
225-
}
226-
227202
@MaxDataSize("10MB")
228203
public DataSize getSmallPartitionedMaxSizePerOperator()
229204
{
@@ -278,20 +253,6 @@ public DynamicFilterConfig setLargeMaxSizePerDriver(DataSize largeMaxSizePerDriv
278253
return this;
279254
}
280255

281-
@Min(0)
282-
public int getLargeRangeRowLimitPerDriver()
283-
{
284-
return largeRangeRowLimitPerDriver;
285-
}
286-
287-
@LegacyConfig("dynamic-filtering.large-broadcast.range-row-limit-per-driver")
288-
@Config("dynamic-filtering.large.range-row-limit-per-driver")
289-
public DynamicFilterConfig setLargeRangeRowLimitPerDriver(int largeRangeRowLimitPerDriver)
290-
{
291-
this.largeRangeRowLimitPerDriver = largeRangeRowLimitPerDriver;
292-
return this;
293-
}
294-
295256
@MaxDataSize("100MB")
296257
public DataSize getLargeMaxSizePerOperator()
297258
{
@@ -331,19 +292,6 @@ public DynamicFilterConfig setLargePartitionedMaxSizePerDriver(DataSize largePar
331292
return this;
332293
}
333294

334-
@Min(0)
335-
public int getLargePartitionedRangeRowLimitPerDriver()
336-
{
337-
return largePartitionedRangeRowLimitPerDriver;
338-
}
339-
340-
@Config("dynamic-filtering.large-partitioned.range-row-limit-per-driver")
341-
public DynamicFilterConfig setLargePartitionedRangeRowLimitPerDriver(int largePartitionedRangeRowLimitPerDriver)
342-
{
343-
this.largePartitionedRangeRowLimitPerDriver = largePartitionedRangeRowLimitPerDriver;
344-
return this;
345-
}
346-
347295
@MaxDataSize("50MB")
348296
public DataSize getLargePartitionedMaxSizePerOperator()
349297
{
@@ -370,4 +318,28 @@ public DynamicFilterConfig setLargeMaxSizePerFilter(DataSize largeMaxSizePerFilt
370318
this.largeMaxSizePerFilter = largeMaxSizePerFilter;
371319
return this;
372320
}
321+
322+
public int getBloomFilterMaxDistinctValuesPerDriver()
323+
{
324+
return bloomFilterMaxDistinctValuesPerDriver;
325+
}
326+
327+
@Config("dynamic-filtering.bloom-filter.max-distinct-values-per-driver")
328+
public DynamicFilterConfig setBloomFilterMaxDistinctValuesPerDriver(int bloomFilterMaxDistinctValuesPerDriver)
329+
{
330+
this.bloomFilterMaxDistinctValuesPerDriver = bloomFilterMaxDistinctValuesPerDriver;
331+
return this;
332+
}
333+
334+
public int getPartitionedBloomFilterMaxDistinctValuesPerDriver()
335+
{
336+
return partitionedBloomFilterMaxDistinctValuesPerDriver;
337+
}
338+
339+
@Config("dynamic-filtering.partitioned-bloom-filter.max-distinct-values-per-driver")
340+
public DynamicFilterConfig setPartitionedBloomFilterMaxDistinctValuesPerDriver(int partitionedBloomFilterMaxDistinctValuesPerDriver)
341+
{
342+
this.partitionedBloomFilterMaxDistinctValuesPerDriver = partitionedBloomFilterMaxDistinctValuesPerDriver;
343+
return this;
344+
}
373345
}

Diff for: core/trino-main/src/main/java/io/trino/execution/DynamicFiltersCollector.java

+9-9
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import com.fasterxml.jackson.annotation.JsonProperty;
1818
import com.google.common.collect.ImmutableMap;
1919
import com.google.errorprone.annotations.concurrent.GuardedBy;
20-
import io.trino.spi.predicate.Domain;
20+
import io.trino.sql.planner.DynamicFilterDomain;
2121
import io.trino.sql.planner.plan.DynamicFilterId;
2222

2323
import java.util.HashMap;
@@ -44,15 +44,15 @@ public DynamicFiltersCollector(Runnable notifyTaskStatusChanged)
4444
this.notifyTaskStatusChanged = requireNonNull(notifyTaskStatusChanged, "notifyTaskStatusChanged is null");
4545
}
4646

47-
public void updateDomains(Map<DynamicFilterId, Domain> newDynamicFilterDomains)
47+
public void updateDomains(Map<DynamicFilterId, DynamicFilterDomain> newDynamicFilterDomains)
4848
{
4949
if (newDynamicFilterDomains.isEmpty()) {
5050
return;
5151
}
5252

5353
synchronized (this) {
5454
long currentVersion = ++this.currentVersion;
55-
for (Map.Entry<DynamicFilterId, Domain> entry : newDynamicFilterDomains.entrySet()) {
55+
for (Map.Entry<DynamicFilterId, DynamicFilterDomain> entry : newDynamicFilterDomains.entrySet()) {
5656
dynamicFilterDomains.merge(
5757
entry.getKey(),
5858
new VersionedDomain(currentVersion, entry.getValue()),
@@ -95,10 +95,10 @@ public synchronized VersionedDynamicFilterDomains getCurrentDynamicFilterDomains
9595
public static class VersionedDynamicFilterDomains
9696
{
9797
private final long version;
98-
private final Map<DynamicFilterId, Domain> dynamicFilterDomains;
98+
private final Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains;
9999

100100
@JsonCreator
101-
public VersionedDynamicFilterDomains(long version, Map<DynamicFilterId, Domain> dynamicFilterDomains)
101+
public VersionedDynamicFilterDomains(long version, Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains)
102102
{
103103
this.version = version;
104104
this.dynamicFilterDomains = ImmutableMap.copyOf(requireNonNull(dynamicFilterDomains, "dynamicFilterDomains is null"));
@@ -111,7 +111,7 @@ public long getVersion()
111111
}
112112

113113
@JsonProperty
114-
public Map<DynamicFilterId, Domain> getDynamicFilterDomains()
114+
public Map<DynamicFilterId, DynamicFilterDomain> getDynamicFilterDomains()
115115
{
116116
return dynamicFilterDomains;
117117
}
@@ -120,9 +120,9 @@ public Map<DynamicFilterId, Domain> getDynamicFilterDomains()
120120
private static class VersionedDomain
121121
{
122122
private final long version;
123-
private final Domain domain;
123+
private final DynamicFilterDomain domain;
124124

125-
private VersionedDomain(long version, Domain domain)
125+
private VersionedDomain(long version, DynamicFilterDomain domain)
126126
{
127127
this.version = version;
128128
this.domain = requireNonNull(domain, "domain is null");
@@ -133,7 +133,7 @@ public long getVersion()
133133
return version;
134134
}
135135

136-
public Domain getDomain()
136+
public DynamicFilterDomain getDomain()
137137
{
138138
return domain;
139139
}

Diff for: core/trino-main/src/main/java/io/trino/execution/SqlTask.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
import io.trino.operator.TaskContext;
4343
import io.trino.operator.TaskStats;
4444
import io.trino.spi.connector.CatalogHandle;
45-
import io.trino.spi.predicate.Domain;
45+
import io.trino.sql.planner.DynamicFilterDomain;
4646
import io.trino.sql.planner.PlanFragment;
4747
import io.trino.sql.planner.plan.DynamicFilterId;
4848
import io.trino.sql.planner.plan.PlanNodeId;
@@ -490,7 +490,7 @@ public TaskInfo updateTask(
490490
Optional<PlanFragment> fragment,
491491
List<SplitAssignment> splitAssignments,
492492
OutputBuffers outputBuffers,
493-
Map<DynamicFilterId, Domain> dynamicFilterDomains,
493+
Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains,
494494
boolean speculative)
495495
{
496496
try {

Diff for: core/trino-main/src/main/java/io/trino/execution/SqlTaskManager.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,9 @@
5454
import io.trino.spi.VersionEmbedder;
5555
import io.trino.spi.catalog.CatalogProperties;
5656
import io.trino.spi.connector.CatalogHandle;
57-
import io.trino.spi.predicate.Domain;
5857
import io.trino.spiller.LocalSpillManager;
5958
import io.trino.spiller.NodeSpillConfig;
59+
import io.trino.sql.planner.DynamicFilterDomain;
6060
import io.trino.sql.planner.LocalExecutionPlanner;
6161
import io.trino.sql.planner.PlanFragment;
6262
import io.trino.sql.planner.plan.DynamicFilterId;
@@ -500,7 +500,7 @@ public TaskInfo updateTask(
500500
Optional<PlanFragment> fragment,
501501
List<SplitAssignment> splitAssignments,
502502
OutputBuffers outputBuffers,
503-
Map<DynamicFilterId, Domain> dynamicFilterDomains,
503+
Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains,
504504
boolean speculative)
505505
{
506506
try {
@@ -520,7 +520,7 @@ private TaskInfo doUpdateTask(
520520
Optional<PlanFragment> fragment,
521521
List<SplitAssignment> splitAssignments,
522522
OutputBuffers outputBuffers,
523-
Map<DynamicFilterId, Domain> dynamicFilterDomains,
523+
Map<DynamicFilterId, DynamicFilterDomain> dynamicFilterDomains,
524524
boolean speculative)
525525
{
526526
requireNonNull(session, "session is null");

0 commit comments

Comments
 (0)