Skip to content

Commit c3f150b

Browse files
committed
Add 449 docs
1 parent 9f9f50b commit c3f150b

File tree

1,300 files changed

+1230816
-2
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

1,300 files changed

+1230816
-2
lines changed

449/_images/functions_color_bar.png

48.1 KB
Loading

449/_sources/admin.md.txt

+28
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# Administration
2+
3+
```{toctree}
4+
:maxdepth: 1
5+
6+
admin/web-interface
7+
admin/tuning
8+
admin/jmx
9+
admin/opentelemetry
10+
admin/properties
11+
admin/spill
12+
admin/resource-groups
13+
admin/session-property-managers
14+
admin/dist-sort
15+
admin/dynamic-filtering
16+
admin/graceful-shutdown
17+
admin/fault-tolerant-execution
18+
```
19+
20+
## Event listeners
21+
22+
```{toctree}
23+
:titlesonly: true
24+
25+
admin/event-listeners-http
26+
admin/event-listeners-mysql
27+
admin/event-listeners-openlineage
28+
```

449/_sources/admin/dist-sort.md.txt

+15
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# Distributed sort
2+
3+
Distributed sort allows to sort data, which exceeds `query.max-memory-per-node`.
4+
Distributed sort is enabled via the `distributed_sort` session property, or
5+
`distributed-sort` configuration property set in
6+
`etc/config.properties` of the coordinator. Distributed sort is enabled by
7+
default.
8+
9+
When distributed sort is enabled, the sort operator executes in parallel on multiple
10+
nodes in the cluster. Partially sorted data from each Trino worker node is then streamed
11+
to a single worker node for a final merge. This technique allows to utilize memory of multiple
12+
Trino worker nodes for sorting. The primary purpose of distributed sort is to allow for sorting
13+
of data sets which don't normally fit into single node memory. Performance improvement
14+
can be expected, but it won't scale linearly with the number of nodes, since the
15+
data needs to be merged by a single node.
+264
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,264 @@
1+
# Dynamic filtering
2+
3+
Dynamic filtering optimizations significantly improve the performance of queries
4+
with selective joins by avoiding reading of data that would be filtered by join condition.
5+
6+
Consider the following query which captures a common pattern of a fact table `store_sales`
7+
joined with a filtered dimension table `date_dim`:
8+
9+
> SELECT count(\*)
10+
> FROM store_sales
11+
> JOIN date_dim ON store_sales.ss_sold_date_sk = date_dim.d_date_sk
12+
> WHERE d_following_holiday='Y' AND d_year = 2000;
13+
14+
Without dynamic filtering, Trino pushes predicates for the dimension table to the
15+
table scan on `date_dim`, and it scans all the data in the fact table since there
16+
are no filters on `store_sales` in the query. The join operator ends up throwing away
17+
most of the probe-side rows as the join criteria is highly selective.
18+
19+
When dynamic filtering is enabled, Trino collects candidate values for join condition
20+
from the processed dimension table on the right side of join. In the case of broadcast joins,
21+
the runtime predicates generated from this collection are pushed into the local table scan
22+
on the left side of the join running on the same worker.
23+
24+
Additionally, these runtime predicates are communicated to the coordinator over the network
25+
so that dynamic filtering can also be performed on the coordinator during enumeration of
26+
table scan splits.
27+
28+
For example, in the case of the Hive connector, dynamic filters are used
29+
to skip loading of partitions which don't match the join criteria.
30+
This is known as **dynamic partition pruning**.
31+
32+
After completing the collection of dynamic filters, the coordinator also distributes them
33+
to worker nodes over the network for partitioned joins. This allows push down of dynamic
34+
filters from partitioned joins into the table scans on the left side of that join.
35+
Distribution of dynamic filters from the coordinator to workers is enabled by default.
36+
It can be disabled by setting either the `enable-coordinator-dynamic-filters-distribution`
37+
configuration property, or the session property
38+
`enable_coordinator_dynamic_filters_distribution` to `false`.
39+
40+
The results of dynamic filtering optimization can include the following benefits:
41+
42+
- improved overall query performance
43+
- reduced network traffic between Trino and the data source
44+
- reduced load on the remote data source
45+
46+
Dynamic filtering is enabled by default. It can be disabled by setting either the
47+
`enable-dynamic-filtering` configuration property, or the session property
48+
`enable_dynamic_filtering` to `false`.
49+
50+
Support for push down of dynamic filters is specific to each connector,
51+
and the relevant underlying database or storage system. The documentation for
52+
specific connectors with support for dynamic filtering includes further details,
53+
for example the {ref}`Hive connector <hive-dynamic-filtering>`
54+
or the {ref}`Memory connector <memory-dynamic-filtering>`.
55+
56+
## Analysis and confirmation
57+
58+
Dynamic filtering depends on a number of factors:
59+
60+
- Planner support for dynamic filtering for a given join operation in Trino.
61+
Currently inner and right joins with `=`, `<`, `<=`, `>`, `>=` or
62+
`IS NOT DISTINCT FROM` join conditions, and
63+
semi-joins with `IN` conditions are supported.
64+
- Connector support for utilizing dynamic filters pushed into the table scan at runtime.
65+
For example, the Hive connector can push dynamic filters into ORC and Parquet readers
66+
to perform stripe or row-group pruning.
67+
- Connector support for utilizing dynamic filters at the splits enumeration stage.
68+
- Size of right (build) side of the join.
69+
70+
You can take a closer look at the {doc}`EXPLAIN plan </sql/explain>` of the query
71+
to analyze if the planner is adding dynamic filters to a specific query's plan.
72+
For example, the explain plan for the above query can be obtained by running
73+
the following statement:
74+
75+
```
76+
EXPLAIN
77+
SELECT count(*)
78+
FROM store_sales
79+
JOIN date_dim ON store_sales.ss_sold_date_sk = date_dim.d_date_sk
80+
WHERE d_following_holiday='Y' AND d_year = 2000;
81+
```
82+
83+
The explain plan for this query shows `dynamicFilterAssignments` in the
84+
`InnerJoin` node with dynamic filter `df_370` collected from build symbol `d_date_sk`.
85+
You can also see the `dynamicFilter` predicate as part of the Hive `ScanFilterProject`
86+
operator where `df_370` is associated with probe symbol `ss_sold_date_sk`.
87+
This shows you that the planner is successful in pushing dynamic filters
88+
down to the connector in the query plan.
89+
90+
```text
91+
...
92+
93+
Fragment 1 [SOURCE]
94+
Output layout: [count_3]
95+
Output partitioning: SINGLE []
96+
Aggregate(PARTIAL)
97+
│ Layout: [count_3:bigint]
98+
│ count_3 := count(*)
99+
└─ InnerJoin[(""ss_sold_date_sk"" = ""d_date_sk"")][$hashvalue, $hashvalue_4]
100+
│ Layout: []
101+
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
102+
│ Distribution: REPLICATED
103+
│ dynamicFilterAssignments = {d_date_sk -> #df_370}
104+
├─ ScanFilterProject[table = hive:default:store_sales, grouped = false, filterPredicate = true, dynamicFilters = {""ss_sold_date_sk"" = #df_370}]
105+
│ Layout: [ss_sold_date_sk:bigint, $hashvalue:bigint]
106+
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
107+
│ $hashvalue := combine_hash(bigint '0', COALESCE(""$operator$hash_code""(""ss_sold_date_sk""), 0))
108+
│ ss_sold_date_sk := ss_sold_date_sk:bigint:REGULAR
109+
└─ LocalExchange[HASH][$hashvalue_4] (""d_date_sk"")
110+
│ Layout: [d_date_sk:bigint, $hashvalue_4:bigint]
111+
│ Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
112+
└─ RemoteSource[2]
113+
Layout: [d_date_sk:bigint, $hashvalue_5:bigint]
114+
115+
Fragment 2 [SOURCE]
116+
Output layout: [d_date_sk, $hashvalue_6]
117+
Output partitioning: BROADCAST []
118+
ScanFilterProject[table = hive:default:date_dim, grouped = false, filterPredicate = ((""d_following_holiday"" = CAST('Y' AS char(1))) AND (""d_year"" = 2000))]
119+
Layout: [d_date_sk:bigint, $hashvalue_6:bigint]
120+
Estimates: {rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}/{rows: 0 (0B), cpu: 0, memory: 0B, network: 0B}
121+
$hashvalue_6 := combine_hash(bigint '0', COALESCE(""$operator$hash_code""(""d_date_sk""), 0))
122+
d_following_holiday := d_following_holiday:char(1):REGULAR
123+
d_date_sk := d_date_sk:bigint:REGULAR
124+
d_year := d_year:int:REGULAR
125+
```
126+
127+
During execution of a query with dynamic filters, Trino populates statistics
128+
about dynamic filters in the QueryInfo JSON available through the
129+
{doc}`/admin/web-interface`.
130+
In the `queryStats` section, statistics about dynamic filters collected
131+
by the coordinator can be found in the `dynamicFiltersStats` structure.
132+
133+
```text
134+
"dynamicFiltersStats" : {
135+
"dynamicFilterDomainStats" : [ {
136+
"dynamicFilterId" : "df_370",
137+
"simplifiedDomain" : "[ SortedRangeSet[type=bigint, ranges=3, {[2451546], ..., [2451905]}] ]",
138+
"collectionDuration" : "2.34s"
139+
} ],
140+
"lazyDynamicFilters" : 1,
141+
"replicatedDynamicFilters" : 1,
142+
"totalDynamicFilters" : 1,
143+
"dynamicFiltersCompleted" : 1
144+
}
145+
```
146+
147+
Push down of dynamic filters into a table scan on the worker nodes can be
148+
verified by looking at the operator statistics for that table scan.
149+
`dynamicFilterSplitsProcessed` records the number of splits
150+
processed after a dynamic filter is pushed down to the table scan.
151+
152+
```text
153+
"operatorType" : "ScanFilterAndProjectOperator",
154+
"totalDrivers" : 1,
155+
"addInputCalls" : 762,
156+
"addInputWall" : "0.00ns",
157+
"addInputCpu" : "0.00ns",
158+
"physicalInputDataSize" : "0B",
159+
"physicalInputPositions" : 28800991,
160+
"inputPositions" : 28800991,
161+
"dynamicFilterSplitsProcessed" : 1,
162+
```
163+
164+
Dynamic filters are reported as a part of the
165+
{doc}`EXPLAIN ANALYZE plan </sql/explain-analyze>` in the statistics for
166+
`ScanFilterProject` nodes.
167+
168+
```text
169+
...
170+
171+
└─ InnerJoin[("ss_sold_date_sk" = "d_date_sk")][$hashvalue, $hashvalue_4]
172+
│ Layout: []
173+
│ Estimates: {rows: 11859 (0B), cpu: 8.84M, memory: 3.19kB, network: 3.19kB}
174+
│ CPU: 78.00ms (30.00%), Scheduled: 295.00ms (47.05%), Output: 296 rows (0B)
175+
│ Left (probe) Input avg.: 120527.00 rows, Input std.dev.: 0.00%
176+
│ Right (build) Input avg.: 0.19 rows, Input std.dev.: 208.17%
177+
│ Distribution: REPLICATED
178+
│ dynamicFilterAssignments = {d_date_sk -> #df_370}
179+
├─ ScanFilterProject[table = hive:default:store_sales, grouped = false, filterPredicate = true, dynamicFilters = {"ss_sold_date_sk" = #df_370}]
180+
│ Layout: [ss_sold_date_sk:bigint, $hashvalue:bigint]
181+
│ Estimates: {rows: 120527 (2.03MB), cpu: 1017.64k, memory: 0B, network: 0B}/{rows: 120527 (2.03MB), cpu: 1.99M, memory: 0B, network: 0B}/{rows: 120527 (2.03MB), cpu: 4.02M, memory: 0B, network: 0B}
182+
│ CPU: 49.00ms (18.85%), Scheduled: 123.00ms (19.62%), Output: 120527 rows (2.07MB)
183+
│ Input avg.: 120527.00 rows, Input std.dev.: 0.00%
184+
│ $hashvalue := combine_hash(bigint '0', COALESCE("$operator$hash_code"("ss_sold_date_sk"), 0))
185+
│ ss_sold_date_sk := ss_sold_date_sk:bigint:REGULAR
186+
│ Input: 120527 rows (1.03MB), Filtered: 0.00%
187+
│ Dynamic filters:
188+
│ - df_370, [ SortedRangeSet[type=bigint, ranges=3, {[2451546], ..., [2451905]}] ], collection time=2.34s
189+
|
190+
...
191+
```
192+
193+
## Dynamic filter collection thresholds
194+
195+
In order for dynamic filtering to work, the smaller dimension table
196+
needs to be chosen as a join’s build side. The cost-based optimizer can automatically
197+
do this using table statistics provided by connectors. Therefore, it is recommended
198+
to keep {doc}`table statistics </optimizer/statistics>` up to date and rely on the
199+
CBO to correctly choose the smaller table on the build side of join.
200+
201+
Collection of values of the join key columns from the build side for
202+
dynamic filtering may incur additional CPU overhead during query execution.
203+
Therefore, to limit the overhead of collecting dynamic filters
204+
to the cases where the join operator is likely to be selective,
205+
Trino defines thresholds on the size of dynamic filters collected from build side tasks.
206+
Collection of dynamic filters for joins with large build sides can be enabled
207+
using the `enable-large-dynamic-filters` configuration property or the
208+
`enable_large_dynamic_filters` session property.
209+
210+
When large dynamic filters are enabled, limits on the size of dynamic filters can
211+
be configured using the configuration properties
212+
`dynamic-filtering.large.max-distinct-values-per-driver`,
213+
`dynamic-filtering.large.max-size-per-driver` ,
214+
`dynamic-filtering.large.range-row-limit-per-driver`,
215+
`dynamic-filtering.large-partitioned.max-distinct-values-per-driver`,
216+
`dynamic-filtering.large-partitioned.max-size-per-driver` and
217+
`dynamic-filtering.large-partitioned.range-row-limit-per-driver`.
218+
219+
Similarly, limits for dynamic filters when `enable-large-dynamic-filters`
220+
is not enabled can be configured using configuration properties like
221+
`dynamic-filtering.small.max-distinct-values-per-driver`,
222+
`dynamic-filtering.small.max-size-per-driver` ,
223+
`dynamic-filtering.small.range-row-limit-per-driver`,
224+
`dynamic-filtering.small-partitioned.max-distinct-values-per-driver`,
225+
`dynamic-filtering.small-partitioned.max-size-per-driver` and
226+
`dynamic-filtering.small-partitioned.range-row-limit-per-driver`.
227+
228+
The `dynamic-filtering.large.*` and `dynamic-filtering.small.*` limits are applied
229+
when dynamic filters are collected before build side is partitioned on join
230+
keys (when broadcast join is chosen or when fault tolerant execution is enabled). The
231+
`dynamic-filtering.large-partitioned.*` and `dynamic-filtering.small-partitioned.*`
232+
limits are applied when dynamic filters are collected after build side is partitioned
233+
on join keys (when partitioned join is chosen and fault tolerant execution is disabled).
234+
235+
The properties based on `max-distinct-values-per-driver` and `max-size-per-driver`
236+
define thresholds for the size up to which dynamic filters are collected in a
237+
distinct values data structure. When the build side exceeds these thresholds,
238+
Trino switches to collecting min and max values per column to reduce overhead.
239+
This min-max filter has much lower granularity than the distinct values filter.
240+
However, it may still be beneficial in filtering some data from the probe side,
241+
especially when a range of values is selected from the build side of the join.
242+
The limits for min-max filters collection are defined by the properties
243+
based on `range-row-limit-per-driver`.
244+
245+
## Dimension tables layout
246+
247+
Dynamic filtering works best for dimension tables where
248+
table keys are correlated with columns.
249+
250+
For example, a date dimension key column should be correlated with a date column,
251+
so the table keys monotonically increase with date values.
252+
An address dimension key can be composed of other columns such as
253+
`COUNTRY-STATE-ZIP-ADDRESS_ID` with an example value of `US-NY-10001-1234`.
254+
This usage allows dynamic filtering to succeed even with a large number
255+
of selected rows from the dimension table.
256+
257+
## Limitations
258+
259+
- Min-max dynamic filter collection is not supported for `DOUBLE`, `REAL` and unorderable data types.
260+
- Dynamic filtering is not supported for `DOUBLE` and `REAL` data types when using `IS NOT DISTINCT FROM` predicate.
261+
- Dynamic filtering is supported when the join key contains a cast from the build key type to the
262+
probe key type. Dynamic filtering is also supported in limited scenarios when there is an implicit
263+
cast from the probe key type to the build key type. For example, dynamic filtering is supported when
264+
the build side key is of `DOUBLE` type and the probe side key is of `REAL` or `INTEGER` type.

0 commit comments

Comments
 (0)