From d3105885e837a3e058983dde5d3397e0918f7a4f Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Tue, 2 Jul 2024 20:54:37 +1000 Subject: [PATCH 1/4] Add an abstract Synchronous cache and optimize BlockedRequests --- .../src/main/scala/zio/query/Cache.scala | 51 +++++++++++++------ .../src/main/scala/zio/query/ZQuery.scala | 4 +- .../zio/query/internal/BlockedRequests.scala | 30 +++++++---- 3 files changed, 57 insertions(+), 28 deletions(-) diff --git a/zio-query/shared/src/main/scala/zio/query/Cache.scala b/zio-query/shared/src/main/scala/zio/query/Cache.scala index b1dc027..f766124 100644 --- a/zio-query/shared/src/main/scala/zio/query/Cache.scala +++ b/zio-query/shared/src/main/scala/zio/query/Cache.scala @@ -74,33 +74,52 @@ object Cache { def empty(expectedNumOfElements: Int)(implicit trace: Trace): UIO[Cache] = ZIO.succeed(Cache.unsafeMake(expectedNumOfElements)) - private[query] final class Default(private val map: ConcurrentHashMap[Request[_, _], Promise[_, _]]) extends Cache { + /** + * A 'Synchronous' cache is one that doesn't require an effect to look up its + * value. Prefer extending this class when implementing a cache that doesn't + * perform any asynchronous IO. + */ + abstract class Synchronous extends Cache { + def getOrNull[E, A](request: Request[E, A]): Promise[E, A] + def lookupNow[E, A, B](request: Request[_, _]): Either[Promise[E, B], Promise[E, B]] + def putNow[E, A](request: Request[E, A], result: Promise[E, A]): Unit + def removeNow[E, A](request: Request[E, A]): Unit - def get[E, A](request: Request[E, A])(implicit trace: Trace): IO[Unit, Promise[E, A]] = + final def get[E, A](request: Request[E, A])(implicit trace: Trace): IO[Unit, Promise[E, A]] = ZIO.suspendSucceed { - val out = map.get(request).asInstanceOf[Promise[E, A]] - if (out eq null) Exit.fail(()) else Exit.succeed(out) + val p = getOrNull(request) + if (p eq null) Exit.fail(()) else Exit.succeed(p) } - def lookup[E, A, B](request: A)(implicit - ev: A <:< Request[E, B], - trace: Trace - ): UIO[Either[Promise[E, B], Promise[E, B]]] = - ZIO.succeed(lookupUnsafe(request)(Unsafe.unsafe)) + final def lookup[E, A, B]( + request: A + )(implicit ev: A <:< Request[E, B], trace: Trace): UIO[Either[Promise[E, B], Promise[E, B]]] = + ZIO.succeed(lookupNow(request)) + + final def put[E, A](request: Request[E, A], result: Promise[E, A])(implicit trace: Trace): UIO[Unit] = + ZIO.succeed(putNow(request, result)) + + final def remove[E, A](request: Request[E, A])(implicit trace: Trace): UIO[Unit] = + ZIO.succeed(removeNow(request)) + } + + private final class Default(map: ConcurrentHashMap[Request[_, _], Promise[_, _]]) extends Synchronous { + private implicit val unsafe: Unsafe = Unsafe.unsafe + + def getOrNull[E, A](request: Request[E, A]): Promise[E, A] = + map.get(request).asInstanceOf[Promise[E, A]] - def lookupUnsafe[E, A, B](request: Request[_, _])(implicit - unsafe: Unsafe - ): Either[Promise[E, B], Promise[E, B]] = { + def lookupNow[E, A, B](request: Request[_, _]): Either[Promise[E, B], Promise[E, B]] = { val newPromise = Promise.unsafe.make[E, B](FiberId.None) val existing = map.putIfAbsent(request, newPromise).asInstanceOf[Promise[E, B]] if (existing eq null) Left(newPromise) else Right(existing) } - def put[E, A](request: Request[E, A], result: Promise[E, A])(implicit trace: Trace): UIO[Unit] = - ZIO.succeed(map.put(request, result)) + def putNow[E, A](request: Request[E, A], result: Promise[E, A]): Unit = + map.put(request, result) - def remove[E, A](request: Request[E, A])(implicit trace: Trace): UIO[Unit] = - ZIO.succeed(map.remove(request)) + def removeNow[E, A](request: Request[E, A]): Unit = + map.remove(request) } // TODO: Initialize the map with a sensible default value. Default is 16, which seems way too small for a cache diff --git a/zio-query/shared/src/main/scala/zio/query/ZQuery.scala b/zio-query/shared/src/main/scala/zio/query/ZQuery.scala index ba19cb6..d9952b1 100644 --- a/zio-query/shared/src/main/scala/zio/query/ZQuery.scala +++ b/zio-query/shared/src/main/scala/zio/query/ZQuery.scala @@ -1559,8 +1559,8 @@ object ZQuery { } cache match { - case cache: Cache.Default => foldPromise(cache.lookupUnsafe(request)(Unsafe.unsafe)) - case cache => CachedResult.Effectful(cache.lookup(request).flatMap(foldPromise(_).toZIO)) + case cache: Cache.Synchronous => foldPromise(cache.lookupNow(request)) + case cache => CachedResult.Effectful(cache.lookup(request).flatMap(foldPromise(_).toZIO)) } } diff --git a/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala b/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala index a94836d..77cffd6 100644 --- a/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala +++ b/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala @@ -285,28 +285,38 @@ private[query] object BlockedRequests { private def completePromises( dataSource: DataSource[_, Any], sequential: Chunk[Chunk[BlockedRequest[Any]]] - )(get: Request[?, ?] => Option[Exit[Any, Any]]): Unit = - sequential.foreach { - _.foreach { br => - val req = br.request - val res = get(req) match { - case Some(exit) => exit.asInstanceOf[Exit[br.Failure, br.Success]] - case None => Exit.die(QueryFailure(dataSource, req)) - } + )(get: Request[?, ?] => Option[Exit[Any, Any]]): Unit = { + + def loopInner(c: Chunk[BlockedRequest[Any]]): Unit = { + val it = c.iterator + while (it.hasNext) { + val br = it.next() + val req = br.request + val exit = get(req) + val res = + if (exit.isEmpty) Exit.die(QueryFailure(dataSource, req)) + else exit.get.asInstanceOf[Exit[br.Failure, br.Success]] br.result.unsafe.done(res)(Unsafe.unsafe) } } + val it0 = sequential.iterator + while (it0.hasNext) { + val next = it0.next() + loopInner(next) + } + } + private def cacheLeftovers( cache: Cache, map: mutable.HashMap[Request[_, _], Exit[Any, Any]] )(implicit trace: Trace): UIO[Unit] = cache match { - case cache: Cache.Default => + case cache: Cache.Synchronous => ZIO.succeedUnsafe { implicit unsafe => map.foreach { case (request: Request[Any, Any], exit) => cache - .lookupUnsafe(request) + .lookupNow(request) .merge .unsafe .done(exit) From 0e74c5aea7134f77d0279a609408822171c6adb4 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Thu, 4 Jul 2024 09:16:08 +1000 Subject: [PATCH 2/4] Rename abstract class to Eager --- .../shared/src/main/scala/zio/query/Cache.scala | 16 ++++++++-------- .../shared/src/main/scala/zio/query/ZQuery.scala | 4 ++-- .../zio/query/internal/BlockedRequests.scala | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/zio-query/shared/src/main/scala/zio/query/Cache.scala b/zio-query/shared/src/main/scala/zio/query/Cache.scala index f766124..7269427 100644 --- a/zio-query/shared/src/main/scala/zio/query/Cache.scala +++ b/zio-query/shared/src/main/scala/zio/query/Cache.scala @@ -75,13 +75,13 @@ object Cache { ZIO.succeed(Cache.unsafeMake(expectedNumOfElements)) /** - * A 'Synchronous' cache is one that doesn't require an effect to look up its + * An 'Eager' cache is one that doesn't require an effect to look up its * value. Prefer extending this class when implementing a cache that doesn't - * perform any asynchronous IO. + * perform any IO, such as a cache based on a Map. */ - abstract class Synchronous extends Cache { + abstract class Eager extends Cache { def getOrNull[E, A](request: Request[E, A]): Promise[E, A] - def lookupNow[E, A, B](request: Request[_, _]): Either[Promise[E, B], Promise[E, B]] + def lookupNow[E, A](request: Request[E, A]): Either[Promise[E, A], Promise[E, A]] def putNow[E, A](request: Request[E, A], result: Promise[E, A]): Unit def removeNow[E, A](request: Request[E, A]): Unit @@ -103,15 +103,15 @@ object Cache { ZIO.succeed(removeNow(request)) } - private final class Default(map: ConcurrentHashMap[Request[_, _], Promise[_, _]]) extends Synchronous { + private final class Default(map: ConcurrentHashMap[Request[_, _], Promise[_, _]]) extends Eager { private implicit val unsafe: Unsafe = Unsafe.unsafe def getOrNull[E, A](request: Request[E, A]): Promise[E, A] = map.get(request).asInstanceOf[Promise[E, A]] - def lookupNow[E, A, B](request: Request[_, _]): Either[Promise[E, B], Promise[E, B]] = { - val newPromise = Promise.unsafe.make[E, B](FiberId.None) - val existing = map.putIfAbsent(request, newPromise).asInstanceOf[Promise[E, B]] + def lookupNow[E, A](request: Request[E, A]): Either[Promise[E, A], Promise[E, A]] = { + val newPromise = Promise.unsafe.make[E, A](FiberId.None) + val existing = map.putIfAbsent(request, newPromise).asInstanceOf[Promise[E, A]] if (existing eq null) Left(newPromise) else Right(existing) } diff --git a/zio-query/shared/src/main/scala/zio/query/ZQuery.scala b/zio-query/shared/src/main/scala/zio/query/ZQuery.scala index d9952b1..b31a97a 100644 --- a/zio-query/shared/src/main/scala/zio/query/ZQuery.scala +++ b/zio-query/shared/src/main/scala/zio/query/ZQuery.scala @@ -1559,8 +1559,8 @@ object ZQuery { } cache match { - case cache: Cache.Synchronous => foldPromise(cache.lookupNow(request)) - case cache => CachedResult.Effectful(cache.lookup(request).flatMap(foldPromise(_).toZIO)) + case cache: Cache.Eager => foldPromise(cache.lookupNow(request)) + case cache => CachedResult.Effectful(cache.lookup(request).flatMap(foldPromise(_).toZIO)) } } diff --git a/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala b/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala index 77cffd6..9aa7c98 100644 --- a/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala +++ b/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala @@ -312,7 +312,7 @@ private[query] object BlockedRequests { map: mutable.HashMap[Request[_, _], Exit[Any, Any]] )(implicit trace: Trace): UIO[Unit] = cache match { - case cache: Cache.Synchronous => + case cache: Cache.Eager => ZIO.succeedUnsafe { implicit unsafe => map.foreach { case (request: Request[Any, Any], exit) => cache From ffda828cf261f1f3e36ce89fa8109addc20a7c5a Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Fri, 5 Jul 2024 17:03:40 +1000 Subject: [PATCH 3/4] Change names and return type of getNow --- .../src/main/scala/zio/query/Cache.scala | 22 ++++++++++--------- .../src/main/scala/zio/query/ZQuery.scala | 4 ++-- .../zio/query/internal/BlockedRequests.scala | 2 +- 3 files changed, 15 insertions(+), 13 deletions(-) diff --git a/zio-query/shared/src/main/scala/zio/query/Cache.scala b/zio-query/shared/src/main/scala/zio/query/Cache.scala index 7269427..1a64d92 100644 --- a/zio-query/shared/src/main/scala/zio/query/Cache.scala +++ b/zio-query/shared/src/main/scala/zio/query/Cache.scala @@ -75,20 +75,22 @@ object Cache { ZIO.succeed(Cache.unsafeMake(expectedNumOfElements)) /** - * An 'Eager' cache is one that doesn't require an effect to look up its + * An 'InMemory' cache is one that doesn't require an effect to look up its * value. Prefer extending this class when implementing a cache that doesn't * perform any IO, such as a cache based on a Map. */ - abstract class Eager extends Cache { - def getOrNull[E, A](request: Request[E, A]): Promise[E, A] + abstract class InMemory extends Cache { + def getNow[E, A](request: Request[E, A]): Option[Promise[E, A]] def lookupNow[E, A](request: Request[E, A]): Either[Promise[E, A], Promise[E, A]] def putNow[E, A](request: Request[E, A], result: Promise[E, A]): Unit def removeNow[E, A](request: Request[E, A]): Unit final def get[E, A](request: Request[E, A])(implicit trace: Trace): IO[Unit, Promise[E, A]] = ZIO.suspendSucceed { - val p = getOrNull(request) - if (p eq null) Exit.fail(()) else Exit.succeed(p) + getNow(request) match { + case Some(p) => Exit.succeed(p) + case _ => Exit.fail(()) + } } final def lookup[E, A, B]( @@ -103,11 +105,11 @@ object Cache { ZIO.succeed(removeNow(request)) } - private final class Default(map: ConcurrentHashMap[Request[_, _], Promise[_, _]]) extends Eager { + private final class NonExpiringCache(map: ConcurrentHashMap[Request[_, _], Promise[_, _]]) extends InMemory { private implicit val unsafe: Unsafe = Unsafe.unsafe - def getOrNull[E, A](request: Request[E, A]): Promise[E, A] = - map.get(request).asInstanceOf[Promise[E, A]] + def getNow[E, A](request: Request[E, A]): Option[Promise[E, A]] = + Option(map.get(request).asInstanceOf[Promise[E, A]]) def lookupNow[E, A](request: Request[E, A]): Either[Promise[E, A], Promise[E, A]] = { val newPromise = Promise.unsafe.make[E, A](FiberId.None) @@ -123,10 +125,10 @@ object Cache { } // TODO: Initialize the map with a sensible default value. Default is 16, which seems way too small for a cache - private[query] def unsafeMake(): Cache = new Default(new ConcurrentHashMap()) + private[query] def unsafeMake(): Cache = new NonExpiringCache(new ConcurrentHashMap()) private[query] def unsafeMake(expectedNumOfElements: Int): Cache = { val initialSize = Math.ceil(expectedNumOfElements / 0.75d).toInt - new Default(new ConcurrentHashMap(initialSize)) + new NonExpiringCache(new ConcurrentHashMap(initialSize)) } } diff --git a/zio-query/shared/src/main/scala/zio/query/ZQuery.scala b/zio-query/shared/src/main/scala/zio/query/ZQuery.scala index b31a97a..458686b 100644 --- a/zio-query/shared/src/main/scala/zio/query/ZQuery.scala +++ b/zio-query/shared/src/main/scala/zio/query/ZQuery.scala @@ -1559,8 +1559,8 @@ object ZQuery { } cache match { - case cache: Cache.Eager => foldPromise(cache.lookupNow(request)) - case cache => CachedResult.Effectful(cache.lookup(request).flatMap(foldPromise(_).toZIO)) + case cache: Cache.InMemory => foldPromise(cache.lookupNow(request)) + case cache => CachedResult.Effectful(cache.lookup(request).flatMap(foldPromise(_).toZIO)) } } diff --git a/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala b/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala index 9aa7c98..9812dbc 100644 --- a/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala +++ b/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala @@ -312,7 +312,7 @@ private[query] object BlockedRequests { map: mutable.HashMap[Request[_, _], Exit[Any, Any]] )(implicit trace: Trace): UIO[Unit] = cache match { - case cache: Cache.Eager => + case cache: Cache.InMemory => ZIO.succeedUnsafe { implicit unsafe => map.foreach { case (request: Request[Any, Any], exit) => cache From 04f882be1672c9c6bbacf55a8808165b053aab49 Mon Sep 17 00:00:00 2001 From: Kyri Petrou Date: Fri, 5 Jul 2024 17:04:25 +1000 Subject: [PATCH 4/4] Rename once more --- zio-query/shared/src/main/scala/zio/query/Cache.scala | 4 ++-- zio-query/shared/src/main/scala/zio/query/ZQuery.scala | 4 ++-- .../src/main/scala/zio/query/internal/BlockedRequests.scala | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/zio-query/shared/src/main/scala/zio/query/Cache.scala b/zio-query/shared/src/main/scala/zio/query/Cache.scala index 1a64d92..b2387c8 100644 --- a/zio-query/shared/src/main/scala/zio/query/Cache.scala +++ b/zio-query/shared/src/main/scala/zio/query/Cache.scala @@ -79,7 +79,7 @@ object Cache { * value. Prefer extending this class when implementing a cache that doesn't * perform any IO, such as a cache based on a Map. */ - abstract class InMemory extends Cache { + abstract class InMemoryCache extends Cache { def getNow[E, A](request: Request[E, A]): Option[Promise[E, A]] def lookupNow[E, A](request: Request[E, A]): Either[Promise[E, A], Promise[E, A]] def putNow[E, A](request: Request[E, A], result: Promise[E, A]): Unit @@ -105,7 +105,7 @@ object Cache { ZIO.succeed(removeNow(request)) } - private final class NonExpiringCache(map: ConcurrentHashMap[Request[_, _], Promise[_, _]]) extends InMemory { + private final class NonExpiringCache(map: ConcurrentHashMap[Request[_, _], Promise[_, _]]) extends InMemoryCache { private implicit val unsafe: Unsafe = Unsafe.unsafe def getNow[E, A](request: Request[E, A]): Option[Promise[E, A]] = diff --git a/zio-query/shared/src/main/scala/zio/query/ZQuery.scala b/zio-query/shared/src/main/scala/zio/query/ZQuery.scala index 458686b..b72ddb1 100644 --- a/zio-query/shared/src/main/scala/zio/query/ZQuery.scala +++ b/zio-query/shared/src/main/scala/zio/query/ZQuery.scala @@ -1559,8 +1559,8 @@ object ZQuery { } cache match { - case cache: Cache.InMemory => foldPromise(cache.lookupNow(request)) - case cache => CachedResult.Effectful(cache.lookup(request).flatMap(foldPromise(_).toZIO)) + case cache: Cache.InMemoryCache => foldPromise(cache.lookupNow(request)) + case cache => CachedResult.Effectful(cache.lookup(request).flatMap(foldPromise(_).toZIO)) } } diff --git a/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala b/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala index 9812dbc..d7475c3 100644 --- a/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala +++ b/zio-query/shared/src/main/scala/zio/query/internal/BlockedRequests.scala @@ -312,7 +312,7 @@ private[query] object BlockedRequests { map: mutable.HashMap[Request[_, _], Exit[Any, Any]] )(implicit trace: Trace): UIO[Unit] = cache match { - case cache: Cache.InMemory => + case cache: Cache.InMemoryCache => ZIO.succeedUnsafe { implicit unsafe => map.foreach { case (request: Request[Any, Any], exit) => cache