Skip to content

Commit 41d3afe

Browse files
committed
The initial retrieval of jobs does not return jobs, only the change id of the oldest job, so the application catches up the changes.
1 parent a3c96fe commit 41d3afe

1 file changed

Lines changed: 23 additions & 18 deletions

File tree

dqops/src/main/java/com/dqops/core/jobqueue/monitoring/DqoJobQueueMonitoringServiceImpl.java

Lines changed: 23 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -319,13 +319,21 @@ public void publishFolderSynchronizationStatus(String dataDomainName, CloudSynch
319319
public Mono<DqoJobQueueInitialSnapshotModel> getInitialJobList(String dataDomain) {
320320
Mono<DqoJobQueueInitialSnapshotModel> jobsMono = Mono.defer(() -> {
321321
long changeSequence;
322-
List<DqoJobHistoryEntryModel> jobs;
322+
List<DqoJobHistoryEntryModel> jobs = new ArrayList<>();
323323

324324
synchronized (this.lock) {
325-
changeSequence = this.dqoJobIdGenerator.generateNextIncrementalId();
326-
jobs = this.allJobs.values().stream()
327-
.filter(dqoJobHistoryEntryModel -> dqoJobHistoryEntryModel != null && Objects.equals(dqoJobHistoryEntryModel.getDataDomain(), dataDomain))
328-
.collect(Collectors.toList());
325+
if (!this.jobChanges.isEmpty()) {
326+
changeSequence = this.jobChanges.firstKey() - 1L;
327+
328+
//// disabled, we will let the web application just download the whole history
329+
// jobs = this.allJobs.values().stream()
330+
// .filter(dqoJobHistoryEntryModel -> dqoJobHistoryEntryModel != null && Objects.equals(dqoJobHistoryEntryModel.getDataDomain(), dataDomain))
331+
// .collect(Collectors.toList());
332+
333+
}
334+
else {
335+
changeSequence = this.dqoJobIdGenerator.generateNextIncrementalId();
336+
}
329337
}
330338

331339
return Mono.just(new DqoJobQueueInitialSnapshotModel(jobs, this.currentSynchronizationStatus, changeSequence));
@@ -350,9 +358,9 @@ public Mono<DqoJobQueueIncrementalSnapshotModel> getIncrementalJobChanges(long l
350358

351359
synchronized (this.lock) {
352360
changeSequence = this.dqoJobIdGenerator.generateNextIncrementalId();
353-
changesList = new ArrayList<>(this.jobChanges
361+
changesList = this.jobChanges
354362
.tailMap(lastChangeId, false)
355-
.values())
363+
.values()
356364
.stream()
357365
.filter(dqoJobChangeModel -> Objects.equals(dqoJobChangeModel.getDomainName(), domainName))
358366
.collect(Collectors.toList());
@@ -373,15 +381,15 @@ public Mono<DqoJobQueueIncrementalSnapshotModel> getIncrementalJobChanges(long l
373381
synchronized (this.lock) {
374382
domainAwaitingClientsFinal.remove(changeSequence);
375383
}
376-
if (result == null) {
377-
return new DqoJobQueueIncrementalSnapshotModel(null, this.currentSynchronizationStatus, changeSequence);
384+
if (result == null) { // timeout, with no results
385+
return new DqoJobQueueIncrementalSnapshotModel(new ArrayList<>(), this.currentSynchronizationStatus, changeSequence);
378386
}
379387
else {
380388
synchronized (this.lock) {
381389
long nextChangeId = this.dqoJobIdGenerator.generateNextIncrementalId();
382-
List<DqoJobChangeModel> newChangesList = new ArrayList<>(this.jobChanges
390+
List<DqoJobChangeModel> newChangesList = this.jobChanges
383391
.tailMap(lastChangeId, false)
384-
.values())
392+
.values()
385393
.stream()
386394
.filter(dqoJobChangeModel -> Objects.equals(dqoJobChangeModel.getDomainName(), domainName))
387395
.collect(Collectors.toList());
@@ -456,8 +464,8 @@ public void onJobChange(DqoChangeNotificationEntry changeNotificationEntry) {
456464
if (jobChange.getJobId().getJobBusinessKey() != null) {
457465
this.businessKeyToJobIdMap.put(jobChange.getJobId().getJobBusinessKey(), jobChange.getJobId());
458466
}
459-
DqoJobChangeModel dqoNewJobChangeModel = new DqoJobChangeModel(jobChange, changeSequence);
460-
this.jobChanges.put(changeSequence, dqoNewJobChangeModel);
467+
// DqoJobChangeModel dqoNewJobChangeModel = new DqoJobChangeModel(jobChange, changeSequence);
468+
// this.jobChanges.put(changeSequence, dqoNewJobChangeModel);
461469
}
462470
}
463471

@@ -498,16 +506,13 @@ public void onJobChange(DqoChangeNotificationEntry changeNotificationEntry) {
498506
* @param jobIdToKeep Job id to keep. It's child jobs (when it is a parent job) are also preserved.
499507
*/
500508
public void removeOlderSynchronizeMultipleFoldersJobs(final DqoQueueJobId jobIdToKeep) {
501-
Instant oldestJobToRemove = Instant.now().minusSeconds(10 * 60);
502-
503509
Set<DqoQueueJobId> oldJobIdsToDelete = this.allJobs.entrySet()
504510
.stream()
505511
.filter(e -> e.getValue().getJobType() == DqoJobType.synchronize_multiple_folders ||
506512
e.getValue().getJobType() == DqoJobType.synchronize_folder)
507513
.filter(e -> (e.getValue().getJobType() == DqoJobType.synchronize_multiple_folders && e.getKey().getJobId() != jobIdToKeep.getJobId()) ||
508514
(e.getValue().getJobType() == DqoJobType.synchronize_folder && e.getKey().getParentJobId() != null &&
509515
e.getKey().getParentJobId().getJobId() != jobIdToKeep.getJobId()))
510-
.filter(e -> e.getValue().getStatusChangedAt() != null && e.getValue().getStatusChangedAt().isBefore(oldestJobToRemove))
511516
.map(e -> e.getKey())
512517
.collect(Collectors.toSet());
513518

@@ -544,7 +549,7 @@ public void removeOldFinishedJobs() {
544549
List<DqoQueueJobId> oldJobIdsToDelete = this.allJobs.entrySet()
545550
.stream()
546551
.filter(e -> e.getValue() != null)
547-
.takeWhile(e -> e.getValue().getStatusChangedAt().compareTo(oldJobsHistoryThresholdTimestamp) < 1)
552+
.takeWhile(e -> e.getValue().getStatusChangedAt().isBefore(oldJobsHistoryThresholdTimestamp))
548553
.filter(e -> e.getValue().getStatus() == DqoJobStatus.finished ||
549554
e.getValue().getStatus() == DqoJobStatus.failed ||
550555
e.getValue().getStatus() == DqoJobStatus.cancelled)
@@ -571,7 +576,7 @@ public void removeOldJobChanges() {
571576
List<Long> oldChangeIdsToDelete = this.jobChanges.entrySet()
572577
.stream()
573578
.filter(e -> e.getValue() != null)
574-
.takeWhile(e -> e.getValue().getStatusChangedAt().compareTo(oldJobChangesThresholdTimestamp) < 1)
579+
.takeWhile(e -> e.getValue().getStatusChangedAt().isBefore(oldJobChangesThresholdTimestamp))
575580
.map(e -> e.getKey())
576581
.collect(Collectors.toList());
577582

0 commit comments

Comments
 (0)