Skip to content

Commit e5cb7d6

Browse files
authored
Allow BatchStatements to be LWT (#489)
Previously all DefaultBatchStatements would always return `false` when `isLWT()` was called. This would cause the driver to route the batch based on the first non null routing information found in a batch but using regular rules rather than rules for LWT queries, even if a LWT query was inside the batch. Now LWT routing will be used for DefaultBatchStatements if somewhere along the way an LWT query was added to the batch. This can also be controlled explicitly regardless of batch contents with `BatchStatementBuilder#setIsLWT(boolean)`.
1 parent 263f09d commit e5cb7d6

File tree

6 files changed

+220
-28
lines changed

6 files changed

+220
-28
lines changed

Diff for: core/src/main/java/com/datastax/oss/driver/api/core/cql/BatchStatement.java

+15-3
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ static BatchStatement newInstance(@NonNull BatchType batchType) {
6969
null,
7070
null,
7171
null,
72-
Statement.NO_NOW_IN_SECONDS);
72+
Statement.NO_NOW_IN_SECONDS,
73+
null);
7374
}
7475

7576
/**
@@ -100,7 +101,8 @@ static BatchStatement newInstance(
100101
null,
101102
null,
102103
null,
103-
Statement.NO_NOW_IN_SECONDS);
104+
Statement.NO_NOW_IN_SECONDS,
105+
null);
104106
}
105107

106108
/**
@@ -131,7 +133,8 @@ static BatchStatement newInstance(
131133
null,
132134
null,
133135
null,
134-
Statement.NO_NOW_IN_SECONDS);
136+
Statement.NO_NOW_IN_SECONDS,
137+
null);
135138
}
136139

137140
/**
@@ -277,4 +280,13 @@ default int computeSizeInBytes(@NonNull DriverContext context) {
277280

278281
return size;
279282
}
283+
284+
/**
285+
* Overrides LWT state to a specific value. If unset or set to {@code null} the {@link
286+
* Statement#isLWT()} method will infer result from the statments in the batch.
287+
*
288+
* @param newIsLWT new Boolean to set
289+
* @return new BatchStatement with updated isLWT field.
290+
*/
291+
BatchStatement setIsLWT(Boolean newIsLWT);
280292
}

Diff for: core/src/main/java/com/datastax/oss/driver/api/core/cql/BatchStatementBuilder.java

+19-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import edu.umd.cs.findbugs.annotations.NonNull;
2525
import edu.umd.cs.findbugs.annotations.Nullable;
2626
import java.util.Arrays;
27+
import java.util.List;
2728
import net.jcip.annotations.NotThreadSafe;
2829

2930
/**
@@ -38,6 +39,7 @@ public class BatchStatementBuilder extends StatementBuilder<BatchStatementBuilde
3839
@Nullable private CqlIdentifier keyspace;
3940
@NonNull private ImmutableList.Builder<BatchableStatement<?>> statementsBuilder;
4041
private int statementsCount;
42+
@Nullable private Boolean isLWT = null;
4143

4244
public BatchStatementBuilder(@NonNull BatchType batchType) {
4345
this.batchType = batchType;
@@ -74,6 +76,19 @@ public BatchStatementBuilder setKeyspace(@NonNull String keyspaceName) {
7476
return setKeyspace(CqlIdentifier.fromCql(keyspaceName));
7577
}
7678

79+
/**
80+
* Forces driver to see this batch as LWT or non-LWT. Note that if never explicitly set or set to
81+
* {@code null}, the resulting {@code DefaultBatchStatement} will decide its LWT state based on
82+
* contained statements.
83+
*
84+
* @return this builder; never {@code null}.
85+
*/
86+
@NonNull
87+
public BatchStatementBuilder setIsLWT(Boolean newIsLWT) {
88+
this.isLWT = newIsLWT;
89+
return this;
90+
}
91+
7792
/**
7893
* Adds a new statement to the batch.
7994
*
@@ -136,9 +151,10 @@ public BatchStatementBuilder clearStatements() {
136151
@Override
137152
@NonNull
138153
public BatchStatement build() {
154+
List<BatchableStatement<?>> statements = statementsBuilder.build();
139155
return new DefaultBatchStatement(
140156
batchType,
141-
statementsBuilder.build(),
157+
statements,
142158
executionProfileName,
143159
executionProfile,
144160
keyspace,
@@ -155,7 +171,8 @@ public BatchStatement build() {
155171
serialConsistencyLevel,
156172
timeout,
157173
node,
158-
nowInSeconds);
174+
nowInSeconds,
175+
isLWT);
159176
}
160177

161178
public int getStatementsCount() {

Diff for: core/src/main/java/com/datastax/oss/driver/api/core/cql/Statement.java

+3
Original file line numberDiff line numberDiff line change
@@ -533,6 +533,9 @@ default SelfT setNowInSeconds(int nowInSeconds) {
533533
* Scylla, using too old Scylla version, future changes in driver allowing channels to be created
534534
* without sending OPTIONS request.
535535
*
536+
* <p>Provided implementations of BatchStatements will be considered by driver as LWT if any of
537+
* the statements in a batch is LWT on its own.
538+
*
536539
* <p>More information about LWT:
537540
*
538541
* @see <a href="https://docs.scylladb.com/using-scylla/lwt/">Docs about LWT</a>

Diff for: core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java

+75-23
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.datastax.oss.driver.api.core.cql.BatchType;
3131
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
3232
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
33+
import com.datastax.oss.driver.api.core.cql.Statement;
3334
import com.datastax.oss.driver.api.core.metadata.Node;
3435
import com.datastax.oss.driver.api.core.metadata.token.Token;
3536
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
@@ -68,6 +69,7 @@ public class DefaultBatchStatement implements BatchStatement {
6869
private final Duration timeout;
6970
private final Node node;
7071
private final int nowInSeconds;
72+
private final Boolean isLWT;
7173

7274
public DefaultBatchStatement(
7375
BatchType batchType,
@@ -88,7 +90,8 @@ public DefaultBatchStatement(
8890
ConsistencyLevel serialConsistencyLevel,
8991
Duration timeout,
9092
Node node,
91-
int nowInSeconds) {
93+
int nowInSeconds,
94+
Boolean isLWT) {
9295
for (BatchableStatement<?> statement : statements) {
9396
if (statement != null
9497
&& (statement.getConsistencyLevel() != null
@@ -120,6 +123,7 @@ public DefaultBatchStatement(
120123
this.timeout = timeout;
121124
this.node = node;
122125
this.nowInSeconds = nowInSeconds;
126+
this.isLWT = isLWT;
123127
}
124128

125129
@NonNull
@@ -150,7 +154,8 @@ public BatchStatement setBatchType(@NonNull BatchType newBatchType) {
150154
serialConsistencyLevel,
151155
timeout,
152156
node,
153-
nowInSeconds);
157+
nowInSeconds,
158+
isLWT);
154159
}
155160

156161
@NonNull
@@ -175,7 +180,8 @@ public BatchStatement setKeyspace(@Nullable CqlIdentifier newKeyspace) {
175180
serialConsistencyLevel,
176181
timeout,
177182
node,
178-
nowInSeconds);
183+
nowInSeconds,
184+
isLWT);
179185
}
180186

181187
@NonNull
@@ -204,7 +210,8 @@ public BatchStatement add(@NonNull BatchableStatement<?> statement) {
204210
serialConsistencyLevel,
205211
timeout,
206212
node,
207-
nowInSeconds);
213+
nowInSeconds,
214+
isLWT);
208215
}
209216
}
210217

@@ -237,7 +244,8 @@ public BatchStatement addAll(@NonNull Iterable<? extends BatchableStatement<?>>
237244
serialConsistencyLevel,
238245
timeout,
239246
node,
240-
nowInSeconds);
247+
nowInSeconds,
248+
isLWT);
241249
}
242250
}
243251

@@ -268,7 +276,8 @@ public BatchStatement clear() {
268276
serialConsistencyLevel,
269277
timeout,
270278
node,
271-
nowInSeconds);
279+
nowInSeconds,
280+
isLWT);
272281
}
273282

274283
@NonNull
@@ -304,7 +313,8 @@ public BatchStatement setPagingState(ByteBuffer newPagingState) {
304313
serialConsistencyLevel,
305314
timeout,
306315
node,
307-
nowInSeconds);
316+
nowInSeconds,
317+
isLWT);
308318
}
309319

310320
@Override
@@ -334,7 +344,8 @@ public BatchStatement setPageSize(int newPageSize) {
334344
serialConsistencyLevel,
335345
timeout,
336346
node,
337-
nowInSeconds);
347+
nowInSeconds,
348+
isLWT);
338349
}
339350

340351
@Nullable
@@ -365,7 +376,8 @@ public BatchStatement setConsistencyLevel(@Nullable ConsistencyLevel newConsiste
365376
serialConsistencyLevel,
366377
timeout,
367378
node,
368-
nowInSeconds);
379+
nowInSeconds,
380+
isLWT);
369381
}
370382

371383
@Nullable
@@ -397,7 +409,8 @@ public BatchStatement setSerialConsistencyLevel(
397409
newSerialConsistencyLevel,
398410
timeout,
399411
node,
400-
nowInSeconds);
412+
nowInSeconds,
413+
isLWT);
401414
}
402415

403416
@Override
@@ -427,7 +440,8 @@ public BatchStatement setExecutionProfileName(@Nullable String newConfigProfileN
427440
serialConsistencyLevel,
428441
timeout,
429442
node,
430-
nowInSeconds);
443+
nowInSeconds,
444+
isLWT);
431445
}
432446

433447
@Override
@@ -457,7 +471,8 @@ public DefaultBatchStatement setExecutionProfile(@Nullable DriverExecutionProfil
457471
serialConsistencyLevel,
458472
timeout,
459473
node,
460-
nowInSeconds);
474+
nowInSeconds,
475+
isLWT);
461476
}
462477

463478
@Override
@@ -522,7 +537,8 @@ public BatchStatement setRoutingKeyspace(CqlIdentifier newRoutingKeyspace) {
522537
serialConsistencyLevel,
523538
timeout,
524539
node,
525-
nowInSeconds);
540+
nowInSeconds,
541+
isLWT);
526542
}
527543

528544
@NonNull
@@ -547,7 +563,8 @@ public BatchStatement setNode(@Nullable Node newNode) {
547563
serialConsistencyLevel,
548564
timeout,
549565
newNode,
550-
nowInSeconds);
566+
nowInSeconds,
567+
isLWT);
551568
}
552569

553570
@Nullable
@@ -593,7 +610,8 @@ public BatchStatement setRoutingKey(ByteBuffer newRoutingKey) {
593610
serialConsistencyLevel,
594611
timeout,
595612
node,
596-
nowInSeconds);
613+
nowInSeconds,
614+
isLWT);
597615
}
598616

599617
@Override
@@ -633,7 +651,8 @@ public BatchStatement setRoutingToken(Token newRoutingToken) {
633651
serialConsistencyLevel,
634652
timeout,
635653
node,
636-
nowInSeconds);
654+
nowInSeconds,
655+
isLWT);
637656
}
638657

639658
@NonNull
@@ -664,7 +683,8 @@ public DefaultBatchStatement setCustomPayload(@NonNull Map<String, ByteBuffer> n
664683
serialConsistencyLevel,
665684
timeout,
666685
node,
667-
nowInSeconds);
686+
nowInSeconds,
687+
isLWT);
668688
}
669689

670690
@Override
@@ -700,7 +720,8 @@ public DefaultBatchStatement setIdempotent(Boolean newIdempotence) {
700720
serialConsistencyLevel,
701721
timeout,
702722
node,
703-
nowInSeconds);
723+
nowInSeconds,
724+
isLWT);
704725
}
705726

706727
@Override
@@ -730,7 +751,8 @@ public BatchStatement setTracing(boolean newTracing) {
730751
serialConsistencyLevel,
731752
timeout,
732753
node,
733-
nowInSeconds);
754+
nowInSeconds,
755+
isLWT);
734756
}
735757

736758
@Override
@@ -760,7 +782,8 @@ public BatchStatement setQueryTimestamp(long newTimestamp) {
760782
serialConsistencyLevel,
761783
timeout,
762784
node,
763-
nowInSeconds);
785+
nowInSeconds,
786+
isLWT);
764787
}
765788

766789
@NonNull
@@ -785,7 +808,8 @@ public BatchStatement setTimeout(@Nullable Duration newTimeout) {
785808
serialConsistencyLevel,
786809
newTimeout,
787810
node,
788-
nowInSeconds);
811+
nowInSeconds,
812+
isLWT);
789813
}
790814

791815
@Override
@@ -815,11 +839,39 @@ public BatchStatement setNowInSeconds(int newNowInSeconds) {
815839
serialConsistencyLevel,
816840
timeout,
817841
node,
818-
newNowInSeconds);
842+
newNowInSeconds,
843+
isLWT);
844+
}
845+
846+
@NonNull
847+
@Override
848+
public BatchStatement setIsLWT(Boolean newIsLWT) {
849+
return new DefaultBatchStatement(
850+
batchType,
851+
statements,
852+
executionProfileName,
853+
executionProfile,
854+
keyspace,
855+
routingKeyspace,
856+
routingKey,
857+
routingToken,
858+
customPayload,
859+
idempotent,
860+
tracing,
861+
timestamp,
862+
pagingState,
863+
pageSize,
864+
consistencyLevel,
865+
serialConsistencyLevel,
866+
timeout,
867+
node,
868+
nowInSeconds,
869+
newIsLWT);
819870
}
820871

821872
@Override
822873
public boolean isLWT() {
823-
return false;
874+
if (isLWT != null) return isLWT;
875+
return statements.stream().anyMatch(Statement::isLWT);
824876
}
825877
}

0 commit comments

Comments
 (0)