Skip to content

Commit e0fbffd

Browse files
committed
Tablets support
Introduces basic tablets support for version 4.x of the driver. Metadata about tablets will be kept in TabletMap that gets continuously updated through the tablets-routing-v1 extension. Each time the BoundStatement targets the wrong node and shard combination the server supporting tablets should respond with tablet metadata inside custom payload of its response. This metadata will be transparently processed and used for future queries. Tablets metadata will by enabled by default. Until now driver was using TokenMaps to choose replicas and appropriate shards. Having a token was enough information to do that. Now driver will first attempt tablet-based lookup and only after failing to find corresponding tablet it will defer to TokenMap lookup. Since to find a correct tablet besides the token we need the keyspace and table names, many of the methods were extended to also accept those as parameters. RequestHandlerTestHarness was adjusted to mock also MetadataManager. Before it used to mock only `session.getMetadata()` call but the same can be obtained by `context.getMetadataManager().getMetadata()`. Using the second method was causing test failures.
1 parent 7acacbb commit e0fbffd

File tree

21 files changed

+967
-18
lines changed

21 files changed

+967
-18
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package com.datastax.oss.driver.api.core.metadata;
2+
3+
import com.datastax.oss.driver.api.core.CqlIdentifier;
4+
import edu.umd.cs.findbugs.annotations.NonNull;
5+
import java.util.Objects;
6+
7+
/** Simple keyspace name and table name pair. */
8+
public class KeyspaceTableNamePair {
9+
@NonNull private final CqlIdentifier keyspace;
10+
@NonNull private final CqlIdentifier tableName;
11+
12+
public KeyspaceTableNamePair(@NonNull CqlIdentifier keyspace, @NonNull CqlIdentifier tableName) {
13+
this.keyspace = keyspace;
14+
this.tableName = tableName;
15+
}
16+
17+
@NonNull
18+
public CqlIdentifier getKeyspace() {
19+
return keyspace;
20+
}
21+
22+
@NonNull
23+
public CqlIdentifier getTableName() {
24+
return tableName;
25+
}
26+
27+
@Override
28+
public String toString() {
29+
return "KeyspaceTableNamePair{"
30+
+ "keyspace='"
31+
+ keyspace
32+
+ '\''
33+
+ ", tableName='"
34+
+ tableName
35+
+ '\''
36+
+ '}';
37+
}
38+
39+
@Override
40+
public boolean equals(Object o) {
41+
if (this == o) return true;
42+
if (o == null || !(o instanceof KeyspaceTableNamePair)) return false;
43+
KeyspaceTableNamePair that = (KeyspaceTableNamePair) o;
44+
return keyspace.equals(that.keyspace) && tableName.equals(that.tableName);
45+
}
46+
47+
@Override
48+
public int hashCode() {
49+
return Objects.hash(keyspace, tableName);
50+
}
51+
}

Diff for: core/src/main/java/com/datastax/oss/driver/api/core/metadata/Metadata.java

+8
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,14 @@ default Optional<KeyspaceMetadata> getKeyspace(@NonNull String keyspaceName) {
115115
@NonNull
116116
Optional<TokenMap> getTokenMap();
117117

118+
/**
119+
* The tablet map for this cluster.
120+
*
121+
* <p>Starts as an empty map that will gradually receive updates on each query of a yet unknown
122+
* tablet.
123+
*/
124+
TabletMap getTabletMap();
125+
118126
/**
119127
* The cluster name to which this session is connected. The Optional returned should contain the
120128
* value from the server for <b>system.local.cluster_name</b>.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.datastax.oss.driver.api.core.metadata;
2+
3+
import com.datastax.oss.driver.shaded.guava.common.annotations.Beta;
4+
import java.util.Set;
5+
6+
/**
7+
* Represents a tablet as described in tablets-routing-v1 protocol extension with some additional
8+
* fields for ease of use.
9+
*/
10+
@Beta
11+
public interface Tablet extends Comparable<Tablet> {
12+
/**
13+
* Returns left endpoint of an interval. This interval is left-open, meaning the tablet does not
14+
* own the token equal to the first token.
15+
*
16+
* @return {@code long} value representing first token.
17+
*/
18+
public long getFirstToken();
19+
20+
/**
21+
* Returns right endpoint of an interval. This interval is right-closed, which means that last
22+
* token is owned by this tablet.
23+
*
24+
* @return {@code long} value representing last token.
25+
*/
26+
public long getLastToken();
27+
28+
public Set<Node> getReplicaNodes();
29+
30+
/**
31+
* Looks up the shard number for specific replica Node.
32+
*
33+
* @param node one of the replica nodes of this tablet.
34+
* @return Shard number for the replica or -1 if no such Node found.
35+
*/
36+
public int getShardForNode(Node node);
37+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package com.datastax.oss.driver.api.core.metadata;
2+
3+
import com.datastax.oss.driver.api.core.CqlIdentifier;
4+
import com.datastax.oss.driver.shaded.guava.common.annotations.Beta;
5+
import java.util.concurrent.ConcurrentMap;
6+
import java.util.concurrent.ConcurrentSkipListSet;
7+
8+
/** Holds all currently known tablet metadata. */
9+
@Beta
10+
public interface TabletMap {
11+
/**
12+
* Returns mapping from tables to the sets of their tablets.
13+
*
14+
* @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type.
15+
*/
16+
public ConcurrentMap<KeyspaceTableNamePair, ConcurrentSkipListSet<Tablet>> getMapping();
17+
18+
/**
19+
* Adds a single tablet to the map. Handles removal of overlapping tablets.
20+
*
21+
* @param keyspace target keyspace
22+
* @param table target table
23+
* @param tablet tablet instance to add
24+
*/
25+
public void addTablet(CqlIdentifier keyspace, CqlIdentifier table, Tablet tablet);
26+
27+
/**
28+
* Returns {@link Tablet} instance
29+
*
30+
* @param keyspace tablet's keyspace
31+
* @param table tablet's table
32+
* @param token target token
33+
* @return {@link Tablet} responsible for provided token or {@code null} if no such tablet is
34+
* present.
35+
*/
36+
public Tablet getTablet(CqlIdentifier keyspace, CqlIdentifier table, long token);
37+
}

Diff for: core/src/main/java/com/datastax/oss/driver/api/core/session/Request.java

+11
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,17 @@ public interface Request {
9595
@Nullable
9696
CqlIdentifier getRoutingKeyspace();
9797

98+
/**
99+
* The table to use for tablet-aware routing. Infers the table from available ColumnDefinitions or
100+
* {@code null} if it is not possible.
101+
*
102+
* @return
103+
*/
104+
@Nullable
105+
default CqlIdentifier getRoutingTable() {
106+
return null;
107+
}
108+
98109
/**
99110
* The partition key to use for token-aware routing.
100111
*

Diff for: core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java

+6
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder;
4444
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo;
4545
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo.ConnectionShardingInfo;
46+
import com.datastax.oss.driver.internal.core.protocol.TabletInfo;
4647
import com.datastax.oss.driver.internal.core.util.ProtocolUtils;
4748
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
4849
import com.datastax.oss.protocol.internal.Message;
@@ -94,6 +95,7 @@ class ProtocolInitHandler extends ConnectInitHandler {
9495
private ChannelHandlerContext ctx;
9596
private final boolean querySupportedOptions;
9697
private LwtInfo lwtInfo;
98+
private TabletInfo tabletInfo;
9799

98100
/**
99101
* @param querySupportedOptions whether to send OPTIONS as the first message, to request which
@@ -191,6 +193,9 @@ Message getRequest() {
191193
if (lwtInfo != null) {
192194
lwtInfo.addOption(startupOptions);
193195
}
196+
if (tabletInfo != null && tabletInfo.isEnabled()) {
197+
TabletInfo.addOption(startupOptions);
198+
}
194199
return request = new Startup(startupOptions);
195200
case GET_CLUSTER_NAME:
196201
return request = CLUSTER_NAME_QUERY;
@@ -230,6 +235,7 @@ void onResponse(Message response) {
230235
if (lwtInfo != null) {
231236
channel.attr(LWT_INFO_KEY).set(lwtInfo);
232237
}
238+
tabletInfo = TabletInfo.parseTabletInfo(res.options);
233239
step = Step.STARTUP;
234240
send();
235241
} else if (step == Step.STARTUP && response instanceof Ready) {

Diff for: core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java

+74-2
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@
3434
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
3535
import com.datastax.oss.driver.api.core.cql.Statement;
3636
import com.datastax.oss.driver.api.core.metadata.Node;
37+
import com.datastax.oss.driver.api.core.metadata.Tablet;
38+
import com.datastax.oss.driver.api.core.metadata.TabletMap;
3739
import com.datastax.oss.driver.api.core.metadata.TokenMap;
3840
import com.datastax.oss.driver.api.core.metadata.token.Partitioner;
3941
import com.datastax.oss.driver.api.core.metadata.token.Token;
@@ -59,8 +61,10 @@
5961
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
6062
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
6163
import com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenMap;
64+
import com.datastax.oss.driver.internal.core.metadata.token.TokenLong64;
6265
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
6366
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
67+
import com.datastax.oss.driver.internal.core.protocol.TabletInfo;
6468
import com.datastax.oss.driver.internal.core.session.DefaultSession;
6569
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
6670
import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker;
@@ -284,6 +288,51 @@ private Token getRoutingToken(Statement statement) {
284288
return tokenMap == null ? null : ((DefaultTokenMap) tokenMap).getTokenFactory().hash(key);
285289
}
286290

291+
public Integer getShardFromTabletMap(Statement statement, Node node, Token token) {
292+
TabletMap tabletMap = context.getMetadataManager().getMetadata().getTabletMap();
293+
if (!(token instanceof TokenLong64)) {
294+
LOG.trace(
295+
"Token ({}) is not a TokenLong64. Not performing tablet shard lookup for statement {}.",
296+
token,
297+
statement);
298+
return null;
299+
}
300+
CqlIdentifier statementKeyspace = statement.getKeyspace();
301+
if (statementKeyspace == null) {
302+
statementKeyspace = statement.getRoutingKeyspace();
303+
}
304+
if (statementKeyspace == null) {
305+
statementKeyspace = this.keyspace;
306+
}
307+
CqlIdentifier statementTable = statement.getRoutingTable();
308+
if (statementKeyspace == null || statementTable == null) {
309+
return null;
310+
}
311+
long tokenValue = ((TokenLong64) token).getValue();
312+
313+
Tablet targetTablet = tabletMap.getTablet(statementKeyspace, statementTable, tokenValue);
314+
if (targetTablet == null) {
315+
LOG.trace(
316+
"Could not determine shard for token {} and table {}.{} on Node {}: Could not find corresponding tablet, returning null.",
317+
token,
318+
statementKeyspace,
319+
statementTable,
320+
node);
321+
return null;
322+
}
323+
int shard = targetTablet.getShardForNode(node);
324+
if (shard == -1) {
325+
LOG.trace(
326+
"Could not find shard corresponding to token {} and Node {} for table {} in keyspace {}. Returning null.",
327+
token,
328+
node,
329+
statementTable,
330+
statementKeyspace);
331+
return null;
332+
}
333+
return shard;
334+
}
335+
287336
/**
288337
* Sends the request to the next available node.
289338
*
@@ -309,9 +358,20 @@ private void sendRequest(
309358
Node node = retriedNode;
310359
DriverChannel channel = null;
311360
if (node == null
312-
|| (channel = session.getChannel(node, logPrefix, getRoutingToken(statement))) == null) {
361+
|| (channel =
362+
session.getChannel(
363+
node,
364+
logPrefix,
365+
getRoutingToken(statement),
366+
getShardFromTabletMap(statement, node, getRoutingToken(statement))))
367+
== null) {
313368
while (!result.isDone() && (node = queryPlan.poll()) != null) {
314-
channel = session.getChannel(node, logPrefix, getRoutingToken(statement));
369+
channel =
370+
session.getChannel(
371+
node,
372+
logPrefix,
373+
getRoutingToken(statement),
374+
getShardFromTabletMap(statement, node, getRoutingToken(statement)));
315375
if (channel != null) {
316376
break;
317377
} else {
@@ -420,6 +480,18 @@ private void setFinalResult(
420480
totalLatencyNanos,
421481
TimeUnit.NANOSECONDS);
422482
}
483+
if (resultSet.getColumnDefinitions().size() > 0
484+
&& resultSet
485+
.getExecutionInfo()
486+
.getIncomingPayload()
487+
.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) {
488+
context
489+
.getMetadataManager()
490+
.addTabletFromPayload(
491+
resultSet.getColumnDefinitions().get(0).getKeyspace(),
492+
resultSet.getColumnDefinitions().get(0).getTable(),
493+
resultSet.getExecutionInfo().getIncomingPayload());
494+
}
423495
}
424496
// log the warnings if they have NOT been disabled
425497
if (!executionInfo.getWarnings().isEmpty()

Diff for: core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java

+11
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,17 @@ public CqlIdentifier getRoutingKeyspace() {
472472
return null;
473473
}
474474

475+
@Override
476+
public CqlIdentifier getRoutingTable() {
477+
for (BatchableStatement<?> statement : statements) {
478+
CqlIdentifier ks = statement.getRoutingTable();
479+
if (ks != null) {
480+
return ks;
481+
}
482+
}
483+
return null;
484+
}
485+
475486
@NonNull
476487
@Override
477488
public BatchStatement setRoutingKeyspace(CqlIdentifier newRoutingKeyspace) {

Diff for: core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBoundStatement.java

+6
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,12 @@ public CqlIdentifier getRoutingKeyspace() {
301301
}
302302
}
303303

304+
@Override
305+
public CqlIdentifier getRoutingTable() {
306+
ColumnDefinitions definitions = preparedStatement.getVariableDefinitions();
307+
return (definitions.size() == 0) ? null : definitions.get(0).getTable();
308+
}
309+
304310
@NonNull
305311
@Override
306312
public BoundStatement setRoutingKeyspace(@Nullable CqlIdentifier newRoutingKeyspace) {

0 commit comments

Comments
 (0)