diff --git a/src/main/scala/com/evolution/resourcepool/ResourcePool.scala b/src/main/scala/com/evolution/resourcepool/ResourcePool.scala index f5af9ec..c25b29b 100644 --- a/src/main/scala/com/evolution/resourcepool/ResourcePool.scala +++ b/src/main/scala/com/evolution/resourcepool/ResourcePool.scala @@ -1,6 +1,6 @@ package com.evolution.resourcepool -import cats.{Functor, Parallel} +import cats.Functor import cats.effect.{Async, Deferred, MonadCancel, MonadCancelThrow, Poll, Ref, Resource, Sync, Temporal} import cats.effect.syntax.all._ import cats.syntax.all._ @@ -37,9 +37,10 @@ object ResourcePool { * determined automatically by taking into account the number of available * processors and expected pool size. */ - def of[F[_]: Async: Parallel, A]( + def of[F[_]: Async, A]( maxSize: Int, expireAfter: FiniteDuration, + discardTasksOnRelease: Boolean, resource: Id => Resource[F, A] ): Resource[F, ResourcePool[F, A]] = { @@ -52,6 +53,7 @@ object ResourcePool { maxSize = maxSize, partitions = (maxSize / 100).min(cpus), expireAfter, + discardTasksOnRelease, resource) } yield result } @@ -73,10 +75,11 @@ object ResourcePool { * Factory for creating the new resources. `Id` is a unique identifier of a * resource that could be used, for example, for logging purposes. */ - def of[F[_]: Async: Parallel, A]( + def of[F[_]: Async, A]( maxSize: Int, partitions: Int, expireAfter: FiniteDuration, + discardTasksOnRelease: Boolean, resource: Id => Resource[F, A] ): Resource[F, ResourcePool[F, A]] = { @@ -86,6 +89,7 @@ object ResourcePool { of0( maxSize, expireAfter, + discardTasksOnRelease, resource) } @@ -131,6 +135,7 @@ object ResourcePool { private def of0[F[_]: Async, A]( maxSize: Int, expireAfter: FiniteDuration, + discardTasksOnRelease: Boolean, resource: Id => Resource[F, A] ): Resource[F, ResourcePool[F, A]] = { @@ -303,12 +308,28 @@ object ResourcePool { } case stage: State.Allocated.Stage.Busy => - apply( - allocated = state.entries.keySet, - releasing = state.releasing, - stage.tasks - ) { - ().pure[F] + if (discardTasksOnRelease) { + apply( + allocated = state.entries.keySet, + releasing = state.releasing, + Queue.empty + ) { + stage + .tasks + .foldMapM { task => + task + .complete(ReleasedError.asLeft) + .void + } + } + } else { + apply( + allocated = state.entries.keySet, + releasing = state.releasing, + stage.tasks + ) { + ().pure[F] + } } } } @@ -775,9 +796,25 @@ object ResourcePool { expireAfter: FiniteDuration, )(implicit F: Async[F], - P: Parallel[F] ): Resource[F, ResourcePool[F, A]] = { - ResourcePool.of(maxSize, expireAfter, _ => self) + toResourcePool(maxSize, expireAfter, discardTasksOnRelease = false) + } + + /** Same as [[of[F[_],A](maxSize:Int,expireAfter*]], but provides a + * shorter syntax to create a pool out of existing resource. + */ + def toResourcePool( + maxSize: Int, + expireAfter: FiniteDuration, + discardTasksOnRelease: Boolean, + )(implicit + F: Async[F], + ): Resource[F, ResourcePool[F, A]] = { + ResourcePool.of( + maxSize, + expireAfter, + discardTasksOnRelease, + _ => self) } /** Same as [[of[F[_],A](maxSize:Int,partitions:Int*]], but provides a @@ -787,11 +824,16 @@ object ResourcePool { maxSize: Int, partitions: Int, expireAfter: FiniteDuration, + discardTasksOnRelease: Boolean, )(implicit F: Async[F], - P: Parallel[F] ): Resource[F, ResourcePool[F, A]] = { - ResourcePool.of(maxSize = maxSize, partitions = partitions, expireAfter, _ => self) + ResourcePool.of( + maxSize, + partitions, + expireAfter, + discardTasksOnRelease, + _ => self) } } } diff --git a/src/test/scala/com/evolution/resourcepool/ResourcePoolTest.scala b/src/test/scala/com/evolution/resourcepool/ResourcePoolTest.scala index c922334..6ffadf7 100644 --- a/src/test/scala/com/evolution/resourcepool/ResourcePoolTest.scala +++ b/src/test/scala/com/evolution/resourcepool/ResourcePoolTest.scala @@ -46,6 +46,7 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { .of( maxSize = 1, expireAfter = 1.day, + discardTasksOnRelease = false, resource = _ => Resource.make { add(Action.Acquire) } { _ => @@ -80,21 +81,21 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { deferred2 <- Deferred[IO, Unit].toResource deferreds = List(deferred0, deferred1) ref <- Ref[IO].of(deferreds).toResource - pool <- ResourcePool.of( - maxSize = 2, - expireAfter = 1.day, - resource = _ => { - val result = for { - result <- ref.modify { - case a :: as => (as, a.some) - case as => (as, none) - } - result <- result.foldMapM { _.complete(()).void } - _ <- deferred2.get - } yield result - result.toResource - } - ) + pool <- { + val result = for { + result <- ref.modify { + case a :: as => (as, a.some) + case as => (as, none) + } + result <- result.foldMapM { _.complete(()).void } + _ <- deferred2.get + } yield result + result + .toResource + .toResourcePool( + maxSize = 2, + expireAfter = 1.day) + } } yield { for { fiber0 <- pool.resource.use { _.pure[IO] }.start @@ -113,12 +114,11 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("fail after being released") { val result = for { - result <- ResourcePool - .of( + result <- () + .pure[Resource[IO, *]] + .toResourcePool( maxSize = 2, - expireAfter = 1.day, - resource = _ => ().pure[Resource[IO, *]] - ) + expireAfter = 1.day) .allocated (pool, release) = result _ <- release @@ -131,12 +131,11 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("release gracefully") { val result = for { ref <- Ref[IO].of(0) - result <- ResourcePool - .of( + result <- Resource + .release { ref.update { _ + 1 } } + .toResourcePool( maxSize = 2, - expireAfter = 1.day, - resource = _ => Resource.release { ref.update { _ + 1 } } - ) + expireAfter = 1.day) .allocated (pool, release0) = result result <- pool.get @@ -171,12 +170,12 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("release empty pool") { val result = for { ref <- Ref[IO].of(0) - _ <- ResourcePool - .of( + _ <- ref + .update { _ + 1 } + .toResource + .toResourcePool( maxSize = 2, - expireAfter = 1.day, - resource = _ => ref.update { _ + 1 }.toResource - ) + expireAfter = 1.day) .use { _ => ().pure[IO] } result <- ref.get _ <- IO { result shouldEqual 0 } @@ -189,19 +188,18 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { val result = for { deferred <- Deferred[IO, Unit] ref <- Ref[IO].of(List(error.raiseError[IO, Unit], deferred.complete(()).void)) - result <- ResourcePool - .of( + result <- Resource + .release { + ref + .modify { + case a :: as => (as, a) + case as => (as, ().pure[IO]) + } + .flatten + } + .toResourcePool( maxSize = 2, - expireAfter = 1.day, - resource = _ => Resource.release { - ref - .modify { - case a :: as => (as, a) - case as => (as, ().pure[IO]) - } - .flatten - } - ) + expireAfter = 1.day) .allocated (pool, release0) = result result <- pool.get @@ -223,10 +221,8 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { ref1 <- Ref[IO].of(0).toResource deferred0 <- Deferred[IO, Unit].toResource deferred1 <- Deferred[IO, Unit].toResource - pool <- ResourcePool.of( - maxSize = 5, - expireAfter = 10.millis, - resource = _ => Resource.make { + pool <- Resource + .make { for { a <- ref0.update { _ + 1 } _ <- deferred0.complete(()) @@ -237,7 +233,9 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { _ <- deferred1.complete(()) } yield a } - ) + .toResourcePool( + maxSize = 5, + expireAfter = 10.millis) _ <- Concurrent[IO].background { pool .resource @@ -265,11 +263,12 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { val maxSize = 2 val resource = for { ref <- Ref[IO].of(0).toResource - pool <- ResourcePool.of( - maxSize = maxSize, - expireAfter = 1.day, - resource = _ => ref.update { _ + 1 }.toResource - ) + pool <- ref + .update { _ + 1 } + .toResource + .toResourcePool( + maxSize = maxSize, + expireAfter = 1.day) } yield { for { _ <- pool @@ -295,6 +294,7 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { maxSize = maxSize, partitions = 3, expireAfter = 1.day, + discardTasksOnRelease = false, resource = id => ref.update { id :: _ }.toResource ) } yield { @@ -323,6 +323,7 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { pool <- ResourcePool.of( maxSize = 1, expireAfter = 1.day, + discardTasksOnRelease = false, resource = _ => { for { a <- deferred.get.toResource @@ -360,14 +361,13 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { val result = for { deferred0 <- Deferred[IO, Unit].toResource deferred1 <- Deferred[IO, Unit].toResource - pool <- ResourcePool.of( - maxSize = 1, - expireAfter = 1.day, - resource = _ => deferred0 - .complete(()) - .productR { deferred1.get } - .toResource - ) + pool <- deferred0 + .complete(()) + .productR { deferred1.get } + .toResource + .toResourcePool( + maxSize = 1, + expireAfter = 1.day) } yield { for { fiber0 <- pool @@ -398,11 +398,11 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("cancel `get` while waiting in queue") { val result = for { deferred0 <- Deferred[IO, Unit].toResource - pool <- ResourcePool.of( + pool <- () + .pure[Resource[IO, *]] + .toResourcePool( maxSize = 1, - expireAfter = 1.day, - resource = _ => ().pure[Resource[IO, *]] - ) + expireAfter = 1.day) } yield { for { fiber0 <- pool @@ -435,15 +435,13 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { val result = for { deferred0 <- Deferred[IO, Unit] deferred1 <- Deferred[IO, Unit] - result <- ResourcePool - .of( + result <- deferred0 + .complete(()) + .productR { deferred1.get } + .toResource + .toResourcePool( maxSize = 1, - expireAfter = 1.day, - resource = _ => deferred0 - .complete(()) - .productR { deferred1.get } - .toResource - ) + expireAfter = 1.day) .allocated (pool, release) = result fiber0 <- pool @@ -467,12 +465,12 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { test("release before resource allocation is completed") { val result = for { deferred <- Deferred[IO, Unit] - result <- ResourcePool - .of( + result <- deferred + .get + .toResource + .toResourcePool( maxSize = 1, - expireAfter = 1.day, - resource = _ => deferred.get.toResource - ) + expireAfter = 1.day) .allocated (pool, release) = result fiber0 <- pool @@ -510,15 +508,15 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { val result = for { ref <- Ref[IO].of(List.empty[Action]).toResource add = (action: Action) => ref.update { action :: _ } - pool <- ResourcePool.of( - maxSize = 5, - expireAfter = 10.millis, - resource = _ => Resource.make { + pool <- Resource + .make { add(Action.Allocate) } { _ => add(Action.Release) } - ) + .toResourcePool( + maxSize = 5, + expireAfter = 10.millis) } yield { val job = pool .resource @@ -560,4 +558,41 @@ class ResourcePoolTest extends AsyncFunSuite with Matchers { .use(identity) .run() } + + test("discard tasks on release") { + val result = for { + result <- () + .pure[Resource[IO, *]] + .toResourcePool( + maxSize = 1, + expireAfter = 1.day, + discardTasksOnRelease = true) + .allocated + (pool, release) = result + fiber = pool + .resource + .use { _ => IO.never[Unit] } + .start + fiber0 <- fiber + result <- fiber0 + .joinWithNever + .timeout(10.millis) + .attempt + _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } + fiber1 <- fiber + result <- fiber1 + .joinWithNever + .timeout(10.millis) + .attempt + _ <- IO { result should matchPattern { case Left(_: TimeoutException) => } } + fiber2 <- release.start + result <- fiber1 + .joinWithNever + .attempt + _ <- IO { result shouldEqual ResourcePool.ReleasedError.asLeft } + _ <- fiber0.cancel + _ <- fiber2.joinWithNever + } yield {} + result.run() + } }