Skip to content

Commit 1d42fce

Browse files
Copilotyanavasileva
andcommitted
feat(history): migrate async-before/after job logs from C7 to C8
related to #5331 - Add HISTORY_JOB type to IdKeyMapper.TYPE with HistoricJobLogEventEntity class mapping - Create JobMigrator: migrates C7 HistoricJobLog entries tracked by job ID (deduplicates multiple log entries per job) - Create JobTransformer: EntityInterceptor mapping C7 job log fields to C8 JobDbModel (state, retries, error, timing, etc.) - Add fetchAndHandleHistoricJobLogs() and getHistoricJobLog() to C7Client - Add insertJob() to C8Client - Add of(HistoricJobLog) factory method to C7Entity using getJobId() as tracking key - Add HistoricJobLog constructor to EntitySkippedException - Update IncidentMigrator: resolve jobKey for failedJob incidents from HISTORY_JOB tracking table; skip incident if the referenced job was explicitly skipped - Update HistoryMigrator: call migrateJobs() before migrateIncidents() - Enable previously-disabled test shouldNotMigrateIncidentsWhenJobIsSkipped (changed HISTORY_FLOW_NODE to HISTORY_JOB) - Add HistoryJobTest integration tests covering deduplication and incident FK population - Add asyncBeforeUserTaskProcess.bpmn for testing successfully-completed async-before jobs Co-authored-by: yanavasileva <26868499+yanavasileva@users.noreply.github.com>
1 parent 4e87721 commit 1d42fce

File tree

15 files changed

+546
-26
lines changed

15 files changed

+546
-26
lines changed

data-migrator/core/src/main/java/io/camunda/migration/data/HistoryMigrator.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.camunda.migration.data.impl.history.migrator.FlowNodeMigrator;
2020
import io.camunda.migration.data.impl.history.migrator.FormMigrator;
2121
import io.camunda.migration.data.impl.history.migrator.IncidentMigrator;
22+
import io.camunda.migration.data.impl.history.migrator.JobMigrator;
2223
import io.camunda.migration.data.impl.history.migrator.ProcessDefinitionMigrator;
2324
import io.camunda.migration.data.impl.history.migrator.ProcessInstanceMigrator;
2425
import io.camunda.migration.data.impl.history.migrator.UserTaskMigrator;
@@ -59,6 +60,9 @@ public class HistoryMigrator {
5960
@Autowired
6061
protected IncidentMigrator incidentMigrator;
6162

63+
@Autowired
64+
protected JobMigrator jobMigrator;
65+
6266
@Autowired
6367
protected DecisionRequirementsMigrator decisionRequirementsMigrator;
6468

@@ -111,6 +115,7 @@ public void migrate() {
111115
migrateFlowNodes();
112116
migrateUserTasks();
113117
migrateVariables();
118+
migrateJobs();
114119
migrateIncidents();
115120
migrateDecisionRequirementsDefinitions();
116121
migrateDecisionDefinitions();
@@ -146,6 +151,10 @@ public void migrateIncidents() {
146151
incidentMigrator.migrateAll();
147152
}
148153

154+
public void migrateJobs() {
155+
jobMigrator.migrateAll();
156+
}
157+
149158
public void migrateDecisionRequirementsDefinitions() {
150159
decisionRequirementsMigrator.migrateAll();
151160
}
@@ -174,6 +183,7 @@ public void setMode(MigratorMode mode) {
174183
decisionRequirementsMigrator.setMode(mode);
175184
flowNodeMigrator.setMode(mode);
176185
incidentMigrator.setMode(mode);
186+
jobMigrator.setMode(mode);
177187
processDefinitionMigrator.setMode(mode);
178188
processInstanceMigrator.setMode(mode);
179189
userTaskMigrator.setMode(mode);

data-migrator/core/src/main/java/io/camunda/migration/data/impl/clients/C7Client.java

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.camunda.bpm.engine.history.HistoricActivityInstance;
4343
import org.camunda.bpm.engine.history.HistoricDecisionInstance;
4444
import org.camunda.bpm.engine.history.HistoricIncident;
45+
import org.camunda.bpm.engine.history.HistoricJobLog;
4546
import org.camunda.bpm.engine.history.HistoricProcessInstance;
4647
import org.camunda.bpm.engine.history.HistoricTaskInstance;
4748
import org.camunda.bpm.engine.history.HistoricVariableInstance;
@@ -52,6 +53,7 @@
5253
import org.camunda.bpm.engine.impl.HistoricActivityInstanceQueryImpl;
5354
import org.camunda.bpm.engine.impl.HistoricDecisionInstanceQueryImpl;
5455
import org.camunda.bpm.engine.impl.HistoricIncidentQueryImpl;
56+
import org.camunda.bpm.engine.impl.HistoricJobLogQueryImpl;
5557
import org.camunda.bpm.engine.impl.HistoricProcessInstanceQueryImpl;
5658
import org.camunda.bpm.engine.impl.HistoricTaskInstanceQueryImpl;
5759
import org.camunda.bpm.engine.impl.HistoricVariableInstanceQueryImpl;
@@ -571,6 +573,39 @@ public UserOperationLogEntry getUserOperationLogEntry(String c7Id) {
571573
return callApi(query::singleResult, format(FAILED_TO_FETCH_HISTORIC_ELEMENT, "UserOperationLogEntry", c7Id));
572574
}
573575

576+
/**
577+
* Fetches the first historic job log entry for the given job ID, ordered by timestamp ascending.
578+
* Used for retry mode, where the job ID is the tracking key.
579+
*/
580+
public HistoricJobLog getHistoricJobLog(String jobId) {
581+
HistoricJobLogQueryImpl query = (HistoricJobLogQueryImpl) historyService.createHistoricJobLogQuery()
582+
.jobId(jobId)
583+
.orderByTimestamp()
584+
.asc()
585+
.orderByJobId()
586+
.asc();
587+
List<HistoricJobLog> results = callApi(query::list,
588+
format(FAILED_TO_FETCH_HISTORIC_ELEMENT, "HistoricJobLog with jobId", jobId));
589+
return results.isEmpty() ? null : results.getFirst();
590+
}
591+
592+
/**
593+
* Processes historic job log entries with pagination using the provided callback consumer.
594+
* The {@code ignoredCreatedAfter} parameter is not supported by the query and is ignored;
595+
* deduplication is handled via the tracking table.
596+
*/
597+
public void fetchAndHandleHistoricJobLogs(Consumer<HistoricJobLog> callback, Date ignoredCreatedAfter) {
598+
HistoricJobLogQueryImpl query = (HistoricJobLogQueryImpl) historyService.createHistoricJobLogQuery()
599+
.orderByTimestamp()
600+
.asc()
601+
.orderByJobId()
602+
.asc();
603+
604+
new Pagination<HistoricJobLog>().pageSize(properties.getPageSize())
605+
.query(query)
606+
.callback(callback);
607+
}
608+
574609
/**
575610
* Processes tenant entities with pagination using the provided callback consumer.
576611
*/

data-migrator/core/src/main/java/io/camunda/migration/data/impl/clients/C8Client.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_DECISION_REQUIREMENTS;
2323
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_FLOW_NODE_INSTANCE;
2424
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_INCIDENT;
25+
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_JOB;
2526
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_PROCESS_DEFINITION;
2627
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_PROCESS_INSTANCE;
2728
import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_USER_TASK;
@@ -66,6 +67,7 @@
6667
import io.camunda.db.rdbms.sql.FlowNodeInstanceMapper;
6768
import io.camunda.db.rdbms.sql.FormMapper;
6869
import io.camunda.db.rdbms.sql.IncidentMapper;
70+
import io.camunda.db.rdbms.sql.JobMapper;
6971
import io.camunda.db.rdbms.sql.ProcessDefinitionMapper;
7072
import io.camunda.db.rdbms.sql.ProcessInstanceMapper;
7173
import io.camunda.db.rdbms.sql.UserTaskMapper;
@@ -77,6 +79,7 @@
7779
import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel;
7880
import io.camunda.db.rdbms.write.domain.FormDbModel;
7981
import io.camunda.db.rdbms.write.domain.IncidentDbModel;
82+
import io.camunda.db.rdbms.write.domain.JobDbModel;
8083
import io.camunda.db.rdbms.write.domain.ProcessDefinitionDbModel;
8184
import io.camunda.db.rdbms.write.domain.ProcessInstanceDbModel;
8285
import io.camunda.db.rdbms.write.domain.UserTaskDbModel;
@@ -146,6 +149,9 @@ public class C8Client {
146149
@Autowired(required = false)
147150
protected FormMapper formMapper;
148151

152+
@Autowired(required = false)
153+
protected JobMapper jobMapper;
154+
149155
/**
150156
* Creates a new process instance with the given BPMN process ID and variables.
151157
*/
@@ -400,4 +406,11 @@ public void insertForm(FormDbModel dbModel) {
400406
callApi(() -> formMapper.insert(dbModel), "Failed to insert form");
401407
}
402408

409+
/**
410+
* Inserts a Job into the database.
411+
*/
412+
public void insertJob(JobDbModel dbModel) {
413+
callApi(() -> jobMapper.insert(new BatchInsertDto(List.of(dbModel))), FAILED_TO_INSERT_JOB);
414+
}
415+
403416
}

data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/C7Entity.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.camunda.bpm.engine.history.HistoricActivityInstance;
1414
import org.camunda.bpm.engine.history.HistoricDecisionInstance;
1515
import org.camunda.bpm.engine.history.HistoricIncident;
16+
import org.camunda.bpm.engine.history.HistoricJobLog;
1617
import org.camunda.bpm.engine.history.HistoricProcessInstance;
1718
import org.camunda.bpm.engine.history.HistoricTaskInstance;
1819
import org.camunda.bpm.engine.history.HistoricVariableInstance;
@@ -42,6 +43,7 @@ public static C7Entity<?> of(Object c7) {
4243
case HistoricDecisionInstance c7DecisionInstance -> of(c7DecisionInstance);
4344
case HistoricActivityInstance c7ActivityInstance -> of(c7ActivityInstance);
4445
case HistoricIncident c7Incident -> of(c7Incident);
46+
case HistoricJobLog c7JobLog -> of(c7JobLog);
4547
case HistoricProcessInstance c7ProcessInstance -> of(c7ProcessInstance);
4648
case HistoricTaskInstance c7TaskInstance -> of(c7TaskInstance);
4749
case HistoricVariableInstance c7VariableInstance -> of(c7VariableInstance);
@@ -104,6 +106,10 @@ public static C7Entity<HistoricVariableInstance> of(HistoricVariableInstance c7E
104106
return new C7Entity<>(c7Entity.getId(), c7Entity.getCreateTime(), c7Entity);
105107
}
106108

109+
public static C7Entity<HistoricJobLog> of(HistoricJobLog c7Entity) {
110+
return new C7Entity<>(c7Entity.getJobId(), c7Entity.getTimestamp(), c7Entity);
111+
}
112+
107113
public String getId() {
108114
return id;
109115
}

data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/EntitySkippedException.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import org.camunda.bpm.engine.history.HistoricActivityInstance;
1313
import org.camunda.bpm.engine.history.HistoricDecisionInstance;
1414
import org.camunda.bpm.engine.history.HistoricIncident;
15+
import org.camunda.bpm.engine.history.HistoricJobLog;
1516
import org.camunda.bpm.engine.history.HistoricProcessInstance;
1617
import org.camunda.bpm.engine.history.HistoricTaskInstance;
1718
import org.camunda.bpm.engine.history.HistoricVariableInstance;
@@ -48,6 +49,10 @@ public EntitySkippedException(HistoricIncident c7Incident, String message) {
4849
this(C7Entity.of(c7Incident), message);
4950
}
5051

52+
public EntitySkippedException(HistoricJobLog c7JobLog, String message) {
53+
this(C7Entity.of(c7JobLog), message);
54+
}
55+
5156
public EntitySkippedException(HistoricProcessInstance c7ProcessInstance, String message) {
5257
this(C7Entity.of(c7ProcessInstance), message);
5358
}

data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/IncidentMigrator.java

Lines changed: 62 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@
88
package io.camunda.migration.data.impl.history.migrator;
99

1010
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_FLOW_NODE;
11+
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_JOB_REFERENCE;
1112
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_PROCESS_DEFINITION;
1213
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_PROCESS_INSTANCE;
1314
import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE;
1415
import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_INCIDENT;
16+
import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_JOB;
1517
import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_INSTANCE;
1618

1719
import io.camunda.db.rdbms.write.domain.IncidentDbModel;
1820
import io.camunda.db.rdbms.write.domain.IncidentDbModel.Builder;
19-
import io.camunda.migration.data.exception.EntityInterceptorException;
2021
import io.camunda.migration.data.impl.history.C7Entity;
2122
import io.camunda.migration.data.impl.history.EntitySkippedException;
2223
import io.camunda.migration.data.impl.logging.HistoryMigratorLogs;
@@ -76,8 +77,6 @@ public Long migrateTransactionally(HistoricIncident c7Incident) {
7677
if (processInstanceKey != null) {
7778
var flowNodeInstanceKey = findFlowNodeInstanceKey(c7Incident.getActivityId(), c7Incident.getProcessInstanceId());
7879
builder.flowNodeInstanceKey(flowNodeInstanceKey);
79-
// .jobKey(jobDefinitionKey) // TODO when jobs are migrated
80-
8180

8281
String c7RootProcessInstanceId = c7Incident.getRootProcessInstanceId();
8382
if (c7RootProcessInstanceId != null && isMigrated(c7RootProcessInstanceId, HISTORY_PROCESS_INSTANCE)) {
@@ -89,29 +88,28 @@ public Long migrateTransactionally(HistoricIncident c7Incident) {
8988
}
9089
}
9190

91+
resolveJobKey(c7Incident, builder);
92+
9293
IncidentDbModel dbModel = convert(C7Entity.of(c7Incident), builder);
9394

94-
if (dbModel.processDefinitionKey() == null) {
95-
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_PROCESS_DEFINITION);
96-
}
95+
if (dbModel.processDefinitionKey() == null) {
96+
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_PROCESS_DEFINITION);
97+
}
9798

98-
if (dbModel.processInstanceKey() == null) {
99+
if (dbModel.processInstanceKey() == null) {
99100
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_PROCESS_INSTANCE);
100-
}
101+
}
101102

102-
if (dbModel.rootProcessInstanceKey() == null) {
103-
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE);
104-
}
103+
if (dbModel.rootProcessInstanceKey() == null) {
104+
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE);
105+
}
105106

106-
if (dbModel.flowNodeInstanceKey() == null) {
107-
if (!c7Client.hasWaitingExecution(c7Incident.getProcessInstanceId(), c7Incident.getActivityId())) { // Activities on async before waiting state will not have a flow node instance key, but should not be skipped
108-
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_FLOW_NODE);
107+
if (dbModel.flowNodeInstanceKey() == null) {
108+
if (!c7Client.hasWaitingExecution(c7Incident.getProcessInstanceId(), c7Incident.getActivityId())) {
109+
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_FLOW_NODE);
110+
}
109111
}
110-
}
111112

112-
if (dbModel.jobKey() == null) {
113-
// throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_JOB_REFERENCE); // TODO when jobs are migrated
114-
}
115113
c8Client.insertIncident(dbModel);
116114

117115
return dbModel.incidentKey();
@@ -120,5 +118,51 @@ public Long migrateTransactionally(HistoricIncident c7Incident) {
120118
return null;
121119
}
122120

121+
/**
122+
* Resolves and sets the job key on the incident builder for {@code failedJob} incidents.
123+
* <p>
124+
* If the incident is a {@code failedJob} incident and the associated C7 job has been tracked
125+
* in the migration table:
126+
* <ul>
127+
* <li>If the job was migrated (has a C8 key), sets the {@code jobKey} on the builder.</li>
128+
* <li>If the job was explicitly skipped (null C8 key), throws an
129+
* {@link EntitySkippedException} to skip this incident as well.</li>
130+
* </ul>
131+
* If the job is not yet tracked (job migration may not have run or the job type is not
132+
* tracked), the incident proceeds without a job key.
133+
* </p>
134+
*
135+
* @param c7Incident the Camunda 7 incident
136+
* @param builder the incident builder to set the job key on
137+
* @throws EntitySkippedException if the associated job was explicitly skipped
138+
*/
139+
protected void resolveJobKey(final HistoricIncident c7Incident, final Builder builder) {
140+
if (!isFailedJobIncident(c7Incident)) {
141+
return;
142+
}
143+
final String c7JobId = c7Incident.getConfiguration();
144+
if (c7JobId == null) {
145+
return;
146+
}
147+
if (dbClient.checkExistsByC7IdAndType(c7JobId, HISTORY_JOB)) {
148+
final Long jobKey = dbClient.findC8KeyByC7IdAndType(c7JobId, HISTORY_JOB);
149+
if (jobKey != null) {
150+
builder.jobKey(jobKey);
151+
} else {
152+
throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_JOB_REFERENCE);
153+
}
154+
}
155+
}
156+
157+
/**
158+
* Returns true if this incident was caused by a job failure.
159+
*
160+
* @param c7Incident the Camunda 7 incident
161+
* @return true for {@code failedJob} incident type
162+
*/
163+
protected boolean isFailedJobIncident(final HistoricIncident c7Incident) {
164+
return "failedJob".equals(c7Incident.getIncidentType());
165+
}
166+
123167
}
124168

0 commit comments

Comments
 (0)