Skip to content

Commit c24d841

Browse files
committed
Tablets support
Introduces tablets support for version 4.x of the driver. Metadata about tablets will be kept in TabletMap that gets continuously updated through the tablets-routing-v1 extension. Each time the PreparedStatement or BoundStatement targets the wrong node and shard combination the server supporting tablets should respond with tablet metadata inside custom payload of its response. This metadata will be transparently processed and used for future queries. Tablets metadata will by enabled by default. Until now driver was using TokenMaps to choose replicas and appropriate shards. Having a token was enough information to do that. Now driver will first attempt tablet-based lookup and only after failing to find corresponding tablet it will defer to TokenMap lookup. Since to find a correct tablet besides the token we need the keyspace and table names, many of the methods were extended to also accept those as parameters. RequestHandlerTestHarness was adjusted to mock also MetadataManager. Before it used to mock only `session.getMetadata()` call but the same can be obtained by `context.getMetadataManager().getMetadata()`. Using the second method was causing test failures.
1 parent e561a2d commit c24d841

File tree

19 files changed

+903
-13
lines changed

19 files changed

+903
-13
lines changed

Diff for: core/src/main/java/com/datastax/oss/driver/api/core/cql/PreparedStatement.java

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
*/
2222
package com.datastax.oss.driver.api.core.cql;
2323

24+
import com.datastax.oss.driver.api.core.CqlIdentifier;
2425
import com.datastax.oss.driver.api.core.CqlSession;
2526
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
2627
import com.datastax.oss.driver.api.core.metadata.token.Partitioner;
@@ -58,6 +59,10 @@ public interface PreparedStatement {
5859
@NonNull
5960
ColumnDefinitions getVariableDefinitions();
6061

62+
/** Table name inferred from {@link PreparedStatement#getVariableDefinitions()}. */
63+
@Nullable
64+
CqlIdentifier getTable();
65+
6166
/**
6267
* The partitioner to use for token-aware routing. If {@code null}, the cluster-wide partitioner
6368
* will be used.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package com.datastax.oss.driver.api.core.metadata;
2+
3+
import java.util.UUID;
4+
5+
/**
6+
* Simple class to hold UUID of a host and a shard number. Class itself makes no checks or
7+
* guarantees about existence of a shard on corresponding host.
8+
*/
9+
public class HostShardPair {
10+
private final UUID host;
11+
private final int shard;
12+
13+
public HostShardPair(UUID host, int shard) {
14+
this.host = host;
15+
this.shard = shard;
16+
}
17+
18+
public UUID getHost() {
19+
return host;
20+
}
21+
22+
public int getShard() {
23+
return shard;
24+
}
25+
26+
@Override
27+
public String toString() {
28+
return "HostShardPair{" + "host=" + host + ", shard=" + shard + '}';
29+
}
30+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package com.datastax.oss.driver.api.core.metadata;
2+
3+
import java.util.Objects;
4+
5+
/** Simple keyspace name and table name pair. */
6+
public class KeyspaceTableNamePair {
7+
private final String keyspace;
8+
private final String tableName;
9+
10+
public KeyspaceTableNamePair(String keyspace, String tableName) {
11+
this.keyspace = keyspace;
12+
this.tableName = tableName;
13+
}
14+
15+
public String getKeyspace() {
16+
return keyspace;
17+
}
18+
19+
public String getTableName() {
20+
return tableName;
21+
}
22+
23+
@Override
24+
public String toString() {
25+
return "KeyspaceTableNamePair{"
26+
+ "keyspace='"
27+
+ keyspace
28+
+ '\''
29+
+ ", tableName='"
30+
+ tableName
31+
+ '\''
32+
+ '}';
33+
}
34+
35+
@Override
36+
public boolean equals(Object o) {
37+
if (this == o) return true;
38+
if (o == null || !(o instanceof KeyspaceTableNamePair)) return false;
39+
KeyspaceTableNamePair that = (KeyspaceTableNamePair) o;
40+
return keyspace.equals(that.keyspace) && tableName.equals(that.tableName);
41+
}
42+
43+
@Override
44+
public int hashCode() {
45+
return Objects.hash(keyspace, tableName);
46+
}
47+
}

Diff for: core/src/main/java/com/datastax/oss/driver/api/core/metadata/Metadata.java

+8
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,14 @@ default Optional<KeyspaceMetadata> getKeyspace(@NonNull String keyspaceName) {
115115
@NonNull
116116
Optional<TokenMap> getTokenMap();
117117

118+
/**
119+
* The tablet map for this cluster.
120+
*
121+
* <p>Starts as an empty map that will gradually receive updates on each query of a yet unknown
122+
* tablet.
123+
*/
124+
TabletMap getTabletMap();
125+
118126
/**
119127
* The cluster name to which this session is connected. The Optional returned should contain the
120128
* value from the server for <b>system.local.cluster_name</b>.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package com.datastax.oss.driver.api.core.metadata;
2+
3+
import java.util.Set;
4+
import java.util.UUID;
5+
6+
/** Represents a tablet as described in tablets-routing-v1 protocol extension. */
7+
public interface Tablet extends Comparable<Tablet> {
8+
public String getKeyspaceName();
9+
10+
public UUID getTableId();
11+
12+
public String getTableName();
13+
14+
public long getFirstToken();
15+
16+
public long getLastToken();
17+
18+
public Set<HostShardPair> getReplicas();
19+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package com.datastax.oss.driver.api.core.metadata;
2+
3+
import com.datastax.oss.driver.api.core.data.TupleValue;
4+
import java.util.Map;
5+
import java.util.NavigableSet;
6+
import java.util.Set;
7+
import java.util.UUID;
8+
9+
/** Holds all currently known tablet metadata. */
10+
public interface TabletMap {
11+
/**
12+
* Returns mapping from tables to the sets of their tablets.
13+
*
14+
* @return the Map keyed by (keyspace,table) pairs with Set of tablets as value type.
15+
*/
16+
public Map<KeyspaceTableNamePair, NavigableSet<Tablet>> getMapping();
17+
18+
/**
19+
* Adds a singular tablet to the map. Provided tuple has to match format described for
20+
* tablets-routing-v1 described in Scylla protocol extensions.
21+
*
22+
* @param keyspace tablet's keyspace
23+
* @param table tablet's table
24+
* @param tupleValue tablet
25+
*/
26+
public void addTablet(String keyspace, String table, TupleValue tupleValue);
27+
28+
/**
29+
* Returns UUIDs of replicas holding tablets for given token. May contain stale data that
30+
* eventually will be updated. To get actual Node instances from UUIDs see {@link Metadata#getNodes()}.
31+
*
32+
* @param keyspace tablet's keyspace
33+
* @param table tablet's table
34+
* @param token target token
35+
* @return Set of {@link Node} that holds target tablets.
36+
*/
37+
public Set<UUID> getReplicas(String keyspace, String table, long token);
38+
}

Diff for: core/src/main/java/com/datastax/oss/driver/internal/core/channel/ProtocolInitHandler.java

+6
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import com.datastax.oss.driver.internal.core.protocol.SegmentToFrameDecoder;
4444
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo;
4545
import com.datastax.oss.driver.internal.core.protocol.ShardingInfo.ConnectionShardingInfo;
46+
import com.datastax.oss.driver.internal.core.protocol.TabletInfo;
4647
import com.datastax.oss.driver.internal.core.util.ProtocolUtils;
4748
import com.datastax.oss.driver.internal.core.util.concurrent.UncaughtExceptions;
4849
import com.datastax.oss.protocol.internal.Message;
@@ -94,6 +95,7 @@ class ProtocolInitHandler extends ConnectInitHandler {
9495
private ChannelHandlerContext ctx;
9596
private final boolean querySupportedOptions;
9697
private LwtInfo lwtInfo;
98+
private TabletInfo tabletInfo;
9799

98100
/**
99101
* @param querySupportedOptions whether to send OPTIONS as the first message, to request which
@@ -191,6 +193,9 @@ Message getRequest() {
191193
if (lwtInfo != null) {
192194
lwtInfo.addOption(startupOptions);
193195
}
196+
if (tabletInfo != null && tabletInfo.isEnabled()) {
197+
TabletInfo.addOption(startupOptions);
198+
}
194199
return request = new Startup(startupOptions);
195200
case GET_CLUSTER_NAME:
196201
return request = CLUSTER_NAME_QUERY;
@@ -230,6 +235,7 @@ void onResponse(Message response) {
230235
if (lwtInfo != null) {
231236
channel.attr(LWT_INFO_KEY).set(lwtInfo);
232237
}
238+
tabletInfo = TabletInfo.parseTabletInfo(res.options);
233239
step = Step.STARTUP;
234240
send();
235241
} else if (step == Step.STARTUP && response instanceof Ready) {

Diff for: core/src/main/java/com/datastax/oss/driver/internal/core/cql/CqlRequestHandler.java

+134-2
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,16 @@
3131
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
3232
import com.datastax.oss.driver.api.core.connection.FrameTooLongException;
3333
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
34+
import com.datastax.oss.driver.api.core.cql.BoundStatement;
35+
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
3436
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
37+
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
3538
import com.datastax.oss.driver.api.core.cql.Statement;
39+
import com.datastax.oss.driver.api.core.metadata.HostShardPair;
40+
import com.datastax.oss.driver.api.core.metadata.KeyspaceTableNamePair;
3641
import com.datastax.oss.driver.api.core.metadata.Node;
42+
import com.datastax.oss.driver.api.core.metadata.Tablet;
43+
import com.datastax.oss.driver.api.core.metadata.TabletMap;
3744
import com.datastax.oss.driver.api.core.metadata.TokenMap;
3845
import com.datastax.oss.driver.api.core.metadata.token.Partitioner;
3946
import com.datastax.oss.driver.api.core.metadata.token.Token;
@@ -58,9 +65,12 @@
5865
import com.datastax.oss.driver.internal.core.channel.ResponseCallback;
5966
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
6067
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
68+
import com.datastax.oss.driver.internal.core.metadata.DefaultTabletMap;
6169
import com.datastax.oss.driver.internal.core.metadata.token.DefaultTokenMap;
70+
import com.datastax.oss.driver.internal.core.metadata.token.TokenLong64;
6271
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
6372
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
73+
import com.datastax.oss.driver.internal.core.protocol.TabletInfo;
6474
import com.datastax.oss.driver.internal.core.session.DefaultSession;
6575
import com.datastax.oss.driver.internal.core.session.RepreparePayload;
6676
import com.datastax.oss.driver.internal.core.tracker.NoopRequestTracker;
@@ -90,8 +100,10 @@
90100
import java.util.AbstractMap;
91101
import java.util.List;
92102
import java.util.Map;
103+
import java.util.NavigableSet;
93104
import java.util.Queue;
94105
import java.util.Set;
106+
import java.util.UUID;
95107
import java.util.concurrent.CancellationException;
96108
import java.util.concurrent.CompletableFuture;
97109
import java.util.concurrent.CompletionStage;
@@ -284,6 +296,103 @@ private Token getRoutingToken(Statement statement) {
284296
return tokenMap == null ? null : ((DefaultTokenMap) tokenMap).getTokenFactory().hash(key);
285297
}
286298

299+
private CqlIdentifier getTabletRoutingKeyspace(Statement statement) {
300+
if (statement == null) {
301+
if (initialStatement == null) {
302+
return null;
303+
}
304+
statement = initialStatement;
305+
}
306+
ColumnDefinitions cdefs = null;
307+
CqlIdentifier result = null;
308+
if (statement instanceof BoundStatement) {
309+
cdefs = ((BoundStatement) statement).getPreparedStatement().getVariableDefinitions();
310+
} else if (statement instanceof PreparedStatement) {
311+
cdefs = ((PreparedStatement) statement).getVariableDefinitions();
312+
}
313+
if (cdefs != null && cdefs.size() > 0) {
314+
result = cdefs.get(0).getKeyspace();
315+
}
316+
if (result == null) {
317+
return keyspace;
318+
} else {
319+
return result;
320+
}
321+
}
322+
323+
private CqlIdentifier getTabletRoutingTable(Statement statement) {
324+
if (statement == null) {
325+
if (initialStatement == null) {
326+
return null;
327+
}
328+
statement = initialStatement;
329+
}
330+
if (statement instanceof BoundStatement) {
331+
return ((BoundStatement) statement).getPreparedStatement().getTable();
332+
} else if (statement instanceof PreparedStatement) {
333+
return ((PreparedStatement) statement).getTable();
334+
} else {
335+
return null;
336+
}
337+
}
338+
339+
public Integer getShardFromTabletMap(Statement statement, Node node, Token token) {
340+
TabletMap tabletMap = context.getMetadataManager().getMetadata().getTabletMap();
341+
if (!(token instanceof TokenLong64)) {
342+
LOG.trace(
343+
"Token ({}) is not a TokenLong64. Not performing tablet shard lookup for statement {}.",
344+
token,
345+
statement);
346+
return null;
347+
}
348+
if (tabletMap == null) {
349+
LOG.trace(
350+
"Could not determine shard for token {} on host {} because tablets metadata is currently null. "
351+
+ "Returning null.",
352+
token,
353+
node);
354+
return null;
355+
}
356+
if ((getTabletRoutingKeyspace(statement) == null && keyspace == null)
357+
|| getTabletRoutingTable(statement) == null) {
358+
return null;
359+
}
360+
UUID targetHostUuid = node.getHostId();
361+
long tokenValue = ((TokenLong64) token).getValue();
362+
String statementKeyspace =
363+
getTabletRoutingKeyspace(statement) != null
364+
? getTabletRoutingKeyspace(statement).asInternal()
365+
: keyspace.asInternal();
366+
String statementTable = getTabletRoutingTable(statement).asInternal();
367+
KeyspaceTableNamePair key = new KeyspaceTableNamePair(statementKeyspace, statementTable);
368+
NavigableSet<Tablet> targetTablets = tabletMap.getMapping().get(key);
369+
if (targetTablets == null) {
370+
LOG.trace(
371+
"Could not determine shard for token {} on host {} because table {}.{} is not present in tablets "
372+
+ "metadata. Returning null.",
373+
token,
374+
node,
375+
statementKeyspace,
376+
statementTable);
377+
return null;
378+
}
379+
Tablet row = targetTablets.ceiling(DefaultTabletMap.DefaultTablet.malformedTablet(tokenValue));
380+
if (row != null && row.getFirstToken() < tokenValue) {
381+
for (HostShardPair hostShardPair : row.getReplicas()) {
382+
if (hostShardPair.getHost().equals(targetHostUuid)) {
383+
return hostShardPair.getShard();
384+
}
385+
}
386+
}
387+
LOG.trace(
388+
"Could not find tablet corresponding to token {} on host {} for table {} in keyspace {}. Returning null.",
389+
token,
390+
node,
391+
statementTable,
392+
statementKeyspace);
393+
return null;
394+
}
395+
287396
/**
288397
* Sends the request to the next available node.
289398
*
@@ -309,9 +418,20 @@ private void sendRequest(
309418
Node node = retriedNode;
310419
DriverChannel channel = null;
311420
if (node == null
312-
|| (channel = session.getChannel(node, logPrefix, getRoutingToken(statement))) == null) {
421+
|| (channel =
422+
session.getChannel(
423+
node,
424+
logPrefix,
425+
getRoutingToken(statement),
426+
getShardFromTabletMap(statement, node, getRoutingToken(statement))))
427+
== null) {
313428
while (!result.isDone() && (node = queryPlan.poll()) != null) {
314-
channel = session.getChannel(node, logPrefix, getRoutingToken(statement));
429+
channel =
430+
session.getChannel(
431+
node,
432+
logPrefix,
433+
getRoutingToken(statement),
434+
getShardFromTabletMap(statement, node, getRoutingToken(statement)));
315435
if (channel != null) {
316436
break;
317437
} else {
@@ -420,6 +540,18 @@ private void setFinalResult(
420540
totalLatencyNanos,
421541
TimeUnit.NANOSECONDS);
422542
}
543+
if (resultSet.getColumnDefinitions().size() > 0
544+
&& resultSet
545+
.getExecutionInfo()
546+
.getIncomingPayload()
547+
.containsKey(TabletInfo.TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY)) {
548+
context
549+
.getMetadataManager()
550+
.addTabletsFromPayload(
551+
resultSet.getColumnDefinitions().get(0).getKeyspace(),
552+
resultSet.getColumnDefinitions().get(0).getTable(),
553+
resultSet.getExecutionInfo().getIncomingPayload());
554+
}
423555
}
424556
// log the warnings if they have NOT been disabled
425557
if (!executionInfo.getWarnings().isEmpty()

0 commit comments

Comments
 (0)