Skip to content

Commit 9f74250

Browse files
committed
Don't use Scala Futures in Java APIs
Sketching out #1417 - incomplete and notably not bothering with binary compatibility yet, just to illustrate the idea.
1 parent 684fec9 commit 9f74250

File tree

12 files changed

+107
-96
lines changed

12 files changed

+107
-96
lines changed

actor-tests/src/test/java/org/apache/pekko/dispatch/JavaFutureTests.java

+1
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
import org.apache.pekko.testkit.PekkoSpec;
3838

39+
@SuppressWarnings("deprecation")
3940
public class JavaFutureTests extends JUnitSuite {
4041

4142
@ClassRule

actor-tests/src/test/java/org/apache/pekko/pattern/PatternsTest.java

+27-29
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import org.apache.pekko.testkit.PekkoSpec;
2020
import org.apache.pekko.testkit.TestProbe;
2121
import org.apache.pekko.util.Timeout;
22+
import org.apache.pekko.util.FutureConverters;
2223
import org.junit.ClassRule;
2324
import org.junit.Test;
2425
import org.scalatestplus.junit.JUnitSuite;
@@ -219,15 +220,15 @@ public void testAskWithReplyToTimeout() throws Exception {
219220
@Test
220221
public void usePipe() throws Exception {
221222
TestProbe probe = new TestProbe(system);
222-
pipe(Futures.successful("ho!"), system.dispatcher()).to(probe.ref());
223+
pipe(CompletableFuture.completedFuture("ho!"), system.dispatcher()).to(probe.ref());
223224
probe.expectMsg("ho!");
224225
}
225226

226227
@Test
227228
public void usePipeWithActorSelection() throws Exception {
228229
TestProbe probe = new TestProbe(system);
229230
ActorSelection selection = system.actorSelection(probe.ref().path());
230-
pipe(Futures.successful("hi!"), system.dispatcher()).to(selection);
231+
pipe(CompletableFuture.completedFuture("hi!"), system.dispatcher()).to(selection);
231232
probe.expectMsg("hi!");
232233
}
233234

@@ -291,15 +292,11 @@ public void testRetryCompletionStageRandomDelay() throws Exception {
291292
public void testRetry() throws Exception {
292293
final String expected = "hello";
293294

294-
Future<String> retriedFuture =
295+
CompletionStage<String> retriedFuture =
295296
Patterns.retry(
296-
() -> Futures.successful(expected),
297-
3,
298-
scala.concurrent.duration.Duration.apply(200, "millis"),
299-
system.scheduler(),
300-
ec);
297+
() -> CompletableFuture.completedFuture(expected), 3, Duration.ofMillis(200), system);
301298

302-
String actual = Await.result(retriedFuture, FiniteDuration.apply(3, SECONDS));
299+
String actual = retriedFuture.toCompletableFuture().get(3, SECONDS);
303300
assertEquals(expected, actual);
304301
}
305302

@@ -317,21 +314,21 @@ public void testCSRetry() throws Exception {
317314
}
318315

319316
@Test(expected = IllegalStateException.class)
320-
public void testAfterFailedCallable() throws Exception {
321-
Callable<Future<String>> failedCallable =
322-
() -> Futures.failed(new IllegalStateException("Illegal!"));
317+
public void testAfterFailedCallable() throws Throwable {
318+
Callable<CompletionStage<String>> failedCallable =
319+
() -> Futures.failedCompletionStage(new IllegalStateException("Illegal!"));
323320

324-
Future<String> delayedFuture =
325-
Patterns.after(
326-
scala.concurrent.duration.Duration.create(200, "millis"),
327-
system.scheduler(),
328-
ec,
329-
failedCallable);
321+
CompletionStage<String> delayedFuture =
322+
Patterns.after(Duration.ofMillis(200), system, failedCallable);
330323

331-
Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
332-
Await.result(resultFuture, scala.concurrent.duration.FiniteDuration.apply(3, SECONDS));
324+
try {
325+
delayedFuture.toCompletableFuture().get(3, SECONDS);
326+
} catch (ExecutionException e) {
327+
throw e.getCause();
328+
}
333329
}
334330

331+
@SuppressWarnings("deprecation")
335332
@Test(expected = IllegalStateException.class)
336333
public void testAfterFailedFuture() throws Exception {
337334

@@ -340,7 +337,9 @@ public void testAfterFailedFuture() throws Exception {
340337
scala.concurrent.duration.Duration.create(200, "millis"),
341338
system.scheduler(),
342339
ec,
343-
() -> Futures.failed(new IllegalStateException("Illegal!")));
340+
() ->
341+
FutureConverters.asScala(
342+
Futures.failedCompletionStage(new IllegalStateException("Illegal!"))));
344343

345344
Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
346345
Await.result(resultFuture, FiniteDuration.apply(3, SECONDS));
@@ -350,19 +349,16 @@ public void testAfterFailedFuture() throws Exception {
350349
public void testAfterSuccessfulCallable() throws Exception {
351350
final String expected = "Hello";
352351

353-
Future<String> delayedFuture =
352+
CompletionStage<String> delayedFuture =
354353
Patterns.after(
355-
scala.concurrent.duration.Duration.create(200, "millis"),
356-
system.scheduler(),
357-
ec,
358-
() -> Futures.successful(expected));
354+
Duration.ofMillis(200), system, () -> CompletableFuture.completedFuture(expected));
359355

360-
Future<String> resultFuture = Futures.firstCompletedOf(Arrays.asList(delayedFuture), ec);
361-
final String actual = Await.result(resultFuture, FiniteDuration.apply(3, SECONDS));
356+
String actual = delayedFuture.toCompletableFuture().get(3, SECONDS);
362357

363358
assertEquals(expected, actual);
364359
}
365360

361+
@SuppressWarnings("deprecation")
366362
@Test
367363
public void testAfterSuccessfulFuture() throws Exception {
368364
final String expected = "Hello";
@@ -380,6 +376,7 @@ public void testAfterSuccessfulFuture() throws Exception {
380376
assertEquals(expected, actual);
381377
}
382378

379+
@SuppressWarnings("deprecation")
383380
@Test
384381
public void testAfterFiniteDuration() throws Exception {
385382
final String expected = "Hello";
@@ -391,7 +388,8 @@ public void testAfterFiniteDuration() throws Exception {
391388
ec,
392389
() -> Futures.successful("world"));
393390

394-
Future<String> immediateFuture = Futures.future(() -> expected, ec);
391+
Future<String> immediateFuture =
392+
FutureConverters.asScala(CompletableFuture.completedFuture(expected));
395393

396394
Future<String> resultFuture =
397395
Futures.firstCompletedOf(Arrays.asList(delayedFuture, immediateFuture), ec);

actor/src/main/scala/org/apache/pekko/dispatch/Future.scala

+7
Original file line numberDiff line numberDiff line change
@@ -130,27 +130,33 @@ object Futures {
130130
* @param executor the execution context on which the future is run
131131
* @return the `Future` holding the result of the computation
132132
*/
133+
@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0")
133134
def future[T](body: Callable[T], executor: ExecutionContext): Future[T] = Future(body.call)(executor)
134135

135136
/**
136137
* Creates a promise object which can be completed with a value.
137138
*
138139
* @return the newly created `Promise` object
139140
*/
141+
@deprecated("Use CompletableFuture instead", "1.1.0")
140142
def promise[T](): Promise[T] = Promise[T]()
141143

142144
/**
143145
* creates an already completed Promise with the specified exception
144146
*/
147+
@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0")
145148
def failed[T](exception: Throwable): Future[T] = Future.failed(exception)
146149

147150
/**
148151
* Creates an already completed Promise with the specified result
149152
*/
153+
@deprecated("Use CompletionStage / CompletableFuture-based APIs instead", "1.1.0")
150154
def successful[T](result: T): Future[T] = Future.successful(result)
151155

152156
/**
153157
* Creates an already completed CompletionStage with the specified exception
158+
*
159+
* Note: prefer CompletableFuture.failedStage(ex) from Java 9 onwards
154160
*/
155161
def failedCompletionStage[T](ex: Throwable): CompletionStage[T] = {
156162
val f = CompletableFuture.completedFuture[T](null.asInstanceOf[T])
@@ -172,6 +178,7 @@ object Futures {
172178
/**
173179
* Returns a Future to the result of the first future in the list that is completed
174180
*/
181+
@deprecated("Use CompletableFuture.anyOf instead", "1.1.0")
175182
def firstCompletedOf[T <: AnyRef](futures: JIterable[Future[T]], executor: ExecutionContext): Future[T] =
176183
Future.firstCompletedOf(futures.asScala)(executor)
177184

docs/src/test/java/jdocs/future/ActorWithFuture.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515

1616
// #context-dispatcher
1717
import org.apache.pekko.actor.AbstractActor;
18-
import org.apache.pekko.dispatch.Futures;
18+
19+
import java.util.concurrent.CompletableFuture;
1920

2021
public class ActorWithFuture extends AbstractActor {
2122
ActorWithFuture() {
22-
Futures.future(() -> "hello", getContext().dispatcher());
23+
CompletableFuture.supplyAsync(() -> "hello", getContext().dispatcher());
2324
}
2425

2526
@Override

docs/src/test/java/jdocs/future/FutureDocTest.java

+20-22
Original file line numberDiff line numberDiff line change
@@ -14,27 +14,18 @@
1414
package jdocs.future;
1515

1616
import org.apache.pekko.actor.typed.ActorSystem;
17-
import org.apache.pekko.dispatch.Futures;
1817
import org.apache.pekko.pattern.Patterns;
1918
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
2019
import org.apache.pekko.testkit.PekkoSpec;
2120
import org.apache.pekko.util.Timeout;
22-
import org.apache.pekko.util.FutureConverters;
2321
import jdocs.AbstractJavaTest;
2422
import org.junit.ClassRule;
2523
import org.junit.Test;
26-
import scala.concurrent.Await;
27-
import scala.concurrent.ExecutionContext;
28-
import scala.concurrent.Future;
2924

3025
import java.time.Duration;
31-
import java.util.Arrays;
32-
import java.util.concurrent.Callable;
33-
import java.util.concurrent.CompletableFuture;
34-
import java.util.concurrent.CompletionStage;
26+
import java.util.concurrent.*;
3527

3628
import static org.apache.pekko.actor.typed.javadsl.Adapter.toTyped;
37-
import static org.apache.pekko.dispatch.Futures.future;
3829
// #imports
3930

4031
// #imports
@@ -48,9 +39,9 @@ public class FutureDocTest extends AbstractJavaTest {
4839

4940
private final ActorSystem<Void> system = toTyped(actorSystemResource.getSystem());
5041

51-
@Test(expected = java.util.concurrent.CompletionException.class)
52-
public void useAfter() throws Exception {
53-
final ExecutionContext ec = system.executionContext();
42+
@Test(expected = IllegalStateException.class)
43+
public void useAfter() throws Throwable {
44+
final Executor ex = system.executionContext();
5445
// #after
5546
CompletionStage<String> failWithException =
5647
CompletableFuture.supplyAsync(
@@ -60,18 +51,25 @@ public void useAfter() throws Exception {
6051
CompletionStage<String> delayed =
6152
Patterns.after(Duration.ofMillis(200), system, () -> failWithException);
6253
// #after
63-
Future<String> future =
64-
future(
54+
CompletionStage<String> completionStage =
55+
CompletableFuture.supplyAsync(
6556
() -> {
66-
Thread.sleep(1000);
57+
try {
58+
Thread.sleep(1000);
59+
} catch (InterruptedException e) {
60+
throw new RuntimeException(e);
61+
}
6762
return "foo";
6863
},
69-
ec);
70-
Future<String> result =
71-
Futures.firstCompletedOf(
72-
Arrays.<Future<String>>asList(future, FutureConverters.asScala(delayed)), ec);
73-
Timeout timeout = Timeout.create(Duration.ofSeconds(2));
74-
Await.result(result, timeout.duration());
64+
ex);
65+
CompletableFuture<Object> result =
66+
CompletableFuture.anyOf(
67+
completionStage.toCompletableFuture(), delayed.toCompletableFuture());
68+
try {
69+
result.toCompletableFuture().get(2, SECONDS);
70+
} catch (ExecutionException e) {
71+
throw e.getCause();
72+
}
7573
}
7674

7775
@Test

docs/src/test/java/jdocs/persistence/LambdaPersistencePluginDocTest.java

+14-10
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,9 @@
3333
import org.junit.runner.RunWith;
3434
import org.scalatestplus.junit.JUnitRunner;
3535
import scala.concurrent.Future;
36+
37+
import java.util.concurrent.CompletableFuture;
38+
import java.util.concurrent.CompletionStage;
3639
import java.util.function.Consumer;
3740
import org.iq80.leveldb.util.FileUtils;
3841
import java.util.Optional;
@@ -81,45 +84,46 @@ public Receive createReceive() {
8184

8285
class MySnapshotStore extends SnapshotStore {
8386
@Override
84-
public Future<Optional<SelectedSnapshot>> doLoadAsync(
87+
public CompletionStage<Optional<SelectedSnapshot>> doLoadAsync(
8588
String persistenceId, SnapshotSelectionCriteria criteria) {
8689
return null;
8790
}
8891

8992
@Override
90-
public Future<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
93+
public CompletionStage<Void> doSaveAsync(SnapshotMetadata metadata, Object snapshot) {
9194
return null;
9295
}
9396

9497
@Override
95-
public Future<Void> doDeleteAsync(SnapshotMetadata metadata) {
96-
return Futures.successful(null);
98+
public CompletionStage<Void> doDeleteAsync(SnapshotMetadata metadata) {
99+
return CompletableFuture.completedFuture(null);
97100
}
98101

99102
@Override
100-
public Future<Void> doDeleteAsync(String persistenceId, SnapshotSelectionCriteria criteria) {
101-
return Futures.successful(null);
103+
public CompletionStage<Void> doDeleteAsync(
104+
String persistenceId, SnapshotSelectionCriteria criteria) {
105+
return CompletableFuture.completedFuture(null);
102106
}
103107
}
104108

105109
class MyAsyncJournal extends AsyncWriteJournal {
106110
// #sync-journal-plugin-api
107111
@Override
108-
public Future<Iterable<Optional<Exception>>> doAsyncWriteMessages(
112+
public CompletionStage<Iterable<Optional<Exception>>> doAsyncWriteMessages(
109113
Iterable<AtomicWrite> messages) {
110114
try {
111115
Iterable<Optional<Exception>> result = new ArrayList<Optional<Exception>>();
112116
// blocking call here...
113117
// result.add(..)
114-
return Futures.successful(result);
118+
return CompletableFuture.completedFuture(result);
115119
} catch (Exception e) {
116-
return Futures.failed(e);
120+
return Futures.failedCompletionStage(e);
117121
}
118122
}
119123
// #sync-journal-plugin-api
120124

121125
@Override
122-
public Future<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
126+
public CompletionStage<Void> doAsyncDeleteMessagesTo(String persistenceId, long toSequenceNr) {
123127
return null;
124128
}
125129

docs/src/test/java/jdocs/stream/FlowDocTest.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.apache.pekko.actor.ActorRef;
2020
import org.apache.pekko.actor.ActorSystem;
2121
import org.apache.pekko.actor.Cancellable;
22-
import org.apache.pekko.dispatch.Futures;
2322
import org.apache.pekko.japi.Pair;
2423
import org.apache.pekko.stream.*;
2524
import org.apache.pekko.stream.javadsl.*;
@@ -158,8 +157,8 @@ public void creatingSourcesSinks() throws Exception {
158157
list.add(3);
159158
Source.from(list);
160159

161-
// Create a source form a Future
162-
Source.future(Futures.successful("Hello Streams!"));
160+
// Create a source form a CompletionStage
161+
Source.completionStage(CompletableFuture.completedFuture("Hello Streams!"));
163162

164163
// Create a source from a single element
165164
Source.single("only one element");

0 commit comments

Comments
 (0)