Skip to content

Commit bd04657

Browse files
Filter unknown process definitions received via kafka #74
1 parent f8949ee commit bd04657

File tree

1 file changed

+28
-1
lines changed
  • adapters/camunda8/src/main/java/io/vanillabp/cockpit/adapter/camunda8/receiver/kafka

1 file changed

+28
-1
lines changed

adapters/camunda8/src/main/java/io/vanillabp/cockpit/adapter/camunda8/receiver/kafka/KafkaController.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.vanillabp.cockpit.adapter.camunda8.receiver.kafka;
22

3-
43
import at.phactum.zeebe.exporters.kafka.serde.RecordId;
54
import io.camunda.zeebe.protocol.record.Record;
65
import io.camunda.zeebe.protocol.record.RecordType;
@@ -85,6 +84,15 @@ private void handleProcessInstanceCreationRecord(
8584
}
8685
// empty (none) start event
8786
final var processInstanceCreationRecordValue = (ProcessInstanceCreationRecordValue) value.getValue();
87+
final var processDefinitionKey = processInstanceCreationRecordValue.getProcessDefinitionKey();
88+
if (hasNoProcessInformation(processDefinitionKey)) {
89+
final var tenantId = processInstanceCreationRecordValue.getTenantId();
90+
if (camunda8WorkflowEventHandler.isTenantKnown(tenantId)) {
91+
logUnknownProcessDefinitionWarning(tenantId, processDefinitionKey);
92+
}
93+
return;
94+
}
95+
8896
final var workflowCreatedEvent = WorkflowEventZeebeRecordMapper.map(processInstanceCreationRecordValue);
8997
WorkflowEventZeebeRecordMapper.addMetaData(workflowCreatedEvent, value, idNames);
9098
camunda8WorkflowEventHandler.saveBusinessKeyForRootProcessInstance(workflowCreatedEvent);
@@ -171,6 +179,16 @@ private void handleJobRecord(Record<?> value) {
171179
if (!jobRecordValue.getType().equals("io.camunda.zeebe:userTask")){
172180
return;
173181
}
182+
183+
long processDefinitionKey = jobRecordValue.getProcessDefinitionKey();
184+
if (hasNoProcessInformation(processDefinitionKey)) {
185+
String tenantId = jobRecordValue.getTenantId();
186+
if (camunda8WorkflowEventHandler.isTenantKnown(tenantId)) {
187+
logUnknownProcessDefinitionWarning(tenantId, processDefinitionKey);
188+
}
189+
return;
190+
}
191+
174192
// job lifecycle
175193
String intentName = value.getIntent().name();
176194
if (intentName.equals("CREATED")) {
@@ -187,4 +205,13 @@ private void handleJobRecord(Record<?> value) {
187205

188206
}
189207

208+
private boolean hasNoProcessInformation(long definitionKey) {
209+
return deploymentService.getProcessInformationByDefinitionKey(definitionKey).isEmpty();
210+
}
211+
212+
private void logUnknownProcessDefinitionWarning(String tenantId, long processDefinitionKey) {
213+
logger.warn("Tenant {} has no process information for definition key '{}'!",
214+
tenantId,
215+
processDefinitionKey);
216+
}
190217
}

0 commit comments

Comments
 (0)