-
Notifications
You must be signed in to change notification settings - Fork 5.5k
Implements #26762 : adds support for connecting to Astra DB #26767
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Reviewer's GuideAdds Astra DB support to the Cassandra connector by introducing a new configuration property for secure connect bundles, updating session creation to handle either contact points or Astra bundles, and adding focused tests and dependencies to validate the new behavior. Sequence diagram for Cassandra session creation with Astra DB supportsequenceDiagram
participant Connector as CassandraConnector
participant Module as CassandraClientModule
participant Config as CassandraClientConfig
participant Builder as Cluster_Builder
participant Cluster as Cluster
participant Session as NativeCassandraSession
participant RCluster as ReopeningCluster
Connector->>Module: createCassandraSession(connectorId, config, extraColumnMetadataCodec)
Module->>Cluster: builder()
Cluster-->>Module: Builder
Module->>Builder: withProtocolVersion(config.getProtocolVersion())
Module->>Config: getAstraSecureConnectBundlePath()
Config-->>Module: astraSecureConnectBundlePath
alt astraSecureConnectBundlePath is null
Module->>Config: getContactPoints()
Config-->>Module: contactPoints
Module->>Module: check contactPoints not empty
loop for each contactPoint
Module->>Builder: addContactPoint(contactPoint)
end
else astraSecureConnectBundlePath is set
Module->>Module: assert contactPoints is null or empty
Module->>Builder: withCloudSecureConnectBundle(File(astraSecureConnectBundlePath))
end
Module->>Builder: withPort(config.getNativeProtocolPort())
Module->>Builder: withReconnectionPolicy(ExponentialReconnectionPolicy)
Module->>Builder: withRetryPolicy(config.getRetryPolicy().getPolicy())
Module->>Builder: withLoadBalancingPolicy(RoundRobinPolicy or DCAwareRoundRobinPolicy)
Module->>Builder: withQueryOptions(QueryOptions)
Module->>Builder: build()
Builder-->>Module: Cluster
Module->>RCluster: new ReopeningCluster(() -> Cluster)
Module->>Session: new NativeCassandraSession(connectorId, extraColumnMetadataCodec, RCluster, noHostAvailableRetryTimeout, caseSensitiveNameMatchingEnabled)
Module-->>Connector: CassandraSession
Class diagram for updated Cassandra client configuration and session creationclassDiagram
class CassandraClientConfig {
- ConsistencyLevel consistencyLevel
- int fetchSize
- List~String~ contactPoints
- String astraSecureConnectBundlePath
- int nativeProtocolPort
- int partitionSizeForBatchSelect
- int splitSize
- boolean caseSensitiveNameMatchingEnabled
+ List~String~ getContactPoints()
+ CassandraClientConfig setContactPoints(String contactPoints)
+ String getAstraSecureConnectBundlePath()
+ setAstraSecureConnectBundlePath(String astraSecureConnectBundlePath)
+ int getNativeProtocolPort()
}
class CassandraClientModule {
+ configure(Binder binder)
+ createCassandraSession(CassandraConnectorId connectorId, CassandraClientConfig config, JsonCodec extraColumnMetadataCodec) CassandraSession
+ createCassandraSession(ClusterBuilder clusterBuilder, CassandraConnectorId connectorId, CassandraClientConfig config, JsonCodec extraColumnMetadataCodec) CassandraSession
}
class CassandraSession {
}
class NativeCassandraSession {
+ NativeCassandraSession(String connectorId, JsonCodec extraColumnMetadataCodec, ReopeningCluster reopeningCluster, Duration noHostAvailableRetryTimeout, boolean caseSensitiveNameMatchingEnabled)
}
class ReopeningCluster {
+ ReopeningCluster(ClusterFactory clusterFactory)
}
class Cluster {
+ static ClusterBuilder builder()
}
class ClusterBuilder {
+ ClusterBuilder withProtocolVersion(ProtocolVersion protocolVersion)
+ ClusterBuilder addContactPoint(String contactPoint)
+ ClusterBuilder withCloudSecureConnectBundle(File secureConnectBundle)
+ ClusterBuilder withPort(int port)
+ ClusterBuilder withReconnectionPolicy(ReconnectionPolicy policy)
+ ClusterBuilder withRetryPolicy(RetryPolicy policy)
+ ClusterBuilder withLoadBalancingPolicy(LoadBalancingPolicy policy)
+ ClusterBuilder withQueryOptions(QueryOptions options)
+ Cluster build()
}
CassandraClientModule --> CassandraClientConfig : uses
CassandraClientModule --> CassandraSession : creates
CassandraClientModule --> NativeCassandraSession : instantiates
CassandraClientModule --> ReopeningCluster : wraps Cluster
CassandraClientModule --> ClusterBuilder : configures
CassandraClientConfig --> ClusterBuilder : provides connection configuration
NativeCassandraSession --> ReopeningCluster : depends on
ReopeningCluster --> Cluster : builds
File-Level Changes
Possibly linked issues
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and found some issues that need to be addressed.
- The change from creating a new
ClusterinsideReopeningClusterto building it once and reusing the same instance (new ReopeningCluster(() -> cluster)) appears to alter the original semantics around reconnect/reopen and may break scenarios that relied on a freshClusterbeing created on each reopen; consider preserving the lazy-rebuild behavior while still allowing the builder to be injected for tests. - The
checkArgumentguarding simultaneous use of contact points and the Astra secure connect bundle is hard to read due to the negated compound condition; consider rewriting it in a positive, simpler form (e.g.,checkArgument(!(hasBundle && hasContactPoints), ...)or extracting helpers) to make the intent clearer. - The tests add both
mockito-coreandmockito-inlinewith explicit versions to the module POM, which may be unnecessary and could conflict with the parent dependency management; if inline mocking is not required, prefer using only the existing/mockito dependency aligned with the project’s BOM.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The change from creating a new `Cluster` inside `ReopeningCluster` to building it once and reusing the same instance (`new ReopeningCluster(() -> cluster)`) appears to alter the original semantics around reconnect/reopen and may break scenarios that relied on a fresh `Cluster` being created on each reopen; consider preserving the lazy-rebuild behavior while still allowing the builder to be injected for tests.
- The `checkArgument` guarding simultaneous use of contact points and the Astra secure connect bundle is hard to read due to the negated compound condition; consider rewriting it in a positive, simpler form (e.g., `checkArgument(!(hasBundle && hasContactPoints), ...)` or extracting helpers) to make the intent clearer.
- The tests add both `mockito-core` and `mockito-inline` with explicit versions to the module POM, which may be unnecessary and could conflict with the parent dependency management; if inline mocking is not required, prefer using only the existing/mockito dependency aligned with the project’s BOM.
## Individual Comments
### Comment 1
<location> `presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientModule.java:170-175` </location>
<code_context>
config.getSpeculativeExecutionLimit())); // maximum number of executions
}
+ Cluster cluster = clusterBuilder.build();
+
return new NativeCassandraSession(
connectorId.toString(),
extraColumnMetadataCodec,
- new ReopeningCluster(() -> {
- contactPoints.forEach(clusterBuilder::addContactPoint);
- return clusterBuilder.build();
- }),
+ new ReopeningCluster(() -> cluster ),
config.getNoHostAvailableRetryTimeout(), config.isCaseSensitiveNameMatchingEnabled());
}
</code_context>
<issue_to_address>
**issue (bug_risk):** ReopeningCluster now reuses a single Cluster instance instead of recreating it, which likely breaks the reopening behavior.
The original supplier rebuilt the `Cluster` (and re-added contact points) on each invocation, while the new version always returns the same pre-built `cluster` instance. After that instance is closed, `ReopeningCluster` will keep returning a closed `Cluster`, breaking the reopening behavior and likely causing runtime errors. The supplier should instead construct a new `Cluster` on each call (with the necessary configuration) or otherwise guarantee it never returns a previously closed instance.
</issue_to_address>
### Comment 2
<location> `presto-cassandra/src/test/java/com/facebook/presto/cassandra/CassandraClientModuleTest.java:50-51` </location>
<code_context>
+ mockExtraColumnMetadataCodec = Mockito.mock(JsonCodec.class);
+ }
+
+ @Test
+ public void testContactPointsOnly() {
+ ccConfig.setContactPoints("127.0.0.1", "127.0.0.2", "127.0.0.3");
+ CassandraClientModule.createCassandraSession(mockClusterBuilder, ccId, ccConfig, mockExtraColumnMetadataCodec);
</code_context>
<issue_to_address>
**suggestion (testing):** Add a test for the case where neither contact points nor an Astra bundle are configured
Since `createCassandraSession` now throws `IllegalArgumentException` when `astraSecureConnectBundlePath` is null and `contactPoints` is empty, please add a test (e.g. `testNoContactPointsAndNoAstraBundleThrowsError`) that uses the default config and asserts that this exception is thrown. This will lock in the validation behavior and document the expected configuration contract.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| Cluster cluster = clusterBuilder.build(); | ||
|
|
||
| return new NativeCassandraSession( | ||
| connectorId.toString(), | ||
| extraColumnMetadataCodec, | ||
| new ReopeningCluster(() -> { | ||
| contactPoints.forEach(clusterBuilder::addContactPoint); | ||
| return clusterBuilder.build(); | ||
| }), | ||
| new ReopeningCluster(() -> cluster ), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): ReopeningCluster now reuses a single Cluster instance instead of recreating it, which likely breaks the reopening behavior.
The original supplier rebuilt the Cluster (and re-added contact points) on each invocation, while the new version always returns the same pre-built cluster instance. After that instance is closed, ReopeningCluster will keep returning a closed Cluster, breaking the reopening behavior and likely causing runtime errors. The supplier should instead construct a new Cluster on each call (with the necessary configuration) or otherwise guarantee it never returns a previously closed instance.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey there - I've reviewed your changes and found some issues that need to be addressed.
- Changing
ReopeningClusterto capture a single builtClusterinstance (new ReopeningCluster(() -> cluster)) removes the ability to reopen with a freshly built cluster and may defeat its purpose; consider keeping a supplier that rebuilds fromclusterBuilderinstead of reusing one instance. - The mutual-exclusion check for contact points and
astraSecureConnectBundlePathis hard to read (!(config.getAstraSecureConnectBundlePath() != null && ...)); consider simplifying this into two straightforwardcheckArgumentcalls or extracting it into a small helper to make the configuration constraints clearer. - The new overload of
createCassandraSessionthat accepts aCluster.Buildershould be documented in a brief Javadoc (especially around expected usage and lifecycle of the builder) to avoid accidental misuse outside of tests.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- Changing `ReopeningCluster` to capture a single built `Cluster` instance (`new ReopeningCluster(() -> cluster)`) removes the ability to reopen with a freshly built cluster and may defeat its purpose; consider keeping a supplier that rebuilds from `clusterBuilder` instead of reusing one instance.
- The mutual-exclusion check for contact points and `astraSecureConnectBundlePath` is hard to read (`!(config.getAstraSecureConnectBundlePath() != null && ...)`); consider simplifying this into two straightforward `checkArgument` calls or extracting it into a small helper to make the configuration constraints clearer.
- The new overload of `createCassandraSession` that accepts a `Cluster.Builder` should be documented in a brief Javadoc (especially around expected usage and lifecycle of the builder) to avoid accidental misuse outside of tests.
## Individual Comments
### Comment 1
<location> `presto-cassandra/src/main/java/com/facebook/presto/cassandra/CassandraClientModule.java:170-175` </location>
<code_context>
config.getSpeculativeExecutionLimit())); // maximum number of executions
}
+ Cluster cluster = clusterBuilder.build();
+
return new NativeCassandraSession(
connectorId.toString(),
extraColumnMetadataCodec,
- new ReopeningCluster(() -> {
- contactPoints.forEach(clusterBuilder::addContactPoint);
- return clusterBuilder.build();
- }),
+ new ReopeningCluster(() -> cluster ),
config.getNoHostAvailableRetryTimeout(), config.isCaseSensitiveNameMatchingEnabled());
}
</code_context>
<issue_to_address>
**issue (bug_risk):** ReopeningCluster now always returns the same Cluster instance, which defeats the purpose and risks using a closed Cluster.
With the original code, `ReopeningCluster` rebuilt a fresh `Cluster` on each call via `clusterBuilder.build()`. After this change, the same `Cluster` instance is always returned. Once that instance is closed, all subsequent "reopens" will still return a closed `Cluster`, leading to runtime errors.
Keep the `Cluster` construction inside the supplier so each reopen creates a new instance, e.g.:
```java
return new NativeCassandraSession(
connectorId.toString(),
extraColumnMetadataCodec,
new ReopeningCluster(clusterBuilder::build),
config.getNoHostAvailableRetryTimeout(),
config.isCaseSensitiveNameMatchingEnabled());
```
Any conditional logic around contact points vs. secure bundle can still be handled inside that supplier while ensuring a new `Cluster` is built each time.
</issue_to_address>
### Comment 2
<location> `presto-cassandra/src/test/java/com/facebook/presto/cassandra/CassandraClientModuleTest.java:31` </location>
<code_context>
+import static org.mockito.Mockito.*;
+
+
+public class CassandraClientModuleTest {
+
+ private CassandraConnectorId ccId;
</code_context>
<issue_to_address>
**issue (testing):** Add a test for the case where neither contact points nor Astra bundle are configured
The updated `createCassandraSession` now throws `IllegalArgumentException("empty contactPoints")` when `astraSecureConnectBundlePath` is null and `contactPoints` is empty, but this path isn’t tested. Please add a test that uses the default empty `contactPoints` and a null `astraSecureConnectBundlePath`, and asserts that `createCassandraSession` throws this exception to cover this configuration gap.
</issue_to_address>
### Comment 3
<location> `presto-cassandra/src/test/java/com/facebook/presto/cassandra/CassandraClientModuleTest.java:50-51` </location>
<code_context>
+ mockExtraColumnMetadataCodec = Mockito.mock(JsonCodec.class);
+ }
+
+ @Test
+ public void testContactPointsOnly() {
+ ccConfig.setContactPoints("127.0.0.1", "127.0.0.2", "127.0.0.3");
+ CassandraClientModule.createCassandraSession(mockClusterBuilder, ccId, ccConfig, mockExtraColumnMetadataCodec);
</code_context>
<issue_to_address>
**suggestion (testing):** Also assert that the Astra bundle is not used when only contact points are configured
In `testContactPointsOnly`, in addition to verifying the three `addContactPoint` calls, please also assert that `withCloudSecureConnectBundle` is never invoked, e.g. `verify(mockClusterBuilder, never()).withCloudSecureConnectBundle(any(File.class));`. This will explicitly validate the mutual exclusion between contact points and the Astra bundle path.
Suggested implementation:
```java
@Test
public void testContactPointsOnly() {
ccConfig.setContactPoints("127.0.0.1", "127.0.0.2", "127.0.0.3");
CassandraClientModule.createCassandraSession(mockClusterBuilder, ccId, ccConfig, mockExtraColumnMetadataCodec);
verify(mockClusterBuilder, times(3)).addContactPoint(anyString());
verify(mockClusterBuilder, never()).withCloudSecureConnectBundle(any(java.io.File.class));
}
```
1. If you prefer to avoid the fully-qualified `java.io.File` in the test body, add `import java.io.File;` at the top of the file and change the new line to `withCloudSecureConnectBundle(any(File.class));`.
2. Ensure there is a static import for `never` and `any` if not already present, e.g. `import static org.mockito.Mockito.*;` or the specific static imports you use elsewhere in the test file.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| Cluster cluster = clusterBuilder.build(); | ||
|
|
||
| return new NativeCassandraSession( | ||
| connectorId.toString(), | ||
| extraColumnMetadataCodec, | ||
| new ReopeningCluster(() -> { | ||
| contactPoints.forEach(clusterBuilder::addContactPoint); | ||
| return clusterBuilder.build(); | ||
| }), | ||
| new ReopeningCluster(() -> cluster ), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): ReopeningCluster now always returns the same Cluster instance, which defeats the purpose and risks using a closed Cluster.
With the original code, ReopeningCluster rebuilt a fresh Cluster on each call via clusterBuilder.build(). After this change, the same Cluster instance is always returned. Once that instance is closed, all subsequent "reopens" will still return a closed Cluster, leading to runtime errors.
Keep the Cluster construction inside the supplier so each reopen creates a new instance, e.g.:
return new NativeCassandraSession(
connectorId.toString(),
extraColumnMetadataCodec,
new ReopeningCluster(clusterBuilder::build),
config.getNoHostAvailableRetryTimeout(),
config.isCaseSensitiveNameMatchingEnabled());Any conditional logic around contact points vs. secure bundle can still be handled inside that supplier while ensuring a new Cluster is built each time.
| import static org.mockito.Mockito.*; | ||
|
|
||
|
|
||
| public class CassandraClientModuleTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (testing): Add a test for the case where neither contact points nor Astra bundle are configured
The updated createCassandraSession now throws IllegalArgumentException("empty contactPoints") when astraSecureConnectBundlePath is null and contactPoints is empty, but this path isn’t tested. Please add a test that uses the default empty contactPoints and a null astraSecureConnectBundlePath, and asserts that createCassandraSession throws this exception to cover this configuration gap.
| @Test | ||
| public void testContactPointsOnly() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): Also assert that the Astra bundle is not used when only contact points are configured
In testContactPointsOnly, in addition to verifying the three addContactPoint calls, please also assert that withCloudSecureConnectBundle is never invoked, e.g. verify(mockClusterBuilder, never()).withCloudSecureConnectBundle(any(File.class));. This will explicitly validate the mutual exclusion between contact points and the Astra bundle path.
Suggested implementation:
@Test
public void testContactPointsOnly() {
ccConfig.setContactPoints("127.0.0.1", "127.0.0.2", "127.0.0.3");
CassandraClientModule.createCassandraSession(mockClusterBuilder, ccId, ccConfig, mockExtraColumnMetadataCodec);
verify(mockClusterBuilder, times(3)).addContactPoint(anyString());
verify(mockClusterBuilder, never()).withCloudSecureConnectBundle(any(java.io.File.class));
}- If you prefer to avoid the fully-qualified
java.io.Filein the test body, addimport java.io.File;at the top of the file and change the new line towithCloudSecureConnectBundle(any(File.class));. - Ensure there is a static import for
neverandanyif not already present, e.g.import static org.mockito.Mockito.*;or the specific static imports you use elsewhere in the test file.
98842c5 to
e8cd1b1
Compare
| config.getContactPoints() != null && | ||
| !config.getContactPoints().isEmpty()) , "Contact points and Astra Secure Connect Bundle cannot both be specified!"); | ||
|
|
||
| if( config.getAstraSecureConnectBundlePath() == null ) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#nitpick should this be checking for null or empty or just spaces?
tdcmeehan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for this contribution, I have some comments and questions.
| private ConsistencyLevel consistencyLevel = ConsistencyLevel.ONE; | ||
| private int fetchSize = 5_000; | ||
| private List<String> contactPoints = ImmutableList.of(); | ||
| private String astraSecureConnectBundlePath = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Although not evident in this file, our typical convention is to express this as an Optional and for the default value to be Optional.empty(). See our guidelines on nullable objects: https://github.com/prestodb/presto/blob/master/CONTRIBUTING.md#java
| import static org.mockito.Mockito.*; | ||
|
|
||
|
|
||
| public class CassandraClientModuleTest { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Our convention is Test is first, i.e. TestCassandraClientModule. Also, it's not typical to test Guice modules, as this is more or less configuration. Would it be possible to create an end to end test that proves the configuration is being used?
| <artifactId>mockito-inline</artifactId> | ||
| <version>5.2.0</version> | ||
| <scope>test</scope> | ||
| </dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't use Mockito in this project--is there any other way to test this?
Description
adds a new parameter to facilitate connecting to Astra DB
Motivation and Context
Ability to query data in Astra DB
Impact
none
Test Plan
Unit test: used mocked Cluster.Builder to verify parameter is handled properly.
Functional test (manual): verified ability to query data in Astra DB via presto-cli
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.