Skip to content

Commit fcb232c

Browse files
Filter unknown process definitions received via kafka #74
1 parent f8949ee commit fcb232c

File tree

1 file changed

+32
-8
lines changed
  • adapters/camunda8/src/main/java/io/vanillabp/cockpit/adapter/camunda8/receiver/kafka

1 file changed

+32
-8
lines changed

adapters/camunda8/src/main/java/io/vanillabp/cockpit/adapter/camunda8/receiver/kafka/KafkaController.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,31 +1,27 @@
11
package io.vanillabp.cockpit.adapter.camunda8.receiver.kafka;
22

3-
43
import at.phactum.zeebe.exporters.kafka.serde.RecordId;
54
import io.camunda.zeebe.protocol.record.Record;
65
import io.camunda.zeebe.protocol.record.RecordType;
76
import io.camunda.zeebe.protocol.record.ValueType;
87
import io.camunda.zeebe.protocol.record.intent.ProcessEventIntent;
98
import io.camunda.zeebe.protocol.record.intent.ProcessInstanceIntent;
10-
import io.camunda.zeebe.protocol.record.value.BpmnElementType;
11-
import io.camunda.zeebe.protocol.record.value.JobRecordValue;
12-
import io.camunda.zeebe.protocol.record.value.ProcessEventRecordValue;
13-
import io.camunda.zeebe.protocol.record.value.ProcessInstanceCreationRecordValue;
14-
import io.camunda.zeebe.protocol.record.value.ProcessInstanceRecordValue;
9+
import io.camunda.zeebe.protocol.record.value.*;
1510
import io.vanillabp.cockpit.adapter.camunda8.deployments.DeploymentService;
1611
import io.vanillabp.cockpit.adapter.camunda8.receiver.events.Camunda8UserTaskCreatedEvent;
1712
import io.vanillabp.cockpit.adapter.camunda8.receiver.events.Camunda8UserTaskLifecycleEvent;
1813
import io.vanillabp.cockpit.adapter.camunda8.receiver.events.Camunda8WorkflowLifeCycleEvent;
1914
import io.vanillabp.cockpit.adapter.camunda8.usertask.Camunda8UserTaskEventHandler;
2015
import io.vanillabp.cockpit.adapter.camunda8.workflow.Camunda8WorkflowEventHandler;
2116
import io.vanillabp.springboot.adapter.VanillaBpProperties;
22-
import java.util.Set;
23-
import java.util.function.Supplier;
2417
import org.apache.kafka.clients.consumer.ConsumerRecord;
2518
import org.slf4j.Logger;
2619
import org.slf4j.LoggerFactory;
2720
import org.springframework.kafka.annotation.KafkaListener;
2821

22+
import java.util.Set;
23+
import java.util.function.Supplier;
24+
2925
import static io.vanillabp.cockpit.adapter.camunda8.receiver.kafka.KafkaConfiguration.KAFKA_CONSUMER_PREFIX;
3026

3127
public class KafkaController {
@@ -85,6 +81,15 @@ private void handleProcessInstanceCreationRecord(
8581
}
8682
// empty (none) start event
8783
final var processInstanceCreationRecordValue = (ProcessInstanceCreationRecordValue) value.getValue();
84+
final var processDefinitionKey = processInstanceCreationRecordValue.getProcessDefinitionKey();
85+
if (hasNoProcessInformation(processDefinitionKey)) {
86+
final var tenantId = processInstanceCreationRecordValue.getTenantId();
87+
if (camunda8WorkflowEventHandler.isTenantKnown(tenantId)) {
88+
logUnknownProcessDefinitionWarning(tenantId, processDefinitionKey);
89+
}
90+
return;
91+
}
92+
8893
final var workflowCreatedEvent = WorkflowEventZeebeRecordMapper.map(processInstanceCreationRecordValue);
8994
WorkflowEventZeebeRecordMapper.addMetaData(workflowCreatedEvent, value, idNames);
9095
camunda8WorkflowEventHandler.saveBusinessKeyForRootProcessInstance(workflowCreatedEvent);
@@ -171,6 +176,16 @@ private void handleJobRecord(Record<?> value) {
171176
if (!jobRecordValue.getType().equals("io.camunda.zeebe:userTask")){
172177
return;
173178
}
179+
180+
long processDefinitionKey = jobRecordValue.getProcessDefinitionKey();
181+
if (hasNoProcessInformation(processDefinitionKey)) {
182+
String tenantId = jobRecordValue.getTenantId();
183+
if (camunda8WorkflowEventHandler.isTenantKnown(tenantId)) {
184+
logUnknownProcessDefinitionWarning(tenantId, processDefinitionKey);
185+
}
186+
return;
187+
}
188+
174189
// job lifecycle
175190
String intentName = value.getIntent().name();
176191
if (intentName.equals("CREATED")) {
@@ -187,4 +202,13 @@ private void handleJobRecord(Record<?> value) {
187202

188203
}
189204

205+
private boolean hasNoProcessInformation(long definitionKey) {
206+
return deploymentService.getProcessInformationByDefinitionKey(definitionKey).isEmpty();
207+
}
208+
209+
private void logUnknownProcessDefinitionWarning(String tenantId, long processDefinitionKey) {
210+
logger.warn("Tenant {} has no process information for definition key '{}'!",
211+
tenantId,
212+
processDefinitionKey);
213+
}
190214
}

0 commit comments

Comments
 (0)