6
6
import com .datastax .oss .driver .api .core .cql .PreparedStatement ;
7
7
import com .datastax .oss .driver .api .core .cql .ResultSet ;
8
8
import com .datastax .oss .driver .api .core .cql .SimpleStatement ;
9
+ import com .datastax .oss .driver .api .core .cql .Statement ;
9
10
import com .datastax .oss .driver .api .core .metadata .KeyspaceTableNamePair ;
11
+ import com .datastax .oss .driver .api .core .metadata .Node ;
10
12
import com .datastax .oss .driver .api .core .metadata .Tablet ;
11
13
import com .datastax .oss .driver .api .testinfra .CassandraSkip ;
12
14
import com .datastax .oss .driver .api .testinfra .ScyllaRequirement ;
13
15
import com .datastax .oss .driver .api .testinfra .ccm .CustomCcmRule ;
14
16
import com .datastax .oss .driver .api .testinfra .session .SessionRule ;
15
17
import com .datastax .oss .driver .api .testinfra .session .SessionUtils ;
16
18
import com .datastax .oss .driver .internal .core .protocol .TabletInfo ;
19
+
17
20
import java .nio .ByteBuffer ;
18
21
import java .time .Duration ;
22
+ import java .util .ArrayList ;
23
+ import java .util .HashMap ;
24
+ import java .util .List ;
19
25
import java .util .Map ;
20
26
import java .util .Set ;
21
27
import java .util .concurrent .ConcurrentMap ;
22
28
import java .util .concurrent .ConcurrentSkipListSet ;
29
+ import java .util .function .Function ;
30
+ import java .util .function .Supplier ;
31
+
23
32
import org .junit .Assert ;
33
+ import org .junit .BeforeClass ;
24
34
import org .junit .ClassRule ;
25
35
import org .junit .Test ;
26
36
import org .junit .rules .RuleChain ;
@@ -55,9 +65,9 @@ public class DefaultMetadataTabletMapIT {
55
65
private static final int INITIAL_TABLETS = 32 ;
56
66
private static final int QUERIES = 1600 ;
57
67
private static final int REPLICATION_FACTOR = 2 ;
58
- private static String KEYSPACE_NAME = "tabletsTest" ;
59
- private static String TABLE_NAME = "tabletsTable" ;
60
- private static String CREATE_KEYSPACE_QUERY =
68
+ private static final String KEYSPACE_NAME = "tabletsTest" ;
69
+ private static final String TABLE_NAME = "tabletsTable" ;
70
+ private static final String CREATE_KEYSPACE_QUERY =
61
71
"CREATE KEYSPACE IF NOT EXISTS "
62
72
+ KEYSPACE_NAME
63
73
+ " WITH replication = {'class': "
@@ -68,49 +78,208 @@ public class DefaultMetadataTabletMapIT {
68
78
+ "{'initial': "
69
79
+ INITIAL_TABLETS
70
80
+ "};" ;
71
- private static String CREATE_TABLE_QUERY =
81
+ private static final String CREATE_TABLE_QUERY =
72
82
"CREATE TABLE IF NOT EXISTS "
73
83
+ KEYSPACE_NAME
74
84
+ "."
75
85
+ TABLE_NAME
76
- + " (pk int, ck int, PRIMARY KEY(pk, ck));" ;
86
+ + " (pk int, ck int, val int, PRIMARY KEY(pk, ck));" ;
77
87
78
- @ Test
79
- public void should_receive_each_tablet_exactly_once () {
80
- CqlSession session = SESSION_RULE .session ();
88
+ private static final SimpleStatement STMT_INSERT = SimpleStatement .builder (
89
+ "INSERT INTO "
90
+ + KEYSPACE_NAME
91
+ + "."
92
+ + TABLE_NAME
93
+ + " (pk,ck) VALUES (?,?);" )
94
+ .setTracing (true )
95
+ .build ();
81
96
82
- session .execute (CREATE_KEYSPACE_QUERY );
83
- session .execute (CREATE_TABLE_QUERY );
97
+ private static final SimpleStatement STMT_INSERT_NO_KS = SimpleStatement .builder (
98
+ "INSERT INTO "
99
+ + TABLE_NAME
100
+ + " (pk,ck) VALUES (?,?);" )
101
+ .setTracing (true )
102
+ .build ();
84
103
85
- for (int i = 1 ; i <= QUERIES ; i ++) {
86
- session .execute (
104
+ private static final SimpleStatement STMT_INSERT_CONCRETE = SimpleStatement .builder (
87
105
"INSERT INTO "
88
106
+ KEYSPACE_NAME
89
107
+ "."
90
108
+ TABLE_NAME
91
- + " (pk,ck) VALUES ("
92
- + i
93
- + ","
94
- + i
95
- + ");" );
109
+ + " (pk,ck) VALUES (1,1);" )
110
+ .setTracing (true )
111
+ .build ();
112
+
113
+ private static final SimpleStatement STMT_SELECT = SimpleStatement .builder (
114
+ "SELECT pk,ck FROM "
115
+ + KEYSPACE_NAME
116
+ + "."
117
+ + TABLE_NAME
118
+ + " WHERE pk = ? AND ck = ?" )
119
+ .setTracing (true )
120
+ .build ();
121
+
122
+ private static final SimpleStatement STMT_SELECT_NO_KS = SimpleStatement .builder (
123
+ "SELECT pk,ck FROM "
124
+ + TABLE_NAME
125
+ + " WHERE pk = ? AND ck = ?" )
126
+ .setTracing (true )
127
+ .build ();
128
+
129
+ private static final SimpleStatement STMT_SELECT_CONCRETE = SimpleStatement .builder (
130
+ "SELECT pk,ck FROM "
131
+ + KEYSPACE_NAME
132
+ + "."
133
+ + TABLE_NAME
134
+ + " WHERE pk = 1 AND ck = 1" )
135
+ .setTracing (true )
136
+ .build ();
137
+
138
+ private static final SimpleStatement STMT_UPDATE = SimpleStatement .builder (
139
+ "UPDATE "
140
+ + KEYSPACE_NAME
141
+ + "."
142
+ + TABLE_NAME
143
+ + " SET val = 1"
144
+ + " WHERE pk = ? AND ck = ?" )
145
+ .setTracing (true )
146
+ .build ();
147
+
148
+ private static final SimpleStatement STMT_UPDATE_NO_KS = SimpleStatement .builder (
149
+ "UPDATE "
150
+ + TABLE_NAME
151
+ + " SET val = 1"
152
+ + " WHERE pk = ? AND ck = ?" )
153
+ .setTracing (true )
154
+ .build ();
155
+
156
+ private static final SimpleStatement STMT_UPDATE_CONCRETE = SimpleStatement .builder (
157
+ "UPDATE "
158
+ + KEYSPACE_NAME
159
+ + "."
160
+ + TABLE_NAME
161
+ + " SET val = 1"
162
+ + " WHERE pk = 1 AND ck = 1" )
163
+ .setTracing (true )
164
+ .build ();
165
+
166
+ private static final SimpleStatement STMT_DELETE = SimpleStatement .builder (
167
+ "DELETE FROM "
168
+ + KEYSPACE_NAME
169
+ + "."
170
+ + TABLE_NAME
171
+ + " WHERE pk = ? AND ck = ?" )
172
+ .setTracing (true )
173
+ .build ();
174
+
175
+ private static final SimpleStatement STMT_DELETE_NO_KS = SimpleStatement .builder (
176
+ "DELETE FROM "
177
+ + TABLE_NAME
178
+ + " WHERE pk = ? AND ck = ?" )
179
+ .setTracing (true )
180
+ .build ();
181
+
182
+ private static final SimpleStatement STMT_DELETE_CONCRETE = SimpleStatement .builder (
183
+ "DELETE FROM "
184
+ + KEYSPACE_NAME
185
+ + "."
186
+ + TABLE_NAME
187
+ + " WHERE pk = 1 AND ck = 1" )
188
+ .setTracing (true )
189
+ .build ();
190
+
191
+ @ BeforeClass
192
+ public static void setup () {
193
+ CqlSession session = SESSION_RULE .session ();
194
+ session .execute (CREATE_KEYSPACE_QUERY );
195
+ session .execute (CREATE_TABLE_QUERY );
196
+ }
197
+
198
+ @ Test
199
+ public void every_statement_should_get_tablet_info () {
200
+ Map <String , Supplier <CqlSession >> sessions = new HashMap <>();
201
+ sessions .put ("REGULAR" , () -> CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ());
202
+ sessions .put ("WITH_KEYSPACE" , () -> CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).withKeyspace (KEYSPACE_NAME ).build ());
203
+ sessions .put ("USE_KEYSPACE" , () -> {
204
+ CqlSession s = CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ();
205
+ s .execute ("USE " + KEYSPACE_NAME );
206
+ return s ;
207
+ });
208
+
209
+ Map <String , Function <CqlSession , Statement >> statements = new HashMap <>();
210
+ statements .put ("SELECT_CONCRETE" , s -> STMT_SELECT_CONCRETE );
211
+ statements .put ("SELECT_PREPARED" , s -> s .prepare (STMT_SELECT ).bind (2 , 2 ));
212
+ statements .put ("SELECT_NO_KS_PREPARED" , s -> s .prepare (STMT_SELECT_NO_KS ).bind (2 , 2 ));
213
+ statements .put ("SELECT_CONCRETE_PREPARED" , s -> s .prepare (STMT_SELECT_CONCRETE ).bind ());
214
+ statements .put ("INSERT_CONCRETE" , s -> STMT_INSERT_CONCRETE );
215
+ statements .put ("INSERT_PREPARED" , s -> s .prepare (STMT_INSERT ).bind (2 , 2 ));
216
+ statements .put ("INSERT_NO_KS_PREPARED" , s -> s .prepare (STMT_INSERT_NO_KS ).bind (2 , 2 ));
217
+ statements .put ("INSERT_CONCRETE_PREPARED" , s -> s .prepare (STMT_INSERT_CONCRETE ).bind ());
218
+ statements .put ("UPDATE_CONCRETE" , s -> STMT_UPDATE_CONCRETE );
219
+ statements .put ("UPDATE_PREPARED" , s -> s .prepare (STMT_UPDATE ).bind (2 , 2 ));
220
+ statements .put ("UPDATE_NO_KS_PREPARED" , s -> s .prepare (STMT_UPDATE_NO_KS ).bind (2 , 2 ));
221
+ statements .put ("UPDATE_CONCRETE_PREPARED" , s -> s .prepare (STMT_UPDATE_CONCRETE ).bind ());
222
+ statements .put ("DELETE_CONCRETE" , s -> STMT_DELETE_CONCRETE );
223
+ statements .put ("DELETE_PREPARED" , s -> s .prepare (STMT_DELETE ).bind (2 , 2 ));
224
+ statements .put ("DELETE_NO_KS_PREPARED" , s -> s .prepare (STMT_DELETE_NO_KS ).bind (2 , 2 ));
225
+ statements .put ("DELETE_CONCRETE_PREPARED" , s -> s .prepare (STMT_DELETE_CONCRETE ).bind ());
226
+
227
+ List <String > noTabletQueries = new ArrayList <>();
228
+ for (Map .Entry <String , Supplier <CqlSession >> sessionEntry : sessions .entrySet ()) {
229
+ for (Map .Entry <String , Function <CqlSession , Statement >> stmtEntry : statements .entrySet ()) {
230
+ if (stmtEntry .getKey ().contains ("CONCRETE" )) {
231
+ // Broken on server side
232
+ continue ;
233
+ }
234
+ if (sessionEntry .getKey ().equals ("REGULAR" ) && stmtEntry .getKey ().contains ("NO_KS" )) {
235
+ // Statements without KS will fail on the session with no ks specified
236
+ continue ;
237
+ }
238
+ CqlSession session = sessionEntry .getValue ().get ();
239
+ Statement stmt ;
240
+ try {
241
+ stmt = stmtEntry .getValue ().apply (session );
242
+ } catch (Exception e ) {
243
+ RuntimeException ex = new RuntimeException (String .format ("Failed to build statement %s on session %s" , stmtEntry .getKey (), sessionEntry .getKey ()));
244
+ ex .addSuppressed (e );
245
+ throw ex ;
246
+ }
247
+ if (!executeOnAllHostsAndReturnIfResultHasTabletsInfo (session , stmt )) {
248
+ noTabletQueries .add (String .format ("Statement %s on session %s got no tablet into" , stmtEntry .getKey (), sessionEntry .getKey ()));
249
+ continue ;
250
+ }
251
+ if (!isSessionLearnedTabletInfo (session )) {
252
+ noTabletQueries .add (String .format ("Statement %s on session %s did not trigger session tablets update" , stmtEntry .getKey (), sessionEntry .getKey ()));
253
+ }
254
+ }
96
255
}
97
256
98
- PreparedStatement preparedStatement =
99
- session .prepare (
100
- SimpleStatement .builder (
101
- "select pk,ck from "
102
- + KEYSPACE_NAME
103
- + "."
104
- + TABLE_NAME
105
- + " WHERE pk = ? AND ck = ?" )
106
- .setTracing (true )
107
- .build ());
108
- // preparedStatement.enableTracing();
257
+ if (!noTabletQueries .isEmpty ()) {
258
+ throw new AssertionError (String .format ("Found queries that got no tablet info: \n %s" , String .join ("\n " , noTabletQueries )));
259
+ }
260
+ }
261
+
262
+ @ Test
263
+ public void should_receive_each_tablet_exactly_once () {
264
+ CqlSession session = CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ();
109
265
int counter = 0 ;
266
+ PreparedStatement preparedStatement =
267
+ session .prepare (STMT_INSERT );
110
268
for (int i = 1 ; i <= QUERIES ; i ++) {
111
- ResultSet rs = session .execute (preparedStatement .bind (i , i ).setTracing (true ));
112
- Map <String , ByteBuffer > payload = rs .getExecutionInfo ().getIncomingPayload ();
113
- if (payload .containsKey (TabletInfo .TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY )) {
269
+ if (executeAndReturnIfResultHasTabletsInfo (session , preparedStatement .bind (i , i ))) {
270
+ counter ++;
271
+ }
272
+ }
273
+ Assert .assertEquals (INITIAL_TABLETS , counter );
274
+ assertSessionTabletMapIsFilled (session );
275
+ session .close ();
276
+
277
+ session = CqlSession .builder ().addContactEndPoints (CCM_RULE .getContactPoints ()).build ();
278
+ counter = 0 ;
279
+ preparedStatement =
280
+ session .prepare (STMT_SELECT );
281
+ for (int i = 1 ; i <= QUERIES ; i ++) {
282
+ if (executeAndReturnIfResultHasTabletsInfo (session , preparedStatement .bind (i , i ))) {
114
283
counter ++;
115
284
}
116
285
}
@@ -119,7 +288,44 @@ public void should_receive_each_tablet_exactly_once() {
119
288
120
289
// With enough queries we should hit a wrong node for each tablet exactly once.
121
290
Assert .assertEquals (INITIAL_TABLETS , counter );
291
+ assertSessionTabletMapIsFilled (session );
292
+
293
+ // All tablet information should be available by now (unless for some reason cluster did sth on
294
+ // its own)
295
+ // We should not receive any tablet payloads now, since they are sent only on mismatch.
296
+ for (int i = 1 ; i <= QUERIES ; i ++) {
297
+
298
+ ResultSet rs = session .execute (preparedStatement .bind (i , i ));
299
+ Map <String , ByteBuffer > payload = rs .getExecutionInfo ().getIncomingPayload ();
300
+
301
+ if (payload .containsKey (TabletInfo .TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY )) {
302
+ throw new RuntimeException (
303
+ "Received non empty payload with tablets routing information: " + payload );
304
+ }
305
+ }
306
+ }
122
307
308
+ private static boolean isSessionLearnedTabletInfo (CqlSession session ) {
309
+ ConcurrentMap <KeyspaceTableNamePair , ConcurrentSkipListSet <Tablet >> tabletMapping =
310
+ session .getMetadata ().getTabletMap ().getMapping ();
311
+ KeyspaceTableNamePair ktPair =
312
+ new KeyspaceTableNamePair (
313
+ CqlIdentifier .fromCql (KEYSPACE_NAME ), CqlIdentifier .fromCql (TABLE_NAME ));
314
+
315
+ Set <Tablet > tablets = tabletMapping .get (ktPair );
316
+ if (tablets == null || tablets .isEmpty ()) {
317
+ return false ;
318
+ }
319
+
320
+ for (Tablet tab : tablets ) {
321
+ if (tab .getReplicaNodes ().size () >= REPLICATION_FACTOR ) {
322
+ return true ;
323
+ };
324
+ }
325
+ return false ;
326
+ }
327
+
328
+ private static void assertSessionTabletMapIsFilled (CqlSession session ) {
123
329
ConcurrentMap <KeyspaceTableNamePair , ConcurrentSkipListSet <Tablet >> tabletMapping =
124
330
session .getMetadata ().getTabletMap ().getMapping ();
125
331
KeyspaceTableNamePair ktPair =
@@ -133,19 +339,20 @@ public void should_receive_each_tablet_exactly_once() {
133
339
for (Tablet tab : tablets ) {
134
340
Assert .assertEquals (REPLICATION_FACTOR , tab .getReplicaNodes ().size ());
135
341
}
342
+ }
136
343
137
- // All tablet information should be available by now (unless for some reason cluster did sth on
138
- // its own)
139
- // We should not receive any tablet payloads now, since they are sent only on mismatch.
140
- for (int i = 1 ; i <= QUERIES ; i ++) {
141
-
142
- ResultSet rs = session .execute (preparedStatement .bind (i , i ));
143
- Map <String , ByteBuffer > payload = rs .getExecutionInfo ().getIncomingPayload ();
144
-
145
- if (payload .containsKey (TabletInfo .TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY )) {
146
- throw new RuntimeException (
147
- "Received non empty payload with tablets routing information: " + payload );
344
+ private static boolean executeOnAllHostsAndReturnIfResultHasTabletsInfo (CqlSession session , Statement stmt ) {
345
+ session .refreshSchema ();
346
+ for (Node node : session .getMetadata ().getNodes ().values ()) {
347
+ if (executeAndReturnIfResultHasTabletsInfo (session , stmt .setNode (node ))) {
348
+ return true ;
148
349
}
149
350
}
351
+ return false ;
352
+ }
353
+
354
+ private static boolean executeAndReturnIfResultHasTabletsInfo (CqlSession session , Statement statement ) {
355
+ ResultSet rs = session .execute (statement );
356
+ return rs .getExecutionInfo ().getIncomingPayload ().containsKey (TabletInfo .TABLETS_ROUTING_V1_CUSTOM_PAYLOAD_KEY );
150
357
}
151
358
}
0 commit comments