Skip to content

Commit 0c3e3ff

Browse files
committed
Allow BatchStatements to be LWT
Previously all DefaultBatchStatements would always return `false` when `isLWT()` was called. This would cause the driver to route the batch based on the first non null routing information found in a batch but using regular rules rather than rules for LWT queries, even if a LWT query was inside the batch. Now LWT routing will be used for DefaultBatchStatements if somewhere along the way an LWT query was added to the batch. This can also be controlled explicitly regardless of batch contents with `BatchStatementBuilder#setIsLWT(boolean)`.
1 parent 3b4609b commit 0c3e3ff

File tree

5 files changed

+160
-29
lines changed

5 files changed

+160
-29
lines changed

core/src/main/java/com/datastax/oss/driver/api/core/cql/BatchStatement.java

+6-3
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ static BatchStatement newInstance(@NonNull BatchType batchType) {
6969
null,
7070
null,
7171
null,
72-
Statement.NO_NOW_IN_SECONDS);
72+
Statement.NO_NOW_IN_SECONDS,
73+
null);
7374
}
7475

7576
/**
@@ -100,7 +101,8 @@ static BatchStatement newInstance(
100101
null,
101102
null,
102103
null,
103-
Statement.NO_NOW_IN_SECONDS);
104+
Statement.NO_NOW_IN_SECONDS,
105+
null);
104106
}
105107

106108
/**
@@ -131,7 +133,8 @@ static BatchStatement newInstance(
131133
null,
132134
null,
133135
null,
134-
Statement.NO_NOW_IN_SECONDS);
136+
Statement.NO_NOW_IN_SECONDS,
137+
null);
135138
}
136139

137140
/**

core/src/main/java/com/datastax/oss/driver/api/core/cql/BatchStatementBuilder.java

+26-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import edu.umd.cs.findbugs.annotations.NonNull;
2525
import edu.umd.cs.findbugs.annotations.Nullable;
2626
import java.util.Arrays;
27+
import java.util.List;
2728
import net.jcip.annotations.NotThreadSafe;
2829

2930
/**
@@ -38,6 +39,7 @@ public class BatchStatementBuilder extends StatementBuilder<BatchStatementBuilde
3839
@Nullable private CqlIdentifier keyspace;
3940
@NonNull private ImmutableList.Builder<BatchableStatement<?>> statementsBuilder;
4041
private int statementsCount;
42+
@Nullable private Boolean isLWT;
4143

4244
public BatchStatementBuilder(@NonNull BatchType batchType) {
4345
this.batchType = batchType;
@@ -74,6 +76,18 @@ public BatchStatementBuilder setKeyspace(@NonNull String keyspaceName) {
7476
return setKeyspace(CqlIdentifier.fromCql(keyspaceName));
7577
}
7678

79+
/**
80+
* Forces driver to see this query as LWT or not. If never explicitly set the builder will use
81+
* {@code true} if any of the added queries is LWT on its own and {@code false} otherwise.
82+
*
83+
* @return this builder; never {@code null}.
84+
*/
85+
@NonNull
86+
public BatchStatementBuilder setIsLWT(boolean newIsLWT) {
87+
this.isLWT = newIsLWT;
88+
return this;
89+
}
90+
7791
/**
7892
* Adds a new statement to the batch.
7993
*
@@ -136,9 +150,10 @@ public BatchStatementBuilder clearStatements() {
136150
@Override
137151
@NonNull
138152
public BatchStatement build() {
153+
List<BatchableStatement<?>> statements = statementsBuilder.build();
139154
return new DefaultBatchStatement(
140155
batchType,
141-
statementsBuilder.build(),
156+
statements,
142157
executionProfileName,
143158
executionProfile,
144159
keyspace,
@@ -155,10 +170,19 @@ public BatchStatement build() {
155170
serialConsistencyLevel,
156171
timeout,
157172
node,
158-
nowInSeconds);
173+
nowInSeconds,
174+
resolveIsLWT(statements));
159175
}
160176

161177
public int getStatementsCount() {
162178
return this.statementsCount;
163179
}
180+
181+
private boolean resolveIsLWT(List<BatchableStatement<?>> statements) {
182+
if (isLWT != null) return isLWT;
183+
for (BatchableStatement<?> statement : statements) {
184+
if (statement.isLWT()) return true;
185+
}
186+
return false;
187+
}
164188
}

core/src/main/java/com/datastax/oss/driver/internal/core/cql/DefaultBatchStatement.java

+80-23
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.datastax.oss.driver.api.core.cql.BatchType;
3131
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
3232
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
33+
import com.datastax.oss.driver.api.core.cql.Statement;
3334
import com.datastax.oss.driver.api.core.metadata.Node;
3435
import com.datastax.oss.driver.api.core.metadata.token.Token;
3536
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
@@ -41,6 +42,7 @@
4142
import java.util.Iterator;
4243
import java.util.List;
4344
import java.util.Map;
45+
import java.util.stream.StreamSupport;
4446
import net.jcip.annotations.Immutable;
4547
import org.slf4j.Logger;
4648
import org.slf4j.LoggerFactory;
@@ -68,6 +70,7 @@ public class DefaultBatchStatement implements BatchStatement {
6870
private final Duration timeout;
6971
private final Node node;
7072
private final int nowInSeconds;
73+
private final boolean isLWT;
7174

7275
public DefaultBatchStatement(
7376
BatchType batchType,
@@ -88,7 +91,8 @@ public DefaultBatchStatement(
8891
ConsistencyLevel serialConsistencyLevel,
8992
Duration timeout,
9093
Node node,
91-
int nowInSeconds) {
94+
int nowInSeconds,
95+
Boolean isLWT) {
9296
for (BatchableStatement<?> statement : statements) {
9397
if (statement != null
9498
&& (statement.getConsistencyLevel() != null
@@ -120,6 +124,11 @@ public DefaultBatchStatement(
120124
this.timeout = timeout;
121125
this.node = node;
122126
this.nowInSeconds = nowInSeconds;
127+
if (isLWT != null) {
128+
this.isLWT = isLWT;
129+
} else {
130+
this.isLWT = StreamSupport.stream(statements.spliterator(), false).anyMatch(Statement::isLWT);
131+
}
123132
}
124133

125134
@NonNull
@@ -150,7 +159,8 @@ public BatchStatement setBatchType(@NonNull BatchType newBatchType) {
150159
serialConsistencyLevel,
151160
timeout,
152161
node,
153-
nowInSeconds);
162+
nowInSeconds,
163+
isLWT);
154164
}
155165

156166
@NonNull
@@ -175,7 +185,8 @@ public BatchStatement setKeyspace(@Nullable CqlIdentifier newKeyspace) {
175185
serialConsistencyLevel,
176186
timeout,
177187
node,
178-
nowInSeconds);
188+
nowInSeconds,
189+
isLWT);
179190
}
180191

181192
@NonNull
@@ -204,7 +215,8 @@ public BatchStatement add(@NonNull BatchableStatement<?> statement) {
204215
serialConsistencyLevel,
205216
timeout,
206217
node,
207-
nowInSeconds);
218+
nowInSeconds,
219+
isLWT || statement.isLWT());
208220
}
209221
}
210222

@@ -237,7 +249,10 @@ public BatchStatement addAll(@NonNull Iterable<? extends BatchableStatement<?>>
237249
serialConsistencyLevel,
238250
timeout,
239251
node,
240-
nowInSeconds);
252+
nowInSeconds,
253+
isLWT
254+
|| StreamSupport.stream(newStatements.spliterator(), false)
255+
.anyMatch(Statement::isLWT));
241256
}
242257
}
243258

@@ -268,7 +283,8 @@ public BatchStatement clear() {
268283
serialConsistencyLevel,
269284
timeout,
270285
node,
271-
nowInSeconds);
286+
nowInSeconds,
287+
isLWT);
272288
}
273289

274290
@NonNull
@@ -304,7 +320,8 @@ public BatchStatement setPagingState(ByteBuffer newPagingState) {
304320
serialConsistencyLevel,
305321
timeout,
306322
node,
307-
nowInSeconds);
323+
nowInSeconds,
324+
isLWT);
308325
}
309326

310327
@Override
@@ -334,7 +351,8 @@ public BatchStatement setPageSize(int newPageSize) {
334351
serialConsistencyLevel,
335352
timeout,
336353
node,
337-
nowInSeconds);
354+
nowInSeconds,
355+
isLWT);
338356
}
339357

340358
@Nullable
@@ -365,7 +383,8 @@ public BatchStatement setConsistencyLevel(@Nullable ConsistencyLevel newConsiste
365383
serialConsistencyLevel,
366384
timeout,
367385
node,
368-
nowInSeconds);
386+
nowInSeconds,
387+
isLWT);
369388
}
370389

371390
@Nullable
@@ -397,7 +416,8 @@ public BatchStatement setSerialConsistencyLevel(
397416
newSerialConsistencyLevel,
398417
timeout,
399418
node,
400-
nowInSeconds);
419+
nowInSeconds,
420+
isLWT);
401421
}
402422

403423
@Override
@@ -427,7 +447,8 @@ public BatchStatement setExecutionProfileName(@Nullable String newConfigProfileN
427447
serialConsistencyLevel,
428448
timeout,
429449
node,
430-
nowInSeconds);
450+
nowInSeconds,
451+
isLWT);
431452
}
432453

433454
@Override
@@ -457,7 +478,8 @@ public DefaultBatchStatement setExecutionProfile(@Nullable DriverExecutionProfil
457478
serialConsistencyLevel,
458479
timeout,
459480
node,
460-
nowInSeconds);
481+
nowInSeconds,
482+
isLWT);
461483
}
462484

463485
@Override
@@ -522,7 +544,8 @@ public BatchStatement setRoutingKeyspace(CqlIdentifier newRoutingKeyspace) {
522544
serialConsistencyLevel,
523545
timeout,
524546
node,
525-
nowInSeconds);
547+
nowInSeconds,
548+
isLWT);
526549
}
527550

528551
@NonNull
@@ -547,7 +570,8 @@ public BatchStatement setNode(@Nullable Node newNode) {
547570
serialConsistencyLevel,
548571
timeout,
549572
newNode,
550-
nowInSeconds);
573+
nowInSeconds,
574+
isLWT);
551575
}
552576

553577
@Nullable
@@ -593,7 +617,8 @@ public BatchStatement setRoutingKey(ByteBuffer newRoutingKey) {
593617
serialConsistencyLevel,
594618
timeout,
595619
node,
596-
nowInSeconds);
620+
nowInSeconds,
621+
isLWT);
597622
}
598623

599624
@Override
@@ -633,7 +658,8 @@ public BatchStatement setRoutingToken(Token newRoutingToken) {
633658
serialConsistencyLevel,
634659
timeout,
635660
node,
636-
nowInSeconds);
661+
nowInSeconds,
662+
isLWT);
637663
}
638664

639665
@NonNull
@@ -664,7 +690,8 @@ public DefaultBatchStatement setCustomPayload(@NonNull Map<String, ByteBuffer> n
664690
serialConsistencyLevel,
665691
timeout,
666692
node,
667-
nowInSeconds);
693+
nowInSeconds,
694+
isLWT);
668695
}
669696

670697
@Override
@@ -700,7 +727,8 @@ public DefaultBatchStatement setIdempotent(Boolean newIdempotence) {
700727
serialConsistencyLevel,
701728
timeout,
702729
node,
703-
nowInSeconds);
730+
nowInSeconds,
731+
isLWT);
704732
}
705733

706734
@Override
@@ -730,7 +758,8 @@ public BatchStatement setTracing(boolean newTracing) {
730758
serialConsistencyLevel,
731759
timeout,
732760
node,
733-
nowInSeconds);
761+
nowInSeconds,
762+
isLWT);
734763
}
735764

736765
@Override
@@ -760,7 +789,8 @@ public BatchStatement setQueryTimestamp(long newTimestamp) {
760789
serialConsistencyLevel,
761790
timeout,
762791
node,
763-
nowInSeconds);
792+
nowInSeconds,
793+
isLWT);
764794
}
765795

766796
@NonNull
@@ -785,7 +815,8 @@ public BatchStatement setTimeout(@Nullable Duration newTimeout) {
785815
serialConsistencyLevel,
786816
newTimeout,
787817
node,
788-
nowInSeconds);
818+
nowInSeconds,
819+
isLWT);
789820
}
790821

791822
@Override
@@ -815,11 +846,37 @@ public BatchStatement setNowInSeconds(int newNowInSeconds) {
815846
serialConsistencyLevel,
816847
timeout,
817848
node,
818-
newNowInSeconds);
849+
newNowInSeconds,
850+
isLWT);
851+
}
852+
853+
@NonNull
854+
public BatchStatement setIsLWT(boolean newIsLWT) {
855+
return new DefaultBatchStatement(
856+
batchType,
857+
statements,
858+
executionProfileName,
859+
executionProfile,
860+
keyspace,
861+
routingKeyspace,
862+
routingKey,
863+
routingToken,
864+
customPayload,
865+
idempotent,
866+
tracing,
867+
timestamp,
868+
pagingState,
869+
pageSize,
870+
consistencyLevel,
871+
serialConsistencyLevel,
872+
timeout,
873+
node,
874+
nowInSeconds,
875+
newIsLWT);
819876
}
820877

821878
@Override
822879
public boolean isLWT() {
823-
return false;
880+
return isLWT;
824881
}
825882
}

0 commit comments

Comments
 (0)