Skip to content

Commit aff88fb

Browse files
committed
Refinements (#154)
* README improvements * removal of unneeded empty lines * EventLogWriter.fireBusinessEvents now takes `Collection<?>` instead of `Collection<Object>`, same as the other plural fire methods. * rename variable to be more clear * add javadoc * extract some code out of a loop * add tests for transmission service
1 parent 1126252 commit aff88fb

File tree

6 files changed

+45
-16
lines changed

6 files changed

+45
-16
lines changed

README.md

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -241,8 +241,8 @@ for sending events to a compacted event type.
241241

242242
In some cases, like when there usually are large time gaps between producing events for the same compaction key,
243243
the risk of getting events for the same key out-of-order is small.
244-
For these cases, you just can define a bean of type [`CompactionKeyExtractor`](nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/CompactionKeyExtractor.java) , and then all events of that event
245-
type will get sent with a compaction key.
244+
For these cases, you just can define a bean of type [`CompactionKeyExtractor`](nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/CompactionKeyExtractor.java),
245+
and then all events of that event type will be sent with a compaction key.
246246

247247
```java
248248
@Configuration
@@ -261,7 +261,6 @@ matches the payload object (in undefined order) will be used.
261261
There are also some more factory methods with different signatures for more special cases, and you can also write
262262
your own implementation (but for the usual cases, the one shown here should be enough).
263263

264-
265264
### Event snapshots (optional)
266265

267266
A Snapshot event is a special type of data change event (data operation) defined by Nakadi.
@@ -300,7 +299,7 @@ your `application.properties` includes
300299
management.endpoints.web.exposure.include=snapshot-event-creation,your-other-endpoints,...`
301300
```
302301
and if one or more Spring Beans implement the `org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator` interface.
303-
(Note that this will automatically together with the compaction key feature mentioned above,
302+
(Note that this will automatically work together with the compaction key feature mentioned above,
304303
if you have registered a compaction key extractor matching the type of the data objects in your snapshots.)
305304
306305
The optional filter specifier of the trigger request will be passed as a string parameter to the

nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/EndToEndTestIT.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@ public class EndToEndTestIT extends BaseMockedExternalCommunicationIT {
3939
@Autowired
4040
private MockNakadiPublishingClient nakadiClient;
4141

42-
43-
4442
@BeforeEach
4543
@AfterEach
4644
public void clearNakadiEvents() {
@@ -92,7 +90,6 @@ public void compactionKeyIsNotInvented() throws IOException {
9290
is(empty()));
9391
}
9492

95-
9693
@Test
9794
public void businessEventsShouldBeSubmittedToNakadi() throws IOException {
9895
MockPayload payload = Fixture.mockPayload(1, CODE);

nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,5 +231,5 @@ public interface EventLogWriter {
231231
* parameter)
232232
*/
233233
@Transactional
234-
void fireBusinessEvents(String eventType, Collection<Object> payloads);
234+
void fireBusinessEvents(String eventType, Collection<?> payloads);
235235
}

nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterImpl.java

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public class EventLogWriterImpl implements EventLogWriter {
3030
private final ObjectMapper objectMapper;
3131
private final FlowIdComponent flowIdComponent;
3232

33-
private final Map<String, CompactionKeyExtractor> extractors;
33+
private final Map<String, CompactionKeyExtractor> extractorsByEventType;
3434

3535
public EventLogWriterImpl(EventLogRepository eventLogRepository,
3636
ObjectMapper objectMapper,
@@ -39,12 +39,21 @@ public EventLogWriterImpl(EventLogRepository eventLogRepository,
3939
this.eventLogRepository = eventLogRepository;
4040
this.objectMapper = objectMapper;
4141
this.flowIdComponent = flowIdComponent;
42-
this.extractors = keyExtractors.stream()
42+
this.extractorsByEventType = keyExtractors.stream()
4343
.collect(groupingBy(
4444
CompactionKeyExtractor::getEventType,
4545
collectingAndThen(toList(), EventLogWriterImpl::joinCompactors)));
4646
}
4747

48+
/**
49+
* Helper function (used in the constructor) to join a (non-empty) list of compaction key extractors
50+
* (for the same event type) into a single one. If that list has just one element, it is returned.
51+
* Otherwise, a new extractor is created whose retrieval method will just iterate through all the
52+
* extractors, ask them for the key and returns any that is non-empty.
53+
*
54+
* @param list a list of extractors, non-empty.
55+
* @return a single extractor based on the list which will return a key when any of the extractors returns one.
56+
*/
4857
private static CompactionKeyExtractor joinCompactors(List<CompactionKeyExtractor> list) {
4958
Preconditions.checkArgument(!list.isEmpty());
5059
if(list.size() == 1) {
@@ -59,7 +68,6 @@ private static CompactionKeyExtractor joinCompactors(List<CompactionKeyExtractor
5968
}
6069
}
6170

62-
6371
@Override
6472
@Transactional
6573
public void fireCreateEvent(final String eventType, final String dataType, final Object data) {
@@ -121,13 +129,13 @@ public void fireBusinessEvent(final String eventType, Object payload) {
121129

122130
@Override
123131
@Transactional
124-
public void fireBusinessEvents(final String eventType, final Collection<Object> payload) {
132+
public void fireBusinessEvents(final String eventType, final Collection<?> payload) {
125133
final Collection<EventLog> eventLogs = createBusinessEventLogs(eventType, payload);
126134
eventLogRepository.persist(eventLogs);
127135
}
128136

129137
private Collection<EventLog> createBusinessEventLogs(final String eventType,
130-
final Collection<Object> eventPayloads) {
138+
final Collection<?> eventPayloads) {
131139
CompactionKeyExtractor extractor = getKeyExtractorFor(eventType);
132140
return eventPayloads.stream()
133141
.map(payload -> createEventLog(eventType, payload,
@@ -142,10 +150,11 @@ private Collection<EventLog> createDataEventLogs(
142150
final Collection<?> data
143151
) {
144152
CompactionKeyExtractor extractor = getKeyExtractorFor(eventType);
153+
String dataOp = eventDataOperation.toString();
145154
return data.stream()
146155
.map(payload -> createEventLog(
147156
eventType,
148-
new DataChangeEventEnvelope(eventDataOperation.toString(), dataType, payload),
157+
new DataChangeEventEnvelope(dataOp, dataType, payload),
149158
extractor.getKeyOrNull(payload)))
150159
.collect(toList());
151160
}
@@ -161,7 +170,7 @@ private String getCompactionKeyFor(String eventType, Object payload) {
161170
}
162171

163172
private CompactionKeyExtractor getKeyExtractorFor(String eventType) {
164-
return extractors.getOrDefault(eventType, NOOP_EXTRACTOR);
173+
return extractorsByEventType.getOrDefault(eventType, NOOP_EXTRACTOR);
165174
}
166175

167176
private EventLog createEventLog(final String eventType, final Object eventPayload, String compactionKey) {

nakadi-producer/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterMultipleTypesTest.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ public void threeCompactionExtractors() {
110110
assertThat(compactionKeys, contains(equalTo("Hello"), equalTo("World"), equalTo("List?")));
111111
}
112112

113-
114113
private List<String> getPersistedCompactionKeys() {
115114
verify(eventLogRepository).persist(eventLogsCapture.capture());
116115
Collection<EventLog> eventLogs = eventLogsCapture.getValue();

nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionServiceTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,31 @@ public void testWithoutFlowId() throws JsonProcessingException {
8181
assertThat(read(events.get(0), "$.metadata"), not(hasKey("flow_id")));
8282
}
8383

84+
@Test
85+
public void testWithCompactionKey() throws JsonProcessingException {
86+
String compactionKey = "XYZ";
87+
String payloadString = mapper.writeValueAsString(Fixture.mockPayload(42, "bla"));
88+
EventLog ev = new EventLog(27, "type", payloadString, null, now(), now(), null, now().plus(5, MINUTES), compactionKey);
89+
90+
service.sendEvents(singletonList(ev));
91+
92+
List<String> events = publishingClient.getSentEvents("type");
93+
assertThat(events, hasSize(1));
94+
assertThat(read(events.get(0), "$.metadata.partition_compaction_key"), is(compactionKey));
95+
}
96+
97+
@Test
98+
public void testWithoutCompactionKey() throws JsonProcessingException {
99+
String payloadString = mapper.writeValueAsString(Fixture.mockPayload(42, "bla"));
100+
EventLog ev = new EventLog(27, "type", payloadString, null, now(), now(), null, now().plus(5, MINUTES), null);
101+
102+
service.sendEvents(singletonList(ev));
103+
104+
List<String> events = publishingClient.getSentEvents("type");
105+
assertThat(events, hasSize(1));
106+
assertThat(read(events.get(0), "$.metadata"), not(hasKey("partition_compaction_key")));
107+
}
108+
84109
@Test
85110
public void testErrorInPayloadDeserializationIsHandledGracefully() throws IOException {
86111
String payloadString = mapper.writeValueAsString(Fixture.mockPayload(42, "bla"));

0 commit comments

Comments
 (0)