Skip to content

Commit a445a4b

Browse files
authored
Merge pull request #1005: [proxima-direct-io-cassandra] improve AllNodesFailedException handling
2 parents dbb6d63 + 2df1b78 commit a445a4b

File tree

5 files changed

+123
-57
lines changed

5 files changed

+123
-57
lines changed

direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessor.java

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package cz.o2.proxima.direct.io.cassandra;
1717

18+
import com.datastax.oss.driver.api.core.AllNodesFailedException;
1819
import com.datastax.oss.driver.api.core.ConsistencyLevel;
1920
import com.datastax.oss.driver.api.core.CqlSession;
2021
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
@@ -168,21 +169,54 @@ private void initializeCqlFactory() {
168169
});
169170
}
170171

171-
ResultSet execute(Statement statement) {
172-
statement = statement.setConsistencyLevel(consistencyLevel);
173-
if (log.isDebugEnabled()) {
174-
if (statement instanceof BoundStatement) {
175-
final BoundStatement s = (BoundStatement) statement;
176-
log.debug("Executing BoundStatement {}", s.getPreparedStatement().getQuery());
177-
} else {
178-
log.debug(
179-
"Executing {} {} with payload {}",
180-
statement.getClass().getSimpleName(),
181-
statement,
182-
statement.getCustomPayload());
172+
@Nullable
173+
ResultSet executeOptionally(UnaryFunction<CqlSession, Optional<Statement<?>>> statementFn) {
174+
int i = 0;
175+
while (true) {
176+
try {
177+
final CqlSession session = ensureSession();
178+
final Optional<Statement<?>> maybeStatement =
179+
statementFn.apply(session).map(s -> s.setConsistencyLevel(consistencyLevel));
180+
if (maybeStatement.isEmpty()) {
181+
return null;
182+
}
183+
Statement<?> statement = maybeStatement.get();
184+
if (log.isDebugEnabled()) {
185+
if (statement instanceof BoundStatement) {
186+
final BoundStatement s = (BoundStatement) statement;
187+
log.debug("Executing BoundStatement {}", s.getPreparedStatement().getQuery());
188+
} else {
189+
log.debug(
190+
"Executing {} {} with payload {}",
191+
statement.getClass().getSimpleName(),
192+
statement,
193+
statement.getCustomPayload());
194+
}
195+
}
196+
return session.execute(statement);
197+
} catch (AllNodesFailedException ex) {
198+
int tryN = i;
199+
closeSession();
200+
if (tryN < 2) {
201+
log.warn(
202+
"Got {}, retry {}, closing session and retrying.",
203+
ex.getClass().getSimpleName(),
204+
tryN + 1,
205+
ex);
206+
ExceptionUtils.ignoringInterrupted(
207+
() -> TimeUnit.MILLISECONDS.sleep((long) (100 * Math.pow(2, tryN))));
208+
} else {
209+
log.warn("Got {}. Closing session and rethrowing.", ex.getClass().getSimpleName());
210+
throw ex;
211+
}
183212
}
213+
i++;
184214
}
185-
return ensureSession().execute(statement);
215+
}
216+
217+
ResultSet execute(UnaryFunction<CqlSession, Statement<?>> statementFn) {
218+
return Objects.requireNonNull(
219+
executeOptionally(session -> Optional.of(statementFn.apply(session))));
186220
}
187221

188222
private CqlSession getSession(URI uri) {

direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraLogReader.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package cz.o2.proxima.direct.io.cassandra;
1717

18-
import com.datastax.oss.driver.api.core.CqlSession;
1918
import com.datastax.oss.driver.api.core.cql.ResultSet;
2019
import com.datastax.oss.driver.api.core.cql.Row;
2120
import cz.o2.proxima.core.repository.AttributeDescriptor;
@@ -110,9 +109,9 @@ private boolean processSinglePartition(
110109
BatchLogObserver observer) {
111110

112111
ResultSet result;
113-
CqlSession session = accessor.ensureSession();
114112
result =
115-
accessor.execute(accessor.getCqlFactory().scanPartition(attributes, partition, session));
113+
accessor.execute(
114+
session -> accessor.getCqlFactory().scanPartition(attributes, partition, session));
116115
for (Row row : result) {
117116
if (terminationContext.isCancelled()) {
118117
return false;

direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraRandomReader.java

Lines changed: 17 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,7 @@
1515
*/
1616
package cz.o2.proxima.direct.io.cassandra;
1717

18-
import com.datastax.oss.driver.api.core.AllNodesFailedException;
1918
import com.datastax.oss.driver.api.core.CqlSession;
20-
import com.datastax.oss.driver.api.core.cql.BoundStatement;
2119
import com.datastax.oss.driver.api.core.cql.ResultSet;
2220
import com.datastax.oss.driver.api.core.cql.Row;
2321
import cz.o2.proxima.core.functional.Consumer;
@@ -49,16 +47,11 @@ class CassandraRandomReader extends AbstractStorage implements RandomAccessReade
4947
public <T> Optional<KeyValue<T>> get(
5048
String key, String attribute, AttributeDescriptor<T> desc, long stamp) {
5149

52-
CqlSession session = accessor.ensureSession();
53-
BoundStatement statement =
54-
accessor.getCqlFactory().getReadStatement(key, attribute, desc, session);
5550
final ResultSet result;
5651
try {
57-
result = accessor.execute(statement);
58-
} catch (AllNodesFailedException ex) {
59-
log.warn("Got {}, closing session.", AllNodesFailedException.class.getSimpleName(), ex);
60-
accessor.closeSession();
61-
throw ex;
52+
result =
53+
accessor.execute(
54+
session -> accessor.getCqlFactory().getReadStatement(key, attribute, desc, session));
6255
} catch (Exception ex) {
6356
throw new RuntimeException("Unable to execute query.", ex);
6457
}
@@ -118,13 +111,12 @@ public <T> void scanWildcard(
118111
Consumer<KeyValue<T>> consumer) {
119112

120113
try {
121-
CqlSession session = accessor.ensureSession();
122-
BoundStatement statement =
123-
accessor
124-
.getCqlFactory()
125-
.getListStatement(key, wildcard, (Offsets.Raw) offset, limit, session);
126-
127-
ResultSet result = accessor.execute(statement);
114+
ResultSet result =
115+
accessor.execute(
116+
session ->
117+
accessor
118+
.getCqlFactory()
119+
.getListStatement(key, wildcard, (Offsets.Raw) offset, limit, session));
128120
// the row has to have the format (attribute, value)
129121
for (Row row : result) {
130122
Object attribute = row.getObject(0);
@@ -166,14 +158,13 @@ public <T> void scanWildcard(
166158
public void listEntities(
167159
RandomOffset offset, int limit, Consumer<Pair<RandomOffset, String>> consumer) {
168160

169-
CqlSession session = accessor.ensureSession();
170-
BoundStatement statement =
171-
accessor
172-
.getCqlFactory()
173-
.getListEntitiesStatement((Offsets.TokenOffset) offset, limit, session);
174-
175161
try {
176-
ResultSet result = accessor.execute(statement);
162+
ResultSet result =
163+
accessor.execute(
164+
session ->
165+
accessor
166+
.getCqlFactory()
167+
.getListEntitiesStatement((Offsets.TokenOffset) offset, limit, session));
177168
for (Row row : result) {
178169
String key = row.getString(0);
179170
long token = row.getLong(1);
@@ -203,9 +194,9 @@ public RandomOffset fetchOffset(Listing type, String key) {
203194
return new Offsets.Raw(key);
204195

205196
case ENTITY:
206-
CqlSession session = accessor.ensureSession();
207197
ResultSet res =
208-
accessor.execute(accessor.getCqlFactory().getFetchTokenStatement(key, session));
198+
accessor.execute(
199+
session -> accessor.getCqlFactory().getFetchTokenStatement(key, session));
209200
if (res.isFullyFetched()) {
210201
return new Offsets.TokenOffset(Long.MIN_VALUE);
211202
}

direct/io-cassandra/src/main/java/cz/o2/proxima/direct/io/cassandra/CassandraWriter.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717

1818
import com.datastax.oss.driver.api.core.CqlSession;
1919
import com.datastax.oss.driver.api.core.cql.BoundStatement;
20+
import com.datastax.oss.driver.api.core.cql.Statement;
2021
import cz.o2.proxima.core.annotations.DeclaredThreadSafe;
22+
import cz.o2.proxima.core.functional.UnaryFunction;
2123
import cz.o2.proxima.core.storage.StreamElement;
2224
import cz.o2.proxima.direct.core.AbstractOnlineAttributeWriter;
2325
import cz.o2.proxima.direct.core.CommitCallback;
@@ -40,19 +42,24 @@ class CassandraWriter extends AbstractOnlineAttributeWriter implements OnlineAtt
4042
@Override
4143
public void write(StreamElement data, CommitCallback statusCallback) {
4244
try {
43-
CqlSession session = accessor.ensureSession();
44-
Optional<BoundStatement> cql = accessor.getCqlFactory().getWriteStatement(data, session);
45-
if (cql.isPresent()) {
46-
if (log.isDebugEnabled()) {
47-
log.debug(
48-
"Executing statement {} to write {}",
49-
cql.get().getPreparedStatement().getQuery(),
50-
data);
51-
}
52-
accessor.execute(cql.get());
53-
} else {
54-
log.warn("Missing CQL statement to write {}. Discarding.", data);
55-
}
45+
UnaryFunction<CqlSession, Optional<Statement<?>>> fn =
46+
session -> {
47+
Optional<BoundStatement> maybeWriteStatement =
48+
accessor.getCqlFactory().getWriteStatement(data, session);
49+
if (maybeWriteStatement.isPresent()) {
50+
if (log.isDebugEnabled()) {
51+
log.debug(
52+
"Executing statement {} to write {}",
53+
maybeWriteStatement.get().getPreparedStatement().getQuery(),
54+
data);
55+
}
56+
return Optional.of(maybeWriteStatement.get());
57+
} else {
58+
log.warn("Missing CQL statement to write {}. Discarding.", data);
59+
}
60+
return Optional.empty();
61+
};
62+
accessor.executeOptionally(fn);
5663
statusCallback.commit(true, null);
5764
} catch (Exception ex) {
5865
log.error("Failed to ingest record {} into cassandra", data, ex);

direct/io-cassandra/src/test/java/cz/o2/proxima/direct/io/cassandra/CassandraDBAccessorTest.java

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.mockito.ArgumentMatchers.eq;
2121
import static org.mockito.Mockito.*;
2222

23+
import com.datastax.oss.driver.api.core.AllNodesFailedException;
2324
import com.datastax.oss.driver.api.core.ConsistencyLevel;
2425
import com.datastax.oss.driver.api.core.CqlSession;
2526
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
@@ -28,6 +29,7 @@
2829
import com.datastax.oss.driver.api.core.cql.ResultSet;
2930
import com.datastax.oss.driver.api.core.cql.Row;
3031
import com.datastax.oss.driver.api.core.cql.Statement;
32+
import cz.o2.proxima.core.functional.UnaryFunction;
3133
import cz.o2.proxima.core.repository.AttributeDescriptor;
3234
import cz.o2.proxima.core.repository.AttributeDescriptorBase;
3335
import cz.o2.proxima.core.repository.ConfigRepository;
@@ -67,6 +69,7 @@
6769
import org.junit.After;
6870
import org.junit.Test;
6971
import org.mockito.Mockito;
72+
import org.mockito.stubbing.Answer;
7073

7174
/** Test suite for {@link CassandraDBAccessor}. */
7275
public class CassandraDBAccessorTest {
@@ -82,8 +85,8 @@ public TestDBAccessor(EntityDescriptor entityDesc, URI uri, Map<String, Object>
8285
}
8386

8487
@Override
85-
ResultSet execute(Statement statement) {
86-
executed.add(statement);
88+
ResultSet execute(UnaryFunction<CqlSession, Statement<?>> fn) {
89+
executed.add(fn.apply(ensureSession()));
8790
return res;
8891
}
8992

@@ -892,6 +895,38 @@ public void testCreateAccessorWithoutAuthentication() {
892895
assertNotNull(accessor);
893896
}
894897

898+
@Test
899+
public void testExecute() {
900+
@SuppressWarnings("rawtypes")
901+
Statement statement = mock(Statement.class);
902+
CqlSession session = mock(CqlSession.class);
903+
AtomicInteger tryN = new AtomicInteger();
904+
when(statement.setConsistencyLevel(any())).thenReturn(statement);
905+
when(session.execute(any(Statement.class)))
906+
.thenAnswer(
907+
(Answer<ResultSet>)
908+
invocationOnMock -> {
909+
if (tryN.incrementAndGet() < 3) {
910+
throw AllNodesFailedException.fromErrors(Collections.emptyList());
911+
}
912+
return mock(ResultSet.class);
913+
});
914+
Map<String, Object> cfg =
915+
ImmutableMap.<String, Object>builder()
916+
.put(CassandraDBAccessor.CONSISTENCY_LEVEL_CFG, ConsistencyLevel.LOCAL_ONE.name())
917+
.build();
918+
CassandraDBAccessor accessor =
919+
new CassandraDBAccessor(
920+
entity, URI.create("cassandra://host:9042/table/?primary=data"), cfg) {
921+
@Override
922+
CqlSession ensureSession() {
923+
return session;
924+
}
925+
};
926+
accessor.execute(s -> statement);
927+
assertEquals(3, tryN.get());
928+
}
929+
895930
private Map<String, Object> getCfg(Class<?> cls, Class<? extends StringConverter<?>> converter) {
896931
Map<String, Object> m = new HashMap<>();
897932
m.put(CassandraDBAccessor.CQL_FACTORY_CFG, cls.getName());

0 commit comments

Comments
 (0)