Skip to content

Commit b7ba3a9

Browse files
committed
Introduce Task monad
This commit introduces Task, a data structure that represents a recipe, or program, for producing a value of type T (or failing with an exception). It is similar in semantics to RunnableGraph[T], but intended as first-class building block. It has the following properties: - A task can have resources associated to it, which are guaranteed to be released if the task is cancelled or fails - Tasks can be forked so multiple ones can run concurrently - Such forked tasks can be cancelled A Task can be created from a RunnableGraph which has a KillSwitch, by connecting a Source and a Sink through a KillSwitch, or by direct lambda functions.
1 parent 3aacafe commit b7ba3a9

File tree

8 files changed

+877
-2
lines changed

8 files changed

+877
-2
lines changed

actor/src/main/scala/org/apache/pekko/japi/function/Function.scala

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,17 @@ import scala.annotation.nowarn
2525
@nowarn("msg=@SerialVersionUID has no effect")
2626
@SerialVersionUID(1L)
2727
@FunctionalInterface
28-
trait Function[-T, +R] extends java.io.Serializable {
28+
trait Function[-T, +R] extends java.io.Serializable { outer =>
2929
@throws(classOf[Exception])
3030
def apply(param: T): R
31+
32+
/** Returns a function that applies [fn] to the result of this function. */
33+
def andThen[U](fn: Function[R, U]): Function[T, U] = new Function[T,U] {
34+
override def apply(param: T) = fn(outer.apply(param))
35+
}
36+
37+
/** Returns a Scala function representation for this function. */
38+
def toScala[T1 <: T, R1 >: R]: T1 => R1 = t => apply(t)
3139
}
3240

3341
object Function {
@@ -98,11 +106,19 @@ trait Predicate[-T] extends java.io.Serializable {
98106
@nowarn("msg=@SerialVersionUID has no effect")
99107
@SerialVersionUID(1L)
100108
@FunctionalInterface
101-
trait Creator[+T] extends Serializable {
109+
trait Creator[+T] extends Serializable { outer =>
102110

103111
/**
104112
* This method must return a different instance upon every call.
105113
*/
106114
@throws(classOf[Exception])
107115
def create(): T
116+
117+
/** Returns a function that applies [fn] to the result of this function. */
118+
def andThen[U](fn: Function[T, U]): Creator[U] = new Creator[U] {
119+
override def create() = fn(outer.create())
120+
}
121+
122+
/** Returns a Scala function representation for this function. */
123+
def toScala[T1 >: T]: () => T1 = () => create()
108124
}
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.task.javadsl;
19+
20+
import org.apache.pekko.stream.StreamTest;
21+
import org.apache.pekko.testkit.PekkoJUnitActorSystemResource;
22+
import org.apache.pekko.testkit.PekkoSpec;
23+
import org.apache.pekko.stream.Materializer;
24+
import org.apache.pekko.Done;
25+
26+
import org.junit.ClassRule;
27+
import org.junit.Test;
28+
29+
import org.apache.pekko.task.RuntimeDef;
30+
import org.apache.pekko.japi.function.Creator;
31+
import org.apache.pekko.stream.javadsl.Sink;
32+
import org.apache.pekko.stream.javadsl.Source;
33+
34+
import java.util.concurrent.TimeUnit;
35+
import java.util.concurrent.ExecutionException;
36+
import java.util.concurrent.atomic.AtomicLong;
37+
38+
import java.util.Optional;
39+
import java.time.Duration;
40+
41+
import static org.junit.Assert.assertEquals;
42+
import static org.junit.Assert.assertTrue;
43+
44+
public class TaskTest extends StreamTest{
45+
private final RuntimeDef runtime = new RuntimeDef(Materializer.createMaterializer(system));
46+
47+
public TaskTest() {
48+
super(actorSystemResource);
49+
}
50+
51+
@ClassRule
52+
public static PekkoJUnitActorSystemResource actorSystemResource =
53+
new PekkoJUnitActorSystemResource("TaskTest", PekkoSpec.testConf());
54+
55+
private <T> T run(Task<T> task) throws Exception {
56+
return runtime.runAsync(task).get(2, TimeUnit.SECONDS);
57+
}
58+
59+
@Test
60+
public void can_run_task_from_lambda() throws Exception {
61+
assertEquals("Hello", run(Task.run(() -> "Hello")));
62+
}
63+
64+
@Test
65+
public void can_map() throws Exception {
66+
assertEquals(25, run(Task.run(() -> "25").map(Integer::parseInt)).intValue());
67+
}
68+
69+
@Test
70+
public void can_flatMap_to_run() throws Exception {
71+
assertEquals(25, run(Task.run(() -> "25").flatMap(s -> Task.run(() -> Integer.parseInt(s)))).intValue());
72+
}
73+
74+
@Test
75+
public void can_zipPar_two_tasks() throws Exception {
76+
Task<String> task = Task.run(() -> {
77+
return "Hello";
78+
});
79+
assertEquals("HelloHello", run(task.zipPar(task, (s1,s2) -> s1 + s2)));
80+
}
81+
82+
@Test
83+
public void can_interrupt_forked_task() throws Exception {
84+
AtomicLong check = new AtomicLong();
85+
Task<Long> task = Task.run(() -> {
86+
Thread.sleep(100); // TODO replace with .delay() once available
87+
}).map(d -> check.incrementAndGet());
88+
run(task.forkDaemon().flatMap(fiber ->
89+
fiber.interrupt().map(cancelled ->
90+
"cancelled"
91+
)
92+
));
93+
assertEquals(0, check.get());
94+
}
95+
96+
@Test(expected=ExecutionException.class)
97+
public void joining_interrupted_fiber_yields_exception() throws Exception {
98+
Task<Long> task = Task.run(() -> Thread.sleep(100)).map(d -> 42L);
99+
run(task.forkDaemon().flatMap(fiber ->
100+
fiber.interrupt().flatMap(cancelled ->
101+
fiber.join()
102+
)
103+
));
104+
}
105+
106+
@Test
107+
public void can_run_graph() throws Exception {
108+
assertEquals(Optional.of("hello"),
109+
run(Task.connectCancellable(Source.single("hello"), Sink.headOption()).flatMap(fiber -> fiber.join())));
110+
}
111+
112+
@Test
113+
public void can_interrupt_graph() throws Exception {
114+
AtomicLong check = new AtomicLong();
115+
assertEquals(Done.getInstance(), run(
116+
Task.connectCancellable(
117+
Source.tick(Duration.ofMillis(1), Duration.ofMillis(1), ""),
118+
Sink.foreach(s -> check.incrementAndGet())
119+
).flatMap(fiber -> fiber.interrupt())
120+
));
121+
Thread.sleep(100);
122+
assertTrue(check.get() < 10);
123+
}
124+
125+
@Test
126+
public void resource_is_acquired_and_released() throws Exception {
127+
AtomicLong check = new AtomicLong();
128+
Resource<Long> res = Resource.acquireRelease(Task.run(() -> check.incrementAndGet()), i -> Task.run(() -> check.decrementAndGet()));
129+
Task<Long> task = res.use(i -> Task.succeed(i));
130+
assertEquals(1L, run(task).longValue());
131+
assertEquals(0L, check.get());
132+
}
133+
134+
@Test
135+
public void resource_is_released_on_failure() throws Exception {
136+
AtomicLong check = new AtomicLong();
137+
Resource<Long> res = Resource.acquireRelease(Task.run(() -> check.incrementAndGet()), i -> Task.run(() -> check.decrementAndGet()));
138+
Task<Long> task = res.use(i -> Task.fail(new RuntimeException("Simulated failure")));
139+
try { run(task); } catch (Exception ignored) {}
140+
assertEquals(0L, check.get());
141+
}
142+
143+
@Test
144+
public void resource_is_released_when_interrupted() throws Exception {
145+
AtomicLong check = new AtomicLong();
146+
AtomicLong started = new AtomicLong();
147+
148+
Resource<Long> res = Resource.acquireRelease(Task.run(() -> {
149+
return check.incrementAndGet();
150+
}), i -> Task.run(() -> {
151+
return check.decrementAndGet();
152+
}));
153+
154+
Task<Done> task = res.use(i -> Task.run(() -> {
155+
started.incrementAndGet();
156+
Thread.sleep(100); // TODO replace with .delay() once available
157+
}).flatMap(d -> Task.run(() -> {
158+
Thread.sleep(100); // TODO replace with .delay() once available
159+
})));
160+
161+
run(task.forkDaemon().flatMap(fiber -> {
162+
Thread.sleep(50); // TODO replace with .delay() once available
163+
return fiber.interrupt();
164+
}));
165+
166+
assertEquals(0L, check.get());
167+
assertEquals(1L, started.get());
168+
}
169+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.task.javadsl;
19+
20+
import org.apache.pekko.task.FiberDef;
21+
import org.apache.pekko.task.JoinDef;
22+
import org.apache.pekko.task.InterruptDef;
23+
import org.apache.pekko.Done;
24+
25+
/**
26+
* A fiber represents the ongoing execution of a Task, eventually resulting in a value T or failing
27+
* with an exception.
28+
*/
29+
public class Fiber<T> {
30+
private final FiberDef<T> impl;
31+
32+
public Fiber(FiberDef<T> impl) {
33+
this.impl = impl;
34+
}
35+
36+
/** Returns a Task that will complete when this fiber does. */
37+
public Task<T> join() {
38+
return new Task<>(new JoinDef<>(impl));
39+
}
40+
41+
/**
42+
* Returns a Task that will interrupt this fiber, causing its execution to stop. The task will
43+
* complete with Done when this fiber has fully stopped.
44+
*/
45+
public Task<Done> interrupt() {
46+
return new Task<>(new InterruptDef<>(impl)).asDone();
47+
}
48+
}
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.pekko.task.javadsl;
19+
20+
import org.apache.pekko.japi.function.Function;
21+
import org.apache.pekko.task.AbstractTask;
22+
23+
import static org.apache.pekko.task.javadsl.Task.*;
24+
25+
/**
26+
* A Resource represents a value that can be created, but needs to be cleaned up after use. The use
27+
* of Resource guarantees that values aren't leaked even in case of failure or interrupted fibers.
28+
*/
29+
public class Resource<T> {
30+
/**
31+
* Creates a Resource that creates a value using [acquire], invoking [release] when such value
32+
* needs to be cleaned up.
33+
*/
34+
public static <T> Resource<T> acquireRelease(
35+
Task<T> acquire, Function<T, ? extends AbstractTask<?>> release) {
36+
return new Resource<>(acquire, release);
37+
}
38+
39+
private final Task<T> create;
40+
private final Function<T, ? extends AbstractTask<?>> destroy;
41+
42+
private Resource(Task<T> create, Function<T, ? extends AbstractTask<?>> destroy) {
43+
this.create = create;
44+
this.destroy = destroy;
45+
}
46+
47+
/**
48+
* Returns a Task that can safely use this resource in the given function, guaranteeing cleanup
49+
* even if the function fails or the current fiber is interrupted.
50+
*/
51+
public <U> Task<U> use(Function<T, ? extends AbstractTask<U>> fn) {
52+
return Task.<U>uninterruptableMask(
53+
restore ->
54+
create.flatMap(
55+
t ->
56+
runTask(() -> task(restore.<U>apply(task(fn.apply(t)))))
57+
.onComplete((r, x) -> runTask(() -> task(destroy.apply(t))))));
58+
}
59+
}

0 commit comments

Comments
 (0)