Skip to content

Commit 5311434

Browse files
authored
feat(kafka): add interface design for listeners (datahub-project#13637)
1 parent 7d5519f commit 5311434

23 files changed

Lines changed: 817 additions & 315 deletions

File tree

metadata-jobs/common/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ dependencies {
1010
}
1111
implementation externalDependency.springActuator
1212
implementation externalDependency.springKafka
13+
implementation externalDependency.opentelemetrySdkTrace
14+
implementation externalDependency.opentelemetrySdkMetrics
1315
implementation externalDependency.slf4jApi
1416

1517
implementation(externalDependency.springBootStarterJetty) {
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
package com.linkedin.metadata.kafka.listener;
2+
3+
import com.codahale.metrics.Histogram;
4+
import com.codahale.metrics.MetricRegistry;
5+
import com.linkedin.metadata.utils.metrics.MetricUtils;
6+
import com.linkedin.mxe.SystemMetadata;
7+
import io.datahubproject.metadata.context.OperationContext;
8+
import io.opentelemetry.api.trace.Span;
9+
import io.opentelemetry.api.trace.StatusCode;
10+
import java.util.List;
11+
import java.util.Map;
12+
import java.util.Set;
13+
import java.util.stream.Collectors;
14+
import java.util.stream.Stream;
15+
import javax.annotation.Nonnull;
16+
import lombok.Getter;
17+
import lombok.extern.slf4j.Slf4j;
18+
import org.apache.kafka.clients.consumer.ConsumerRecord;
19+
import org.slf4j.MDC;
20+
21+
@Slf4j
22+
public abstract class AbstractKafkaListener<E, H extends EventHook<E>, R>
23+
implements GenericKafkaListener<E, H, R> {
24+
25+
protected OperationContext systemOperationContext;
26+
27+
@Getter protected String consumerGroupId;
28+
29+
@Getter protected List<H> hooks;
30+
31+
protected boolean fineGrainedLoggingEnabled;
32+
protected Map<String, Set<String>> aspectsToDrop;
33+
34+
private final Histogram kafkaLagStats =
35+
MetricUtils.get().histogram(MetricRegistry.name(this.getClass(), "kafkaLag"));
36+
37+
@Override
38+
public GenericKafkaListener<E, H, R> init(
39+
@Nonnull OperationContext systemOperationContext,
40+
@Nonnull String consumerGroup,
41+
@Nonnull List<H> hooks,
42+
boolean fineGrainedLoggingEnabled,
43+
@Nonnull Map<String, Set<String>> aspectsToDrop) {
44+
45+
this.systemOperationContext = systemOperationContext;
46+
this.consumerGroupId = consumerGroup;
47+
this.hooks = hooks;
48+
this.hooks.forEach(hook -> hook.init(systemOperationContext));
49+
this.fineGrainedLoggingEnabled = fineGrainedLoggingEnabled;
50+
this.aspectsToDrop = aspectsToDrop;
51+
52+
log.info(
53+
"Enabled Hooks - Group: {} Hooks: {}",
54+
consumerGroup,
55+
hooks.stream().map(hook -> hook.getClass().getSimpleName()).collect(Collectors.toList()));
56+
57+
return this;
58+
}
59+
60+
@Override
61+
public void consume(@Nonnull final ConsumerRecord<String, R> consumerRecord) {
62+
try {
63+
kafkaLagStats.update(System.currentTimeMillis() - consumerRecord.timestamp());
64+
final R record = consumerRecord.value();
65+
log.debug(
66+
"Got event consumer: {} key: {}, topic: {}, partition: {}, offset: {}, value size: {}, timestamp: {}",
67+
consumerGroupId,
68+
consumerRecord.key(),
69+
consumerRecord.topic(),
70+
consumerRecord.partition(),
71+
consumerRecord.offset(),
72+
consumerRecord.serializedValueSize(),
73+
consumerRecord.timestamp());
74+
75+
MetricUtils.counter(this.getClass(), consumerGroupId + "_received_event_count").inc();
76+
77+
E event;
78+
try {
79+
event = convertRecord(record);
80+
} catch (Exception e) {
81+
MetricUtils.counter(this.getClass(), consumerGroupId + "_conversion_failure").inc();
82+
log.error("Error deserializing message due to: ", e);
83+
log.error("Message: {}", record.toString());
84+
return;
85+
}
86+
87+
// Initialize MDC context with event metadata
88+
setMDCContext(event);
89+
90+
// Check if should skip processing
91+
if (shouldSkipProcessing(event)) {
92+
log.info("Skipping event: {}", event);
93+
return;
94+
}
95+
96+
List<String> loggingAttributes = getFineGrainedLoggingAttributes(event);
97+
98+
processWithHooks(event, loggingAttributes, consumerRecord.topic());
99+
100+
} finally {
101+
MDC.clear();
102+
}
103+
}
104+
105+
/**
106+
* Process the event with all registered hooks.
107+
*
108+
* @param event The event to process
109+
* @param loggingAttributes Attributes for logging
110+
*/
111+
protected void processWithHooks(E event, List<String> loggingAttributes, String topic) {
112+
systemOperationContext.withQueueSpan(
113+
"consume",
114+
getSystemMetadata(event),
115+
topic,
116+
() -> {
117+
log.info(
118+
"Invoking hooks for consumer: {} event: {}",
119+
consumerGroupId,
120+
getEventDisplayString(event));
121+
122+
// Process with each hook
123+
for (H hook : this.hooks) {
124+
systemOperationContext.withSpan(
125+
hook.getClass().getSimpleName(),
126+
() -> {
127+
log.debug(
128+
"Invoking hook {} for event: {}",
129+
hook.getClass().getSimpleName(),
130+
getEventDisplayString(event));
131+
try {
132+
hook.invoke(event);
133+
} catch (Exception e) {
134+
// Just skip this hook and continue - "at most once" processing
135+
MetricUtils.counter(
136+
this.getClass(), hook.getClass().getSimpleName() + "_failure")
137+
.inc();
138+
log.error(
139+
"Failed to execute hook with name {}",
140+
hook.getClass().getCanonicalName(),
141+
e);
142+
143+
Span currentSpan = Span.current();
144+
currentSpan.recordException(e);
145+
currentSpan.setStatus(StatusCode.ERROR, e.getMessage());
146+
currentSpan.setAttribute(MetricUtils.ERROR_TYPE, e.getClass().getName());
147+
}
148+
},
149+
Stream.concat(
150+
Stream.of(
151+
MetricUtils.DROPWIZARD_NAME,
152+
MetricUtils.name(
153+
this.getClass(), hook.getClass().getSimpleName() + "_latency")),
154+
loggingAttributes.stream())
155+
.toArray(String[]::new));
156+
}
157+
158+
MetricUtils.counter(this.getClass(), consumerGroupId + "_consumed_event_count").inc();
159+
log.info(
160+
"Successfully completed hooks for consumer: {} event: {}",
161+
consumerGroupId,
162+
getEventDisplayString(event));
163+
},
164+
Stream.concat(
165+
Stream.of(
166+
MetricUtils.DROPWIZARD_NAME, MetricUtils.name(this.getClass(), "consume")),
167+
loggingAttributes.stream())
168+
.toArray(String[]::new));
169+
}
170+
171+
/**
172+
* Sets MDC context based on event metadata.
173+
*
174+
* @param event The event to extract metadata from
175+
*/
176+
protected abstract void setMDCContext(E event);
177+
178+
/**
179+
* Determines if this event should be skipped based on filtering rules.
180+
*
181+
* @param event The event to check
182+
* @return true if event should be skipped, false otherwise
183+
*/
184+
protected abstract boolean shouldSkipProcessing(E event);
185+
186+
/**
187+
* Gets attributes for fine-grained logging.
188+
*
189+
* @param event The event to extract attributes from
190+
* @return List of attribute name-value pairs
191+
*/
192+
protected abstract List<String> getFineGrainedLoggingAttributes(E event);
193+
194+
/**
195+
* Gets system metadata from the event for tracing.
196+
*
197+
* @param event The event
198+
* @return System metadata object
199+
*/
200+
protected abstract SystemMetadata getSystemMetadata(E event);
201+
202+
/**
203+
* Gets a display string for the event for logging.
204+
*
205+
* @param event The event
206+
* @return Display string
207+
*/
208+
protected abstract String getEventDisplayString(E event);
209+
}
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package com.linkedin.metadata.kafka.listener;
2+
3+
import com.fasterxml.jackson.databind.JavaType;
4+
import com.fasterxml.jackson.databind.ObjectMapper;
5+
import java.lang.reflect.Method;
6+
import java.util.Collections;
7+
import java.util.Comparator;
8+
import java.util.List;
9+
import java.util.Map;
10+
import java.util.Set;
11+
import java.util.stream.Collectors;
12+
import javax.annotation.Nonnull;
13+
import lombok.extern.slf4j.Slf4j;
14+
import org.apache.commons.lang.StringUtils;
15+
import org.apache.kafka.clients.consumer.ConsumerRecord;
16+
import org.springframework.beans.factory.InitializingBean;
17+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
18+
import org.springframework.kafka.config.KafkaListenerEndpoint;
19+
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
20+
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
21+
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
22+
23+
@Slf4j
24+
public abstract class AbstractKafkaListenerRegistrar<E, H extends EventHook<E>, R>
25+
implements GenericKafkaListenerRegistrar<E, H, R>, InitializingBean {
26+
27+
protected final KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
28+
protected final KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;
29+
protected final String consumerGroupBase;
30+
protected final List<H> hooks;
31+
protected final ObjectMapper objectMapper;
32+
33+
protected AbstractKafkaListenerRegistrar(
34+
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry,
35+
KafkaListenerContainerFactory<?> kafkaListenerContainerFactory,
36+
String consumerGroupBase,
37+
List<H> hooks,
38+
ObjectMapper objectMapper) {
39+
this.kafkaListenerEndpointRegistry = kafkaListenerEndpointRegistry;
40+
this.kafkaListenerContainerFactory = kafkaListenerContainerFactory;
41+
this.consumerGroupBase = consumerGroupBase;
42+
this.hooks = hooks;
43+
this.objectMapper = objectMapper;
44+
}
45+
46+
@Override
47+
public void afterPropertiesSet() {
48+
Map<String, List<H>> hookGroups =
49+
getEnabledHooks().stream().collect(Collectors.groupingBy(H::getConsumerGroupSuffix));
50+
51+
log.info(
52+
"{} Consumer Groups: {}",
53+
getProcessorType(),
54+
hookGroups.keySet().stream().map(this::buildConsumerGroupName).collect(Collectors.toSet()));
55+
56+
hookGroups.forEach(
57+
(key, groupHooks) -> {
58+
KafkaListenerEndpoint kafkaListenerEndpoint =
59+
createListenerEndpoint(buildConsumerGroupName(key), getTopicNames(), groupHooks);
60+
registerKafkaListener(kafkaListenerEndpoint, false);
61+
});
62+
}
63+
64+
@Override
65+
@Nonnull
66+
public List<H> getEnabledHooks() {
67+
return hooks.stream()
68+
.filter(EventHook::isEnabled)
69+
.sorted(Comparator.comparing(EventHook::executionOrder))
70+
.toList();
71+
}
72+
73+
@Override
74+
public void registerKafkaListener(
75+
@Nonnull KafkaListenerEndpoint kafkaListenerEndpoint, boolean startImmediately) {
76+
kafkaListenerEndpointRegistry.registerListenerContainer(
77+
kafkaListenerEndpoint, kafkaListenerContainerFactory, startImmediately);
78+
}
79+
80+
@Override
81+
@Nonnull
82+
public KafkaListenerEndpoint createListenerEndpoint(
83+
@Nonnull String consumerGroupId, @Nonnull List<String> topics, @Nonnull List<H> groupHooks) {
84+
MethodKafkaListenerEndpoint<String, R> kafkaListenerEndpoint =
85+
new MethodKafkaListenerEndpoint<>();
86+
kafkaListenerEndpoint.setId(consumerGroupId);
87+
kafkaListenerEndpoint.setGroupId(consumerGroupId);
88+
kafkaListenerEndpoint.setAutoStartup(false);
89+
kafkaListenerEndpoint.setTopics(topics.toArray(new String[0]));
90+
kafkaListenerEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());
91+
92+
Map<String, Set<String>> aspectsToDrop = parseAspectsToDrop();
93+
94+
GenericKafkaListener<E, H, R> listener =
95+
createListener(consumerGroupId, groupHooks, isFineGrainedLoggingEnabled(), aspectsToDrop);
96+
97+
kafkaListenerEndpoint.setBean(listener);
98+
99+
try {
100+
Method consumeMethod = GenericKafkaListener.class.getMethod("consume", ConsumerRecord.class);
101+
kafkaListenerEndpoint.setMethod(consumeMethod);
102+
} catch (NoSuchMethodException e) {
103+
throw new RuntimeException(e);
104+
}
105+
106+
return kafkaListenerEndpoint;
107+
}
108+
109+
@Override
110+
@Nonnull
111+
public String buildConsumerGroupName(@Nonnull String suffix) {
112+
if (suffix.isEmpty()) {
113+
return consumerGroupBase;
114+
} else {
115+
return String.join("-", consumerGroupBase, suffix);
116+
}
117+
}
118+
119+
/**
120+
* Gets the processor type name for logging.
121+
*
122+
* @return The processor type name
123+
*/
124+
protected abstract String getProcessorType();
125+
126+
/**
127+
* Gets the list of topic names to listen to.
128+
*
129+
* @return List of topic names
130+
*/
131+
protected abstract List<String> getTopicNames();
132+
133+
/**
134+
* Checks if fine-grained logging is enabled.
135+
*
136+
* @return true if fine-grained logging is enabled, false otherwise
137+
*/
138+
protected abstract boolean isFineGrainedLoggingEnabled();
139+
140+
/**
141+
* Parses the aspects to drop from configuration.
142+
*
143+
* @return Map of entity types to sets of aspect names to drop
144+
*/
145+
protected Map<String, Set<String>> parseAspectsToDrop() {
146+
String aspectsToDropConfig = getAspectsToDropConfig();
147+
if (StringUtils.isBlank(aspectsToDropConfig)) {
148+
return Collections.emptyMap();
149+
} else {
150+
JavaType type =
151+
objectMapper.getTypeFactory().constructMapType(Map.class, String.class, Set.class);
152+
try {
153+
return objectMapper.readValue(aspectsToDropConfig, type);
154+
} catch (Exception e) {
155+
log.error("Unable to parse aspects to drop configuration: {}", aspectsToDropConfig, e);
156+
return Collections.emptyMap();
157+
}
158+
}
159+
}
160+
161+
/**
162+
* Gets the configuration string for aspects to drop.
163+
*
164+
* @return Configuration string
165+
*/
166+
protected abstract String getAspectsToDropConfig();
167+
}

0 commit comments

Comments
 (0)