Skip to content

Commit 3481bd1

Browse files
authored
More quartz methods (#27)
* More quartz methods * fix
1 parent 9d22e8b commit 3481bd1

File tree

2 files changed

+50
-16
lines changed

2 files changed

+50
-16
lines changed
Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.chilipiper.quartz
22

3-
import org.quartz.{CronExpression, JobDetail, JobKey, Trigger, TriggerBuilder}
3+
import org.quartz.impl.matchers.GroupMatcher
4+
import org.quartz.{CronExpression, JobBuilder, JobDetail, JobKey, Trigger, TriggerBuilder}
45

56
import java.time.Instant
67

@@ -9,14 +10,23 @@ trait Scheduler[A, F[_]] {
910
def scheduleJob(jobDetail: JobDetail, trigger: Trigger): F[Instant]
1011
def checkExists(jobKey: JobKey): F[Boolean]
1112
def deleteJob(jobKey: JobKey): F[Boolean]
13+
def addJob(jobDetail: JobDetail, replace: Boolean): F[Unit]
14+
def triggerJob(jobKey: JobKey): F[Unit]
15+
def getJobKeys(matcher: GroupMatcher[JobKey]): F[Set[JobKey]]
16+
def getJobDetail(jobKey: JobKey): F[JobDetail]
1217
}
1318

1419
trait SchedulerCustom[A, F[_]] extends Scheduler[A, F] {
20+
21+
/** Use the method instead of manually calling `JobBuilder.newJob` to ensure [this] can work with the job correctly.
22+
*/
23+
def newJobDetail(jobKey: JobKey, jobData: A, customize: JobBuilder => JobBuilder = identity): JobDetail
1524
def scheduleJobCustom(jobKey: JobKey, jobData: A, cronExpression: CronExpression): F[Unit]
1625
def scheduleJobCustom(jobKey: JobKey, jobData: A, instant: Instant): F[Unit]
1726
def scheduleJobCustom(
1827
jobKey: JobKey,
1928
jobData: A,
2029
configure: TriggerBuilder[Trigger] => TriggerBuilder[? <: Trigger],
30+
customizeJob: JobBuilder => JobBuilder = identity,
2131
): F[Instant]
2232
}

src/main/scala/com/chilipiper/quartz/SchedulerQuartz.scala

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import io.circe.syntax._
1515
import io.circe.{Decoder, Encoder}
1616
import org.quartz._
1717
import org.quartz.impl.StdSchedulerFactory
18+
import org.quartz.impl.matchers.GroupMatcher
1819
import org.quartz.spi.TriggerFiredBundle
1920
import org.quartz.utils._
2021

@@ -60,6 +61,30 @@ class SchedulerQuartz[A: Encoder: Decoder, F[_]: MonadThrow, G[_]: Sync](
6061
underlying.deleteJob(jobKey)
6162
}
6263

64+
override def addJob(jobDetail: JobDetail, replace: Boolean): G[Unit] = Sync[G].interruptible {
65+
underlying.addJob(jobDetail, replace)
66+
}
67+
68+
override def triggerJob(jobKey: JobKey): G[Unit] = Sync[G].interruptible {
69+
underlying.triggerJob(jobKey)
70+
}
71+
72+
override def getJobKeys(matcher: GroupMatcher[JobKey]): G[Set[JobKey]] = Sync[G].interruptible {
73+
underlying.getJobKeys(matcher).asScala.toSet
74+
}
75+
76+
override def getJobDetail(jobKey: JobKey): G[JobDetail] = Sync[G].interruptible {
77+
underlying.getJobDetail(jobKey)
78+
}
79+
80+
override def newJobDetail(jobKey: JobKey, jobData: A, customize: JobBuilder => JobBuilder = identity): JobDetail =
81+
JobBuilder
82+
.newJob(classOf[SchedulerQuartz[A, F, G]])
83+
.pipe(customize)
84+
.withIdentity(jobKey)
85+
.usingJobData(jobDataMapKey, jobData.asJson.spaces2SortKeys)
86+
.build()
87+
6388
override def scheduleJobCustom(jobKey: JobKey, jobData: A, cronExpression: CronExpression): G[Unit] =
6489
scheduleJobCustom(
6590
jobKey,
@@ -79,21 +104,17 @@ class SchedulerQuartz[A: Encoder: Decoder, F[_]: MonadThrow, G[_]: Sync](
79104
jobKey: JobKey,
80105
jobData: A,
81106
configure: TriggerBuilder[Trigger] => TriggerBuilder[? <: Trigger],
107+
customizeJob: JobBuilder => JobBuilder = identity,
82108
): G[Instant] = for {
83109
jobDetail <- Sync[G].blocking {
84-
JobBuilder
85-
.newJob(classOf[SchedulerQuartz[A, F, G]])
86-
.withIdentity(jobKey)
87-
.usingJobData(jobDataMapKey, jobData.asJson.spaces2SortKeys)
88-
.requestRecovery(true)
89-
.build()
110+
newJobDetail(jobKey, jobData, _.requestRecovery(true).pipe(customizeJob))
90111
}
91112
trigger <- Sync[G].blocking {
92113
TriggerBuilder
93114
.newTrigger()
115+
.pipe(configure)
94116
.withIdentity(TriggerKey.triggerKey(jobKey.getName, jobKey.getGroup))
95117
.forJob(jobDetail)
96-
.pipe(configure)
97118
.build()
98119
}
99120
exists <- checkExists(jobKey)
@@ -169,14 +190,17 @@ object SchedulerQuartz {
169190
quartzConfig = quartzConfig0 ++ Map(dataSourceKey -> dataSourceValue)
170191

171192
dbInitScript <- dbInitScriptName.traverse(getBbInitScript[F]).toResource
172-
_ <- (
173-
for {
174-
dbIsInitialized <- isDbInitialized(quartzConfig(tablePrefixKey))
175-
_ <- MonadCancelThrow[ConnectionIO].unlessA(dbIsInitialized)(
176-
dbInitScript.traverse(_.updateWithLabel("SchedulerQuartzDbInit").run),
177-
)
178-
} yield ()
179-
).transact(transactor).toResource
193+
_ <- dbInitScript
194+
.traverse { dbInitScript =>
195+
for {
196+
dbIsInitialized <- isDbInitialized(quartzConfig(tablePrefixKey))
197+
_ <- MonadCancelThrow[ConnectionIO].unlessA(dbIsInitialized)(
198+
dbInitScript.updateWithLabel("SchedulerQuartzDbInit").run,
199+
)
200+
} yield ()
201+
}
202+
.transact(transactor)
203+
.toResource
180204

181205
scheduler <- Resource[F, SchedulerQuartz[A, F, G]](Sync[F].interruptible {
182206
DBConnectionManager

0 commit comments

Comments
 (0)