Skip to content

Commit abfbcc6

Browse files
authored
Merge pull request #5049 from jackchenjc/issue-5048
fix: Fix the ScheduleJob out-of-sync issue
2 parents 2801c4a + 9100ac2 commit abfbcc6

File tree

1 file changed

+53
-55
lines changed
  • internal/support/scheduler/infrastructure

1 file changed

+53
-55
lines changed

internal/support/scheduler/infrastructure/manager.go

Lines changed: 53 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -253,70 +253,68 @@ func (m *manager) addNewJob(job models.ScheduleJob) errors.EdgeX {
253253
if endOption != nil {
254254
jobOptions = append(jobOptions, endOption)
255255
}
256-
} else {
257-
// If the scheduled job should not be triggered, skip adding the job into the scheduler manager
258-
return nil
259-
}
260256

261-
for _, a := range job.Actions {
262-
copiedAction := a
263-
task, edgeXerr := action.ToGocronTask(m.lc, m.dic, m.secretProvider, a)
264-
if edgeXerr != nil {
265-
return errors.NewCommonEdgeXWrapper(edgeXerr)
257+
// If toTrigger is true, the ScheduleAction will be added to the scheduler and ready to be triggered
258+
for _, a := range job.Actions {
259+
copiedAction := a
260+
task, edgeXerr := action.ToGocronTask(m.lc, m.dic, m.secretProvider, a)
261+
if edgeXerr != nil {
262+
return errors.NewCommonEdgeXWrapper(edgeXerr)
263+
}
264+
265+
// Add event listeners to the job options for recording the schedule action records
266+
jobOptions = append(jobOptions, gocron.WithEventListeners(
267+
gocron.AfterJobRuns(
268+
func(jobID uuid.UUID, jobName string) {
269+
gocronJob := getGocronJobByID(scheduler.Jobs(), jobID)
270+
lastRun, err := gocronJob.LastRun()
271+
if err != nil {
272+
m.lc.Errorf("failed to get the last run time for job: %s, Correlation-ID: %s, err: %v", job.Name, correlationId, err)
273+
}
274+
275+
record := models.ScheduleActionRecord{
276+
JobName: job.Name,
277+
Action: copiedAction,
278+
Status: models.Succeeded,
279+
ScheduledAt: lastRun.UnixMilli(),
280+
}
281+
m.addScheduleActionRecord(ctx, record, nil)
282+
}),
283+
gocron.AfterJobRunsWithError(
284+
func(jobID uuid.UUID, jobName string, err error) {
285+
gocronJob := getGocronJobByID(scheduler.Jobs(), jobID)
286+
lastRun, timeErr := gocronJob.LastRun()
287+
if timeErr != nil {
288+
m.lc.Errorf("failed to get the last run time for job: %s, Correlation-ID: %s, err: %v", job.Name, correlationId, timeErr)
289+
}
290+
291+
record := models.ScheduleActionRecord{
292+
JobName: job.Name,
293+
Action: copiedAction,
294+
Status: models.Failed,
295+
ScheduledAt: lastRun.UnixMilli(),
296+
}
297+
m.addScheduleActionRecord(ctx, record, err)
298+
}),
299+
))
300+
301+
// A "ScheduleAction" will be treated as a "Job" in gocron scheduler
302+
_, err := scheduler.NewJob(definition, task, jobOptions...)
303+
if err != nil {
304+
return errors.NewCommonEdgeX(errors.KindServerError,
305+
fmt.Sprintf("failed to create new scheduled aciton for job: %s", job.Name), err)
306+
}
266307
}
267308

268-
// Add event listeners to the job options for recording the schedule action records
269-
jobOptions = append(jobOptions, gocron.WithEventListeners(
270-
gocron.AfterJobRuns(
271-
func(jobID uuid.UUID, jobName string) {
272-
gocronJob := getGocronJobByID(scheduler.Jobs(), jobID)
273-
lastRun, err := gocronJob.LastRun()
274-
if err != nil {
275-
m.lc.Errorf("failed to get the last run time for job: %s, Correlation-ID: %s, err: %v", job.Name, correlationId, err)
276-
}
277-
278-
record := models.ScheduleActionRecord{
279-
JobName: job.Name,
280-
Action: copiedAction,
281-
Status: models.Succeeded,
282-
ScheduledAt: lastRun.UnixMilli(),
283-
}
284-
m.addScheduleActionRecord(ctx, record, nil)
285-
}),
286-
gocron.AfterJobRunsWithError(
287-
func(jobID uuid.UUID, jobName string, err error) {
288-
gocronJob := getGocronJobByID(scheduler.Jobs(), jobID)
289-
lastRun, timeErr := gocronJob.LastRun()
290-
if timeErr != nil {
291-
m.lc.Errorf("failed to get the last run time for job: %s, Correlation-ID: %s, err: %v", job.Name, correlationId, timeErr)
292-
}
293-
294-
record := models.ScheduleActionRecord{
295-
JobName: job.Name,
296-
Action: copiedAction,
297-
Status: models.Failed,
298-
ScheduledAt: lastRun.UnixMilli(),
299-
}
300-
m.addScheduleActionRecord(ctx, record, err)
301-
}),
302-
))
303-
304-
// A "ScheduleAction" will be treated as a "Job" in gocron scheduler
305-
_, err := scheduler.NewJob(definition, task, jobOptions...)
306-
if err != nil {
307-
return errors.NewCommonEdgeX(errors.KindServerError,
308-
fmt.Sprintf("failed to create new scheduled aciton for job: %s", job.Name), err)
309-
}
309+
scheduler.Start()
310+
m.lc.Debugf("The scheduled job %s was started. Correlation-ID: %s", job.Name, correlationId)
310311
}
311312

313+
// Whether the job is going to be triggered or not, the scheduler will be added to the manager to sync with the database
312314
m.mu.Lock()
313315
m.schedulers[job.Name] = scheduler
314316
m.mu.Unlock()
315317

316-
if err := m.StartScheduleJobByName(job.Name, correlationId); err != nil {
317-
return errors.NewCommonEdgeXWrapper(err)
318-
}
319-
320318
return nil
321319
}
322320

0 commit comments

Comments
 (0)