31
31
import com .datastax .oss .driver .api .core .config .DriverExecutionProfile ;
32
32
import com .datastax .oss .driver .api .core .connection .FrameTooLongException ;
33
33
import com .datastax .oss .driver .api .core .cql .AsyncResultSet ;
34
+ import com .datastax .oss .driver .api .core .cql .BoundStatement ;
35
+ import com .datastax .oss .driver .api .core .cql .ColumnDefinitions ;
34
36
import com .datastax .oss .driver .api .core .cql .ExecutionInfo ;
37
+ import com .datastax .oss .driver .api .core .cql .PreparedStatement ;
35
38
import com .datastax .oss .driver .api .core .cql .Statement ;
39
+ import com .datastax .oss .driver .api .core .metadata .HostShardPair ;
40
+ import com .datastax .oss .driver .api .core .metadata .KeyspaceTableNamePair ;
36
41
import com .datastax .oss .driver .api .core .metadata .Node ;
42
+ import com .datastax .oss .driver .api .core .metadata .Tablet ;
43
+ import com .datastax .oss .driver .api .core .metadata .TabletMap ;
37
44
import com .datastax .oss .driver .api .core .metadata .TokenMap ;
38
45
import com .datastax .oss .driver .api .core .metadata .token .Partitioner ;
39
46
import com .datastax .oss .driver .api .core .metadata .token .Token ;
58
65
import com .datastax .oss .driver .internal .core .channel .ResponseCallback ;
59
66
import com .datastax .oss .driver .internal .core .context .InternalDriverContext ;
60
67
import com .datastax .oss .driver .internal .core .metadata .DefaultNode ;
68
+ import com .datastax .oss .driver .internal .core .metadata .DefaultTabletMap ;
61
69
import com .datastax .oss .driver .internal .core .metadata .token .DefaultTokenMap ;
70
+ import com .datastax .oss .driver .internal .core .metadata .token .TokenLong64 ;
62
71
import com .datastax .oss .driver .internal .core .metrics .NodeMetricUpdater ;
63
72
import com .datastax .oss .driver .internal .core .metrics .SessionMetricUpdater ;
73
+ import com .datastax .oss .driver .internal .core .protocol .TabletInfo ;
64
74
import com .datastax .oss .driver .internal .core .session .DefaultSession ;
65
75
import com .datastax .oss .driver .internal .core .session .RepreparePayload ;
66
76
import com .datastax .oss .driver .internal .core .tracker .NoopRequestTracker ;
90
100
import java .util .AbstractMap ;
91
101
import java .util .List ;
92
102
import java .util .Map ;
103
+ import java .util .NavigableSet ;
93
104
import java .util .Queue ;
94
105
import java .util .Set ;
106
+ import java .util .UUID ;
95
107
import java .util .concurrent .CancellationException ;
96
108
import java .util .concurrent .CompletableFuture ;
97
109
import java .util .concurrent .CompletionStage ;
@@ -284,6 +296,103 @@ private Token getRoutingToken(Statement statement) {
284
296
return tokenMap == null ? null : ((DefaultTokenMap ) tokenMap ).getTokenFactory ().hash (key );
285
297
}
286
298
299
+ private CqlIdentifier getTabletRoutingKeyspace (Statement statement ) {
300
+ if (statement == null ) {
301
+ if (initialStatement == null ) {
302
+ return null ;
303
+ }
304
+ statement = initialStatement ;
305
+ }
306
+ ColumnDefinitions cdefs = null ;
307
+ CqlIdentifier result = null ;
308
+ if (statement instanceof BoundStatement ) {
309
+ cdefs = ((BoundStatement ) statement ).getPreparedStatement ().getVariableDefinitions ();
310
+ } else if (statement instanceof PreparedStatement ) {
311
+ cdefs = ((PreparedStatement ) statement ).getVariableDefinitions ();
312
+ }
313
+ if (cdefs != null && cdefs .size () > 0 ) {
314
+ result = cdefs .get (0 ).getKeyspace ();
315
+ }
316
+ if (result == null ) {
317
+ return keyspace ;
318
+ } else {
319
+ return result ;
320
+ }
321
+ }
322
+
323
+ private CqlIdentifier getTabletRoutingTable (Statement statement ) {
324
+ if (statement == null ) {
325
+ if (initialStatement == null ) {
326
+ return null ;
327
+ }
328
+ statement = initialStatement ;
329
+ }
330
+ if (statement instanceof BoundStatement ) {
331
+ return ((BoundStatement ) statement ).getPreparedStatement ().getTable ();
332
+ } else if (statement instanceof PreparedStatement ) {
333
+ return ((PreparedStatement ) statement ).getTable ();
334
+ } else {
335
+ return null ;
336
+ }
337
+ }
338
+
339
+ public Integer getShardFromTabletMap (Statement statement , Node node , Token token ) {
340
+ TabletMap tabletMap = context .getMetadataManager ().getMetadata ().getTabletMap ();
341
+ if (!(token instanceof TokenLong64 )) {
342
+ LOG .trace (
343
+ "Token ({}) is not a TokenLong64. Not performing tablet shard lookup for statement {}." ,
344
+ token ,
345
+ statement );
346
+ return null ;
347
+ }
348
+ if (tabletMap == null ) {
349
+ LOG .trace (
350
+ "Could not determine shard for token {} on host {} because tablets metadata is currently null. "
351
+ + "Returning null." ,
352
+ token ,
353
+ node );
354
+ return null ;
355
+ }
356
+ if ((getTabletRoutingKeyspace (statement ) == null && keyspace == null )
357
+ || getTabletRoutingTable (statement ) == null ) {
358
+ return null ;
359
+ }
360
+ UUID targetHostUuid = node .getHostId ();
361
+ long tokenValue = ((TokenLong64 ) token ).getValue ();
362
+ String statementKeyspace =
363
+ getTabletRoutingKeyspace (statement ) != null
364
+ ? getTabletRoutingKeyspace (statement ).asInternal ()
365
+ : keyspace .asInternal ();
366
+ String statementTable = getTabletRoutingTable (statement ).asInternal ();
367
+ KeyspaceTableNamePair key = new KeyspaceTableNamePair (statementKeyspace , statementTable );
368
+ NavigableSet <Tablet > targetTablets = tabletMap .getMapping ().get (key );
369
+ if (targetTablets == null ) {
370
+ LOG .trace (
371
+ "Could not determine shard for token {} on host {} because table {}.{} is not present in tablets "
372
+ + "metadata. Returning null." ,
373
+ token ,
374
+ node ,
375
+ statementKeyspace ,
376
+ statementTable );
377
+ return null ;
378
+ }
379
+ Tablet row = targetTablets .ceiling (DefaultTabletMap .DefaultTablet .malformedTablet (tokenValue ));
380
+ if (row != null && row .getFirstToken () < tokenValue ) {
381
+ for (HostShardPair hostShardPair : row .getReplicas ()) {
382
+ if (hostShardPair .getHost ().equals (targetHostUuid )) {
383
+ return hostShardPair .getShard ();
384
+ }
385
+ }
386
+ }
387
+ LOG .trace (
388
+ "Could not find tablet corresponding to token {} on host {} for table {} in keyspace {}. Returning null." ,
389
+ token ,
390
+ node ,
391
+ statementTable ,
392
+ statementKeyspace );
393
+ return null ;
394
+ }
395
+
287
396
/**
288
397
* Sends the request to the next available node.
289
398
*
@@ -309,9 +418,20 @@ private void sendRequest(
309
418
Node node = retriedNode ;
310
419
DriverChannel channel = null ;
311
420
if (node == null
312
- || (channel = session .getChannel (node , logPrefix , getRoutingToken (statement ))) == null ) {
421
+ || (channel =
422
+ session .getChannel (
423
+ node ,
424
+ logPrefix ,
425
+ getRoutingToken (statement ),
426
+ getShardFromTabletMap (statement , node , getRoutingToken (statement ))))
427
+ == null ) {
313
428
while (!result .isDone () && (node = queryPlan .poll ()) != null ) {
314
- channel = session .getChannel (node , logPrefix , getRoutingToken (statement ));
429
+ channel =
430
+ session .getChannel (
431
+ node ,
432
+ logPrefix ,
433
+ getRoutingToken (statement ),
434
+ getShardFromTabletMap (statement , node , getRoutingToken (statement )));
315
435
if (channel != null ) {
316
436
break ;
317
437
} else {
@@ -420,6 +540,18 @@ private void setFinalResult(
420
540
totalLatencyNanos ,
421
541
TimeUnit .NANOSECONDS );
422
542
}
543
+ if (resultSet .getColumnDefinitions ().size () > 0
544
+ && resultSet
545
+ .getExecutionInfo ()
546
+ .getIncomingPayload ()
547
+ .containsKey (TabletInfo .TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY )) {
548
+ context
549
+ .getMetadataManager ()
550
+ .addTabletsFromPayload (
551
+ resultSet .getColumnDefinitions ().get (0 ).getKeyspace (),
552
+ resultSet .getColumnDefinitions ().get (0 ).getTable (),
553
+ resultSet .getExecutionInfo ().getIncomingPayload ());
554
+ }
423
555
}
424
556
// log the warnings if they have NOT been disabled
425
557
if (!executionInfo .getWarnings ().isEmpty ()
0 commit comments