diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/IncidentMigrator.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/IncidentMigrator.java index 009f81da1..9b9b65538 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/IncidentMigrator.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/IncidentMigrator.java @@ -81,18 +81,19 @@ public Long migrateTransactionally(HistoricIncident c7Incident) { if (shouldMigrate(c7IncidentId, HISTORY_INCIDENT)) { HistoryMigratorLogs.migratingHistoricIncident(c7IncidentId); var c7ProcessInstance = findProcessInstanceByC7Id(c7Incident.getProcessInstanceId()); + Long processInstanceKey = null; + Long flowNodeInstanceKey = null; var builder = new Builder(); var processDefinitionKey = findProcessDefinitionKey(c7Incident.getProcessDefinitionId()); builder.processDefinitionKey(processDefinitionKey); if (c7ProcessInstance != null) { - var processInstanceKey = c7ProcessInstance.processInstanceKey(); + processInstanceKey = c7ProcessInstance.processInstanceKey(); builder.processInstanceKey(processInstanceKey); if (processInstanceKey != null) { - var flowNodeInstanceKey = findFlowNodeInstanceKey(c7Incident.getActivityId(), c7Incident.getProcessInstanceId()); + flowNodeInstanceKey = findFlowNodeInstanceKey(c7Incident.getActivityId(), c7Incident.getProcessInstanceId()); builder.flowNodeInstanceKey(flowNodeInstanceKey); -// .jobKey(jobDefinitionKey) // TODO when jobs are migrated - + // .jobKey(jobDefinitionKey) // TODO when jobs are migrated String c7RootProcessInstanceId = c7Incident.getRootProcessInstanceId(); if (c7RootProcessInstanceId != null && isMigrated(c7RootProcessInstanceId, HISTORY_PROCESS_INSTANCE)) { @@ -101,32 +102,34 @@ public Long migrateTransactionally(HistoricIncident c7Incident) { builder.rootProcessInstanceKey(rootProcessInstance.processInstanceKey()); } } + builder.treePath(generateTreePath(processInstanceKey, flowNodeInstanceKey)); } } IncidentDbModel dbModel = convert(C7Entity.of(c7Incident), builder); - if (dbModel.processDefinitionKey() == null) { - throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_PROCESS_DEFINITION); - } + if (dbModel.processDefinitionKey() == null) { + throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_PROCESS_DEFINITION); + } - if (dbModel.processInstanceKey() == null) { + if (dbModel.processInstanceKey() == null) { throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_PROCESS_INSTANCE); - } + } - if (dbModel.rootProcessInstanceKey() == null) { - throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE); - } + if (dbModel.rootProcessInstanceKey() == null) { + throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE); + } - if (dbModel.flowNodeInstanceKey() == null) { - if (!c7Client.hasWaitingExecution(c7Incident.getProcessInstanceId(), c7Incident.getActivityId())) { // Activities on async before waiting state will not have a flow node instance key, but should not be skipped - throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_FLOW_NODE); + if (dbModel.flowNodeInstanceKey() == null) { + if (!c7Client.hasWaitingExecution(c7Incident.getProcessInstanceId(), c7Incident.getActivityId())) { + // Activities on async before waiting state will not have a flow node instance key, but should not be skipped + throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_FLOW_NODE); + } } - } - if (dbModel.jobKey() == null) { -// throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_JOB_REFERENCE); // TODO when jobs are migrated - } + if (dbModel.jobKey() == null) { + // throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_JOB_REFERENCE); // TODO when jobs are migrated + } c8Client.insertIncident(dbModel); return dbModel.incidentKey(); @@ -141,11 +144,9 @@ protected Long findFlowNodeInstanceKey(String activityId, String processInstance return null; } - List flowNodes = c8Client.searchFlowNodeInstances( - FlowNodeInstanceDbQuery.of(builder -> builder.filter( - FlowNodeInstanceFilter.of(filter -> filter.flowNodeIds(activityId).processInstanceKeys(processInstanceKey)) - )) - ); + List flowNodes = c8Client.searchFlowNodeInstances(FlowNodeInstanceDbQuery.of( + builder -> builder.filter(FlowNodeInstanceFilter.of( + filter -> filter.flowNodeIds(activityId).processInstanceKeys(processInstanceKey))))); if (!flowNodes.isEmpty()) { return flowNodes.getFirst().flowNodeInstanceKey(); @@ -154,5 +155,18 @@ protected Long findFlowNodeInstanceKey(String activityId, String processInstance } } + /** + * Generates a tree path for incidents in the format: PI_processInstanceKey/FNI_elementInstanceKey (if the + * elementInstanceKey exists, otherwise PI_processInstanceKey) + * + * @param processInstanceKey the process instance key + * @param elementInstanceKey the flow node instance key + * @return the tree path string + */ + public static String generateTreePath(Long processInstanceKey, Long elementInstanceKey) { + return elementInstanceKey == null ? + "PI_" + processInstanceKey : + "PI_" + processInstanceKey + "/FNI_" + elementInstanceKey; + } } diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/interceptor/history/entity/IncidentTransformer.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/interceptor/history/entity/IncidentTransformer.java index f8d82222e..b84f1de7c 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/interceptor/history/entity/IncidentTransformer.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/interceptor/history/entity/IncidentTransformer.java @@ -47,7 +47,6 @@ public void execute(HistoricIncident entity, Builder builder) { .errorType(determineErrorType(entity)) .errorMessage(entity.getIncidentMessage()) .creationDate(convertDate(entity.getCreateTime())) - .treePath(null) .errorMessageHash(null) .partitionId(C7_HISTORY_PARTITION_ID) .jobKey(null) diff --git a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationAbstractTest.java b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationAbstractTest.java index adeeb7239..2d4577df8 100644 --- a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationAbstractTest.java +++ b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationAbstractTest.java @@ -21,6 +21,7 @@ import io.camunda.db.rdbms.sql.FlowNodeInstanceMapper; import io.camunda.db.rdbms.sql.PurgeMapper; import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel; +import io.camunda.db.rdbms.write.domain.IncidentDbModel; import io.camunda.db.rdbms.write.service.RdbmsPurger; import io.camunda.migration.data.HistoryMigrator; import io.camunda.migration.data.MigratorMode; @@ -29,6 +30,7 @@ import io.camunda.migration.data.impl.clients.DbClient; import io.camunda.migration.data.impl.util.ConverterUtil; import io.camunda.migration.data.qa.AbstractMigratorTest; +import io.camunda.migration.data.qa.extension.RdbmsQueryExtension; import io.camunda.migration.data.qa.util.WithSpringProfile; import io.camunda.search.entities.AuditLogEntity; import io.camunda.search.entities.DecisionDefinitionEntity; @@ -64,6 +66,7 @@ import org.camunda.bpm.engine.impl.util.ClockUtil; import org.camunda.bpm.engine.task.Task; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.extension.RegisterExtension; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Conditional; @@ -93,6 +96,10 @@ public abstract class HistoryMigrationAbstractTest extends AbstractMigratorTest @Autowired protected FlowNodeInstanceMapper flowNodeInstanceMapper; + @RegisterExtension + @Autowired + protected RdbmsQueryExtension rdbmsQuery = new RdbmsQueryExtension(); + // C7 --------------------------------------- @Autowired @@ -232,6 +239,20 @@ public List searchHistoricIncidents(String processDefinitionId) .items(); } + public List searchIncidentsByProcessInstanceKeyAndReturnAsDbModel(Long processInstanceKey) { + String sql = "SELECT INCIDENT_KEY, TREE_PATH, PROCESS_INSTANCE_KEY, FLOW_NODE_INSTANCE_KEY " + + "FROM INCIDENT WHERE PROCESS_INSTANCE_KEY = ?"; + + return rdbmsQuery.query(sql, (rs, rowNum) -> { + IncidentDbModel.Builder builder = new IncidentDbModel.Builder(); + builder.incidentKey(rs.getLong("INCIDENT_KEY")); + builder.treePath(rs.getString("TREE_PATH")); + builder.processInstanceKey(rs.getLong("PROCESS_INSTANCE_KEY")); + builder.flowNodeInstanceKey(rs.getLong("FLOW_NODE_INSTANCE_KEY")); + return builder.build(); + }, processInstanceKey); + } + public List searchHistoricVariables(String varName) { return rdbmsService.getVariableReader() .search(VariableQuery.of(queryBuilder -> diff --git a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryIncidentTest.java b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryIncidentTest.java index 95ab14211..90758c2cf 100644 --- a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryIncidentTest.java +++ b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryIncidentTest.java @@ -16,12 +16,19 @@ import static io.camunda.search.entities.IncidentEntity.ErrorType.RESOURCE_NOT_FOUND; import static io.camunda.search.entities.IncidentEntity.ErrorType.UNHANDLED_ERROR_EVENT; import static io.camunda.search.entities.IncidentEntity.ErrorType.UNKNOWN; +import static io.camunda.search.entities.FlowNodeInstanceEntity.FlowNodeType.END_EVENT; +import static io.camunda.search.entities.FlowNodeInstanceEntity.FlowNodeType.START_EVENT; +import static io.camunda.search.entities.FlowNodeInstanceEntity.FlowNodeType.USER_TASK; import static org.assertj.core.api.Assertions.assertThat; +import static org.camunda.bpm.engine.impl.incident.IncidentHandling.createIncident; +import static org.camunda.bpm.engine.impl.jobexecutor.ExecuteJobHelper.executeJob; -import io.camunda.migration.data.MigratorMode; +import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel; +import io.camunda.db.rdbms.write.domain.IncidentDbModel; import io.camunda.migration.data.qa.history.HistoryMigrationAbstractTest; import io.camunda.search.entities.IncidentEntity; import java.util.Collections; +import io.camunda.search.entities.ProcessInstanceEntity; import java.util.List; import org.camunda.bpm.engine.history.HistoricIncident; import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl; @@ -52,7 +59,9 @@ public void shouldMigrateIncidentTenant() { // then List incidentsDefaultTenant = searchHistoricIncidents("incidentProcessId"); List incidentsTenant1 = searchHistoricIncidents("incidentProcessId2"); - assertThat(incidentsDefaultTenant).singleElement().extracting(IncidentEntity::tenantId).isEqualTo(C8_DEFAULT_TENANT); + assertThat(incidentsDefaultTenant).singleElement() + .extracting(IncidentEntity::tenantId) + .isEqualTo(C8_DEFAULT_TENANT); assertThat(incidentsTenant1).singleElement().extracting(IncidentEntity::tenantId).isEqualTo("tenant1"); } @@ -110,13 +119,19 @@ public void shouldMigrateIncidentForNestedProcessInstance() { deployer.deployCamunda7Process("callActivityProcess.bpmn"); deployer.deployCamunda7Process("calledActivitySubprocess.bpmn"); ProcessInstance parentProcess = runtimeService.startProcessInstanceByKey("callingProcessId"); - ProcessInstance childProcess = runtimeService.createProcessInstanceQuery().processDefinitionKey("calledProcessInstanceId").singleResult(); + ProcessInstance childProcess = runtimeService.createProcessInstanceQuery() + .processDefinitionKey("calledProcessInstanceId") + .singleResult(); createIncident("userTaskId"); // create incident in child's task - HistoricIncident c7ChildIncident = historyService.createHistoricIncidentQuery().processInstanceId(childProcess.getProcessInstanceId()).singleResult(); + HistoricIncident c7ChildIncident = historyService.createHistoricIncidentQuery() + .processInstanceId(childProcess.getProcessInstanceId()) + .singleResult(); assertThat(c7ChildIncident).isNotNull(); - HistoricIncident c7ParentIncident = historyService.createHistoricIncidentQuery().processInstanceId(parentProcess.getProcessInstanceId()).singleResult(); + HistoricIncident c7ParentIncident = historyService.createHistoricIncidentQuery() + .processInstanceId(parentProcess.getProcessInstanceId()) + .singleResult(); assertThat(c7ParentIncident).isNotNull(); // when @@ -127,7 +142,8 @@ public void shouldMigrateIncidentForNestedProcessInstance() { // child incident is migrated List childIncidents = searchHistoricIncidents(childProcess.getProcessDefinitionKey()); assertThat(childIncidents).hasSize(1); - assertOnIncidentBasicFields(childIncidents.getFirst(), c7ChildIncident, childProcess, parentProcess, UNKNOWN, false); + assertOnIncidentBasicFields(childIncidents.getFirst(), c7ChildIncident, childProcess, parentProcess, UNKNOWN, + false); // parent incident is migrated List parentIncidents = searchHistoricIncidents(parentProcess.getProcessDefinitionKey()); @@ -282,7 +298,8 @@ public void shouldMigrateIncidentWithDecisionEvaluationErrorType() { // given deployer.deployCamunda7Process("ruleTaskProcess.bpmn"); deployer.deployCamunda7Decision("mappingFailureDmn.dmn"); - ProcessInstance c7ProcessInstance = runtimeService.startProcessInstanceByKey("ruleTaskProcessId", Collections.singletonMap("input", "single entry list")); + ProcessInstance c7ProcessInstance = runtimeService.startProcessInstanceByKey("ruleTaskProcessId", + Collections.singletonMap("input", "single entry list")); triggerIncident(c7ProcessInstance.getId()); HistoricIncident c7Incident = historyService.createHistoricIncidentQuery() @@ -368,19 +385,91 @@ public void shouldMigrateIncidentWithNoJobRetriesErrorType() { assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, JOB_NO_RETRIES, true); } - protected void assertOnIncidentBasicFields(IncidentEntity c8Incident, HistoricIncident c7Incident, ProcessInstance c7ChildInstance, ProcessInstance c7ParentInstance) { + @Test + public void shouldGenerateTreePathForIncidentsWithFlowNodeInstanceKey() { + // given + deployer.deployCamunda7Process("userTaskProcessAsyncAfter.bpmn"); + ProcessInstance c7ProcessInstance = runtimeService.startProcessInstanceByKey("userTaskProcessId"); + + createIncident("userTaskId"); + String userTaskId = taskService.createTaskQuery().taskDefinitionKey("userTaskId").singleResult().getId(); + taskService.complete(userTaskId); + + HistoricIncident c7Incident = historyService.createHistoricIncidentQuery() + .processInstanceId(c7ProcessInstance.getId()) + .singleResult(); + assertThat(c7Incident).isNotNull(); + + // when + historyMigrator.migrate(); + + // then + List processInstances = searchHistoricProcessInstances("userTaskProcessId"); + assertThat(processInstances).hasSize(1); + Long processInstanceKey = processInstances.getFirst().processInstanceKey(); + List flowNodes = searchFlowNodeInstancesByName("UserTaskName"); + assertThat(flowNodes).hasSize(1); + Long flownodeInstanceKey = flowNodes.getFirst().flowNodeInstanceKey(); + + List incidents = searchIncidentsByProcessInstanceKeyAndReturnAsDbModel(processInstanceKey); + assertThat(incidents).singleElement() + .extracting(IncidentDbModel::treePath) + .isNotNull() + .isEqualTo("PI_" + processInstanceKey + "/FNI_" + flownodeInstanceKey); + } + + @Test + public void shouldGenerateTreePathForIncidentsWithoutFlowNodeInstanceKey() { + // given + deployer.deployCamunda7Process("incidentProcess.bpmn"); + ProcessInstance c7ProcessInstance = runtimeService.startProcessInstanceByKey("incidentProcessId"); + triggerIncident(c7ProcessInstance.getId()); + + HistoricIncident c7Incident = historyService.createHistoricIncidentQuery() + .processInstanceId(c7ProcessInstance.getId()) + .singleResult(); + assertThat(c7Incident).isNotNull(); + + // when + historyMigrator.migrate(); + + // then + List processInstances = searchHistoricProcessInstances("incidentProcessId"); + assertThat(processInstances).hasSize(1); + Long processInstanceKey = processInstances.getFirst().processInstanceKey(); + + List incidents = searchIncidentsByProcessInstanceKeyAndReturnAsDbModel(processInstanceKey); + assertThat(incidents).singleElement() + .extracting(IncidentDbModel::treePath) + .isNotNull() + .isEqualTo("PI_" + processInstanceKey); + } + + protected void assertOnIncidentBasicFields(IncidentEntity c8Incident, + HistoricIncident c7Incident, + ProcessInstance c7ChildInstance, + ProcessInstance c7ParentInstance) { assertOnIncidentBasicFields(c8Incident, c7Incident, c7ChildInstance, c7ParentInstance, UNKNOWN, false); } - protected void assertOnIncidentBasicFields(IncidentEntity c8Incident, HistoricIncident c7Incident, ProcessInstance c7ChildInstance, ProcessInstance c7ParentInstance, IncidentEntity.ErrorType errorType, boolean waitingExecution) { + protected void assertOnIncidentBasicFields(IncidentEntity c8Incident, + HistoricIncident c7Incident, + ProcessInstance c7ChildInstance, + ProcessInstance c7ParentInstance, + IncidentEntity.ErrorType errorType, + boolean waitingExecution) { // specific values assertThat(c8Incident.tenantId()).isEqualTo(C8_DEFAULT_TENANT); - assertThat(c8Incident.processDefinitionId()).isEqualTo(prefixDefinitionId(c7ChildInstance.getProcessDefinitionKey())); + assertThat(c8Incident.processDefinitionId()).isEqualTo( + prefixDefinitionId(c7ChildInstance.getProcessDefinitionKey())); assertThat(c8Incident.flowNodeId()).isEqualTo(c7Incident.getActivityId()); assertThat(c8Incident.state()).isEqualTo(IncidentEntity.IncidentState.RESOLVED); assertThat(c8Incident.errorMessage()).isEqualTo(c7Incident.getIncidentMessage()); - assertThat(c8Incident.processInstanceKey()).isEqualTo(findMigratedProcessInstanceKey(c7ChildInstance.getProcessDefinitionKey())); - String expectedRootProcessKey = c7ParentInstance != null ? c7ParentInstance.getProcessDefinitionKey() : c7ChildInstance.getProcessDefinitionKey(); + assertThat(c8Incident.processInstanceKey()).isEqualTo( + findMigratedProcessInstanceKey(c7ChildInstance.getProcessDefinitionKey())); + String expectedRootProcessKey = c7ParentInstance != null ? + c7ParentInstance.getProcessDefinitionKey() : + c7ChildInstance.getProcessDefinitionKey(); assertThat(c8Incident.rootProcessInstanceKey()).isEqualTo(findMigratedProcessInstanceKey(expectedRootProcessKey)); assertThat(c8Incident.errorType()).isEqualTo(errorType); @@ -400,10 +489,7 @@ protected void assertOnIncidentBasicFields(IncidentEntity c8Incident, HistoricIn } protected void executeJob(ProcessInstance c7ProcessInstance) { - Job job = managementService - .createJobQuery() - .processInstanceId(c7ProcessInstance.getId()) - .singleResult(); + Job job = managementService.createJobQuery().processInstanceId(c7ProcessInstance.getId()).singleResult(); if (job != null) { managementService.executeJob(job.getId());