Skip to content

4.x: Allow BatchStatements to be LWT #489

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ static BatchStatement newInstance(@NonNull BatchType batchType) {
null,
null,
null,
Statement.NO_NOW_IN_SECONDS);
Statement.NO_NOW_IN_SECONDS,
null);
}

/**
Expand Down Expand Up @@ -100,7 +101,8 @@ static BatchStatement newInstance(
null,
null,
null,
Statement.NO_NOW_IN_SECONDS);
Statement.NO_NOW_IN_SECONDS,
null);
}

/**
Expand Down Expand Up @@ -131,7 +133,8 @@ static BatchStatement newInstance(
null,
null,
null,
Statement.NO_NOW_IN_SECONDS);
Statement.NO_NOW_IN_SECONDS,
null);
}

/**
Expand Down Expand Up @@ -277,4 +280,13 @@ default int computeSizeInBytes(@NonNull DriverContext context) {

return size;
}

/**
* Overrides LWT state to a specific value. If unset or set to {@code null} the {@link
* Statement#isLWT()} method will infer result from the statments in the batch.
*
* @param newIsLWT new Boolean to set
* @return new BatchStatement with updated isLWT field.
*/
BatchStatement setIsLWT(Boolean newIsLWT);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.Arrays;
import java.util.List;
import net.jcip.annotations.NotThreadSafe;

/**
Expand All @@ -38,6 +39,7 @@ public class BatchStatementBuilder extends StatementBuilder<BatchStatementBuilde
@Nullable private CqlIdentifier keyspace;
@NonNull private ImmutableList.Builder<BatchableStatement<?>> statementsBuilder;
private int statementsCount;
@Nullable private Boolean isLWT = null;

public BatchStatementBuilder(@NonNull BatchType batchType) {
this.batchType = batchType;
Expand Down Expand Up @@ -74,6 +76,19 @@ public BatchStatementBuilder setKeyspace(@NonNull String keyspaceName) {
return setKeyspace(CqlIdentifier.fromCql(keyspaceName));
}

/**
* Forces driver to see this batch as LWT or non-LWT. Note that if never explicitly set or set to
* {@code null}, the resulting {@code DefaultBatchStatement} will decide its LWT state based on
* contained statements.
*
* @return this builder; never {@code null}.
*/
@NonNull
public BatchStatementBuilder setIsLWT(Boolean newIsLWT) {
this.isLWT = newIsLWT;
return this;
}

/**
* Adds a new statement to the batch.
*
Expand Down Expand Up @@ -136,9 +151,10 @@ public BatchStatementBuilder clearStatements() {
@Override
@NonNull
public BatchStatement build() {
List<BatchableStatement<?>> statements = statementsBuilder.build();
return new DefaultBatchStatement(
batchType,
statementsBuilder.build(),
statements,
executionProfileName,
executionProfile,
keyspace,
Expand All @@ -155,7 +171,8 @@ public BatchStatement build() {
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

public int getStatementsCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,9 @@ default SelfT setNowInSeconds(int nowInSeconds) {
* Scylla, using too old Scylla version, future changes in driver allowing channels to be created
* without sending OPTIONS request.
*
* <p>Provided implementations of BatchStatements will be considered by driver as LWT if any of
* the statements in a batch is LWT on its own.
*
* <p>More information about LWT:
*
* @see <a href="https://docs.scylladb.com/using-scylla/lwt/">Docs about LWT</a>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.datastax.oss.driver.api.core.cql.BatchType;
import com.datastax.oss.driver.api.core.cql.BatchableStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metadata.token.Token;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class DefaultBatchStatement implements BatchStatement {
private final Duration timeout;
private final Node node;
private final int nowInSeconds;
private final Boolean isLWT;

public DefaultBatchStatement(
BatchType batchType,
Expand All @@ -88,7 +90,8 @@ public DefaultBatchStatement(
ConsistencyLevel serialConsistencyLevel,
Duration timeout,
Node node,
int nowInSeconds) {
int nowInSeconds,
Boolean isLWT) {
for (BatchableStatement<?> statement : statements) {
if (statement != null
&& (statement.getConsistencyLevel() != null
Expand Down Expand Up @@ -120,6 +123,7 @@ public DefaultBatchStatement(
this.timeout = timeout;
this.node = node;
this.nowInSeconds = nowInSeconds;
this.isLWT = isLWT;
}

@NonNull
Expand Down Expand Up @@ -150,7 +154,8 @@ public BatchStatement setBatchType(@NonNull BatchType newBatchType) {
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@NonNull
Expand All @@ -175,7 +180,8 @@ public BatchStatement setKeyspace(@Nullable CqlIdentifier newKeyspace) {
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@NonNull
Expand Down Expand Up @@ -204,7 +210,8 @@ public BatchStatement add(@NonNull BatchableStatement<?> statement) {
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}
}

Expand Down Expand Up @@ -237,7 +244,8 @@ public BatchStatement addAll(@NonNull Iterable<? extends BatchableStatement<?>>
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}
}

Expand Down Expand Up @@ -268,7 +276,8 @@ public BatchStatement clear() {
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@NonNull
Expand Down Expand Up @@ -304,7 +313,8 @@ public BatchStatement setPagingState(ByteBuffer newPagingState) {
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@Override
Expand Down Expand Up @@ -334,7 +344,8 @@ public BatchStatement setPageSize(int newPageSize) {
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@Nullable
Expand Down Expand Up @@ -365,7 +376,8 @@ public BatchStatement setConsistencyLevel(@Nullable ConsistencyLevel newConsiste
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@Nullable
Expand Down Expand Up @@ -397,7 +409,8 @@ public BatchStatement setSerialConsistencyLevel(
newSerialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@Override
Expand Down Expand Up @@ -427,7 +440,8 @@ public BatchStatement setExecutionProfileName(@Nullable String newConfigProfileN
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@Override
Expand Down Expand Up @@ -457,7 +471,8 @@ public DefaultBatchStatement setExecutionProfile(@Nullable DriverExecutionProfil
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@Override
Expand Down Expand Up @@ -522,7 +537,8 @@ public BatchStatement setRoutingKeyspace(CqlIdentifier newRoutingKeyspace) {
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@NonNull
Expand All @@ -547,7 +563,8 @@ public BatchStatement setNode(@Nullable Node newNode) {
serialConsistencyLevel,
timeout,
newNode,
nowInSeconds);
nowInSeconds,
isLWT);
}

@Nullable
Expand Down Expand Up @@ -593,7 +610,8 @@ public BatchStatement setRoutingKey(ByteBuffer newRoutingKey) {
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@Override
Expand Down Expand Up @@ -633,7 +651,8 @@ public BatchStatement setRoutingToken(Token newRoutingToken) {
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@NonNull
Expand Down Expand Up @@ -664,7 +683,8 @@ public DefaultBatchStatement setCustomPayload(@NonNull Map<String, ByteBuffer> n
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@Override
Expand Down Expand Up @@ -700,7 +720,8 @@ public DefaultBatchStatement setIdempotent(Boolean newIdempotence) {
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@Override
Expand Down Expand Up @@ -730,7 +751,8 @@ public BatchStatement setTracing(boolean newTracing) {
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@Override
Expand Down Expand Up @@ -760,7 +782,8 @@ public BatchStatement setQueryTimestamp(long newTimestamp) {
serialConsistencyLevel,
timeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@NonNull
Expand All @@ -785,7 +808,8 @@ public BatchStatement setTimeout(@Nullable Duration newTimeout) {
serialConsistencyLevel,
newTimeout,
node,
nowInSeconds);
nowInSeconds,
isLWT);
}

@Override
Expand Down Expand Up @@ -815,11 +839,39 @@ public BatchStatement setNowInSeconds(int newNowInSeconds) {
serialConsistencyLevel,
timeout,
node,
newNowInSeconds);
newNowInSeconds,
isLWT);
}

@NonNull
@Override
public BatchStatement setIsLWT(Boolean newIsLWT) {
return new DefaultBatchStatement(
batchType,
statements,
executionProfileName,
executionProfile,
keyspace,
routingKeyspace,
routingKey,
routingToken,
customPayload,
idempotent,
tracing,
timestamp,
pagingState,
pageSize,
consistencyLevel,
serialConsistencyLevel,
timeout,
node,
nowInSeconds,
newIsLWT);
}

@Override
public boolean isLWT() {
return false;
if (isLWT != null) return isLWT;
return statements.stream().anyMatch(Statement::isLWT);
}
}
Loading
Loading