Skip to content

Commit 99bd01e

Browse files
committed
Rearrange ExecutionContexts
1 parent dfe6b82 commit 99bd01e

File tree

6 files changed

+58
-47
lines changed

6 files changed

+58
-47
lines changed

aws-s3/aws-sdk-v2/src/main/scala/com/gu/etagcaching/aws/sdkv2/s3/S3ObjectFetching.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ import scala.concurrent.{ExecutionContext, Future}
1515

1616
case class S3ObjectFetching[Response](s3Client: S3AsyncClient, transformer: Transformer[Response])
1717
extends Fetching[ObjectId, Response] {
18+
implicit val ec: ExecutionContext = ExecutionContext.parasitic // Only if suitable...
19+
1820
private def wrapWithETag(resp: Response): ETaggedData[Response] =
1921
ETaggedData(transformer.rawResponseObjectOf(resp).eTag(), resp)
2022

2123
private def performFetch(
2224
resourceId: ObjectId,
2325
reqModifier: Endo[GetObjectRequest.Builder] = identity,
24-
)(implicit ec: ExecutionContext): Future[MissingOrETagged[Response]] = {
26+
): Future[MissingOrETagged[Response]] = {
2527
val requestBuilder = GetObjectRequest.builder().bucket(resourceId.bucket).key(resourceId.key)
2628
val request = reqModifier(requestBuilder).build()
2729
s3Client.getObject(request, transformer.asyncResponseTransformer())
@@ -35,9 +37,9 @@ case class S3ObjectFetching[Response](s3Client: S3AsyncClient, transformer: Tran
3537
}
3638
}
3739

38-
def fetch(key: ObjectId)(implicit ec: ExecutionContext): Future[MissingOrETagged[Response]] = performFetch(key)
40+
def fetch(key: ObjectId): Future[MissingOrETagged[Response]] = performFetch(key)
3941

40-
def fetchOnlyIfETagChanged(key: ObjectId, eTag: String)(implicit ec: ExecutionContext): Future[Option[MissingOrETagged[Response]]] =
42+
def fetchOnlyIfETagChanged(key: ObjectId, eTag: String): Future[Option[MissingOrETagged[Response]]] =
4143
performFetch(key, _.ifNoneMatch(eTag)).map(Some(_)).recover {
4244
case e: S3Exception if e.statusCode == HTTP_NOT_MODIFIED => None // no fresh download because the ETag matched!
4345
}
@@ -48,5 +50,5 @@ object S3ObjectFetching {
4850
* Convenience method for creating a fetcher that just returns a simple byte array.
4951
*/
5052
def byteArraysWith(s3AsyncClient: S3AsyncClient): S3ByteArrayFetching =
51-
S3ObjectFetching(s3AsyncClient, Bytes).mapResponse(_.asByteArray())
53+
S3ObjectFetching(s3AsyncClient, Bytes).mapResponse(_.asByteArray())(ExecutionContext.parasitic)
5254
}

core/src/main/scala/com/gu/etagcaching/ETagCache.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import scala.concurrent.{ExecutionContext, Future}
1717
* ETagCache caches resolved-key-values along with their `ETag`, a content hash supplied & recognised by the
1818
* remote service. ETagCache (when used with a [[AlwaysWaitForRefreshedValue]] policy)
1919
* ''will'' always connect to the service for each request for a key-value, but where it already holds a
20-
* cached `ETaggedData`, it will send the `ETag` with it's request, and the service will return a
20+
* cached `ETaggedData`, it will send the `ETag` with its request, and the service will return a
2121
* response indicating if the content has changed or not. If the content is unchanged, the response will
2222
* be blank, saving network bandwidth, and the old value can be used, saving CPU by not having to parse the
2323
* data again.
@@ -32,18 +32,17 @@ class ETagCache[K, V](
3232
loading: Loading[K, V],
3333
freshnessPolicy: FreshnessPolicy,
3434
configureCache: ConfigCache
35-
)(implicit ec: ExecutionContext) {
36-
35+
) {
3736
private val cache: AsyncLoadingCache[K, MissingOrETagged[V]] = configureCache(Scaffeine()).buildAsyncFuture[K, MissingOrETagged[V]](
3837
loader = loading.fetchAndParse,
3938
reloadLoader = Some(
4039
(key: K, old: MissingOrETagged[V]) => old match {
4140
case Missing => loading.fetchAndParse(key)
42-
case oldETaggedData: ETaggedData[V] => loading.fetchThenParseIfNecessary(key, oldETaggedData)
41+
case oldETaggedData: ETaggedData[V] @unchecked => loading.fetchThenParseIfNecessary(key, oldETaggedData)
4342
}
4443
))
4544

4645
private val read = freshnessPolicy.on(cache)
4746

48-
def get(key: K): Future[Option[V]] = read(key).map(_.toOption)
47+
def get(key: K): Future[Option[V]] = read(key).map(_.toOption)(ExecutionContext.parasitic)
4948
}

core/src/main/scala/com/gu/etagcaching/Loading.scala

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,27 @@ import scala.concurrent.{ExecutionContext, Future}
1818
* @tparam V The 'value' for the key - a parsed representation of whatever was in the resource data.
1919
*/
2020
trait Loading[K, V] {
21-
def fetchAndParse(k: K)(implicit ec: ExecutionContext): Future[MissingOrETagged[V]]
21+
def fetchAndParse(k: K): Future[MissingOrETagged[V]]
2222

2323
/**
2424
* When we have ''old'' `ETaggedData`, we can send the `ETag` with the Fetch request, and the server will return
2525
* a blank HTTP 304 `Not Modified` response if the content hasn't changed - which means we DO NOT need to parse
2626
* any new data, and can just reuse our old data, saving us CPU time and network bandwidth.
2727
*/
28-
def fetchThenParseIfNecessary(k: K, oldV: ETaggedData[V])(implicit ec: ExecutionContext): Future[MissingOrETagged[V]]
28+
def fetchThenParseIfNecessary(k: K, oldV: ETaggedData[V]): Future[MissingOrETagged[V]]
2929

3030
/**
3131
* Add a handler for doing side-effectful logging of updates.
3232
*/
33-
def onUpdate(handler: Update[K,V] => Unit): Loading[K, V] = OnUpdate(this)(handler)
33+
def onUpdate(handler: Update[K,V] => Unit)(implicit ec: ExecutionContext): Loading[K, V] = OnUpdate(this)(handler)
3434
}
3535

3636
object Loading {
37-
def by[K, Response, V](fetching: Fetching[K, Response])(parse: Response => V): Loading[K, V] = new Loading[K, V] {
38-
def fetchAndParse(key: K)(implicit ec: ExecutionContext): Future[MissingOrETagged[V]] =
37+
def by[K, Response, V](fetching: Fetching[K, Response])(parse: Response => V)(implicit parsingEC: ExecutionContext): Loading[K, V] = new Loading[K, V] {
38+
def fetchAndParse(key: K): Future[MissingOrETagged[V]] =
3939
fetching.fetch(key).map(_.map(parse))
4040

41-
def fetchThenParseIfNecessary(key: K, oldV: ETaggedData[V])(implicit ec: ExecutionContext): Future[MissingOrETagged[V]] =
41+
def fetchThenParseIfNecessary(key: K, oldV: ETaggedData[V]): Future[MissingOrETagged[V]] =
4242
fetching.fetchOnlyIfETagChanged(key, oldV.eTag).map {
4343
case None => oldV // we got HTTP 304 'NOT MODIFIED': there's no new data - old data is still valid
4444
case Some(freshResponse) => freshResponse.map(parse)
@@ -52,17 +52,19 @@ object Loading {
5252

5353
/**
5454
* Wrapper round an underlying instance of Loading which adds handler for doing side-effectful logging of updates.
55+
*
56+
* @param loggingEC ExecutionContext for the `handler` function to run in
5557
*/
56-
case class OnUpdate[K, V](underlying: Loading[K, V])(handler: Update[K,V] => Unit)
58+
case class OnUpdate[K, V](underlying: Loading[K, V])(handler: Update[K,V] => Unit)(implicit loggingEC: ExecutionContext)
5759
extends Loading[K, V] {
5860

59-
override def fetchAndParse(key: K)(implicit ec: ExecutionContext): Future[MissingOrETagged[V]] =
61+
override def fetchAndParse(key: K): Future[MissingOrETagged[V]] =
6062
handle(key, None, underlying.fetchAndParse(key))
6163

62-
override def fetchThenParseIfNecessary(key: K, oldV: ETaggedData[V])(implicit ec: ExecutionContext): Future[MissingOrETagged[V]] =
64+
override def fetchThenParseIfNecessary(key: K, oldV: ETaggedData[V]): Future[MissingOrETagged[V]] =
6365
handle(key, oldV.toOption, underlying.fetchThenParseIfNecessary(key, oldV))
6466

65-
private def handle(key: K, oldV: Option[V], fut: Future[MissingOrETagged[V]])(implicit ec: ExecutionContext): Future[MissingOrETagged[V]] = {
67+
private def handle(key: K, oldV: Option[V], fut: Future[MissingOrETagged[V]]): Future[MissingOrETagged[V]] = {
6668
for {
6769
wrappedNewV <- fut
6870
} {

core/src/main/scala/com/gu/etagcaching/fetching/Fetching.scala

Lines changed: 31 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,35 @@ import scala.concurrent.{ExecutionContext, Future}
99
import scala.util.Try
1010

1111
trait Fetching[K, Response] {
12-
def fetch(key: K)(implicit ec: ExecutionContext): Future[MissingOrETagged[Response]]
12+
def fetch(key: K): Future[MissingOrETagged[Response]]
1313

14-
def fetchOnlyIfETagChanged(key: K, eTag: String)(implicit ec: ExecutionContext): Future[Option[MissingOrETagged[Response]]]
14+
def fetchOnlyIfETagChanged(key: K, eTag: String): Future[Option[MissingOrETagged[Response]]]
1515

16+
/**
17+
* @param recordingEC the ExecutionContext used for
18+
*/
1619
def timing(
1720
attemptWith: Duration => Unit = _ => (),
1821
successWith: Duration => Unit = _ => (),
1922
fullFetchWith: Duration => Unit = _ => (),
2023
notModifiedWith: Duration => Unit = _ => ()
21-
): Fetching[K, Response] =
22-
DurationRecorder(this) { recorded =>
23-
val duration = recorded.duration
24-
attemptWith(duration)
25-
recorded.result.foreach { successfulResponse =>
26-
successWith(duration)
27-
successfulResponse match {
28-
case FullFetch => fullFetchWith(duration)
29-
case NotModified => notModifiedWith(duration)
30-
}
24+
)(implicit recordingEC: ExecutionContext): Fetching[K, Response] = DurationRecorder(this) { recorded =>
25+
val duration = recorded.duration
26+
attemptWith(duration)
27+
recorded.result.foreach { successfulResponse =>
28+
successWith(duration)
29+
successfulResponse match {
30+
case FullFetch => fullFetchWith(duration)
31+
case NotModified => notModifiedWith(duration)
3132
}
3233
}
34+
}
3335

3436
def keyOn[K2](f: K2 => K): Fetching[K2, Response] = KeyAdapter(this)(f)
3537

36-
def mapResponse[Response2](f: Response => Response2): Fetching[K, Response2] = ResponseMapper(this)(f)
38+
def mapResponse[Response2](f: Response => Response2)(implicit ec: ExecutionContext): Fetching[K, Response2] = ResponseMapper(this)(f)
3739

38-
def thenParsing[V](parse: Response => V): Loading[K, V] = Loading.by(this)(parse)
40+
def thenParsing[V](parse: Response => V)(implicit parsingEC: ExecutionContext): Loading[K, V] = Loading.by(this)(parse)
3941
}
4042

4143
object Fetching {
@@ -47,38 +49,44 @@ object Fetching {
4749
object DurationRecorder {
4850
case class Result(duration: Duration, result: Try[SuccessfulFetch])
4951
}
50-
case class DurationRecorder[K, Response](underlying: Fetching[K, Response])(recorder: Result => Unit)
52+
53+
/**
54+
* @param ec should recording a result happen on the same thread that
55+
*/
56+
case class DurationRecorder[K, Response](underlying: Fetching[K, Response])(recorder: Result => Unit)(implicit ec: ExecutionContext)
5157
extends Fetching[K, Response] {
5258

53-
private def time[V](block: => Future[V])(f: V => SuccessfulFetch)(implicit ec: ExecutionContext): Future[V] = {
59+
private def time[V](block: => Future[V])(f: V => SuccessfulFetch): Future[V] = {
5460
val start = Instant.now()
5561
val resultF = block
5662
resultF.onComplete(resultTry => recorder(Result(Duration.between(start, Instant.now()), resultTry.map(f))))
5763
resultF
5864
}
5965

60-
override def fetch(key: K)(implicit ec: ExecutionContext): Future[MissingOrETagged[Response]] =
66+
override def fetch(key: K): Future[MissingOrETagged[Response]] =
6167
time(underlying.fetch(key))(_ => FullFetch)
6268

63-
override def fetchOnlyIfETagChanged(key: K, eTag: String)(implicit ec: ExecutionContext): Future[Option[MissingOrETagged[Response]]] =
69+
override def fetchOnlyIfETagChanged(key: K, eTag: String): Future[Option[MissingOrETagged[Response]]] =
6470
time(underlying.fetchOnlyIfETagChanged(key, eTag))(_.map(_ => FullFetch).getOrElse(NotModified))
6571
}
6672

6773
private case class KeyAdapter[K, UnderlyingK, Response](underlying: Fetching[UnderlyingK, Response])(f: K => UnderlyingK)
6874
extends Fetching[K, Response] {
69-
override def fetch(key: K)(implicit ec: ExecutionContext): Future[MissingOrETagged[Response]] =
75+
override def fetch(key: K): Future[MissingOrETagged[Response]] =
7076
underlying.fetch(f(key))
7177

72-
override def fetchOnlyIfETagChanged(key: K, eTag: String)(implicit ec: ExecutionContext): Future[Option[MissingOrETagged[Response]]] =
78+
override def fetchOnlyIfETagChanged(key: K, eTag: String): Future[Option[MissingOrETagged[Response]]] =
7379
underlying.fetchOnlyIfETagChanged(f(key), eTag)
7480
}
7581

76-
private case class ResponseMapper[K, UnderlyingResponse, Response](underlying: Fetching[K, UnderlyingResponse])(f: UnderlyingResponse => Response)
82+
private case class ResponseMapper[K, UnderlyingResponse, Response](underlying: Fetching[K, UnderlyingResponse])(
83+
f: UnderlyingResponse => Response
84+
)(implicit responseMappingEC: ExecutionContext) // the function `f` _may_ be resource intensive, and require an appropriate ExecutionContext
7785
extends Fetching[K, Response] {
78-
override def fetch(key: K)(implicit ec: ExecutionContext): Future[MissingOrETagged[Response]] =
86+
override def fetch(key: K): Future[MissingOrETagged[Response]] =
7987
underlying.fetch(key).map(_.map(f))
8088

81-
override def fetchOnlyIfETagChanged(key: K, eTag: String)(implicit ec: ExecutionContext): Future[Option[MissingOrETagged[Response]]] =
89+
override def fetchOnlyIfETagChanged(key: K, eTag: String): Future[Option[MissingOrETagged[Response]]] =
8290
underlying.fetchOnlyIfETagChanged(key, eTag).map(_.map(_.map(f)))
8391
}
8492
}

core/src/test/scala/com/gu/etagcaching/FreshnessPolicyTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,10 +63,10 @@ class FreshnessPolicyTest extends AnyFlatSpec with Matchers with ScalaFutures wi
6363
val fetching: Fetching[String, Int] = TestFetching.withIncrementingValues
6464

6565
val eTagCache = new ETagCache[String, Int](
66-
fetching.thenParsing(identity),
66+
fetching.thenParsing(identity)(ExecutionContext.parasitic),
6767
TolerateOldValueWhileRefreshing,
6868
_.maximumSize(1).expireAfterWrite(100.millis)
69-
)(ExecutionContext.Implicits.global)
69+
)
7070

7171
eTagCache.get("KEY").futureValue.value shouldBe 0
7272
eTagCache.get("KEY").futureValue.value shouldBe 0

core/src/test/scala/com/gu/etagcaching/TestFetching.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ object TestFetching {
1010
def withIncrementingValues: Fetching[String, Int] = new Fetching[String, Int] {
1111
val counter = new AtomicInteger()
1212

13-
override def fetch(key: String)(implicit ec: ExecutionContext): Future[MissingOrETagged[Int]] = {
13+
override def fetch(key: String): Future[MissingOrETagged[Int]] = {
1414
val count = counter.getAndIncrement()
1515
Future.successful(ETaggedData(count.toString, count))
1616
}
17-
override def fetchOnlyIfETagChanged(key: String, eTag: String)(implicit ec: ExecutionContext): Future[Option[MissingOrETagged[Int]]] =
18-
fetch(key)(ec).map(Some(_))
17+
override def fetchOnlyIfETagChanged(key: String, eTag: String): Future[Option[MissingOrETagged[Int]]] =
18+
fetch(key).map(Some(_))(ExecutionContext.parasitic)
1919
}
2020
}

0 commit comments

Comments
 (0)