Skip to content

Commit a7b04d0

Browse files
committed
Introduce Task monad
This commit introduces Task, a data structure that represents a recipe, or program, for producing a value of type T (or failing with an exception). It is similar in semantics to RunnableGraph[T], but intended as first-class building block. It has the following properties: - A task can have resources associated to it, which are guaranteed to be released if the task is cancelled or fails - Tasks can be forked so multiple ones can run concurrently - Such forked tasks can be cancelled A Task can be created from a RunnableGraph which has a KillSwitch, by connecting a Source and a Sink through a KillSwitch, or by direct lambda functions.
1 parent 3d48931 commit a7b04d0

File tree

24 files changed

+2142
-2
lines changed

24 files changed

+2142
-2
lines changed

actor/src/main/scala/org/apache/pekko/japi/function/Function.scala

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,17 @@ import scala.annotation.nowarn
2525
@nowarn("msg=@SerialVersionUID has no effect")
2626
@SerialVersionUID(1L)
2727
@FunctionalInterface
28-
trait Function[-T, +R] extends java.io.Serializable {
28+
trait Function[-T, +R] extends java.io.Serializable { outer =>
2929
@throws(classOf[Exception])
3030
def apply(param: T): R
31+
32+
/** Returns a function that applies [fn] to the result of this function. */
33+
def andThen[U](fn: Function[R, U]): Function[T, U] = new Function[T,U] {
34+
override def apply(param: T) = fn(outer.apply(param))
35+
}
36+
37+
/** Returns a Scala function representation for this function. */
38+
def toScala[T1 <: T, R1 >: R]: T1 => R1 = t => apply(t)
3139
}
3240

3341
object Function {
@@ -63,6 +71,21 @@ trait Function2[-T1, -T2, +R] extends java.io.Serializable {
6371
trait Procedure[-T] extends java.io.Serializable {
6472
@throws(classOf[Exception])
6573
def apply(param: T): Unit
74+
75+
def toScala[T1 <: T]: T1 => Unit = t => apply(t)
76+
}
77+
78+
/**
79+
* A BiProcedure is like a BiFunction, but it doesn't produce a return value.
80+
* `Serializable` is needed to be able to grab line number for Java 8 lambdas.
81+
* Supports throwing `Exception` in the apply, which the `java.util.function.Consumer` counterpart does not.
82+
*/
83+
@nowarn("msg=@SerialVersionUID has no effect")
84+
@SerialVersionUID(1L)
85+
@FunctionalInterface
86+
trait BiProcedure[-T1,-T2] extends java.io.Serializable {
87+
@throws(classOf[Exception])
88+
def apply(t1: T1, t2: T2): Unit
6689
}
6790

6891
/**
@@ -77,6 +100,9 @@ trait Effect extends java.io.Serializable {
77100

78101
@throws(classOf[Exception])
79102
def apply(): Unit
103+
104+
/** Returns a Scala function representation for this function. */
105+
def toScala: () => Unit = () => apply()
80106
}
81107

82108
/**
@@ -98,11 +124,19 @@ trait Predicate[-T] extends java.io.Serializable {
98124
@nowarn("msg=@SerialVersionUID has no effect")
99125
@SerialVersionUID(1L)
100126
@FunctionalInterface
101-
trait Creator[+T] extends Serializable {
127+
trait Creator[+T] extends Serializable { outer =>
102128

103129
/**
104130
* This method must return a different instance upon every call.
105131
*/
106132
@throws(classOf[Exception])
107133
def create(): T
134+
135+
/** Returns a function that applies [fn] to the result of this function. */
136+
def andThen[U](fn: Function[T, U]): Creator[U] = new Creator[U] {
137+
override def create() = fn(outer.create())
138+
}
139+
140+
/** Returns a Scala function representation for this function. */
141+
def toScala[T1 >: T]: () => T1 = () => create()
108142
}
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.task.javadsl;
19+
20+
import org.apache.pekko.stream.StreamTest;
21+
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
22+
import org.apache.pekko.testkit.PekkoSpec;
23+
import org.apache.pekko.stream.Materializer;
24+
import org.apache.pekko.Done;
25+
26+
import org.junit.ClassRule;
27+
import org.junit.Test;
28+
29+
import org.apache.pekko.japi.function.Creator;
30+
import org.apache.pekko.stream.javadsl.Sink;
31+
import org.apache.pekko.stream.javadsl.Source;
32+
33+
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.ExecutionException;
35+
import java.util.concurrent.atomic.AtomicLong;
36+
37+
import java.util.Optional;
38+
import java.time.Duration;
39+
40+
import static org.junit.Assert.assertEquals;
41+
import static org.junit.Assert.assertTrue;
42+
43+
public class TaskTest extends StreamTest {
44+
private final Runtime runtime = Runtime.create(Materializer.createMaterializer(system));
45+
46+
public TaskTest() {
47+
super(actorSystemResource);
48+
}
49+
50+
@ClassRule
51+
public static PekkoJUnitActorSystemResource actorSystemResource =
52+
new PekkoJUnitActorSystemResource("TaskTest", PekkoSpec.testConf());
53+
54+
private <T> T run(Task<T> task) throws Throwable {
55+
return runtime.run(task.timeout(Duration.ofSeconds(2)));
56+
}
57+
58+
@Test
59+
public void can_run_task_from_lambda() throws Throwable {
60+
assertEquals("Hello", run(Task.run(() -> "Hello")));
61+
}
62+
63+
@Test
64+
public void can_map() throws Throwable {
65+
assertEquals(25, run(Task.run(() -> "25").map(Integer::parseInt)).intValue());
66+
}
67+
68+
@Test
69+
public void can_flatMap_to_run() throws Throwable {
70+
assertEquals(
71+
25, run(Task.run(() -> "25").flatMap(s -> Task.run(() -> Integer.parseInt(s)))).intValue());
72+
}
73+
74+
@Test
75+
public void can_zipPar_two_tasks() throws Throwable {
76+
Task<String> task =
77+
Task.run(
78+
() -> {
79+
return "Hello";
80+
});
81+
assertEquals("HelloHello", run(task.zipPar(task, (s1, s2) -> s1 + s2)));
82+
}
83+
84+
@Test
85+
public void zipPar_interrupts_first_on_error_in_second() throws Throwable {
86+
AtomicLong check = new AtomicLong();
87+
Task<String> task1 = Task.succeed("A").delayed(Duration.ofMillis(100)).before(Task.run(check::incrementAndGet));
88+
Task<String> task2 = Task.fail(new RuntimeException("simulated failure"));
89+
org.junit.Assert.assertThrows(RuntimeException.class, () -> run(task1.zipPar(task2, (a,b) -> a + b)));
90+
assertEquals(0, check.get());
91+
}
92+
93+
@Test
94+
public void zipPar_interrupts_second_on_error_in_first() throws Throwable {
95+
AtomicLong check = new AtomicLong();
96+
Task<String> task1 = Task.succeed("A").delayed(Duration.ofMillis(100)).before(Task.run(check::incrementAndGet));
97+
Task<String> task2 = Task.fail(new RuntimeException("simulated failure"));
98+
org.junit.Assert.assertThrows(RuntimeException.class, () -> run(task2.zipPar(task1, (a,b) -> a + b)));
99+
assertEquals(0, check.get());
100+
}
101+
102+
@Test
103+
public void can_interrupt_forked_task() throws Throwable {
104+
AtomicLong check = new AtomicLong();
105+
Task<Long> task = Task.run(() -> check.incrementAndGet()).delayed(Duration.ofMillis(100));
106+
run(task.forkDaemon().flatMap(fiber -> fiber.interrupt().map(cancelled -> "cancelled")));
107+
assertEquals(0, check.get());
108+
}
109+
110+
@Test(expected = InterruptedException.class)
111+
public void joining_interrupted_fiber_yields_exception() throws Throwable {
112+
Task<Long> task = Task.succeed(42L).delayed(Duration.ofMillis(100));
113+
run(task.forkDaemon().flatMap(fiber -> fiber.interrupt().flatMap(cancelled -> fiber.join())));
114+
}
115+
116+
@Test
117+
public void can_run_graph() throws Throwable {
118+
assertEquals(
119+
Optional.of("hello"), run(Task.connect(Source.single("hello"), Sink.headOption())));
120+
}
121+
122+
@Test
123+
public void can_interrupt_graph() throws Throwable {
124+
AtomicLong check = new AtomicLong();
125+
assertEquals(
126+
Done.getInstance(),
127+
run(
128+
Task.connect(
129+
Source.tick(Duration.ofMillis(1), Duration.ofMillis(1), ""),
130+
Sink.foreach(s -> check.incrementAndGet()))
131+
.forkDaemon()
132+
.flatMap(fiber -> fiber.interrupt())));
133+
Thread.sleep(100);
134+
assertTrue(check.get() < 10);
135+
}
136+
137+
@Test
138+
public void resource_is_acquired_and_released() throws Throwable {
139+
AtomicLong check = new AtomicLong();
140+
Resource<Long> res =
141+
Resource.acquireRelease(
142+
Task.run(() -> check.incrementAndGet()), i -> Task.run(() -> check.decrementAndGet()));
143+
Task<Long> task = res.use(i -> Task.succeed(i));
144+
assertEquals(1L, run(task).longValue());
145+
assertEquals(0L, check.get());
146+
}
147+
148+
@Test
149+
public void resource_is_released_on_failure() throws Throwable {
150+
AtomicLong check = new AtomicLong();
151+
Resource<Long> res =
152+
Resource.acquireRelease(
153+
Task.run(() -> check.incrementAndGet()), i -> Task.run(() -> check.decrementAndGet()));
154+
Task<Long> task = res.use(i -> Task.fail(new RuntimeException("Simulated failure")));
155+
try {
156+
run(task);
157+
} catch (Exception ignored) {
158+
}
159+
assertEquals(0L, check.get());
160+
}
161+
162+
@Test
163+
public void resource_is_released_when_interrupted() throws Throwable {
164+
AtomicLong check = new AtomicLong();
165+
AtomicLong started = new AtomicLong();
166+
167+
Resource<Long> res =
168+
Resource.acquireRelease(
169+
Task.run(
170+
() -> {
171+
return check.incrementAndGet();
172+
}),
173+
i ->
174+
Task.run(
175+
() -> {
176+
return check.decrementAndGet();
177+
}));
178+
179+
Task<Long> task =
180+
res.use(
181+
i ->
182+
Task.run(() -> started.incrementAndGet())
183+
.before(Clock.sleep(Duration.ofMillis(100))));
184+
run(task.forkDaemon().flatMap(fiber -> fiber.interrupt().delayed(Duration.ofMillis(50))));
185+
186+
assertEquals(0L, check.get());
187+
assertEquals(1L, started.get());
188+
}
189+
190+
@Test
191+
public void resource_can_fork() throws Throwable {
192+
AtomicLong check = new AtomicLong();
193+
Resource<Long> res =
194+
Resource.acquireRelease(Task.run(() -> check.incrementAndGet()), i -> Task.done);
195+
Task<Long> task = res.fork().use(fiber -> fiber.join());
196+
run(task);
197+
assertEquals(1L, check.get());
198+
}
199+
200+
@Test
201+
public void resource_is_released_when_fork_is_interrupted() throws Throwable {
202+
AtomicLong check = new AtomicLong();
203+
Resource<Long> res =
204+
Resource.acquireRelease(
205+
Task.run(() -> check.incrementAndGet()), i -> Task.run(() -> check.decrementAndGet()));
206+
Task<Done> task = res.fork().use(fiber -> fiber.interrupt());
207+
run(task);
208+
assertEquals(0L, check.get());
209+
}
210+
211+
@Test
212+
public void resource_is_released_when_fork_is_completed() throws Throwable {
213+
AtomicLong check = new AtomicLong();
214+
Resource<Long> res =
215+
Resource.acquireRelease(
216+
Task.run(() -> check.incrementAndGet()), i -> Task.run(() -> check.decrementAndGet()));
217+
Task<Long> task = res.fork().use(fiber -> fiber.join());
218+
run(task);
219+
assertEquals(0L, check.get());
220+
}
221+
222+
@Test
223+
public void can_create_and_complete_promise() throws Throwable {
224+
Task<Integer> task =
225+
Promise.<Integer>make()
226+
.flatMap(
227+
promise ->
228+
promise
229+
.await()
230+
.forkDaemon()
231+
.flatMap(fiber -> promise.succeed(42).andThen(fiber.join())));
232+
assertEquals(42, run(task).intValue());
233+
}
234+
235+
@Test
236+
public void can_race_two_tasks() throws Throwable {
237+
Task<Integer> task1 = Task.succeed(0).delayed(Duration.ofMillis(100));
238+
Task<Integer> task2 = Task.succeed(42);
239+
Task<Integer> task = Task.raceAll(task1, task2);
240+
assertEquals(42, run(task).intValue());
241+
}
242+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.task.javadsl;
19+
20+
import java.time.Duration;
21+
import java.time.Instant;
22+
23+
import org.apache.pekko.task.GetRuntimeDef$;
24+
import org.apache.pekko.task.ClockDef$;
25+
import org.apache.pekko.task.TaskDef;
26+
import org.apache.pekko.Done;
27+
28+
public class Clock {
29+
public static final Task<Long> nanoTime = new Task<>(ClockDef$.MODULE$.nanoTime());
30+
public static final Task<Instant> now = new Task<>(ClockDef$.MODULE$.now());
31+
32+
public static Task<Done> sleep(Duration duration) {
33+
return new Task<>(ClockDef$.MODULE$.sleep(duration)).asDone();
34+
}
35+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.task.javadsl;
19+
20+
import java.util.ArrayList;
21+
import java.util.function.Function;
22+
23+
/**
24+
* Functional helpers for collections. Prefixed with "c" so they don't conflict with local methods
25+
* of the same name (in Java 8's limited name resolution)
26+
*/
27+
public class CollectionHelpers {
28+
public static <T, U> ArrayList<U> cmap(
29+
Iterable<? extends T> src, Function<? super T, ? extends U> fn) {
30+
ArrayList<U> list = new ArrayList<>();
31+
for (T t : src) {
32+
list.add(fn.apply(t));
33+
}
34+
return list;
35+
}
36+
}

0 commit comments

Comments
 (0)