Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions docs/src/main/sphinx/connector/exasol.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,49 @@ specify the certificate's fingerprint in the JDBC URL using parameter
``fingerprint``, e.g.: ``jdbc:exa:exasol.example.com:8563;fingerprint=ABC123``.
:::

### Parallel connections

To speed up importing data from an Exasol cluster with multiple nodes,
you can enable parallel connections by specifying property
``exasol.parallel-connections.worker-count`` with value 2 or higher.
This will enable a custom page source that uses Exasol's
[parallel connections](https://exasol.my.site.com/s/article/Parallel-connections-with-JDBC)
to read query results in parallel. The actual number of parallel connections
depends on the number of nodes in the Exasol cluster.
Parallel connections are deactivated by default.

Property value 0 will deactivate parallel connection explicitly. A value
of 1 will use the custom page source with a single connection. This is only
useful for testing.

You can override the setting for an Exasol catalog using session property
``parallel_connections_worker_count`` by running the following SQL statement:

```sql
set session catalog.parallel_connections_worker_count = 4;
```

:::{important}
There is a known issue with the Exasol JDBC driver when loading small
result sets using parallel connections. This operation may block forever
when the query result only contains few rows (around 25 rows per connection),
i.e. when at least one subconnection returns an empty result set.

When this happens you need to restart the Trino cluster.

We recommend using parallel connections only with large result sets
containing more than 25 rows per connection.
:::

:::{note}
* Even with parallel connections, Trino will read the query result only
on a single Trino node, but in multiple threads.
* The actual number of parallel connections depends on the Exasol cluster.
The database may decide to use fewer connections than specified.
* Parallel connections cause overhead for each query. They only make sense
if the query reads a large amount of data.
:::

```{include} jdbc-authentication.fragment
```

Expand Down
31 changes: 25 additions & 6 deletions plugin/trino-exasol/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,26 @@
<classifier>classes</classifier>
</dependency>

<dependency>
<groupId>dev.failsafe</groupId>
<artifactId>failsafe</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>concurrent</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>configuration</artifactId>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
</dependency>

<dependency>
<groupId>io.trino</groupId>
<artifactId>trino-base-jdbc</artifactId>
Expand All @@ -50,6 +65,16 @@
<artifactId>trino-plugin-toolkit</artifactId>
</dependency>

<dependency>
<groupId>jakarta.annotation</groupId>
<artifactId>jakarta.annotation-api</artifactId>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
Expand Down Expand Up @@ -86,12 +111,6 @@
<scope>provided</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log</artifactId>
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>io.airlift</groupId>
<artifactId>log-manager</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.exasol;

import io.trino.spi.connector.SourcePage;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;

final class BufferedSourcePageQueue
{
private final BlockingQueue<SourcePage> queue;
private final AtomicReference<CompletableFuture<Void>> dataAvailableFuture = new AtomicReference<>(new CompletableFuture<>());
private final Map<Integer, ProducerState> producerStates;

BufferedSourcePageQueue(int producerCount)
{
queue = new LinkedBlockingQueue<>(Math.max(8, producerCount * 4));
producerStates = IntStream.range(0, producerCount)
.boxed()
.collect(ConcurrentHashMap::new, (states, producerId) -> states.put(producerId, ProducerState.active()), ConcurrentHashMap::putAll);
}

void add(SourcePage page)
throws InterruptedException
{
queue.put(page);
signalDataAvailable();
}

void finish(int producerId)
{
producerStates.compute(producerId, (_, currentState) -> requireTrackedProducer(producerId, currentState).finish());
signalDataAvailable();
}

void fail(int producerId, Throwable throwable)
{
producerStates.compute(producerId, (_, currentState) -> requireTrackedProducer(producerId, currentState).fail(throwable));
signalDataAvailable();
}

Throwable getFailure()
{
return producerStates.values().stream()
.map(ProducerState::failure)
.filter(Objects::nonNull)
.findFirst()
.orElse(null);
}

SourcePage poll()
{
SourcePage page = queue.poll();
if (page != null) {
signalDataAvailable();
}
return page;
}

int size()
{
return queue.size();
}

boolean isEmpty()
{
return queue.isEmpty();
}

CompletableFuture<?> isBlocked()
{
if (!queue.isEmpty() || getFailure() != null || allProducersFinished()) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> blocked = dataAvailableFuture.get();
if (blocked.isDone()) {
CompletableFuture<Void> newBlocked = new CompletableFuture<>();
if (dataAvailableFuture.compareAndSet(blocked, newBlocked)) {
blocked = newBlocked;
}
else {
blocked = dataAvailableFuture.get();
}
}

if (!queue.isEmpty() || getFailure() != null || allProducersFinished()) {
blocked.complete(null);
return CompletableFuture.completedFuture(null);
}
return blocked;
}

private boolean allProducersFinished()
{
return producerStates.values().stream().allMatch(ProducerState::isTerminal);
}

boolean isFinished()
{
boolean producersFinished = allProducersFinished();
boolean queueEmpty = queue.isEmpty();
boolean noFailure = getFailure() == null;
boolean finished = queueEmpty && noFailure && producersFinished;
return finished;
}

private void signalDataAvailable()
{
dataAvailableFuture.get().complete(null);
}

private static ProducerState requireTrackedProducer(int producerId, ProducerState currentState)
{
if (currentState == null) {
throw new IllegalArgumentException("Unknown producer: " + producerId);
}
return currentState;
}

private record ProducerState(boolean finished, Throwable failure)
{
private static ProducerState active()
{
return new ProducerState(false, null);
}

private ProducerState finish()
{
return new ProducerState(true, failure);
}

private ProducerState fail(Throwable throwable)
{
return new ProducerState(true, throwable);
}

private boolean isTerminal()
{
return finished;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@
import io.trino.plugin.jdbc.JdbcStatisticsConfig;
import io.trino.plugin.jdbc.credential.CredentialProvider;
import io.trino.plugin.jdbc.ptf.Query;
import io.trino.spi.connector.ConnectorPageSourceProvider;
import io.trino.spi.function.table.ConnectorTableFunction;

import java.util.Properties;

import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.plugin.jdbc.JdbcModule.bindSessionPropertiesProvider;

public class ExasolClientModule
extends AbstractConfigurationAwareModule
Expand All @@ -42,8 +45,13 @@ public class ExasolClientModule
protected void setup(Binder binder)
{
binder.bind(JdbcClient.class).annotatedWith(ForBaseJdbc.class).to(ExasolClient.class).in(Scopes.SINGLETON);
bindSessionPropertiesProvider(binder, ExasolSessionProperties.class);
configBinder(binder).bindConfig(JdbcStatisticsConfig.class);
configBinder(binder).bindConfig(ExasolConfig.class);
newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(Query.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, ConnectorPageSourceProvider.class).setBinding().to(ExasolParallelPageSourceProvider.class).in(Scopes.SINGLETON);
binder.bind(ExasolTlsCertificateFingerprintProvider.class).in(Scopes.SINGLETON);
binder.bind(ParallelConnectionFactory.class).in(Scopes.SINGLETON);
}

@Provides
Expand All @@ -55,6 +63,7 @@ public static ConnectionFactory connectionFactory(BaseJdbcConfig config, Credent
// Deactivate SNAPSHOT_MODE (https://docs.exasol.com/db/latest/database_concepts/snapshot_mode.htm)
// to ensure that {@link Connection#getMetaData()} always returns up-to-date data.
connectionProperties.setProperty("snapshottransactions", "1");

return DriverConnectionFactory.builder(
new EXADriver(),
config.getConnectionUrl(),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.exasol;

import io.airlift.configuration.Config;
import io.airlift.configuration.ConfigDescription;

public class ExasolConfig
{
private int parallelConnectionsWorkerCount;

public int getParallelConnectionsWorkerCount()
{
return parallelConnectionsWorkerCount;
}

@ConfigDescription("Maximum number of workers to use for parallel JDBC import. Set to 0 to deactivate parallel import")
@Config("exasol.parallel-connections.worker-count")
public void setParallelConnectionsWorkerCount(int parallelConnectionsWorkerCount)
{
this.parallelConnectionsWorkerCount = parallelConnectionsWorkerCount;
}
}
Loading