Skip to content

Commit 9f9431e

Browse files
committed
Remove split SPI backward-compatibility methods
1 parent 58cf613 commit 9f9431e

64 files changed

Lines changed: 672 additions & 983 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

core/trino-main/src/main/java/io/trino/split/SplitManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public SplitSource getSplits(
9090
table.transaction(),
9191
connectorSession,
9292
table.connectorHandle(),
93-
dynamicFilter,
93+
dynamicFilter.getColumnsCovered(),
9494
constraint);
9595
}
9696

core/trino-spi/pom.xml

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,42 @@
250250
<new>method java.util.concurrent.CompletableFuture&lt;java.util.List&lt;io.trino.spi.connector.ConnectorSplit&gt;&gt; io.trino.spi.connector.FixedSplitSource::getNextBatch(int, io.trino.spi.connector.DynamicFilterSnapshot)</new>
251251
<justification>ConnectorSplitBatch wrapper removed — getNextBatch now returns List&lt;ConnectorSplit&gt; directly</justification>
252252
</item>
253+
<item>
254+
<code>java.method.noLongerDefault</code>
255+
<new>method java.util.concurrent.CompletableFuture&lt;java.util.List&lt;io.trino.spi.connector.ConnectorSplit&gt;&gt; io.trino.spi.connector.ConnectorSplitSource::getNextBatch(int, io.trino.spi.connector.DynamicFilterSnapshot)</new>
256+
<justification>Method is now abstract — split sources must implement it</justification>
257+
</item>
258+
<item>
259+
<code>java.method.nowAbstract</code>
260+
<new>method java.util.concurrent.CompletableFuture&lt;java.util.List&lt;io.trino.spi.connector.ConnectorSplit&gt;&gt; io.trino.spi.connector.ConnectorSplitSource::getNextBatch(int, io.trino.spi.connector.DynamicFilterSnapshot)</new>
261+
<justification>Method is now abstract — split sources must implement it</justification>
262+
</item>
263+
<item>
264+
<code>java.method.abstractMethodAddedToInterface</code>
265+
<new>method io.trino.spi.connector.ConnectorSplitSource io.trino.spi.connector.ConnectorSplitManager::getSplits(io.trino.spi.connector.ConnectorTransactionHandle, io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableHandle, java.util.Set&lt;io.trino.spi.connector.ColumnHandle&gt;, io.trino.spi.connector.Constraint)</new>
266+
<justification>Replaces getSplits(DynamicFilter) — connectors must override the new method to receive the dynamic filter column set</justification>
267+
</item>
268+
<item>
269+
<code>java.method.parameterTypeChanged</code>
270+
<old>method io.trino.spi.connector.ConnectorSplitSource io.trino.spi.connector.ConnectorSplitManager::getSplits(io.trino.spi.connector.ConnectorTransactionHandle, io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableHandle, io.trino.spi.connector.DynamicFilter, io.trino.spi.connector.Constraint)</old>
271+
<new>method io.trino.spi.connector.ConnectorSplitSource io.trino.spi.connector.ConnectorSplitManager::getSplits(io.trino.spi.connector.ConnectorTransactionHandle, io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableHandle, java.util.Set&lt;io.trino.spi.connector.ColumnHandle&gt;, io.trino.spi.connector.Constraint)</new>
272+
<justification>DynamicFilter replaced by Set&lt;ColumnHandle&gt; — predicate is now delivered per-batch via getNextBatch(int, DynamicFilterSnapshot)</justification>
273+
</item>
274+
<item>
275+
<code>java.method.noLongerDefault</code>
276+
<new>method io.trino.spi.connector.ConnectorSplitSource io.trino.spi.connector.ConnectorSplitManager::getSplits(io.trino.spi.connector.ConnectorTransactionHandle, io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableHandle, java.util.Set&lt;io.trino.spi.connector.ColumnHandle&gt;, io.trino.spi.connector.Constraint)</new>
277+
<justification>Method is now abstract — connectors must implement it</justification>
278+
</item>
279+
<item>
280+
<code>java.method.nowAbstract</code>
281+
<new>method io.trino.spi.connector.ConnectorSplitSource io.trino.spi.connector.ConnectorSplitManager::getSplits(io.trino.spi.connector.ConnectorTransactionHandle, io.trino.spi.connector.ConnectorSession, io.trino.spi.connector.ConnectorTableHandle, java.util.Set&lt;io.trino.spi.connector.ColumnHandle&gt;, io.trino.spi.connector.Constraint)</new>
282+
<justification>Method is now abstract — connectors must implement it</justification>
283+
</item>
284+
<item>
285+
<code>java.class.removed</code>
286+
<old>class io.trino.spi.connector.ConnectorSplitSource.ConnectorSplitBatch</old>
287+
<justification>ConnectorSplitBatch wrapper removed — getNextBatch now returns List&lt;ConnectorSplit&gt; directly; use isFinished() instead of isNoMoreSplits()</justification>
288+
</item>
253289
<!-- Any exclusions below can be deleted after each release -->
254290
<item>
255291
<ignore>true</ignore>

core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitManager.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -27,25 +27,12 @@ public interface ConnectorSplitManager
2727
* read). The per-batch resolved predicate arrives separately via
2828
* {@link ConnectorSplitSource#getNextBatch(int, DynamicFilterSnapshot)}.
2929
*/
30-
default ConnectorSplitSource getSplits(
30+
ConnectorSplitSource getSplits(
3131
ConnectorTransactionHandle transaction,
3232
ConnectorSession session,
3333
ConnectorTableHandle table,
3434
Set<ColumnHandle> dynamicFilterColumns,
35-
Constraint constraint)
36-
{
37-
throw new UnsupportedOperationException();
38-
}
39-
40-
default ConnectorSplitSource getSplits(
41-
ConnectorTransactionHandle transaction,
42-
ConnectorSession session,
43-
ConnectorTableHandle table,
44-
DynamicFilter dynamicFilter,
45-
Constraint constraint)
46-
{
47-
return getSplits(transaction, session, table, dynamicFilter.getColumnsCovered(), constraint);
48-
}
35+
Constraint constraint);
4936

5037
default ConnectorSplitSource getSplits(
5138
ConnectorTransactionHandle transaction,

core/trino-spi/src/main/java/io/trino/spi/connector/ConnectorSplitSource.java

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
import java.util.Optional;
2121
import java.util.concurrent.CompletableFuture;
2222

23-
import static java.util.Objects.requireNonNull;
24-
2523
/**
2624
* Source of splits to be processed.
2725
* <p>
@@ -50,16 +48,11 @@ public interface ConnectorSplitSource
5048
* immediately before this call. The engine waits up to
5149
* {@link #getRequestedDynamicFilterWaitTimeoutMillis()} before the <em>first</em> call;
5250
* subsequent calls receive the current state without additional waiting.
51+
* <p>
52+
* When the returned list is empty and {@link #isFinished()} returns {@code false}, the caller
53+
* should retry. No more batches will be produced once {@link #isFinished()} returns {@code true}.
5354
*/
54-
default CompletableFuture<List<ConnectorSplit>> getNextBatch(int maxSize, DynamicFilterSnapshot dynamicFilterSnapshot)
55-
{
56-
return getNextBatch(maxSize).thenApply(ConnectorSplitBatch::getSplits);
57-
}
58-
59-
default CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
60-
{
61-
throw new UnsupportedOperationException();
62-
}
55+
CompletableFuture<List<ConnectorSplit>> getNextBatch(int maxSize, DynamicFilterSnapshot dynamicFilterSnapshot);
6356

6457
@Override
6558
void close();
@@ -103,26 +96,4 @@ default Metrics getMetrics()
10396
{
10497
return Metrics.EMPTY;
10598
}
106-
107-
class ConnectorSplitBatch
108-
{
109-
private final List<ConnectorSplit> splits;
110-
private final boolean noMoreSplits;
111-
112-
public ConnectorSplitBatch(List<ConnectorSplit> splits, boolean noMoreSplits)
113-
{
114-
this.splits = requireNonNull(splits, "splits is null");
115-
this.noMoreSplits = noMoreSplits;
116-
}
117-
118-
public List<ConnectorSplit> getSplits()
119-
{
120-
return splits;
121-
}
122-
123-
public boolean isNoMoreSplits()
124-
{
125-
return noMoreSplits;
126-
}
127-
}
12899
}

lib/trino-plugin-toolkit/src/main/java/io/trino/plugin/base/classloader/ClassLoaderSafeConnectorSplitManager.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import io.trino.spi.connector.ConnectorTableHandle;
2323
import io.trino.spi.connector.ConnectorTransactionHandle;
2424
import io.trino.spi.connector.Constraint;
25-
import io.trino.spi.connector.DynamicFilter;
2625
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
2726

2827
import java.util.Set;
@@ -55,19 +54,6 @@ public ConnectorSplitSource getSplits(
5554
}
5655
}
5756

58-
@Override
59-
public ConnectorSplitSource getSplits(
60-
ConnectorTransactionHandle transaction,
61-
ConnectorSession session,
62-
ConnectorTableHandle table,
63-
DynamicFilter dynamicFilter,
64-
Constraint constraint)
65-
{
66-
try (ThreadContextClassLoader _ = new ThreadContextClassLoader(classLoader)) {
67-
return delegate.getSplits(transaction, session, table, dynamicFilter, constraint);
68-
}
69-
}
70-
7157
@Override
7258
public ConnectorSplitSource getSplits(
7359
ConnectorTransactionHandle transaction,

lib/trino-plugin-toolkit/src/test/java/io/trino/plugin/base/classloader/TestClassLoaderSafeWrappers.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
*/
1414
package io.trino.plugin.base.classloader;
1515

16-
import com.google.common.collect.ImmutableSet;
1716
import io.trino.spi.connector.ConnectorAccessControl;
1817
import io.trino.spi.connector.ConnectorMergeSink;
1918
import io.trino.spi.connector.ConnectorMetadata;
@@ -56,8 +55,7 @@ public void test()
5655
testClassLoaderSafe(ConnectorPageSourceProvider.class, ClassLoaderSafeConnectorPageSourceProvider.class);
5756
testClassLoaderSafe(ConnectorSplitManager.class, ClassLoaderSafeConnectorSplitManager.class);
5857
testClassLoaderSafe(ConnectorNodePartitioningProvider.class, ClassLoaderSafeNodePartitioningProvider.class);
59-
testClassLoaderSafe(ConnectorSplitSource.class, ClassLoaderSafeConnectorSplitSource.class, ImmutableSet.of(
60-
ConnectorSplitSource.class.getMethod("getNextBatch", int.class)));
58+
testClassLoaderSafe(ConnectorSplitSource.class, ClassLoaderSafeConnectorSplitSource.class);
6159
testClassLoaderSafe(SystemTable.class, ClassLoaderSafeSystemTable.class);
6260
testClassLoaderSafe(ConnectorRecordSetProvider.class, ClassLoaderSafeConnectorRecordSetProvider.class);
6361
testClassLoaderSafe(RecordSet.class, ClassLoaderSafeRecordSet.class);

plugin/trino-base-jdbc/src/main/java/io/trino/plugin/jdbc/DynamicFilteringJdbcSplitSource.java

Lines changed: 94 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,29 @@
1313
*/
1414
package io.trino.plugin.jdbc;
1515

16+
import com.google.common.collect.ImmutableList;
17+
import com.google.common.collect.ImmutableSet;
18+
import com.google.errorprone.annotations.concurrent.GuardedBy;
19+
import io.airlift.log.Logger;
20+
import io.trino.spi.connector.ColumnHandle;
21+
import io.trino.spi.connector.ConnectorSession;
22+
import io.trino.spi.connector.ConnectorSplit;
23+
import io.trino.spi.connector.ConnectorSplitManager;
1624
import io.trino.spi.connector.ConnectorSplitSource;
17-
import io.trino.spi.connector.DynamicFilter;
25+
import io.trino.spi.connector.ConnectorTransactionHandle;
26+
import io.trino.spi.connector.Constraint;
27+
import io.trino.spi.connector.DynamicFilterSnapshot;
28+
import io.trino.spi.connector.FixedSplitSource;
1829
import io.trino.spi.predicate.TupleDomain;
1930

31+
import java.util.List;
32+
import java.util.Optional;
33+
import java.util.Set;
2034
import java.util.concurrent.CompletableFuture;
2135

2236
import static com.google.common.base.Verify.verify;
2337
import static com.google.common.collect.ImmutableList.toImmutableList;
38+
import static io.trino.plugin.jdbc.JdbcDynamicFilteringSessionProperties.getDynamicFilteringWaitTimeout;
2439
import static java.util.Objects.requireNonNull;
2540

2641
/**
@@ -36,57 +51,102 @@
3651
public class DynamicFilteringJdbcSplitSource
3752
implements ConnectorSplitSource
3853
{
39-
private final ConnectorSplitSource delegateSplitSource;
40-
private final DynamicFilter dynamicFilter;
41-
private final JdbcTableHandle tableHandle;
54+
private static final Logger log = Logger.get(DynamicFilteringJdbcSplitSource.class);
4255

43-
DynamicFilteringJdbcSplitSource(ConnectorSplitSource delegateSplitSource, DynamicFilter dynamicFilter, JdbcTableHandle tableHandle)
56+
private final ConnectorSplitManager delegateSplitManager;
57+
private final ConnectorTransactionHandle transaction;
58+
private final ConnectorSession session;
59+
private final JdbcTableHandle table;
60+
private final Set<ColumnHandle> dynamicFilterColumns;
61+
private final Constraint constraint;
62+
private final long dynamicFilteringTimeoutMillis;
63+
64+
@GuardedBy("this")
65+
private Optional<ConnectorSplitSource> delegateSplitSource = Optional.empty();
66+
67+
DynamicFilteringJdbcSplitSource(
68+
ConnectorSplitManager delegateSplitManager,
69+
ConnectorTransactionHandle transaction,
70+
ConnectorSession session,
71+
JdbcTableHandle table,
72+
Set<ColumnHandle> dynamicFilterColumns,
73+
Constraint constraint)
4474
{
45-
this.delegateSplitSource = requireNonNull(delegateSplitSource, "delegateSplitSource is null");
46-
this.dynamicFilter = requireNonNull(dynamicFilter, "dynamicFilter is null");
47-
this.tableHandle = requireNonNull(tableHandle, "tableHandle is null");
75+
this.delegateSplitManager = requireNonNull(delegateSplitManager, "delegateSplitManager is null");
76+
this.transaction = requireNonNull(transaction, "transaction is null");
77+
this.session = requireNonNull(session, "session is null");
78+
this.table = requireNonNull(table, "table is null");
79+
this.dynamicFilterColumns = ImmutableSet.copyOf(dynamicFilterColumns);
80+
this.constraint = requireNonNull(constraint, "constraint is null");
81+
this.dynamicFilteringTimeoutMillis = getDynamicFilteringWaitTimeout(session).toMillis();
4882
}
4983

5084
@Override
51-
public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
85+
public long getRequestedDynamicFilterWaitTimeoutMillis()
5286
{
53-
if (!isEligibleForDynamicFilter(tableHandle)) {
54-
return delegateSplitSource.getNextBatch(maxSize);
55-
}
56-
return delegateSplitSource.getNextBatch(maxSize)
57-
.thenApply(batch -> {
58-
TupleDomain<JdbcColumnHandle> dynamicFilterPredicate = dynamicFilter.getCurrentPredicate()
59-
.transformKeys(JdbcColumnHandle.class::cast);
60-
return new ConnectorSplitBatch(
61-
batch.getSplits().stream()
62-
// attach dynamic filter constraint to JdbcSplit
63-
.map(split -> {
64-
JdbcSplit jdbcSplit = (JdbcSplit) split;
65-
// If split was a subclass of JdbcSplit, there would be additional information
66-
// that we would need to pass further on.
67-
verify(jdbcSplit.getClass() == JdbcSplit.class, "Unexpected split type %s", jdbcSplit);
68-
return jdbcSplit.withDynamicFilter(dynamicFilterPredicate);
69-
})
70-
.collect(toImmutableList()),
71-
batch.isNoMoreSplits());
72-
});
87+
return dynamicFilteringTimeoutMillis;
88+
}
89+
90+
@Override
91+
public CompletableFuture<List<ConnectorSplit>> getNextBatch(int maxSize, DynamicFilterSnapshot dynamicFilterSnapshot)
92+
{
93+
log.debug(
94+
"Enumerating splits (query %s, table: %s, completed: %s)",
95+
session.getQueryId(),
96+
table,
97+
dynamicFilterSnapshot.isComplete());
98+
TupleDomain<JdbcColumnHandle> dynamicFilterPredicate = dynamicFilterSnapshot.currentPredicate()
99+
.transformKeys(JdbcColumnHandle.class::cast);
100+
return getDelegateSplitSource(dynamicFilterSnapshot).getNextBatch(maxSize, dynamicFilterSnapshot)
101+
.thenApply(splits -> splits.stream()
102+
// attach dynamic filter constraint to JdbcSplit
103+
.map(split -> {
104+
JdbcSplit jdbcSplit = (JdbcSplit) split;
105+
// If split was a subclass of JdbcSplit, there would be additional information
106+
// that we would need to pass further on.
107+
verify(jdbcSplit.getClass() == JdbcSplit.class, "Unexpected split type %s", jdbcSplit);
108+
return (ConnectorSplit) jdbcSplit.withDynamicFilter(dynamicFilterPredicate);
109+
})
110+
.collect(toImmutableList()));
73111
}
74112

75113
@Override
76114
public void close()
77115
{
78-
delegateSplitSource.close();
116+
getOptionalDelegateSplitSource().ifPresent(ConnectorSplitSource::close);
79117
}
80118

81119
@Override
82120
public boolean isFinished()
83121
{
84-
return delegateSplitSource.isFinished();
122+
return getOptionalDelegateSplitSource()
123+
.map(ConnectorSplitSource::isFinished)
124+
.orElse(false);
125+
}
126+
127+
private synchronized ConnectorSplitSource getDelegateSplitSource(DynamicFilterSnapshot dynamicFilterSnapshot)
128+
{
129+
if (delegateSplitSource.isPresent()) {
130+
return delegateSplitSource.get();
131+
}
132+
133+
JdbcTableHandle intersected = table.intersectedWithConstraint(dynamicFilterSnapshot.currentPredicate());
134+
if (intersected.getConstraint().isNone()) {
135+
delegateSplitSource = Optional.of(new FixedSplitSource(ImmutableList.of()));
136+
return delegateSplitSource.get();
137+
}
138+
139+
delegateSplitSource = Optional.of(delegateSplitManager.getSplits(
140+
transaction,
141+
session,
142+
intersected,
143+
dynamicFilterColumns,
144+
constraint));
145+
return delegateSplitSource.get();
85146
}
86147

87-
public static boolean isEligibleForDynamicFilter(JdbcTableHandle tableHandle)
148+
private synchronized Optional<ConnectorSplitSource> getOptionalDelegateSplitSource()
88149
{
89-
// don't pushdown predicate through limit as it could reduce performance
90-
return tableHandle.getLimit().isEmpty();
150+
return delegateSplitSource;
91151
}
92152
}

0 commit comments

Comments
 (0)