diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/HistoryMigrator.java b/data-migrator/core/src/main/java/io/camunda/migration/data/HistoryMigrator.java index ba0982ac8..76c7ece41 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/HistoryMigrator.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/HistoryMigrator.java @@ -11,28 +11,27 @@ import static io.camunda.migration.data.impl.persistence.IdKeyMapper.getHistoryTypes; import io.camunda.migration.data.config.C8DataSourceConfigured; -import io.camunda.migration.data.impl.history.migrator.HistoryEntityMigrator; +import io.camunda.migration.data.impl.clients.DbClient; +import io.camunda.migration.data.impl.history.EntitySkippedException; +import io.camunda.migration.data.impl.history.migrator.AuditLogMigrator; import io.camunda.migration.data.impl.history.migrator.DecisionDefinitionMigrator; import io.camunda.migration.data.impl.history.migrator.DecisionInstanceMigrator; import io.camunda.migration.data.impl.history.migrator.DecisionRequirementsMigrator; import io.camunda.migration.data.impl.history.migrator.FlowNodeMigrator; import io.camunda.migration.data.impl.history.migrator.FormMigrator; +import io.camunda.migration.data.impl.history.migrator.HistoryEntityMigrator; import io.camunda.migration.data.impl.history.migrator.IncidentMigrator; +import io.camunda.migration.data.impl.history.migrator.JobMigrator; import io.camunda.migration.data.impl.history.migrator.ProcessDefinitionMigrator; import io.camunda.migration.data.impl.history.migrator.ProcessInstanceMigrator; import io.camunda.migration.data.impl.history.migrator.UserTaskMigrator; import io.camunda.migration.data.impl.history.migrator.VariableMigrator; -import io.camunda.migration.data.impl.history.migrator.AuditLogMigrator; -import io.camunda.migration.data.impl.clients.DbClient; -import io.camunda.migration.data.impl.history.EntitySkippedException; import io.camunda.migration.data.impl.logging.HistoryMigratorLogs; import io.camunda.migration.data.impl.persistence.IdKeyMapper; import io.camunda.migration.data.impl.util.ExceptionUtils; import io.camunda.migration.data.impl.util.PrintUtils; import java.util.ArrayList; -import java.util.HashSet; import java.util.List; -import java.util.Set; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Conditional; import org.springframework.stereotype.Component; @@ -62,6 +61,9 @@ public class HistoryMigrator { @Autowired protected IncidentMigrator incidentMigrator; + @Autowired + protected JobMigrator jobMigrator; + @Autowired protected DecisionRequirementsMigrator decisionRequirementsMigrator; @@ -85,6 +87,7 @@ public class HistoryMigrator { flowNodeMigrator, userTaskMigrator, variableMigrator, + jobMigrator, incidentMigrator, decisionRequirementsMigrator, decisionDefinitionMigrator, diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/config/InterceptorConfiguration.java b/data-migrator/core/src/main/java/io/camunda/migration/data/config/InterceptorConfiguration.java index 7916ad4b2..3edcd4d0e 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/config/InterceptorConfiguration.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/config/InterceptorConfiguration.java @@ -31,6 +31,7 @@ import io.camunda.migration.data.impl.interceptor.history.entity.FlowNodeTransformer; import io.camunda.migration.data.impl.interceptor.history.entity.FormTransformer; import io.camunda.migration.data.impl.interceptor.history.entity.IncidentTransformer; +import io.camunda.migration.data.impl.interceptor.history.entity.JobTransformer; import io.camunda.migration.data.impl.interceptor.history.entity.ProcessDefinitionTransformer; import io.camunda.migration.data.impl.interceptor.history.entity.ProcessInstanceTransformer; import io.camunda.migration.data.impl.interceptor.history.entity.UserTaskTransformer; @@ -357,6 +358,11 @@ public FormTransformer formTransformer() { return new FormTransformer(); } + @Bean + public JobTransformer jobTransformer() { + return new JobTransformer(); + } + @Bean public IncidentTransformer incidentTransformer() { return new IncidentTransformer(); diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/config/MigratorAutoConfiguration.java b/data-migrator/core/src/main/java/io/camunda/migration/data/config/MigratorAutoConfiguration.java index 33a04d509..2c692838e 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/config/MigratorAutoConfiguration.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/config/MigratorAutoConfiguration.java @@ -29,6 +29,7 @@ import io.camunda.migration.data.impl.history.migrator.DecisionInstanceMigrator; import io.camunda.migration.data.impl.history.migrator.DecisionRequirementsMigrator; import io.camunda.migration.data.impl.history.migrator.FormMigrator; +import io.camunda.migration.data.impl.history.migrator.JobMigrator; import io.camunda.migration.data.impl.history.migrator.ProcessDefinitionMigrator; import io.camunda.migration.data.impl.history.migrator.AuditLogMigrator; import io.camunda.migration.data.impl.identity.AuthorizationManager; @@ -75,6 +76,7 @@ DecisionInstanceMigrator.class, DecisionRequirementsMigrator.class, FlowNodeMigrator.class, + JobMigrator.class, IncidentMigrator.class, FormMigrator.class, ProcessDefinitionMigrator.class, diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/clients/C7Client.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/clients/C7Client.java index 201c2fdd8..d3827e541 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/clients/C7Client.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/clients/C7Client.java @@ -42,6 +42,7 @@ import org.camunda.bpm.engine.history.HistoricActivityInstance; import org.camunda.bpm.engine.history.HistoricDecisionInstance; import org.camunda.bpm.engine.history.HistoricIncident; +import org.camunda.bpm.engine.history.HistoricJobLog; import org.camunda.bpm.engine.history.HistoricProcessInstance; import org.camunda.bpm.engine.history.HistoricTaskInstance; import org.camunda.bpm.engine.history.HistoricVariableInstance; @@ -56,6 +57,7 @@ import org.camunda.bpm.engine.impl.HistoricActivityInstanceQueryImpl; import org.camunda.bpm.engine.impl.HistoricDecisionInstanceQueryImpl; import org.camunda.bpm.engine.impl.HistoricIncidentQueryImpl; +import org.camunda.bpm.engine.impl.HistoricJobLogQueryImpl; import org.camunda.bpm.engine.impl.HistoricProcessInstanceQueryImpl; import org.camunda.bpm.engine.impl.HistoricTaskInstanceQueryImpl; import org.camunda.bpm.engine.impl.HistoricVariableInstanceQueryImpl; @@ -575,6 +577,41 @@ public UserOperationLogEntry getUserOperationLogEntry(String c7Id) { return callApi(query::singleResult, format(FAILED_TO_FETCH_HISTORIC_ELEMENT, "UserOperationLogEntry", c7Id)); } + /** + * Fetches the first historic job log entry for the given job ID, ordered by timestamp ascending. + * Used for retry mode, where the job ID is the tracking key. + */ + public HistoricJobLog getHistoricJobLog(String jobId) { + HistoricJobLogQueryImpl query = (HistoricJobLogQueryImpl) historyService.createHistoricJobLogQuery() + .jobId(jobId) + .orderByTimestamp() + .asc() + .orderByJobId() + .asc(); + List results = callApi(query::list, + format(FAILED_TO_FETCH_HISTORIC_ELEMENT, "HistoricJobLog with jobId", jobId)); + return results.isEmpty() ? null : results.getFirst(); + } + + /** + * Processes historic job log entries with pagination using the provided callback consumer. + */ + public void fetchAndHandleHistoricJobLogs(Consumer callback, Date timestampAfter) { + HistoricJobLogQueryImpl query = (HistoricJobLogQueryImpl) historyService.createHistoricJobLogQuery() + .orderByTimestamp() + .asc() + .orderByJobId() + .asc(); + + if (timestampAfter != null) { + query.timestampAfter(timestampAfter); + } + + new Pagination().pageSize(properties.getPageSize()) + .query(query) + .callback(callback); + } + /** * Processes tenant entities with pagination using the provided callback consumer. */ diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/clients/C8Client.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/clients/C8Client.java index 8f76103e1..d6293d187 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/clients/C8Client.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/clients/C8Client.java @@ -24,6 +24,7 @@ import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_DECISION_REQUIREMENTS; import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_FLOW_NODE_INSTANCE; import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_INCIDENT; +import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_JOB; import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_PROCESS_DEFINITION; import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_PROCESS_INSTANCE; import static io.camunda.migration.data.impl.logging.C8ClientLogs.FAILED_TO_INSERT_USER_TASK; @@ -68,6 +69,7 @@ import io.camunda.db.rdbms.sql.FlowNodeInstanceMapper; import io.camunda.db.rdbms.sql.FormMapper; import io.camunda.db.rdbms.sql.IncidentMapper; +import io.camunda.db.rdbms.sql.JobMapper; import io.camunda.db.rdbms.sql.ProcessDefinitionMapper; import io.camunda.db.rdbms.sql.ProcessInstanceMapper; import io.camunda.db.rdbms.sql.UserTaskMapper; @@ -79,6 +81,7 @@ import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel; import io.camunda.db.rdbms.write.domain.FormDbModel; import io.camunda.db.rdbms.write.domain.IncidentDbModel; +import io.camunda.db.rdbms.write.domain.JobDbModel; import io.camunda.db.rdbms.write.domain.ProcessDefinitionDbModel; import io.camunda.db.rdbms.write.domain.ProcessInstanceDbModel; import io.camunda.db.rdbms.write.domain.UserTaskDbModel; @@ -149,6 +152,9 @@ public class C8Client { @Autowired(required = false) protected FormMapper formMapper; + @Autowired(required = false) + protected JobMapper jobMapper; + /** * Creates a new process instance with the given BPMN process ID and variables. */ @@ -452,4 +458,11 @@ public void insertForm(FormDbModel dbModel) { callApi(() -> formMapper.insert(dbModel), "Failed to insert form"); } + /** + * Inserts a Job into the database. + */ + public void insertJob(JobDbModel dbModel) { + callApi(() -> jobMapper.insert(new BatchInsertDto(List.of(dbModel))), FAILED_TO_INSERT_JOB); + } + } diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/C7Entity.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/C7Entity.java index ad8fe9ac8..1a597e634 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/C7Entity.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/C7Entity.java @@ -7,18 +7,18 @@ */ package io.camunda.migration.data.impl.history; -import static io.camunda.migration.data.impl.persistence.IdKeyMapper.*; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE; import java.util.Date; import org.camunda.bpm.engine.history.HistoricActivityInstance; import org.camunda.bpm.engine.history.HistoricDecisionInstance; import org.camunda.bpm.engine.history.HistoricIncident; +import org.camunda.bpm.engine.history.HistoricJobLog; import org.camunda.bpm.engine.history.HistoricProcessInstance; import org.camunda.bpm.engine.history.HistoricTaskInstance; import org.camunda.bpm.engine.history.HistoricVariableInstance; -import org.camunda.bpm.engine.impl.persistence.entity.CamundaFormDefinitionEntity; -import org.camunda.bpm.engine.repository.CamundaFormDefinition; import org.camunda.bpm.engine.history.UserOperationLogEntry; +import org.camunda.bpm.engine.repository.CamundaFormDefinition; import org.camunda.bpm.engine.repository.DecisionDefinition; import org.camunda.bpm.engine.repository.DecisionRequirementsDefinition; import org.camunda.bpm.engine.repository.ProcessDefinition; @@ -42,6 +42,7 @@ public static C7Entity of(Object c7) { case HistoricDecisionInstance c7DecisionInstance -> of(c7DecisionInstance); case HistoricActivityInstance c7ActivityInstance -> of(c7ActivityInstance); case HistoricIncident c7Incident -> of(c7Incident); + case HistoricJobLog c7JobLog -> of(c7JobLog); case HistoricProcessInstance c7ProcessInstance -> of(c7ProcessInstance); case HistoricTaskInstance c7TaskInstance -> of(c7TaskInstance); case HistoricVariableInstance c7VariableInstance -> of(c7VariableInstance); @@ -104,6 +105,10 @@ public static C7Entity of(HistoricVariableInstance c7E return new C7Entity<>(c7Entity.getId(), c7Entity.getCreateTime(), c7Entity); } + public static C7Entity of(HistoricJobLog c7Entity) { + return new C7Entity<>(c7Entity.getJobId(), c7Entity.getTimestamp(), c7Entity); + } + public String getId() { return id; } diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/EntitySkippedException.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/EntitySkippedException.java index 4d68d7e90..f4b20c4d3 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/EntitySkippedException.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/EntitySkippedException.java @@ -11,6 +11,7 @@ import org.camunda.bpm.engine.history.HistoricActivityInstance; import org.camunda.bpm.engine.history.HistoricDecisionInstance; import org.camunda.bpm.engine.history.HistoricIncident; +import org.camunda.bpm.engine.history.HistoricJobLog; import org.camunda.bpm.engine.history.HistoricProcessInstance; import org.camunda.bpm.engine.history.HistoricTaskInstance; import org.camunda.bpm.engine.history.HistoricVariableInstance; @@ -47,6 +48,10 @@ public EntitySkippedException(HistoricIncident c7Incident, String message) { this(C7Entity.of(c7Incident), message); } + public EntitySkippedException(HistoricJobLog c7JobLog, String message) { + this(C7Entity.of(c7JobLog), message); + } + public EntitySkippedException(HistoricProcessInstance c7ProcessInstance, String message) { this(C7Entity.of(c7ProcessInstance), message); } diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/HistoryEntityMigrator.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/HistoryEntityMigrator.java index ab784a5bd..6c6316f05 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/HistoryEntityMigrator.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/HistoryEntityMigrator.java @@ -19,12 +19,14 @@ import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_INSTANCE; import static io.camunda.migration.data.impl.util.ConverterUtil.convertDate; +import io.camunda.db.rdbms.read.domain.FlowNodeInstanceDbQuery; import io.camunda.db.rdbms.read.domain.ProcessDefinitionDbQuery; +import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel; import io.camunda.migration.data.MigratorMode; -import io.camunda.migration.data.impl.DataSourceRegistry; import io.camunda.migration.data.config.property.MigratorProperties; import io.camunda.migration.data.exception.EntityInterceptorException; import io.camunda.migration.data.exception.VariableInterceptorException; +import io.camunda.migration.data.impl.DataSourceRegistry; import io.camunda.migration.data.impl.EntityConversionService; import io.camunda.migration.data.impl.clients.C7Client; import io.camunda.migration.data.impl.clients.C8Client; @@ -35,12 +37,12 @@ import io.camunda.migration.data.interceptor.property.EntityConversionContext; import io.camunda.search.entities.ProcessDefinitionEntity; import io.camunda.search.entities.ProcessInstanceEntity; +import io.camunda.search.filter.FlowNodeInstanceFilter; import io.camunda.util.ObjectBuilder; import java.time.OffsetDateTime; import java.time.Period; import java.util.Date; import java.util.List; -import java.util.Set; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -314,6 +316,30 @@ protected Long findFlowNodeInstanceKey(String activityInstanceId) { return dbClient.findC8KeyByC7IdAndType(activityInstanceId, HISTORY_FLOW_NODE); } + /** + * Finds the C8 flow node instance key by C7 activity ID and process instance ID. + * + * @param activityId the C7 activity ID + * @param processInstanceId the C7 process instance ID + * @return the C8 flow node instance key, or {@code null} if not found + */ + protected Long findFlowNodeInstanceKey(String activityId, String processInstanceId) { + Long processInstanceKey = dbClient.findC8KeyByC7IdAndType(processInstanceId, HISTORY_PROCESS_INSTANCE); + if (processInstanceKey == null) { + return null; + } + + List flowNodes = c8Client.searchFlowNodeInstances(FlowNodeInstanceDbQuery.of( + builder -> builder.filter(FlowNodeInstanceFilter.of( + filter -> filter.flowNodeIds(activityId).processInstanceKeys(processInstanceKey))))); + + if (!flowNodes.isEmpty()) { + return flowNodes.getFirst().flowNodeInstanceKey(); + } else { + return null; + } + } + protected boolean isMigrated(String id, TYPE type) { return dbClient.checkHasC8KeyByC7IdAndType(id, type); } diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/IncidentMigrator.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/IncidentMigrator.java index 9b9b65538..2da79b881 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/IncidentMigrator.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/IncidentMigrator.java @@ -8,25 +8,23 @@ package io.camunda.migration.data.impl.history.migrator; import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_FLOW_NODE; +import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_JOB_REFERENCE; import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_PROCESS_DEFINITION; import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_PROCESS_INSTANCE; import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_INCIDENT; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_JOB; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_INSTANCE; +import static org.camunda.bpm.engine.runtime.Incident.FAILED_JOB_HANDLER_TYPE; -import io.camunda.db.rdbms.read.domain.FlowNodeInstanceDbQuery; -import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel; import io.camunda.db.rdbms.write.domain.IncidentDbModel; import io.camunda.db.rdbms.write.domain.IncidentDbModel.Builder; -import io.camunda.migration.data.exception.EntityInterceptorException; import io.camunda.migration.data.impl.history.C7Entity; import io.camunda.migration.data.impl.history.EntitySkippedException; import io.camunda.migration.data.impl.logging.HistoryMigratorLogs; import io.camunda.migration.data.impl.persistence.IdKeyMapper; import io.camunda.search.entities.ProcessInstanceEntity; -import io.camunda.search.filter.FlowNodeInstanceFilter; import java.util.Date; -import java.util.List; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Function; @@ -63,17 +61,16 @@ public IdKeyMapper.TYPE getType() { * *

Skip scenarios: *

    - *
  • Process instance key missing - skipped with {@code SKIP_REASON_MISSING_PROCESS_INSTANCE_KEY}
  • *
  • Process definition not yet migrated - skipped with {@code SKIP_REASON_MISSING_PROCESS_DEFINITION}
  • + *
  • Process instance not yet migrated - skipped with {@code SKIP_REASON_MISSING_PROCESS_INSTANCE}
  • *
  • Root process instance not yet migrated (when part of a process hierarchy) - skipped with {@code SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE}
  • - *
  • Interceptor error during conversion - skipped with the exception message
  • + *
  • Flow node instance not found and activity is not in async-before waiting state - skipped with {@code SKIP_REASON_MISSING_FLOW_NODE}
  • + *
  • Failed job incident whose referenced job was skipped (no C8 key) - skipped with {@code SKIP_REASON_MISSING_JOB_REFERENCE}
  • *
* - *

Note: Flow node instance and job reference validations are currently disabled - * pending resolution of known issues. See code comments for details. - * * @param c7Incident the historic incident from Camunda 7 to be migrated - * @throws EntityInterceptorException if an error occurs during entity conversion (handled internally, entity marked as skipped) + * @return the C8 incident key if the incident was migrated, or {@code null} if it was already migrated + * * @throws EntityInterceptorException if an error occurs during entity conversion (handled internally, entity marked as skipped) */ @Override public Long migrateTransactionally(HistoricIncident c7Incident) { @@ -81,8 +78,8 @@ public Long migrateTransactionally(HistoricIncident c7Incident) { if (shouldMigrate(c7IncidentId, HISTORY_INCIDENT)) { HistoryMigratorLogs.migratingHistoricIncident(c7IncidentId); var c7ProcessInstance = findProcessInstanceByC7Id(c7Incident.getProcessInstanceId()); - Long processInstanceKey = null; - Long flowNodeInstanceKey = null; + Long processInstanceKey; + Long flowNodeInstanceKey; var builder = new Builder(); var processDefinitionKey = findProcessDefinitionKey(c7Incident.getProcessDefinitionId()); @@ -93,7 +90,6 @@ public Long migrateTransactionally(HistoricIncident c7Incident) { if (processInstanceKey != null) { flowNodeInstanceKey = findFlowNodeInstanceKey(c7Incident.getActivityId(), c7Incident.getProcessInstanceId()); builder.flowNodeInstanceKey(flowNodeInstanceKey); - // .jobKey(jobDefinitionKey) // TODO when jobs are migrated String c7RootProcessInstanceId = c7Incident.getRootProcessInstanceId(); if (c7RootProcessInstanceId != null && isMigrated(c7RootProcessInstanceId, HISTORY_PROCESS_INSTANCE)) { @@ -106,6 +102,8 @@ public Long migrateTransactionally(HistoricIncident c7Incident) { } } + resolveJobKey(c7Incident, builder); + IncidentDbModel dbModel = convert(C7Entity.of(c7Incident), builder); if (dbModel.processDefinitionKey() == null) { @@ -127,9 +125,11 @@ public Long migrateTransactionally(HistoricIncident c7Incident) { } } - if (dbModel.jobKey() == null) { - // throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_JOB_REFERENCE); // TODO when jobs are migrated + if (isFailedJobIncident(c7Incident) && dbModel.jobKey() == null) { + // Only skip the incident when the referenced C7 job is actually tracked as migrated/skipped. + throw new EntitySkippedException(c7Incident, SKIP_REASON_MISSING_JOB_REFERENCE); } + c8Client.insertIncident(dbModel); return dbModel.incidentKey(); @@ -138,23 +138,6 @@ public Long migrateTransactionally(HistoricIncident c7Incident) { return null; } - protected Long findFlowNodeInstanceKey(String activityId, String processInstanceId) { - Long processInstanceKey = dbClient.findC8KeyByC7IdAndType(processInstanceId, HISTORY_PROCESS_INSTANCE); - if (processInstanceKey == null) { - return null; - } - - List flowNodes = c8Client.searchFlowNodeInstances(FlowNodeInstanceDbQuery.of( - builder -> builder.filter(FlowNodeInstanceFilter.of( - filter -> filter.flowNodeIds(activityId).processInstanceKeys(processInstanceKey))))); - - if (!flowNodes.isEmpty()) { - return flowNodes.getFirst().flowNodeInstanceKey(); - } else { - return null; - } - } - /** * Generates a tree path for incidents in the format: PI_processInstanceKey/FNI_elementInstanceKey (if the * elementInstanceKey exists, otherwise PI_processInstanceKey) @@ -168,5 +151,45 @@ public static String generateTreePath(Long processInstanceKey, Long elementInsta "PI_" + processInstanceKey : "PI_" + processInstanceKey + "/FNI_" + elementInstanceKey; } + + /** + * Resolves and sets the job key on the incident builder for {@code failedJob} incidents. + *

+ * If the incident is a {@code failedJob} incident and the associated C7 job has been tracked + * in the migration table, the C8 job key is looked up and set on the builder. The key will be + * {@code null} if the job was skipped during migration, which will cause the incident to be + * skipped downstream via {@code SKIP_REASON_MISSING_JOB_REFERENCE}. + *

+ *

+ * If the job is not yet tracked (job migration may not have run or the job type is not + * tracked), the incident proceeds without a job key. + *

+ * + * @param c7Incident the Camunda 7 incident + * @param builder the incident builder to set the job key on + */ + protected void resolveJobKey(final HistoricIncident c7Incident, final Builder builder) { + if (!isFailedJobIncident(c7Incident)) { + return; + } + final String c7JobId = c7Incident.getConfiguration(); + if (c7JobId == null) { + return; + } + if (dbClient.checkExistsByC7IdAndType(c7JobId, HISTORY_JOB)) { + final Long jobKey = dbClient.findC8KeyByC7IdAndType(c7JobId, HISTORY_JOB); + builder.jobKey(jobKey); + } + } + + /** + * Returns true if this incident was caused by a job failure. + * + * @param c7Incident the Camunda 7 incident + * @return true for {@code failedJob} incident type + */ + protected boolean isFailedJobIncident(final HistoricIncident c7Incident) { + return FAILED_JOB_HANDLER_TYPE.equals(c7Incident.getIncidentType()); + } } diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/JobMigrator.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/JobMigrator.java new file mode 100644 index 000000000..b61ad379a --- /dev/null +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/history/migrator/JobMigrator.java @@ -0,0 +1,142 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Camunda License 1.0. You may not use this file + * except in compliance with the Camunda License 1.0. + */ +package io.camunda.migration.data.impl.history.migrator; + +import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_PROCESS_DEFINITION; +import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_PROCESS_INSTANCE; +import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE; +import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_UNSUPPORTED_JOBS; +import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.logMigratingJob; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_JOB; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_INSTANCE; +import static io.camunda.migration.data.impl.util.ConverterUtil.getNextKey; +import static org.camunda.bpm.engine.impl.jobexecutor.MessageJobDeclaration.ASYNC_AFTER; +import static org.camunda.bpm.engine.impl.jobexecutor.MessageJobDeclaration.ASYNC_BEFORE; + +import io.camunda.db.rdbms.write.domain.JobDbModel; +import io.camunda.migration.data.impl.history.C7Entity; +import io.camunda.migration.data.impl.history.EntitySkippedException; +import io.camunda.migration.data.impl.persistence.IdKeyMapper; +import io.camunda.search.entities.ProcessInstanceEntity; +import java.util.Date; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; +import org.camunda.bpm.engine.history.HistoricJobLog; +import org.springframework.stereotype.Service; + +/** + * Service class responsible for migrating historic job log entries from Camunda 7 to Camunda 8. + *

+ * Job logs in Camunda 7 record lifecycle events (creation, execution, failure, deletion) for each + * job. This migrator converts C7 job log entries to C8 {@link JobDbModel} records, tracking each + * C7 job by its job ID so that only one C8 record is created per C7 job. + *

+ *

+ * The C8 job key is stored in the migration tracking table keyed by the C7 job ID. This allows + * the {@link IncidentMigrator} to look up the C8 job key when migrating {@code failedJob} + * incidents. + *

+ */ +@Service +public class JobMigrator extends HistoryEntityMigrator { + + @Override + public BiConsumer, Date> fetchForMigrateHandler() { + return c7Client::fetchAndHandleHistoricJobLogs; + } + + @Override + public Function fetchForRetryHandler() { + return c7Client::getHistoricJobLog; + } + + @Override + public IdKeyMapper.TYPE getType() { + return HISTORY_JOB; + } + + /** + * Migrates a single historic job log entry from Camunda 7 to Camunda 8. + *

+ * Uses the C7 job ID as the tracking key, ensuring that only one C8 job record is created + * per C7 job across multiple log entries. + *

+ * + *

Skip scenarios: + *

    + *
  • Job already tracked in the migration table - silently skipped, returns {@code null}
  • + *
  • Unsupported job type (not async-before or async-after) - skipped with + * {@code SKIP_REASON_UNSUPPORTED_JOBS}
  • + *
  • Process definition not yet migrated - skipped with + * {@code SKIP_REASON_MISSING_PROCESS_DEFINITION}
  • + *
  • Process instance not yet migrated - skipped with + * {@code SKIP_REASON_MISSING_PROCESS_INSTANCE}
  • + *
  • Root process instance not yet migrated (when part of a process hierarchy) - skipped with + * {@code SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE}
  • + *
+ * + * @param c7JobLog the Camunda 7 historic job log entry to migrate + * @return the C8 job key, or {@code null} if already migrated + */ + @Override + public Long migrateTransactionally(final HistoricJobLog c7JobLog) { + final String c7JobId = c7JobLog.getJobId(); + if (shouldMigrate(c7JobId, HISTORY_JOB)) { + String jobDefinitionConfiguration = c7JobLog.getJobDefinitionConfiguration(); + logMigratingJob(c7JobId); + if (!ASYNC_BEFORE.equals(jobDefinitionConfiguration) && !ASYNC_AFTER.equals(jobDefinitionConfiguration)) { + throw new EntitySkippedException(c7JobLog, SKIP_REASON_UNSUPPORTED_JOBS); + } + + final var jobKey = getNextKey(); + final var builder = new JobDbModel.Builder().jobKey(jobKey); + + final var processDefinitionKey = findProcessDefinitionKey(c7JobLog.getProcessDefinitionId()); + builder.processDefinitionKey(processDefinitionKey); + final String c7ProcessInstanceId = c7JobLog.getProcessInstanceId(); + final ProcessInstanceEntity processInstance = findProcessInstanceByC7Id(c7ProcessInstanceId); + if (processInstance != null) { + builder.processInstanceKey(processInstance.processInstanceKey()); + + final String c7RootProcessInstanceId = c7JobLog.getRootProcessInstanceId(); + if (c7RootProcessInstanceId != null && isMigrated(c7RootProcessInstanceId, HISTORY_PROCESS_INSTANCE)) { + final ProcessInstanceEntity rootProcessInstance = findProcessInstanceByC7Id(c7RootProcessInstanceId); + if (rootProcessInstance != null && rootProcessInstance.processInstanceKey() != null) { + builder.rootProcessInstanceKey(rootProcessInstance.processInstanceKey()); + } + } + + final Long elementInstanceKey = findFlowNodeInstanceKey( + c7JobLog.getActivityId(), c7ProcessInstanceId); + builder.elementInstanceKey(elementInstanceKey); + } + + final JobDbModel dbModel = convert(C7Entity.of(c7JobLog), builder); + + if (dbModel.processDefinitionKey() == null) { + throw new EntitySkippedException(c7JobLog, SKIP_REASON_MISSING_PROCESS_DEFINITION); + } + + if (dbModel.processInstanceKey() == null) { + throw new EntitySkippedException(c7JobLog, SKIP_REASON_MISSING_PROCESS_INSTANCE); + } + + if (dbModel.rootProcessInstanceKey() == null) { + throw new EntitySkippedException(c7JobLog, SKIP_REASON_MISSING_ROOT_PROCESS_INSTANCE); + } + + c8Client.insertJob(dbModel); + + return jobKey; + } + + return null; + } + +} diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/interceptor/history/entity/IncidentTransformer.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/interceptor/history/entity/IncidentTransformer.java index 9b51b315b..98d5e04a7 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/interceptor/history/entity/IncidentTransformer.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/interceptor/history/entity/IncidentTransformer.java @@ -49,7 +49,6 @@ public void execute(HistoricIncident entity, Builder builder) { .creationDate(convertDate(entity.getCreateTime())) .errorMessageHash(null) .partitionId(C7_HISTORY_EXPORTER_PARTITION_ID) - .jobKey(null) .state(IncidentEntity.IncidentState.RESOLVED) .tenantId(getTenantId(entity.getTenantId())); // Note: processDefinitionKey, processInstanceKey, jobKey, and flowNodeInstanceKey are set externally diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/interceptor/history/entity/JobTransformer.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/interceptor/history/entity/JobTransformer.java new file mode 100644 index 000000000..c05e4e9d8 --- /dev/null +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/interceptor/history/entity/JobTransformer.java @@ -0,0 +1,72 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Camunda License 1.0. You may not use this file + * except in compliance with the Camunda License 1.0. + */ +package io.camunda.migration.data.impl.interceptor.history.entity; + +import static io.camunda.migration.data.constants.MigratorConstants.C7_HISTORY_PARTITION_ID; +import static io.camunda.migration.data.impl.util.ConverterUtil.convertDate; +import static io.camunda.migration.data.impl.util.ConverterUtil.getTenantId; +import static io.camunda.migration.data.impl.util.ConverterUtil.prefixDefinitionId; + +import io.camunda.db.rdbms.write.domain.JobDbModel; +import io.camunda.migration.data.interceptor.EntityInterceptor; +import io.camunda.search.entities.JobEntity.JobKind; +import io.camunda.search.entities.JobEntity.JobState; +import io.camunda.search.entities.JobEntity.ListenerEventType; +import java.util.Set; +import org.camunda.bpm.engine.history.HistoricJobLog; +import org.springframework.core.annotation.Order; +import org.springframework.stereotype.Component; + +/** + * Transformer for converting Camunda 7 HistoricJobLog to Camunda 8 JobDbModel. + *

+ * This transformer handles the conversion of historic job log entries from Camunda 7 + * to the Camunda 8 job format. It maps job state, retries, error details, timing, and + * contextual information. + *

+ *

+ * Note: Fields requiring database lookups (processDefinitionKey, processInstanceKey, + * rootProcessInstanceKey, elementInstanceKey, jobKey) are set externally by JobMigrator. + *

+ */ +@Order(13) +@Component +public class JobTransformer implements EntityInterceptor { + + @Override + public Set> getTypes() { + return Set.of(HistoricJobLog.class); + } + + /** + * Executes the transformation of a Camunda 7 HistoricJobLog to Camunda 8 JobDbModel. + * + * @param historicJobLog the Camunda 7 historic job log entry to transform + * @param builder the Camunda 8 job builder to populate with converted data + */ + @Override + public void execute(final HistoricJobLog historicJobLog, final JobDbModel.Builder builder) { + final var creationTime = convertDate(historicJobLog.getTimestamp()); + + builder + .type(historicJobLog.getJobDefinitionType()) + .worker(historicJobLog.getHostname()) + .state(JobState.COMPLETED) + .kind(JobKind.BPMN_ELEMENT) + .listenerEventType(ListenerEventType.UNSPECIFIED) + .retries(0) + .processDefinitionId(prefixDefinitionId(historicJobLog.getProcessDefinitionKey())) + .elementId(historicJobLog.getActivityId()) + .tenantId(getTenantId(historicJobLog.getTenantId())) + .partitionId(C7_HISTORY_PARTITION_ID) + .creationTime(creationTime); + // Note: jobKey, processDefinitionKey, processInstanceKey, rootProcessInstanceKey, + // and elementInstanceKey are set externally in JobMigrator. + } + +} diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/logging/C8ClientLogs.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/logging/C8ClientLogs.java index 387098b94..228199e67 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/logging/C8ClientLogs.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/logging/C8ClientLogs.java @@ -43,6 +43,8 @@ public class C8ClientLogs { public static final String FAILED_TO_SEARCH_USER_TASKS = "Failed to search user tasks"; public static final String FAILED_TO_INSERT_AUDIT_LOG = "Failed to insert audit log"; public static final String FAILED_TO_MIGRATE_AUTHORIZATION = "Failed to migrate authorization with legacy ID: "; - public static final String FAILED_TO_CREATE_TENANT_USER_MEMBERSHIP = "Failed to create tenant membership for tenant [%s] and user [%s]"; - public static final String FAILED_TO_CREATE_TENANT_GROUP_MEMBERSHIP = "Failed to create tenant membership for tenant [%s] and group [%s]"; + public static final String FAILED_TO_CREATE_TENANT_USER_MEMBERSHIP = "Failed to create tenant membership for tenant [%s] and user [%s], check logs for details"; + public static final String FAILED_TO_CREATE_TENANT_GROUP_MEMBERSHIP = "Failed to create tenant membership for tenant [%s] and group [%s], check logs for details"; + public static final String FAILED_TO_INSERT_JOB = "Failed to insert job"; + } diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/logging/HistoryMigratorLogs.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/logging/HistoryMigratorLogs.java index a90d7a6ee..34bcea586 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/logging/HistoryMigratorLogs.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/logging/HistoryMigratorLogs.java @@ -41,6 +41,8 @@ public class HistoryMigratorLogs { public static final String SKIP_REASON_UNSUPPORTED_SA_TASKS = "C7 standalone user tasks not supported in C8."; public static final String SKIP_REASON_UNSUPPORTED_CMMN_VARIABLES = "C7 CMMN variables not supported in C8."; public static final String SKIP_REASON_UNSUPPORTED_CMMN_TASKS = "C7 CMMN user tasks not supported in C8."; + public static final String SKIP_REASON_MISSING_JOB_REFERENCE = "Missing job reference"; + public static final String SKIP_REASON_UNSUPPORTED_JOBS = "Only async-before and async-after jobs are supported for migration."; // HistoryMigrator Messages public static final String MIGRATING = "Migrating {}s."; @@ -63,6 +65,8 @@ public class HistoryMigratorLogs { public static final String MIGRATING_AUDIT_LOGS = "Migrating audit logs with C7 ID: [{}]"; + public static final String MIGRATING_JOB = "Migrating historic job with C7 job ID: [{}]"; + public static final String SKIPPING = "Migration of {} with C7 ID [{}] skipped. {}"; public static final String MIGRATION_COMPLETED = "Migration of {} with C7 ID [{}] completed."; @@ -115,6 +119,10 @@ public static void logMigratingAuditLogs(String c7AuditLogId) { LOGGER.debug(MIGRATING_AUDIT_LOGS, c7AuditLogId); } + public static void logMigratingJob(String c7JobId) { + LOGGER.debug(MIGRATING_JOB, c7JobId); + } + public static void migratingHistoricFlowNode(String c7FlowNodeId) { LOGGER.debug(MIGRATING_FLOW_NODE, c7FlowNodeId); } diff --git a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/persistence/IdKeyMapper.java b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/persistence/IdKeyMapper.java index 9777a1bb3..4008deee7 100644 --- a/data-migrator/core/src/main/java/io/camunda/migration/data/impl/persistence/IdKeyMapper.java +++ b/data-migrator/core/src/main/java/io/camunda/migration/data/impl/persistence/IdKeyMapper.java @@ -24,6 +24,7 @@ import org.camunda.bpm.engine.impl.history.event.UserOperationLogEntryEventEntity; import org.camunda.bpm.engine.impl.persistence.entity.HistoricActivityInstanceEntity; import org.camunda.bpm.engine.impl.persistence.entity.HistoricIncidentEntity; +import org.camunda.bpm.engine.impl.persistence.entity.HistoricJobLogEventEntity; import org.camunda.bpm.engine.impl.persistence.entity.HistoricProcessInstanceEntity; import org.camunda.bpm.engine.impl.persistence.entity.HistoricTaskInstanceEntity; import org.camunda.bpm.engine.impl.persistence.entity.HistoricVariableInstanceEntity; @@ -42,7 +43,8 @@ public interface IdKeyMapper { entry(DecisionDefinitionEntity.class, TYPE.HISTORY_DECISION_DEFINITION), entry(DecisionRequirementsDefinitionEntity.class, TYPE.HISTORY_DECISION_REQUIREMENT), entry(UserOperationLogEntryEventEntity.class, TYPE.HISTORY_AUDIT_LOG), - entry(CamundaFormDefinitionEntity.class, TYPE.HISTORY_FORM_DEFINITION) + entry(CamundaFormDefinitionEntity.class, TYPE.HISTORY_FORM_DEFINITION), + entry(HistoricJobLogEventEntity.class, TYPE.HISTORY_JOB) ); enum TYPE { @@ -58,6 +60,7 @@ enum TYPE { HISTORY_DECISION_REQUIREMENT("Historic Decision Requirement"), HISTORY_AUDIT_LOG("Historic Audit Log"), HISTORY_FORM_DEFINITION("Historic Form Definition"), + HISTORY_JOB("Historic Job"), // runtime RUNTIME_PROCESS_INSTANCE("Process Instance"), diff --git a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationAbstractTest.java b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationAbstractTest.java index 2d4577df8..0a49ab952 100644 --- a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationAbstractTest.java +++ b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationAbstractTest.java @@ -24,7 +24,6 @@ import io.camunda.db.rdbms.write.domain.IncidentDbModel; import io.camunda.db.rdbms.write.service.RdbmsPurger; import io.camunda.migration.data.HistoryMigrator; -import io.camunda.migration.data.MigratorMode; import io.camunda.migration.data.config.C8DataSourceConfigured; import io.camunda.migration.data.config.MigratorAutoConfiguration; import io.camunda.migration.data.impl.clients.DbClient; @@ -39,19 +38,21 @@ import io.camunda.search.entities.FlowNodeInstanceEntity; import io.camunda.search.entities.FormEntity; import io.camunda.search.entities.IncidentEntity; +import io.camunda.search.entities.JobEntity; import io.camunda.search.entities.ProcessDefinitionEntity; import io.camunda.search.entities.ProcessInstanceEntity; import io.camunda.search.entities.UserTaskEntity; import io.camunda.search.entities.VariableEntity; -import io.camunda.search.query.AuditLogQuery; import io.camunda.search.filter.FilterBuilders; import io.camunda.search.page.SearchQueryPage; +import io.camunda.search.query.AuditLogQuery; import io.camunda.search.query.DecisionDefinitionQuery; import io.camunda.search.query.DecisionInstanceQuery; import io.camunda.search.query.DecisionRequirementsQuery; import io.camunda.search.query.FlowNodeInstanceQuery; import io.camunda.search.query.FormQuery; import io.camunda.search.query.IncidentQuery; +import io.camunda.search.query.JobQuery; import io.camunda.search.query.ProcessDefinitionQuery; import io.camunda.search.query.ProcessInstanceQuery; import io.camunda.search.query.UserTaskQuery; @@ -275,6 +276,20 @@ public List searchHistoricVariables() { .items(); } + public List searchJobs(long processInstanceKey) { + return rdbmsService.getJobReader() + .search(JobQuery.of(queryBuilder -> + queryBuilder.filter(filterBuilder -> + filterBuilder.processInstanceKeys(processInstanceKey)))) + .items(); + } + + public List searchJobs() { + return rdbmsService.getJobReader() + .search(new JobQuery(FilterBuilders.job().build(), SortOptionBuilders.job().build(), SearchQueryPage.of((b) -> b))) + .items(); + } + public List searchForms(String... formIds) { FormDbReader formReader = rdbmsService.getFormReader(); if (formIds.length != 0) { diff --git a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationListSkippedTest.java b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationListSkippedTest.java index c759c4e45..548954b75 100644 --- a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationListSkippedTest.java +++ b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationListSkippedTest.java @@ -7,13 +7,18 @@ */ package io.camunda.migration.data.qa.history; -import static io.camunda.migration.data.MigratorMode.LIST_SKIPPED; import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIPPING; import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_PROCESS_DEFINITION; import static io.camunda.migration.data.impl.logging.HistoryMigratorLogs.SKIP_REASON_MISSING_PROCESS_INSTANCE; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_AUDIT_LOG; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_DECISION_DEFINITION; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_DECISION_INSTANCE; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_DECISION_REQUIREMENT; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_FLOW_NODE; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_FORM_DEFINITION; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_INCIDENT; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_JOB; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_INSTANCE; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_USER_TASK; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_VARIABLE; @@ -241,12 +246,13 @@ protected void verifySkippedEntitiesOutput(Map> skippedEnti HISTORY_FLOW_NODE.getDisplayName(), HISTORY_USER_TASK.getDisplayName(), HISTORY_VARIABLE.getDisplayName(), - TYPE.HISTORY_AUDIT_LOG.getDisplayName(), + HISTORY_AUDIT_LOG.getDisplayName(), + HISTORY_JOB.getDisplayName(), HISTORY_INCIDENT.getDisplayName(), - TYPE.HISTORY_DECISION_DEFINITION.getDisplayName(), - TYPE.HISTORY_DECISION_REQUIREMENT.getDisplayName(), - TYPE.HISTORY_DECISION_INSTANCE.getDisplayName(), - TYPE.HISTORY_FORM_DEFINITION.getDisplayName() + HISTORY_DECISION_DEFINITION.getDisplayName(), + HISTORY_DECISION_REQUIREMENT.getDisplayName(), + HISTORY_DECISION_INSTANCE.getDisplayName(), + HISTORY_FORM_DEFINITION.getDisplayName() }; assertThat(skippedEntitiesByType.keySet().toArray()).containsExactlyInAnyOrder(expectedEntityTypes); diff --git a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationSkippingTest.java b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationSkippingTest.java index f06e6d9bc..bc8e68954 100644 --- a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationSkippingTest.java +++ b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/HistoryMigrationSkippingTest.java @@ -20,6 +20,7 @@ import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_FLOW_NODE; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_FORM_DEFINITION; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_INCIDENT; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_JOB; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_DEFINITION; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_INSTANCE; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_USER_TASK; @@ -30,15 +31,12 @@ import static org.camunda.bpm.engine.variable.Variables.stringValue; import io.camunda.migration.data.HistoryMigrator; -import io.camunda.migration.data.MigratorMode; -import io.camunda.migration.data.qa.util.WhiteBox; import io.camunda.search.entities.ProcessInstanceEntity; import io.github.netmikey.logunit.api.LogCapturer; import java.util.Map; import org.camunda.bpm.engine.HistoryService; import org.camunda.bpm.engine.task.Task; import org.camunda.bpm.engine.variable.Variables; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.junit.jupiter.api.extension.RegisterExtension; @@ -245,8 +243,6 @@ public void shouldNotMigrateAlreadySkippedIncident() { ).hasSize(1); // Only the first skip from phase 1 } - @Disabled("TODO: https://github.com/camunda/camunda-bpm-platform/issues/5331") - @WhiteBox @Test public void shouldNotMigrateIncidentsWhenJobIsSkipped() { // given state in c7 with a failing service task @@ -258,26 +254,55 @@ public void shouldNotMigrateIncidentsWhenJobIsSkipped() { assertThat(jobs).hasSize(1); var job = jobs.getFirst(); - try { - managementService.executeJob(job.getId()); - } catch (Exception e) { - // expected - job will fail + for (int i = 0; i < 3; i++) { + try { + managementService.executeJob(job.getId()); + } catch (Exception e) { + // expected - job will fail + } } + assertThat(historyService.createHistoricIncidentQuery().count()) + .as("Expected one incident to be created").isEqualTo(1); - // and manually mark the job as skipped - dbClient.insert(job.getId(), null, HISTORY_FLOW_NODE); - - // when history is migrated - historyMigrator.migrate(); + // when history is migrated but jobs are skipped + historyMigrator.migrateByType(HISTORY_PROCESS_DEFINITION); + historyMigrator.migrateByType(HISTORY_PROCESS_INSTANCE); + historyMigrator.migrateByType(HISTORY_INCIDENT); // then process instance was migrated but incident was skipped due to skipped job var historicProcesses = searchHistoricProcessInstances("failingServiceTaskProcessId"); assertThat(historicProcesses).hasSize(1); ProcessInstanceEntity c8processInstance = historicProcesses.getFirst(); assertThat(searchHistoricIncidents(c8processInstance.processDefinitionId())).isEmpty(); + } - // verify the incident was skipped - assertThat(dbClient.countSkippedByType(HISTORY_INCIDENT)).isEqualTo(1); + @Test + public void shouldNotMigrateJobsWhenProcessInstanceIsSkipped() { + // given state in c7 with a failing service task + deployer.deployCamunda7Process("failingServiceTaskProcess.bpmn"); + var processInstance = runtimeService.startProcessInstanceByKey("failingServiceTaskProcessId"); + + // execute the job to trigger the incident + var jobs = managementService.createJobQuery().list(); + assertThat(jobs).hasSize(1); + var job = jobs.getFirst(); + + for (int i = 0; i < 3; i++) { + try { + managementService.executeJob(job.getId()); + } catch (Exception e) { + // expected - job will fail + } + } + + // when + historyMigrator.migrateByType(HISTORY_PROCESS_DEFINITION); + historyMigrator.migrateByType(HISTORY_JOB); + + // then + assertThat(searchJobs()).isEmpty(); + logs.assertContains( + formatMessage(SKIPPING, HISTORY_JOB.getDisplayName(), job.getId(), SKIP_REASON_MISSING_PROCESS_INSTANCE)); } @Test diff --git a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/TransactionRollbackTest.java b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/TransactionRollbackTest.java index d6936a1af..e1d028c6c 100644 --- a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/TransactionRollbackTest.java +++ b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/TransactionRollbackTest.java @@ -12,6 +12,7 @@ import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_DECISION_REQUIREMENT; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_FLOW_NODE; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_INCIDENT; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_JOB; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_DEFINITION; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_INSTANCE; import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_USER_TASK; @@ -20,7 +21,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.camunda.bpm.engine.variable.Variables.createVariables; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.clearInvocations; import static org.mockito.Mockito.doAnswer; @@ -231,6 +231,7 @@ public void shouldRollbackIncidentInsertWhenMappingFails() { // Migrate prerequisites historyMigrator.migrateByType(HISTORY_PROCESS_DEFINITION); historyMigrator.migrateByType(HISTORY_PROCESS_INSTANCE); + historyMigrator.migrateByType(HISTORY_JOB); // when/then - test rollback behavior String c7Id = historyService.createHistoricIncidentQuery().singleResult().getId(); diff --git a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryAuditLogUserTaskTest.java b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryAuditLogUserTaskTest.java index 9aea5f8ee..6d5a604c5 100644 --- a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryAuditLogUserTaskTest.java +++ b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryAuditLogUserTaskTest.java @@ -22,9 +22,7 @@ import org.camunda.bpm.engine.IdentityService; import org.camunda.bpm.engine.history.UserOperationLogEntry; import org.camunda.bpm.engine.runtime.ProcessInstance; -import org.camunda.bpm.engine.task.Task; import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; @@ -241,32 +239,6 @@ public void shouldMigrateAuditLogsForDelegateTask() { assertAuditLogProperties(logs, AuditLogEntity.AuditLogOperationType.ASSIGN); } - @Test - @Disabled - public void shouldMigrateAuditLogsForDeleteTask() { - // given - Task task = taskService.newTask(); - taskService.saveTask(task); - - // Delete a user task to generate audit logs - identityService.setAuthenticatedUserId("demo"); - taskService.deleteTask(task.getId()); - - // Verify audit logs exist in C7 - long auditLogCount = historyService.createUserOperationLogQuery() - .operationType("Delete") - .count(); - assertThat(auditLogCount).isEqualTo(1); - - // when - historyMigrator.migrate(); - - // then - List logs = searchAuditLogsByCategory(AuditLogEntity.AuditLogOperationCategory.USER_TASKS.name()); - assertThat(logs).hasSize(1); - assertAuditLogProperties(logs, AuditLogEntity.AuditLogOperationType.DELETE); - } - @Test public void shouldMigrateAuditLogsForResolveTask() { // given diff --git a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryIncidentTest.java b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryIncidentTest.java index 90758c2cf..ad8d4937b 100644 --- a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryIncidentTest.java +++ b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryIncidentTest.java @@ -16,19 +16,14 @@ import static io.camunda.search.entities.IncidentEntity.ErrorType.RESOURCE_NOT_FOUND; import static io.camunda.search.entities.IncidentEntity.ErrorType.UNHANDLED_ERROR_EVENT; import static io.camunda.search.entities.IncidentEntity.ErrorType.UNKNOWN; -import static io.camunda.search.entities.FlowNodeInstanceEntity.FlowNodeType.END_EVENT; -import static io.camunda.search.entities.FlowNodeInstanceEntity.FlowNodeType.START_EVENT; -import static io.camunda.search.entities.FlowNodeInstanceEntity.FlowNodeType.USER_TASK; import static org.assertj.core.api.Assertions.assertThat; -import static org.camunda.bpm.engine.impl.incident.IncidentHandling.createIncident; -import static org.camunda.bpm.engine.impl.jobexecutor.ExecuteJobHelper.executeJob; import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel; import io.camunda.db.rdbms.write.domain.IncidentDbModel; import io.camunda.migration.data.qa.history.HistoryMigrationAbstractTest; import io.camunda.search.entities.IncidentEntity; -import java.util.Collections; import io.camunda.search.entities.ProcessInstanceEntity; +import java.util.Collections; import java.util.List; import org.camunda.bpm.engine.history.HistoricIncident; import org.camunda.bpm.engine.impl.cfg.ProcessEngineConfigurationImpl; @@ -143,7 +138,7 @@ public void shouldMigrateIncidentForNestedProcessInstance() { List childIncidents = searchHistoricIncidents(childProcess.getProcessDefinitionKey()); assertThat(childIncidents).hasSize(1); assertOnIncidentBasicFields(childIncidents.getFirst(), c7ChildIncident, childProcess, parentProcess, UNKNOWN, - false); + false, false); // parent incident is migrated List parentIncidents = searchHistoricIncidents(parentProcess.getProcessDefinitionKey()); @@ -268,7 +263,7 @@ public void shouldMigrateIncidentBasicFieldsForServiceTask() { List incidents = searchHistoricIncidents("incidentProcessId"); assertThat(incidents).hasSize(1); IncidentEntity c8Incident = incidents.getFirst(); - assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, RESOURCE_NOT_FOUND, true); + assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, RESOURCE_NOT_FOUND, true, true); } @Test @@ -290,7 +285,7 @@ public void shouldMigrateIncidentWithFormNotFoundErrorType() { List incidents = searchHistoricIncidents("formNotFoundProcessId"); assertThat(incidents).hasSize(1); IncidentEntity c8Incident = incidents.getFirst(); - assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, FORM_NOT_FOUND, true); + assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, FORM_NOT_FOUND, true, true); } @Test @@ -314,7 +309,7 @@ public void shouldMigrateIncidentWithDecisionEvaluationErrorType() { List incidents = searchHistoricIncidents("ruleTaskProcessId"); assertThat(incidents).hasSize(1); IncidentEntity c8Incident = incidents.getFirst(); - assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, DECISION_EVALUATION_ERROR, true); + assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, DECISION_EVALUATION_ERROR, true, true); } @Test @@ -336,7 +331,7 @@ public void shouldMigrateIncidentWithConditionalErrorType() { List incidents = searchHistoricIncidents("conditionErrorProcessId"); assertThat(incidents).hasSize(1); IncidentEntity c8Incident = incidents.getFirst(); - assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, CONDITION_ERROR, true); + assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, CONDITION_ERROR, true, true); } @Test @@ -359,7 +354,7 @@ public void shouldMigrateIncidentWithUnhandledErrorType() { List incidents = searchHistoricIncidents("unhandledErrorProcessId"); assertThat(incidents).hasSize(1); IncidentEntity c8Incident = incidents.getFirst(); - assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, UNHANDLED_ERROR_EVENT, true); + assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, UNHANDLED_ERROR_EVENT, true, true); } @Test @@ -382,7 +377,7 @@ public void shouldMigrateIncidentWithNoJobRetriesErrorType() { List incidents = searchHistoricIncidents("noJobRetriesProcessId"); assertThat(incidents).hasSize(1); IncidentEntity c8Incident = incidents.getFirst(); - assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, JOB_NO_RETRIES, true); + assertOnIncidentBasicFields(c8Incident, c7Incident, c7ProcessInstance, null, JOB_NO_RETRIES, true, true); } @Test @@ -449,7 +444,7 @@ protected void assertOnIncidentBasicFields(IncidentEntity c8Incident, HistoricIncident c7Incident, ProcessInstance c7ChildInstance, ProcessInstance c7ParentInstance) { - assertOnIncidentBasicFields(c8Incident, c7Incident, c7ChildInstance, c7ParentInstance, UNKNOWN, false); + assertOnIncidentBasicFields(c8Incident, c7Incident, c7ChildInstance, c7ParentInstance, UNKNOWN, false, false); } protected void assertOnIncidentBasicFields(IncidentEntity c8Incident, @@ -457,7 +452,8 @@ protected void assertOnIncidentBasicFields(IncidentEntity c8Incident, ProcessInstance c7ChildInstance, ProcessInstance c7ParentInstance, IncidentEntity.ErrorType errorType, - boolean waitingExecution) { + boolean waitingExecution, + boolean hasMigratedJob) { // specific values assertThat(c8Incident.tenantId()).isEqualTo(C8_DEFAULT_TENANT); assertThat(c8Incident.processDefinitionId()).isEqualTo( @@ -477,8 +473,11 @@ protected void assertOnIncidentBasicFields(IncidentEntity c8Incident, assertThat(c8Incident.incidentKey()).isNotNull(); assertThat(c8Incident.creationTime()).isNotNull(); - // null values - assertThat(c8Incident.jobKey()).isNull(); + if (hasMigratedJob) { + assertThat(c8Incident.jobKey()).isNotNull(); + } else { + assertThat(c8Incident.jobKey()).isNull(); + } // conditional if (waitingExecution) { diff --git a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryJobTest.java b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryJobTest.java new file mode 100644 index 000000000..22508200d --- /dev/null +++ b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryJobTest.java @@ -0,0 +1,301 @@ +/* + * Copyright Camunda Services GmbH and/or licensed to Camunda Services GmbH under + * one or more contributor license agreements. See the NOTICE file distributed + * with this work for additional information regarding copyright ownership. + * Licensed under the Camunda License 1.0. You may not use this file + * except in compliance with the Camunda License 1.0. + */ +package io.camunda.migration.data.qa.history.entity; + +import static io.camunda.migration.data.constants.MigratorConstants.C8_DEFAULT_TENANT; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_JOB; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_DEFINITION; +import static io.camunda.migration.data.impl.persistence.IdKeyMapper.TYPE.HISTORY_PROCESS_INSTANCE; +import static io.camunda.migration.data.impl.util.ConverterUtil.prefixDefinitionId; +import static io.camunda.migration.data.qa.extension.HistoryMigrationExtension.USER_TASK_ID; +import static io.camunda.migration.data.qa.history.element.HistoryAbstractElementMigrationTest.PROCESS; +import static org.assertj.core.api.Assertions.assertThat; + +import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel; +import io.camunda.migration.data.qa.history.HistoryMigrationAbstractTest; +import io.camunda.search.entities.JobEntity; +import io.camunda.search.entities.JobEntity.JobKind; +import io.camunda.search.entities.JobEntity.JobState; +import io.camunda.search.entities.JobEntity.ListenerEventType; +import io.camunda.search.entities.ProcessInstanceEntity; +import java.util.List; +import org.camunda.bpm.engine.runtime.ProcessInstance; +import org.camunda.bpm.model.bpmn.Bpmn; +import org.camunda.bpm.model.bpmn.BpmnModelInstance; +import org.junit.jupiter.api.Test; + +public class HistoryJobTest extends HistoryMigrationAbstractTest { + + @Test + public void shouldMigrateJobsForAsyncBeforeTask() { + // given: a process with an async-before user task + deployer.deployCamunda7Process("asyncBeforeUserTaskProcess.bpmn"); + runtimeService.startProcessInstanceByKey("asyncBeforeUserTaskProcessId"); + + // execute the async-before job to enter the user task + var jobs = managementService.createJobQuery().list(); + assertThat(jobs).hasSize(1); + String c7JobId = jobs.getFirst().getId(); + managementService.executeJob(c7JobId); + + // when: jobs and process instances are migrated + historyMigrator.migrateByType(HISTORY_PROCESS_DEFINITION); + historyMigrator.migrateByType(HISTORY_PROCESS_INSTANCE); + historyMigrator.migrateByType(HISTORY_JOB); + + // then: the process instance was migrated + List processInstances = searchHistoricProcessInstances("asyncBeforeUserTaskProcessId"); + assertThat(processInstances).hasSize(1); + long processInstanceKey = processInstances.getFirst().processInstanceKey(); + + // and: exactly one C8 job entry was created (deduplicated by job ID across multiple log entries) + List c8Jobs = searchJobs(processInstanceKey); + assertThat(c8Jobs).as("One C8 job entry per C7 job (deduplication by job ID)").hasSize(1); + + JobEntity job = c8Jobs.getFirst(); + assertJobProperties(job, processInstanceKey, "asyncBeforeUserTaskProcessId", "asyncUserTaskId", false, + processInstances.getFirst().processDefinitionKey()); + } + + @Test + public void shouldMigrateJobsForAsyncAfter() { + // given: a process with an async-after + deployModel(); + runtimeService.startProcessInstanceByKey(PROCESS); + + // execute job + executeAllJobsWithRetry(); + + // when + historyMigrator.migrate(); + + // then: the process instance was migrated + List processInstances = searchHistoricProcessInstances(PROCESS); + assertThat(processInstances).hasSize(1); + long processInstanceKey = processInstances.getFirst().processInstanceKey(); + + // and: exactly one C8 job entry was created (deduplicated by job ID across multiple log entries) + List c8Jobs = searchJobs(processInstanceKey); + assertThat(c8Jobs).as("One C8 job entry per C7 job (deduplication by job ID)").hasSize(1); + + JobEntity job = c8Jobs.getFirst(); + assertJobProperties(job, processInstanceKey, PROCESS, "startEvent", true, processInstances.getFirst().processDefinitionKey()); + + List startEvent = searchFlowNodeInstancesByName("startEvent"); + assertThat(startEvent).hasSize(1); + assertThat(job.elementInstanceKey()).isEqualTo(startEvent.getFirst().flowNodeInstanceKey()); + + // and: the incident was migrated with jobKey pointing to the C8 job + var incidents = searchHistoricIncidents(PROCESS); + assertThat(incidents).hasSize(1); + assertThat(incidents.getFirst().jobKey()) + .as("Incident's jobKey should reference the migrated job") + .isNotNull(); + assertThat(incidents.getFirst().jobKey()).isEqualTo(job.jobKey()); + } + + @Test + public void shouldMigrateFailedJobAndPopulateJobKeyOnIncident() { + // given: a failing service task that creates an incident + deployer.deployCamunda7Process("failingServiceTaskProcess.bpmn"); + runtimeService.startProcessInstanceByKey("failingServiceTaskProcessId"); + + executeAllJobsWithRetry(); + assertThat(historyService.createHistoricIncidentQuery().count()) + .as("Expected one incident to be created").isEqualTo(1); + + // when: full migration runs (jobs migrated before incidents) + historyMigrator.migrate(); + + // then: the process instance was migrated + List processInstances = searchHistoricProcessInstances("failingServiceTaskProcessId"); + assertThat(processInstances).hasSize(1); + long processInstanceKey = processInstances.getFirst().processInstanceKey(); + + // and: the failed job was migrated to C8 + List c8Jobs = searchJobs(processInstanceKey); + assertThat(c8Jobs).as("Exactly one C8 job entry (deduplication by job ID)").hasSize(1); + JobEntity job = c8Jobs.getFirst(); + + assertJobProperties(job, processInstanceKey, "failingServiceTaskProcessId", "serviceTaskId", false, processInstances.getFirst().processDefinitionKey()); + + // and: the incident was migrated with jobKey pointing to the C8 job + var incidents = searchHistoricIncidents("failingServiceTaskProcessId"); + assertThat(incidents).hasSize(1); + assertThat(incidents.getFirst().jobKey()) + .as("Incident's jobKey should reference the migrated job") + .isNotNull(); + assertThat(incidents.getFirst().jobKey()).isEqualTo(job.jobKey()); + } + + @Test + public void shouldMigrateJobWithTenant() { + // given: a failing service task that creates an incident + deployer.deployCamunda7Process("failingServiceTaskProcess.bpmn", "tenantId"); + runtimeService.startProcessInstanceByKey("failingServiceTaskProcessId"); + + executeAllJobsWithRetry(); + assertThat(historyService.createHistoricIncidentQuery().count()) + .as("Expected one incident to be created").isEqualTo(1); + + // when: full migration runs (jobs migrated before incidents) + historyMigrator.migrate(); + + // then: the process instance was migrated + List processInstances = searchHistoricProcessInstances("failingServiceTaskProcessId"); + assertThat(processInstances).hasSize(1); + long processInstanceKey = processInstances.getFirst().processInstanceKey(); + + // and: the failed job was migrated to C8 + List c8Jobs = searchJobs(processInstanceKey); + assertThat(c8Jobs).as("Exactly one C8 job entry (deduplication by job ID)").hasSize(1); + JobEntity job = c8Jobs.getFirst(); + assertThat(job.tenantId()).isEqualTo("tenantId"); + } + + @Test + public void shouldDeduplicateJobsByJobId() { + // given: a failing service task with multiple job log entries (creation + multiple failures) + deployer.deployCamunda7Process("failingServiceTaskProcess.bpmn"); + runtimeService.startProcessInstanceByKey("failingServiceTaskProcessId"); + + var jobs = managementService.createJobQuery().list(); + assertThat(jobs).hasSize(1); + var job = jobs.getFirst(); + + // Execute multiple times to create multiple historic job log entries + for (int i = 0; i < 2; i++) { + try { + managementService.executeJob(job.getId()); + } catch (Exception e) { + // expected + } + } + + // Verify multiple log entries exist in C7 for the same job + var jobLogCount = historyService.createHistoricJobLogQuery().jobId(job.getId()).count(); + assertThat(jobLogCount).as("Should have multiple job log entries").isGreaterThan(1); + + // when + historyMigrator.migrateByType(HISTORY_PROCESS_DEFINITION); + historyMigrator.migrateByType(HISTORY_PROCESS_INSTANCE); + historyMigrator.migrateByType(HISTORY_JOB); + + // then: only ONE C8 job entry created despite multiple log entries (tracked by job ID) + List processInstances = searchHistoricProcessInstances("failingServiceTaskProcessId"); + assertThat(processInstances).hasSize(1); + List c8Jobs = searchJobs(processInstances.getFirst().processInstanceKey()); + assertThat(c8Jobs).as("Exactly one C8 job per C7 job despite multiple log entries").hasSize(1); + } + + @Test + public void shouldMigrateJobsInNestedProcess() { + // given: a process with an async-after + deployModel(); + deployCallingModel(); + runtimeService.startProcessInstanceByKey("callingProcessId"); + + completeAllUserTasksWithDefaultUserTaskId(); + executeAllJobsWithRetry(); + + // when + historyMigrator.migrate(); + + // then + List root = searchHistoricProcessInstances("callingProcessId"); + assertThat(root).hasSize(1); + List processInstances = searchHistoricProcessInstances(PROCESS); + assertThat(processInstances).hasSize(1); + + // and: exactly one C8 job entry was created + List c8Jobs = searchJobs(processInstances.getFirst().processInstanceKey()); + assertThat(c8Jobs).as("One C8 job entry per C7 job ").hasSize(1); + + assertThat(c8Jobs.getFirst().rootProcessInstanceKey()).isEqualTo(root.getFirst().processInstanceKey()); + } + + @Test + public void shouldSkipTimerJob() { + // given + deployer.deployCamunda7Process("timerDurationBoundaryEventProcess.bpmn"); + ProcessInstance c7instance = runtimeService.startProcessInstanceByKey("timerDurationBoundaryEventProcessId"); + runtimeService.setVariable(c7instance.getId(), "leftoverDuration", "P0D"); + var jobs = managementService.createJobQuery().processInstanceId(c7instance.getId()).list(); + assertThat(jobs).hasSize(1); + + // when + historyMigrator.migrate(); + + // then + List processInstances = searchHistoricProcessInstances("timerDurationBoundaryEventProcessId"); + assertThat(processInstances).hasSize(1); + + List c8Jobs = searchJobs(processInstances.getFirst().processInstanceKey()); + assertThat(c8Jobs).isEmpty(); + } + + + protected void assertJobProperties(JobEntity job, long processInstanceKey, String c7ProcessDefinitionKey, + String taskId, boolean isAsyncAfter, Long processDefinitionKey) { + assertThat(job.jobKey()).isNotNull(); + assertThat(job.processInstanceKey()).isEqualTo(processInstanceKey); + assertThat(job.rootProcessInstanceKey()).isEqualTo(processInstanceKey); + assertThat(job.processDefinitionKey()).isEqualTo(processDefinitionKey); + if (isAsyncAfter) { + assertThat(job.elementInstanceKey()).isNotNull(); + } else { + // elementInstanceKey is null for async-before because the flow node instance does not yet + // exist at the time the job was created and executed + assertThat(job.elementInstanceKey()).isNull(); + } + + assertThat(job.type()).isEqualTo("async-continuation"); + assertThat(job.worker()).isNotNull(); + assertThat(job.state()).isEqualTo(JobState.COMPLETED); + assertThat(job.kind()).isEqualTo(JobKind.BPMN_ELEMENT); + assertThat(job.listenerEventType()).isEqualTo(ListenerEventType.UNSPECIFIED); + assertThat(job.retries()).isEqualTo(0); + assertThat(job.elementId()).isEqualTo(taskId); + assertThat(job.processDefinitionId()).isEqualTo(prefixDefinitionId(c7ProcessDefinitionKey)); + assertThat(job.tenantId()).isEqualTo(C8_DEFAULT_TENANT); + + assertThat(job.creationTime()).isNotNull(); + assertThat(job.deadline()).isNull(); + assertThat(job.deniedReason()).isNull(); + assertThat(job.endTime()).isNull(); + assertThat(job.errorCode()).isNull(); + assertThat(job.errorMessage()).isNull(); + assertThat(job.hasFailedWithRetriesLeft()).isFalse(); + assertThat(job.isDenied()).isFalse(); + assertThat(job.lastUpdateTime()).isNull(); + } + + protected void deployModel() { + String process = PROCESS; + var c7Model = org.camunda.bpm.model.bpmn.Bpmn.createExecutableProcess(process) + .startEvent("startEvent") + .camundaAsyncAfter() + .serviceTask("serviceTaskId") + .camundaClass("foo") + .endEvent() + .done(); + + deployer.deployC7ModelInstance(process, c7Model); + } + + protected void deployCallingModel() { + BpmnModelInstance c7BusinessRuleProcess = Bpmn.createExecutableProcess("callingProcessId") + .startEvent() + .userTask(USER_TASK_ID) + .callActivity() + .calledElement(PROCESS) + .endEvent() + .done(); + deployer.deployC7ModelInstance("callingProcessId", c7BusinessRuleProcess); + } +} diff --git a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryProcessInstanceTest.java b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryProcessInstanceTest.java index 6b13a8e4f..c628f2536 100644 --- a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryProcessInstanceTest.java +++ b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryProcessInstanceTest.java @@ -31,7 +31,6 @@ import io.github.netmikey.logunit.api.LogCapturer; import java.util.Date; import java.util.List; -import org.camunda.bpm.engine.history.HistoricActivityInstance; import org.camunda.bpm.engine.history.HistoricProcessInstance; import org.camunda.bpm.engine.impl.util.ClockUtil; import org.camunda.bpm.engine.runtime.ProcessInstance; @@ -64,13 +63,34 @@ public void shouldMigrateProcessInstance() { List processInstances = searchHistoricProcessInstances("userTaskProcessId"); assertThat(processInstances).hasSize(1); verifyProcessInstanceFields(processInstances.getFirst(), historicProcessInstance, "userTaskProcessId", - ProcessInstanceEntity.ProcessInstanceState.COMPLETED, "custom-version-tag", C8_DEFAULT_TENANT, false, false); + ProcessInstanceEntity.ProcessInstanceState.COMPLETED, "custom-version-tag", C8_DEFAULT_TENANT, false, false, + true); List variableEntities = searchHistoricVariables(LEGACY_ID_VAR_NAME); assertThat(variableEntities).hasSize(1); assertThat(variableEntities.getFirst().value()).isEqualTo("\"" + c7Process.getId() + "\""); assertThat(variableEntities.getFirst().processInstanceKey()).isEqualTo(processInstances.getFirst().processInstanceKey()); } + @Test + public void shouldMigrateProcessInstanceAsCancelled() { + // given + deployer.deployCamunda7Process("userTaskProcess.bpmn"); + ProcessInstance c7Process = runtimeService.startProcessInstanceByKey("userTaskProcessId"); + HistoricProcessInstance historicProcessInstance = historyService.createHistoricProcessInstanceQuery() + .processInstanceId(c7Process.getId()) + .singleResult(); + + // when + historyMigrator.migrate(); + + // then + List processInstances = searchHistoricProcessInstances("userTaskProcessId"); + assertThat(processInstances).hasSize(1); + verifyProcessInstanceFields(processInstances.getFirst(), historicProcessInstance, "userTaskProcessId", + ProcessInstanceEntity.ProcessInstanceState.CANCELED, "custom-version-tag", C8_DEFAULT_TENANT, false, false, + false); + } + @Test public void shouldMigrateProcessInstancesWithTenant() { // given @@ -88,7 +108,7 @@ public void shouldMigrateProcessInstancesWithTenant() { List processInstances = searchHistoricProcessInstances("userTaskProcessId"); assertThat(processInstances).hasSize(1); verifyProcessInstanceFields(processInstances.getFirst(), historicProcessInstance, "userTaskProcessId", - ProcessInstanceEntity.ProcessInstanceState.COMPLETED, "custom-version-tag", "my-tenant1", false, false); + ProcessInstanceEntity.ProcessInstanceState.COMPLETED, "custom-version-tag", "my-tenant1", false, false, true); } @Test @@ -119,11 +139,12 @@ public void shouldMigrateCallActivityAndSubprocess() { var parent = parentProcessInstance.getFirst(); verifyProcessInstanceFields(parent, historicProcessInstance, "callingProcessId", - ProcessInstanceEntity.ProcessInstanceState.COMPLETED, null, C8_DEFAULT_TENANT, false, false); + ProcessInstanceEntity.ProcessInstanceState.COMPLETED, null, C8_DEFAULT_TENANT, false, false, true); var sub = subProcessInstance.getFirst(); verifyProcessInstanceFields(sub, historicSubProcessInstance, "calledProcessInstanceId", - ProcessInstanceEntity.ProcessInstanceState.COMPLETED, null, C8_DEFAULT_TENANT, true, false); + ProcessInstanceEntity.ProcessInstanceState.COMPLETED, null, C8_DEFAULT_TENANT, true, false, true); + } @Test @@ -160,7 +181,7 @@ public void shouldMigrateCallActivityAndSubprocessLegacyIdVariable() { } @Test - @Disabled("https://github.com/camunda/camunda-bpm-platform/issues/5400") + @Disabled("https://github.com/camunda/camunda-7-to-8-migration-tooling/issues/428") public void shouldMigrateProcessInstanceWithIncident() { // given deployer.deployCamunda7Process("incidentProcess.bpmn"); @@ -179,32 +200,7 @@ public void shouldMigrateProcessInstanceWithIncident() { var processInstance = processInstances.getFirst(); verifyProcessInstanceFields(processInstance, historicProcessInstance, "incidentProcessId", - ProcessInstanceEntity.ProcessInstanceState.ACTIVE, null, C8_DEFAULT_TENANT, false, true); - } - - @Test - @Disabled("https://github.com/camunda/camunda-bpm-platform/issues/5359") - @WhiteBox - public void shouldCheckCalledProcessParentElementKey() { - // given - deployer.deployCamunda7Process("callActivityProcess.bpmn"); - deployer.deployCamunda7Process("calledActivitySubprocess.bpmn"); - ProcessInstance parentInstance = runtimeService.startProcessInstanceByKey("callingProcessId"); - ProcessInstance subInstance = runtimeService.createProcessInstanceQuery() - .superProcessInstanceId(parentInstance.getProcessInstanceId()) - .singleResult(); - completeAllUserTasksWithDefaultUserTaskId(); - HistoricActivityInstance callActivity = historyService.createHistoricActivityInstanceQuery() - .activityId("callActivityId") - .processInstanceId(subInstance.getId()) - .singleResult(); - dbClient.insert(callActivity.getId(), null, TYPE.HISTORY_PROCESS_DEFINITION); - - // when - historyMigrator.migrate(); - - // then - assertThat(searchHistoricProcessInstances("calledProcessInstanceId")).isEmpty(); + ProcessInstanceEntity.ProcessInstanceState.CANCELED, null, C8_DEFAULT_TENANT, false, true, false); } @Test @@ -346,7 +342,7 @@ protected void verifyProcessInstanceFields(ProcessInstanceEntity processInstance String versionTag, String tenantId, boolean hasParent, - boolean hasIncidents) { + boolean hasIncidents, boolean isCompleted) { // Verify migration completed successfully via logs logs.assertContains(formatMessage(MIGRATION_COMPLETED, TYPE.HISTORY_PROCESS_INSTANCE.getDisplayName(), historicProcessInstance.getId())); @@ -357,8 +353,11 @@ protected void verifyProcessInstanceFields(ProcessInstanceEntity processInstance assertThat(processInstance.tenantId()).isEqualTo(tenantId); assertThat(processInstance.startDate()) .isEqualTo(convertDate(historicProcessInstance.getStartTime())); - assertThat(processInstance.endDate()) - .isEqualTo(convertDate(historicProcessInstance.getEndTime())); + if (isCompleted) { + assertThat(processInstance.endDate()).isEqualTo(convertDate(historicProcessInstance.getEndTime())); + } else { + assertThat(processInstance.endDate()).isNotNull(); + } assertThat(processInstance.processDefinitionVersion()).isEqualTo(1); assertThat(processInstance.processDefinitionVersionTag()).isEqualTo(versionTag); diff --git a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryUserTaskTest.java b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryUserTaskTest.java index 54412e186..f7f5acec6 100644 --- a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryUserTaskTest.java +++ b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/HistoryUserTaskTest.java @@ -248,7 +248,7 @@ public void shouldMigrateMultipleTasks() { } @Test - public void shouldMigrateUserTaskWithNoAssignee() {// todo + public void shouldMigrateUserTaskWithNoAssignee() { // given deployer.deployCamunda7Process("userTaskProcess.bpmn"); ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("userTaskProcessId"); @@ -278,7 +278,7 @@ public void shouldMigrateUserTaskWithNoAssignee() {// todo } @Test - public void shouldMigrateUserTaskWithNullDates() {// todo + public void shouldMigrateUserTaskWithNullDates() { // given deployer.deployCamunda7Process("userTaskProcess.bpmn"); ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("userTaskProcessId"); diff --git a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/interceptor/HistoryPresetParentPropertiesTest.java b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/interceptor/HistoryPresetParentPropertiesTest.java index bd2fb074a..b88f61494 100644 --- a/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/interceptor/HistoryPresetParentPropertiesTest.java +++ b/data-migrator/qa/integration-tests/src/test/java/io/camunda/migration/data/qa/history/entity/interceptor/HistoryPresetParentPropertiesTest.java @@ -18,7 +18,6 @@ import static org.camunda.bpm.engine.variable.Variables.stringValue; import io.camunda.db.rdbms.write.domain.FlowNodeInstanceDbModel; -import io.camunda.migration.data.MigratorMode; import io.camunda.migration.data.interceptor.EntityInterceptor; import io.camunda.migration.data.qa.history.HistoryMigrationAbstractTest; import io.camunda.search.entities.DecisionInstanceEntity; @@ -34,7 +33,6 @@ import org.camunda.bpm.engine.task.Task; import org.camunda.bpm.engine.variable.VariableMap; import org.camunda.bpm.engine.variable.Variables; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.TestPropertySource; @@ -205,10 +203,10 @@ public void shouldMigrateVariableWithPresetProperties() { } @Test - @Disabled("https://github.com/camunda/camunda-7-to-8-migration-tooling/issues/364") public void shouldMigrateIncidentWithPresetProperties() { // given: Process with failing service task in C7 deployer.deployCamunda7Process("failingServiceTaskProcess.bpmn"); + runtimeService.startProcessInstanceByKey("failingServiceTaskProcessId"); // execute the job to trigger the incident var jobs = managementService.createJobQuery().list(); diff --git a/data-migrator/qa/integration-tests/src/test/resources/io/camunda/migration/data/bpmn/c7/asyncBeforeUserTaskProcess.bpmn b/data-migrator/qa/integration-tests/src/test/resources/io/camunda/migration/data/bpmn/c7/asyncBeforeUserTaskProcess.bpmn new file mode 100644 index 000000000..00c61e639 --- /dev/null +++ b/data-migrator/qa/integration-tests/src/test/resources/io/camunda/migration/data/bpmn/c7/asyncBeforeUserTaskProcess.bpmn @@ -0,0 +1,17 @@ + + + + + Flow_1 + + + + Flow_2 + + + + Flow_1 + Flow_2 + + +