Skip to content

Commit ba09de8

Browse files
committed
chore: Add FutureOps which with an await style.
1 parent 2b6d2dc commit ba09de8

File tree

7 files changed

+48
-24
lines changed

7 files changed

+48
-24
lines changed

actor/src/main/scala/org/apache/pekko/actor/LightArrayRevolverScheduler.scala

+3-2
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import java.util.concurrent.atomic.{ AtomicLong, AtomicReference }
1919

2020
import scala.annotation.{ nowarn, tailrec }
2121
import scala.collection.immutable
22-
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
22+
import scala.concurrent.{ ExecutionContext, Future, Promise }
2323
import scala.concurrent.duration._
2424
import scala.util.control.NonFatal
2525

@@ -178,7 +178,8 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
178178
}
179179
}
180180

181-
Await.result(stop(), getShutdownTimeout).foreach {
181+
import pekko.util.Helpers._
182+
stop().await(getShutdownTimeout).foreach {
182183
case task: Scheduler.TaskRunOnClose =>
183184
runTask(task)
184185
case holder: TaskHolder => // don't run

actor/src/main/scala/org/apache/pekko/pattern/CircuitBreaker.scala

+10-8
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger, AtomicLong }
1919
import java.util.function.BiFunction
2020
import java.util.function.Consumer
2121
import scala.annotation.nowarn
22-
import scala.concurrent.{ Await, ExecutionContext, Future, Promise }
22+
import scala.concurrent.{ ExecutionContext, Future, Promise }
2323
import scala.concurrent.TimeoutException
2424
import scala.concurrent.duration._
2525
import scala.util.{ Failure, Success, Try }
@@ -446,13 +446,15 @@ class CircuitBreaker(
446446
* @param defineFailureFn function that define what should be consider failure and thus increase failure count
447447
* @return The result of the call
448448
*/
449-
def withSyncCircuitBreaker[T](body: => T, defineFailureFn: Try[T] => Boolean): T =
450-
Await.result(
451-
withCircuitBreaker(
452-
try Future.successful(body)
453-
catch { case NonFatal(t) => Future.failed(t) },
454-
defineFailureFn),
455-
callTimeout)
449+
def withSyncCircuitBreaker[T](body: => T, defineFailureFn: Try[T] => Boolean): T = {
450+
import pekko.util.Helpers._
451+
withCircuitBreaker(
452+
try Future.successful(body)
453+
catch {
454+
case NonFatal(t) => Future.failed(t)
455+
},
456+
defineFailureFn).await(callTimeout)
457+
}
456458

457459
/**
458460
* Java API for [[#withSyncCircuitBreaker]]. Throws [[java.util.concurrent.TimeoutException]] if the call timed out.

actor/src/main/scala/org/apache/pekko/serialization/AsyncSerializer.scala

+5-4
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,7 @@ package org.apache.pekko.serialization
1515

1616
import java.util.concurrent.CompletionStage
1717

18-
import scala.concurrent.{ Await, Future }
19-
import scala.concurrent.duration.Duration
18+
import scala.concurrent.Future
2019

2120
import org.apache.pekko
2221
import pekko.actor.ExtendedActorSystem
@@ -59,14 +58,16 @@ abstract class AsyncSerializerWithStringManifest(system: ExtendedActorSystem)
5958
log.warning(
6059
"Async serializer called synchronously. This will block. Async serializers should only be used for pekko persistence plugins that support them. Class: {}",
6160
o.getClass)
62-
Await.result(toBinaryAsync(o), Duration.Inf)
61+
import pekko.util.Helpers._
62+
toBinaryAsync(o).await()
6363
}
6464

6565
final override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = {
6666
log.warning(
6767
"Async serializer called synchronously. This will block. Async serializers should only be used for Pekko persistence plugins that support them. Manifest: [{}]",
6868
manifest)
69-
Await.result(fromBinaryAsync(bytes, manifest), Duration.Inf)
69+
import pekko.util.Helpers._
70+
fromBinaryAsync(bytes, manifest).await()
7071
}
7172
}
7273

actor/src/main/scala/org/apache/pekko/util/Helpers.scala

+17
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ import scala.annotation.tailrec
3535
import scala.concurrent.duration.{ Duration, FiniteDuration }
3636
import com.typesafe.config.{ Config, ConfigRenderOptions }
3737

38+
import scala.concurrent.{ Await, Future }
39+
3840
object Helpers {
3941

4042
def toRootLowerCase(s: String): String = s.toLowerCase(Locale.ROOT)
@@ -195,4 +197,19 @@ object Helpers {
195197
Duration(config.getDuration(path, unit), unit)
196198
}
197199

200+
/**
201+
* INTERNAL API
202+
*/
203+
private[pekko] final implicit class FutureOps[T](val future: Future[T]) extends AnyVal {
204+
205+
/**
206+
* Wait for the future to complete and return the result, or throw an exception if the future failed.
207+
* Optimize for the case when the future is already completed.
208+
* @since 1.2.0
209+
*/
210+
def await(atMost: Duration = Duration.Inf): T = future.value match {
211+
case Some(value) => value.get
212+
case None => Await.result(future, atMost)
213+
}
214+
}
198215
}

stream/src/main/scala/org/apache/pekko/stream/SystemMaterializer.scala

+6-4
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
package org.apache.pekko.stream
1515

1616
import scala.annotation.nowarn
17-
import scala.concurrent.Await
1817
import scala.concurrent.Promise
1918

2019
import org.apache.pekko
@@ -77,7 +76,8 @@ final class SystemMaterializer(system: ExtendedActorSystem) extends Extension {
7776
private[pekko] def createAdditionalSystemMaterializer(): Materializer = {
7877
val started =
7978
(materializerGuardian ? MaterializerGuardian.StartMaterializer).mapTo[MaterializerGuardian.MaterializerStarted]
80-
Await.result(started, materializerTimeout.duration).materializer
79+
import pekko.util.Helpers._
80+
started.await(materializerTimeout.duration).materializer
8181
}
8282

8383
/**
@@ -91,12 +91,14 @@ final class SystemMaterializer(system: ExtendedActorSystem) extends Extension {
9191
val started =
9292
(materializerGuardian ? MaterializerGuardian.LegacyStartMaterializer(namePrefix, settings))
9393
.mapTo[MaterializerGuardian.MaterializerStarted]
94-
Await.result(started, materializerTimeout.duration).materializer
94+
import pekko.util.Helpers._
95+
started.await(materializerTimeout.duration).materializer
9596
}
9697

9798
val materializer: Materializer = {
9899
// block on async creation to make it effectively final
99-
Await.result(systemMaterializerPromise.future, materializerTimeout.duration)
100+
import pekko.util.Helpers._
101+
systemMaterializerPromise.future.await(materializerTimeout.duration)
100102
}
101103

102104
}

stream/src/main/scala/org/apache/pekko/stream/impl/io/OutputStreamSourceStage.scala

+4-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ package org.apache.pekko.stream.impl.io
1616
import java.io.{ IOException, OutputStream }
1717
import java.util.concurrent.{ Semaphore, TimeUnit }
1818

19-
import scala.concurrent.Await
2019
import scala.concurrent.duration.FiniteDuration
2120
import scala.util.control.NonFatal
2221

@@ -87,7 +86,8 @@ private[pekko] class OutputStreamAdapter(
8786
}
8887

8988
try {
90-
Await.result(sendToStage.invokeWithFeedback(Send(data)), writeTimeout)
89+
import pekko.util.Helpers._
90+
sendToStage.invokeWithFeedback(Send(data)).await(writeTimeout)
9191
} catch {
9292
case NonFatal(e) => throw new IOException(e)
9393
}
@@ -115,7 +115,8 @@ private[pekko] class OutputStreamAdapter(
115115
@scala.throws(classOf[IOException])
116116
override def close(): Unit = {
117117
try {
118-
Await.result(sendToStage.invokeWithFeedback(Close), writeTimeout)
118+
import pekko.util.Helpers._
119+
sendToStage.invokeWithFeedback(Close).await(writeTimeout)
119120
} catch {
120121
case NonFatal(e) => throw new IOException(e)
121122
}

stream/src/main/scala/org/apache/pekko/stream/scaladsl/StreamConverters.scala

+3-3
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@ import java.io.{ InputStream, OutputStream }
1717
import java.util.Spliterators
1818
import java.util.stream.{ Collector, StreamSupport }
1919

20-
import scala.concurrent.{ Await, Future }
20+
import scala.concurrent.Future
2121
import scala.concurrent.duration._
22-
import scala.concurrent.duration.Duration._
2322

2423
import org.apache.pekko
2524
import pekko.NotUsed
@@ -197,7 +196,8 @@ object StreamConverters {
197196
var nextElement: Option[T] = _
198197

199198
override def hasNext: Boolean = {
200-
nextElement = Await.result(nextElementFuture, Inf)
199+
import pekko.util.Helpers._
200+
nextElement = nextElementFuture.await()
201201
nextElement.isDefined
202202
}
203203

0 commit comments

Comments
 (0)