Skip to content

Commit 58cf613

Browse files
committed
Migrate built-in connectors to new split SPI
1 parent 8985a1a commit 58cf613

6 files changed

Lines changed: 41 additions & 42 deletions

File tree

core/trino-main/src/main/java/io/trino/connector/informationschema/InformationSchemaSplitManager.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,17 @@
1414
package io.trino.connector.informationschema;
1515

1616
import io.trino.spi.HostAddress;
17+
import io.trino.spi.connector.ColumnHandle;
1718
import io.trino.spi.connector.ConnectorSession;
1819
import io.trino.spi.connector.ConnectorSplitManager;
1920
import io.trino.spi.connector.ConnectorSplitSource;
2021
import io.trino.spi.connector.ConnectorTableHandle;
2122
import io.trino.spi.connector.ConnectorTransactionHandle;
2223
import io.trino.spi.connector.Constraint;
23-
import io.trino.spi.connector.DynamicFilter;
2424
import io.trino.spi.connector.FixedSplitSource;
2525

26+
import java.util.Set;
27+
2628
public class InformationSchemaSplitManager
2729
implements ConnectorSplitManager
2830
{
@@ -38,7 +40,7 @@ public ConnectorSplitSource getSplits(
3840
ConnectorTransactionHandle transaction,
3941
ConnectorSession session,
4042
ConnectorTableHandle table,
41-
DynamicFilter dynamicFilter,
43+
Set<ColumnHandle> dynamicFilterColumns,
4244
Constraint constraint)
4345
{
4446
return new FixedSplitSource(split);

core/trino-main/src/main/java/io/trino/connector/system/GlobalSystemConnector.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@
1919
import io.trino.operator.table.SequenceFunction.SequenceFunctionHandle;
2020
import io.trino.spi.catalog.CatalogName;
2121
import io.trino.spi.connector.CatalogVersion;
22+
import io.trino.spi.connector.ColumnHandle;
2223
import io.trino.spi.connector.ConnectorMetadata;
2324
import io.trino.spi.connector.ConnectorSession;
2425
import io.trino.spi.connector.ConnectorSplitManager;
2526
import io.trino.spi.connector.ConnectorSplitSource;
27+
import io.trino.spi.connector.ConnectorTableHandle;
2628
import io.trino.spi.connector.ConnectorTransactionHandle;
29+
import io.trino.spi.connector.Constraint;
2730
import io.trino.spi.connector.SystemTable;
2831
import io.trino.spi.function.table.ConnectorTableFunction;
2932
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
@@ -91,6 +94,12 @@ public ConnectorSplitManager getSplitManager()
9194
{
9295
return new ConnectorSplitManager()
9396
{
97+
@Override
98+
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableHandle table, Set<ColumnHandle> dynamicFilterColumns, Constraint constraint)
99+
{
100+
throw new UnsupportedOperationException();
101+
}
102+
94103
@Override
95104
public ConnectorSplitSource getSplits(ConnectorTransactionHandle transaction, ConnectorSession session, ConnectorTableFunctionHandle functionHandle)
96105
{

core/trino-main/src/main/java/io/trino/connector/system/SystemSplitManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
import io.trino.spi.connector.ConnectorTableHandle;
2727
import io.trino.spi.connector.ConnectorTransactionHandle;
2828
import io.trino.spi.connector.Constraint;
29-
import io.trino.spi.connector.DynamicFilter;
3029
import io.trino.spi.connector.FixedSplitSource;
3130
import io.trino.spi.connector.SystemTable;
3231
import io.trino.spi.connector.SystemTable.Distribution;
@@ -61,7 +60,7 @@ public ConnectorSplitSource getSplits(
6160
ConnectorTransactionHandle transaction,
6261
ConnectorSession session,
6362
ConnectorTableHandle tableHandle,
64-
DynamicFilter dynamicFilter,
63+
Set<ColumnHandle> dynamicFilterColumns,
6564
Constraint constraint)
6665
{
6766
SystemTableHandle table = (SystemTableHandle) tableHandle;

core/trino-main/src/main/java/io/trino/testing/TestingSplitManager.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,18 @@
1414
package io.trino.testing;
1515

1616
import com.google.common.collect.ImmutableList;
17+
import io.trino.spi.connector.ColumnHandle;
1718
import io.trino.spi.connector.ConnectorSession;
1819
import io.trino.spi.connector.ConnectorSplit;
1920
import io.trino.spi.connector.ConnectorSplitManager;
2021
import io.trino.spi.connector.ConnectorSplitSource;
2122
import io.trino.spi.connector.ConnectorTableHandle;
2223
import io.trino.spi.connector.ConnectorTransactionHandle;
2324
import io.trino.spi.connector.Constraint;
24-
import io.trino.spi.connector.DynamicFilter;
2525
import io.trino.spi.connector.FixedSplitSource;
2626

2727
import java.util.List;
28+
import java.util.Set;
2829

2930
import static java.util.Objects.requireNonNull;
3031

@@ -43,7 +44,7 @@ public ConnectorSplitSource getSplits(
4344
ConnectorTransactionHandle transaction,
4445
ConnectorSession session,
4546
ConnectorTableHandle table,
46-
DynamicFilter dynamicFilter,
47+
Set<ColumnHandle> dynamicFilterColumns,
4748
Constraint constraint)
4849
{
4950
return new FixedSplitSource(splits);

core/trino-main/src/test/java/io/trino/connector/MockConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -351,7 +351,7 @@ public ConnectorSplitSource getSplits(
351351
ConnectorTransactionHandle transaction,
352352
ConnectorSession session,
353353
ConnectorTableHandle table,
354-
DynamicFilter dynamicFilter,
354+
Set<ColumnHandle> dynamicFilterColumns,
355355
Constraint constraint)
356356
{
357357
SchemaTableName tableName = ((MockConnectorTableHandle) table).getTableName();

testing/trino-tests/src/test/java/io/trino/execution/AbstractTestCoordinatorDynamicFiltering.java

Lines changed: 23 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@
3939
import io.trino.spi.connector.ConnectorTransactionHandle;
4040
import io.trino.spi.connector.Constraint;
4141
import io.trino.spi.connector.DynamicFilter;
42+
import io.trino.spi.connector.DynamicFilterSnapshot;
4243
import io.trino.spi.connector.EmptyPageSource;
4344
import io.trino.spi.predicate.Domain;
4445
import io.trino.spi.predicate.Range;
@@ -62,6 +63,7 @@
6263
import java.util.Set;
6364
import java.util.concurrent.CompletableFuture;
6465
import java.util.concurrent.atomic.AtomicBoolean;
66+
import java.util.concurrent.atomic.AtomicReference;
6567
import java.util.function.Consumer;
6668
import java.util.stream.LongStream;
6769

@@ -106,7 +108,7 @@ public abstract class AbstractTestCoordinatorDynamicFiltering
106108
public void setup()
107109
{
108110
// create lineitem table in test connector
109-
getQueryRunner().installPlugin(new TestingPlugin(getRetryPolicy() == RetryPolicy.TASK));
111+
getQueryRunner().installPlugin(new TestingPlugin());
110112
getQueryRunner().installPlugin(new TpchPlugin());
111113
getQueryRunner().installPlugin(new TpcdsPlugin());
112114
getQueryRunner().installPlugin(new MemoryPlugin());
@@ -474,12 +476,7 @@ protected void assertQueryDynamicFilters(
474476
private class TestingPlugin
475477
implements Plugin
476478
{
477-
private final boolean isTaskRetryMode;
478-
479-
public TestingPlugin(boolean isTaskRetryMode)
480-
{
481-
this.isTaskRetryMode = isTaskRetryMode;
482-
}
479+
public TestingPlugin() {}
483480

484481
@Override
485482
public Iterable<ConnectorFactory> getConnectorFactories()
@@ -497,7 +494,7 @@ public String getName()
497494
@Override
498495
public Connector create(String catalogName, Map<String, String> config, ConnectorContext context)
499496
{
500-
return new TestConnector(metadata, isTaskRetryMode);
497+
return new TestConnector(metadata);
501498
}
502499
});
503500
}
@@ -507,12 +504,10 @@ private class TestConnector
507504
implements Connector
508505
{
509506
private final ConnectorMetadata metadata;
510-
private final boolean isTaskRetryMode;
511507

512-
private TestConnector(ConnectorMetadata metadata, boolean isTaskRetryMode)
508+
private TestConnector(ConnectorMetadata metadata)
513509
{
514510
this.metadata = requireNonNull(metadata, "metadata is null");
515-
this.isTaskRetryMode = isTaskRetryMode;
516511
}
517512

518513
@Override
@@ -537,37 +532,32 @@ public ConnectorSplitSource getSplits(
537532
ConnectorTransactionHandle transaction,
538533
ConnectorSession session,
539534
ConnectorTableHandle table,
540-
DynamicFilter dynamicFilter,
535+
Set<ColumnHandle> dynamicFilterColumns,
541536
Constraint constraint)
542537
{
543-
if (!isTaskRetryMode) {
544-
// In task retry mode, dynamic filter collection is done outside the join stage,
545-
// so it's not necessary that dynamicFilter will be blocked initially.
546-
assertThat(dynamicFilter.isBlocked().isDone())
547-
.describedAs("Dynamic filter should be initially blocked")
548-
.isFalse();
549-
}
550-
assertThat(dynamicFilter.getColumnsCovered())
538+
assertThat(dynamicFilterColumns)
551539
.describedAs("columns covered")
552540
.isEqualTo(expectedDynamicFilterColumnsCovered);
553541

554542
AtomicBoolean splitProduced = new AtomicBoolean();
543+
AtomicReference<TupleDomain<ColumnHandle>> capturedPredicate = new AtomicReference<>();
555544
return new ConnectorSplitSource()
556545
{
557546
@Override
558-
public CompletableFuture<ConnectorSplitBatch> getNextBatch(int maxSize)
547+
public long getRequestedDynamicFilterWaitTimeoutMillis()
559548
{
560-
CompletableFuture<?> blocked = dynamicFilter.isBlocked();
549+
return Long.MAX_VALUE;
550+
}
561551

562-
if (blocked.isDone()) {
563-
splitProduced.set(true);
564-
return completedFuture(new ConnectorSplitBatch(ImmutableList.of(createRemoteSplit()), isFinished()));
552+
@Override
553+
public CompletableFuture<List<ConnectorSplit>> getNextBatch(int maxSize, DynamicFilterSnapshot dynamicFilterSnapshot)
554+
{
555+
if (!dynamicFilterSnapshot.isComplete()) {
556+
return completedFuture(ImmutableList.of());
565557
}
566-
567-
return blocked.thenApply(_ -> {
568-
// yield until dynamic filter is fully loaded
569-
return new ConnectorSplitBatch(ImmutableList.of(), false);
570-
});
558+
capturedPredicate.set(dynamicFilterSnapshot.currentPredicate());
559+
splitProduced.set(true);
560+
return completedFuture(ImmutableList.of(createRemoteSplit()));
571561
}
572562

573563
@Override
@@ -576,17 +566,15 @@ public void close() {}
576566
@Override
577567
public boolean isFinished()
578568
{
579-
assertThat(dynamicFilter.getColumnsCovered())
569+
assertThat(dynamicFilterColumns)
580570
.describedAs("columns covered")
581571
.isEqualTo(expectedDynamicFilterColumnsCovered);
582572

583-
if (!dynamicFilter.isComplete() || !splitProduced.get()) {
573+
if (!splitProduced.get()) {
584574
return false;
585575
}
586576

587-
assertThat(dynamicFilter.isBlocked().isDone()).isTrue();
588-
expectedCoordinatorDynamicFilterAssertion.accept(dynamicFilter.getCurrentPredicate());
589-
577+
expectedCoordinatorDynamicFilterAssertion.accept(capturedPredicate.get());
590578
return true;
591579
}
592580
};

0 commit comments

Comments
 (0)