Skip to content

WIP: Add JobManager #4287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: series/3.x
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions std/shared/src/main/scala/cats/effect/std/JobManager.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright 2020-2025 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.std

import cats.effect.kernel._
import cats.syntax.all._

/**
* A `JobManager` allows you to launch `Jobs` in the background using a unique identifier. Then
* you can use the identifier to query for the status of the job or cancel it.
*/
trait JobManager[F[_], Id, S] {

/**
* Creates and launches the given `Job` in the background. If another Job with the same id was
* already running, it will be cancelled before starting this one.
*
* Note: This waits until the actual job's `run` process has started. In other words, you can
* query the `status` of the associated id immediately after.
*/
def startJob(id: Id, job: Resource[F, JobManager.Job[F, S]]): F[Unit]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On one had, I like the idea of users being able to use their own Ids.
On the other, I think most users would benefit from a default using automatically generated UUIDs.

Should we provide such default in some way?


/**
* Gets the status of the `Job` associated with the given `id`. If `id` doesn't exists or the
* `Job` already finished then the returned value will be a `None`.
*/
def getJobStatus(id: Id): F[Option[S]]
Comment on lines +37 to +41
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I somewhat dislike the idea that both bad id and already finished return in a None.
But, otherwise, the map will grow indefinitely.

Any ideas?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By fully controlling IDs, we could have startJob return a fresh ID, so "bad ID" would never happen. (But then we'd lose the ability of users to have their own IDs...)


/**
* Signals cancellation of the `Job` associated with the given `id`, and waits for its
* completion.
*/
def cancelJob(id: Id): F[Unit]
Comment on lines +43 to +47
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we provide a cacelAndForget variant where we don't wait on cancellation?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would that be essentially cancelJob(...).start? If yes, I don't think we should add it (everyone can .start for themselves).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually yes, but also, since we already have a Dispatcher in place, it could be used to perform that rather than raw start.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, so I think my point is: if it's somehow better (performance, safety, whatever) than just .start-ing, then yeah, maybe add it. If it's not better, then definitely not.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, it is safer since its life cycle is attached to the Supervisor.
But I am not sure what canceling a cancel does.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does "nothing", as a cancel is uncancelable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then I guess as you said, it provides no value and users who don't want to wait on the cancellation can just start.

}

object JobManager {

/**
* Represents a job managed by a `JobManager`.
*/
trait Job[F[_], S] {

/**
* Starts the logic of this `Job`.
*/
def run: F[Unit]

/**
* Gets the status of this `Job`.
*/
def getStatus: F[S]
}
Comment on lines +55 to +66
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Users would then implement this trait for their own Jobs.


private final case class RunningJob[F[_], S](
status: F[S],
cancel: F[Unit]
)

def apply[F[_], Id, S](implicit F: Concurrent[F]): Resource[F, JobManager[F, Id, S]] =
for {
supervisor <- Supervisor[F](await = true)
jobsMap <- Resource.eval(MapRef[F, Id, RunningJob[F, S]])
} yield new JobManager[F, Id, S] {
override def startJob(id: Id, jobR: Resource[F, Job[F, S]]): F[Unit] = {
Deferred[F, Unit].flatMap { waitForJobRegistration =>
// Running a job has the following steps:
// 1. Run the job setup (Resource acquire).
// 2. Launch the job in the background.
// 3. Register the job in the jobsMap.
// 4. In case there was another job already register with the same id, cancel it.
// 5. Wait for the job to finish.
// 6. Run the job cleanup (Resource release).
val runJob = jobR.use { job =>
supervisor.supervise(job.run).flatMap { jobFiber =>
val registerJob =
F.guarantee(
jobsMap(id).getAndSet(
RunningJob(
status = job.getStatus,
cancel = jobFiber.cancel
).some
),
fin = waitForJobRegistration.complete(()).void
)

registerJob.flatMap(_.traverse_(_.cancel)) >> jobFiber.join
}
}

// Starts the run job process in the background,
// but wait until it has been registered in the jobsMap.
supervisor.supervise(runJob) >> waitForJobRegistration.get
}
}

override def getJobStatus(id: Id): F[Option[S]] =
jobsMap(id).get.flatMap(_.traverse(_.status))

override def cancelJob(id: Id): F[Unit] =
jobsMap(id).getAndSet(None).flatMap(_.traverse_(_.cancel))
}
}
Loading