Skip to content

Commit c337831

Browse files
JAVA-3166: Enhanced exception handling of C* driver executeAsync() (#1370)
1 parent d2d13b2 commit c337831

File tree

2 files changed

+27
-5
lines changed

2 files changed

+27
-5
lines changed

connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala

+9-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
package com.datastax.spark.connector.writer
22

3-
import java.util.concurrent.{CompletionStage, Semaphore}
3+
import java.util.concurrent.{CompletableFuture, CompletionStage, Semaphore}
44
import java.util.function.BiConsumer
5-
65
import com.datastax.spark.connector.util.Logging
76

87
import scala.jdk.CollectionConverters._
@@ -41,9 +40,14 @@ class AsyncExecutor[T, R](asyncAction: T => CompletionStage[R], maxConcurrentTas
4140
val executionTimestamp = System.nanoTime()
4241

4342
def tryFuture(): Future[R] = {
44-
val value = asyncAction(task)
45-
46-
value.whenComplete(new BiConsumer[R, Throwable] {
43+
val value = Try(asyncAction(task)) recover {
44+
case e =>
45+
val future = new CompletableFuture[R]()
46+
future.completeExceptionally(e)
47+
future
48+
}
49+
50+
value.get.whenComplete(new BiConsumer[R, Throwable] {
4751
private def release() {
4852
semaphore.release()
4953
pendingFutures.remove(promise.future)

connector/src/test/scala/com/datastax/spark/connector/writer/AsyncExecutorTest.scala

+18
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.datastax.spark.connector.writer
22

3+
import com.datastax.oss.driver.api.core.cql.{AsyncResultSet, SimpleStatement, Statement}
4+
35
import java.util.concurrent.atomic.AtomicInteger
46
import java.util.concurrent.{Callable, CompletableFuture, CompletionStage}
57

@@ -55,4 +57,20 @@ class AsyncExecutorTest {
5557
totalFinishedExecutionsCounter.get() shouldBe taskCount
5658
asyncExecutor.getLatestException() shouldBe None
5759
}
60+
61+
@Test
62+
def testGracefullyHandleCqlSessionExecuteExceptions() {
63+
val executor = new AsyncExecutor[Statement[_], AsyncResultSet](
64+
_ => {
65+
// simulate exception returned by session.executeAsync() (not future)
66+
throw new IllegalStateException("something bad happened")
67+
}, 10, None, None
68+
)
69+
val stmt = SimpleStatement.newInstance("INSERT INTO table1 (key, value) VALUES (1, '100')");
70+
val future = executor.executeAsync(stmt)
71+
assertTrue(future.isCompleted)
72+
val value = future.value.get
73+
assertTrue(value.isInstanceOf[Failure[_]])
74+
assertTrue(value.asInstanceOf[Failure[_]].exception.isInstanceOf[IllegalStateException])
75+
}
5876
}

0 commit comments

Comments
 (0)