Skip to content

Commit d0eb797

Browse files
committed
Merge branch '154-key-extractor-bean-experiment' into release-candidate-21.0.0-RC3b
2 parents 939fbbc + 1126252 commit d0eb797

File tree

16 files changed

+556
-156
lines changed

16 files changed

+556
-156
lines changed

README.md

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
[Nakadi](https://github.com/zalando/nakadi) is a distributed event bus that implements a RESTful API abstraction instead of Kafka-like queues.
99

10-
The goal of this Spring Boot starter is to simplify the reliable integration between event producer and Nakadi. When we send events from a transactional application, a few recurring challenges appear:
10+
The goal of **this** Spring Boot starter is to simplify the reliable integration between event producer and Nakadi. When we send events from a transactional application, a few recurring challenges appear:
1111
- we have to make sure that events from a transaction get sent, when the transaction has been committed,
1212
- we have to make sure that events from a transaction do not get sent, when the transaction has been rolled back,
1313
- we have to make sure that events get sent, even if an error occurred while sending the event,
@@ -24,7 +24,8 @@ This project is mature, used in production in some services at Zalando, and in a
2424

2525
Be aware that this library **does neither guarantee that events are sent exactly once, nor that they are sent in the order they have been persisted**. This is not a bug but a design decision that allows us to skip and retry sending events later in case of temporary failures. So make sure that your events are designed to be processed out of order (See [Rule 203 in Zalando's API guidelines](https://opensource.zalando.com/restful-api-guidelines/#203)). To help you in this matter, the library generates a *strictly monotonically increasing event id* (field `metadata/eid` in Nakadi's event object) that can be used to reconstruct the message order.
2626

27-
Unfortunately this approach is not compatible with Nakadi's compacted event types – it can happen that the last event submitted (and thus the one which will stay after compaction) is not the last event which was actually been fired. For this reason, the library currently also doesn't provide any access to Nakadi's [`partition_compaction_key`](https://nakadi.io/manual.html#definition_EventMetadata*partition_compaction_key) feature.
27+
Unfortunately this approach is fundamentally incompatible with Nakadi's compacted event types – it can happen that the last event submitted (and thus the one which will stay after compaction) is not the last event which was actually been fired.
28+
We still provide means to set the compaction key, see [compacted event types](#compacted-event-types) below.
2829

2930
## Versioning
3031

@@ -107,7 +108,7 @@ token. The easiest way to do so is to include the [Zalando Tokens library](https
107108
</dependency>
108109
```
109110

110-
This starter will detect and auto configure it.
111+
This starter will detect and autoconfigure it.
111112

112113
If your application is running in Zalando's Kubernetes environment, you have to configure the credential rotation:
113114
```yaml
@@ -158,16 +159,18 @@ nakadi-producer:
158159
```
159160

160161
#### Implement Nakadi authentication yourself
161-
If you do not use the STUPS Tokens library, you can implement token retrieval yourself by defining a Spring bean of type `org.zalando.nakadiproducer.AccessTokenProvider`. The starter will detect it and call it once for each request to retrieve the token.
162+
If you do not use the STUPS Tokens library, you can implement token retrieval yourself by defining a Spring bean of
163+
type [`AccessTokenProvider`](nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/AccessTokenProvider.java).
164+
The starter will detect it and call it once for each request to retrieve the token.
162165

163166
### Creating events
164167

165168
The typical use case for this library is to publish events like creating or updating of some objects.
166169

167-
In order to store events you can autowire the [`EventLogWriter`](src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java)
170+
In order to store events you can autowire the [`EventLogWriter`](nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/EventLogWriter.java)
168171
service and use its methods: `fireCreateEvent`, `fireUpdateEvent`, `fireDeleteEvent`, `fireSnapshotEvent` or `fireBusinessEvent`.
169172

170-
To store events in bulk the methods `fireCreateEvents`, `fireUpdateEvents`, `fireDeleteEvents`, `fireSnapshotEvents` or `fireBusinessEvents` can be used.
173+
To store several events of the same type in bulk, the methods `fireCreateEvents`, `fireUpdateEvents`, `fireDeleteEvents`, `fireSnapshotEvents` or `fireBusinessEvents` can be used.
171174

172175
You normally don't need to call `fireSnapshotEvent` directly, see below for [snapshot creation](#event-snapshots-optional).
173176

@@ -222,14 +225,51 @@ For business events, you have just two parameters, the **eventType** and the eve
222225
You usually should fire those also in the same transaction as you are storing the results of the
223226
process step the event is reporting.
224227

228+
#### Compacted event types
229+
230+
Nakadi offers a "log-compaction" feature, where each event (on an event type) has a
231+
[`partition_compaction_key`](https://nakadi.io/manual.html#definition_EventMetadata*partition_compaction_key), and
232+
Nakadi will (after delivering to live subscribers) clean up events, but leave the latest event for each
233+
compaction key available long-term.
234+
235+
This library (by design) doesn't guarantee the submission order of events – especially when there are problems
236+
on Nakadi side and some events fail (and are retried later), earlier produced events (for the same entity)
237+
can be submitted after later events. For log-compacted event types this means that an outdated event will remain
238+
in the topic for future subscribers to read.
239+
It is therefore generally **not recommended** to use this library (or any solution which doesn't guarantee the order)
240+
for sending events to a compacted event type.
241+
242+
In some cases, like when there usually are large time gaps between producing events for the same compaction key,
243+
the risk of getting events for the same key out-of-order is small.
244+
For these cases, you just can define a bean of type [`CompactionKeyExtractor`](nakadi-producer/src/main/java/org/zalando/nakadiproducer/eventlog/CompactionKeyExtractor.java) , and then all events of that event
245+
type will get sent with a compaction key.
246+
247+
```java
248+
@Configuration
249+
public class NakadiProducerConfiguration {
250+
@Bean
251+
public CompactionKeyExtractor extractorForWarehouseEvents() {
252+
return CompactionKeyExtractor.of("wholesale.warehouse-change-event",
253+
Warehouse.class, Warehouse::getCode);
254+
}
255+
}
256+
```
257+
The service class sending the event looks exactly the same as above.
258+
259+
For corner cases: You can have multiple such extractors for the same event type, any one where the class object
260+
matches the payload object (in undefined order) will be used.
261+
There are also some more factory methods with different signatures for more special cases, and you can also write
262+
your own implementation (but for the usual cases, the one shown here should be enough).
263+
264+
225265
### Event snapshots (optional)
226266

227267
A Snapshot event is a special type of data change event (data operation) defined by Nakadi.
228268
It does not represent a change of the state of a resource, but a current snapshot of its state. It can be useful to
229269
bootstrap a new consumer or to recover from inconsistencies between sender and consumer after an incident.
230270

231271
You can create snapshot events programmatically (using EventLogWriter.fireSnapshotEvent), but usually snapshot event
232-
creation is a irregular, manually triggered maintenance task.
272+
creation is an irregular, manually triggered maintenance task.
233273

234274
This library provides a Spring Boot Actuator endpoint named `snapshot_event_creation` that can be used to trigger a Snapshot for a given event type. Assuming your management port is set to `7979`,
235275

@@ -260,6 +300,9 @@ your `application.properties` includes
260300
management.endpoints.web.exposure.include=snapshot-event-creation,your-other-endpoints,...`
261301
```
262302
and if one or more Spring Beans implement the `org.zalando.nakadiproducer.snapshots.SnapshotEventGenerator` interface.
303+
(Note that this will automatically together with the compaction key feature mentioned above,
304+
if you have registered a compaction key extractor matching the type of the data objects in your snapshots.)
305+
263306
The optional filter specifier of the trigger request will be passed as a string parameter to the
264307
SnapshotEventGenerator's `generateSnapshots` method and may be null, if none is given.
265308

nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/NakadiProducerAutoConfiguration.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
2424
import org.springframework.scheduling.annotation.EnableScheduling;
2525
import org.zalando.fahrschein.NakadiClient;
26-
import org.zalando.fahrschein.NakadiClientBuilder;
2726
import org.zalando.fahrschein.http.api.ContentEncoding;
2827
import org.zalando.fahrschein.http.api.RequestFactory;
2928
import org.zalando.fahrschein.http.simple.SimpleRequestFactory;
29+
import org.zalando.nakadiproducer.eventlog.CompactionKeyExtractor;
3030
import org.zalando.nakadiproducer.eventlog.EventLogWriter;
3131
import org.zalando.nakadiproducer.eventlog.impl.EventLogRepository;
3232
import org.zalando.nakadiproducer.eventlog.impl.EventLogRepositoryImpl;
@@ -139,8 +139,8 @@ public SnapshotCreationService snapshotCreationService(
139139

140140
@Bean
141141
public EventLogWriter eventLogWriter(EventLogRepository eventLogRepository, ObjectMapper objectMapper,
142-
FlowIdComponent flowIdComponent) {
143-
return new EventLogWriterImpl(eventLogRepository, objectMapper, flowIdComponent);
142+
FlowIdComponent flowIdComponent, List<CompactionKeyExtractor> extractorList) {
143+
return new EventLogWriterImpl(eventLogRepository, objectMapper, flowIdComponent, extractorList);
144144
}
145145

146146
@Bean

nakadi-producer-spring-boot-starter/src/main/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryImpl.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,16 +87,17 @@ public void persist(Collection<EventLog> eventLogs) {
8787
namedParameterMap.addValue("lastModified", now);
8888
namedParameterMap.addValue("lockedBy", eventLog.getLockedBy());
8989
namedParameterMap.addValue("lockedUntil", eventLog.getLockedUntil());
90+
namedParameterMap.addValue("compactionKey", eventLog.getCompactionKey());
9091
return namedParameterMap;
9192
})
9293
.toArray(MapSqlParameterSource[]::new);
9394

9495
jdbcTemplate.batchUpdate(
9596
"INSERT INTO " +
9697
" nakadi_events.event_log " +
97-
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until) " +
98+
" (event_type, event_body_data, flow_id, created, last_modified, locked_by, locked_until, compaction_key)" +
9899
"VALUES " +
99-
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil)",
100+
" (:eventType, :eventBodyData, :flowId, :created, :lastModified, :lockedBy, :lockedUntil, :compactionKey)",
100101
namedParameterMaps
101102
);
102103
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
ALTER TABLE nakadi_events.event_log
2+
ADD COLUMN compaction_key TEXT NULL
3+
;

nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/EndToEndTestIT.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
import static com.jayway.jsonpath.Criteria.where;
44
import static com.jayway.jsonpath.JsonPath.read;
55
import static org.hamcrest.MatcherAssert.assertThat;
6-
import static org.hamcrest.Matchers.empty;
7-
import static org.hamcrest.Matchers.is;
6+
import static org.hamcrest.Matchers.*;
87

98
import java.io.IOException;
109
import java.util.List;
@@ -13,17 +12,23 @@
1312
import org.junit.jupiter.api.BeforeEach;
1413
import org.junit.jupiter.api.Test;
1514
import org.springframework.beans.factory.annotation.Autowired;
15+
import org.springframework.context.annotation.Bean;
16+
import org.springframework.test.context.ContextConfiguration;
17+
import org.zalando.nakadiproducer.eventlog.CompactionKeyExtractor;
1618
import org.zalando.nakadiproducer.eventlog.EventLogWriter;
1719
import org.zalando.nakadiproducer.transmission.MockNakadiPublishingClient;
1820
import org.zalando.nakadiproducer.transmission.impl.EventTransmitter;
1921
import org.zalando.nakadiproducer.util.Fixture;
2022
import org.zalando.nakadiproducer.util.MockPayload;
2123

24+
@ContextConfiguration(classes = EndToEndTestIT.Config.class)
2225
public class EndToEndTestIT extends BaseMockedExternalCommunicationIT {
2326
private static final String MY_DATA_CHANGE_EVENT_TYPE = "myDataChangeEventType";
27+
private static final String SECOND_DATA_CHANGE_EVENT_TYPE = "secondDataChangeEventType";
2428
private static final String MY_BUSINESS_EVENT_TYPE = "myBusinessEventType";
2529
public static final String PUBLISHER_DATA_TYPE = "nakadi:some-publisher";
2630
private static final String CODE = "code123";
31+
public static final String COMPACTION_KEY = "Hello World";
2732

2833
@Autowired
2934
private EventLogWriter eventLogWriter;
@@ -34,6 +39,8 @@ public class EndToEndTestIT extends BaseMockedExternalCommunicationIT {
3439
@Autowired
3540
private MockNakadiPublishingClient nakadiClient;
3641

42+
43+
3744
@BeforeEach
3845
@AfterEach
3946
public void clearNakadiEvents() {
@@ -55,6 +62,37 @@ public void dataEventsShouldBeSubmittedToNakadi() throws IOException {
5562
assertThat(read(value.get(0), "$.data.code"), is(CODE));
5663
}
5764

65+
@Test
66+
public void compactionKeyIsPreserved() throws IOException {
67+
MockPayload payload = Fixture.mockPayload(1, CODE);
68+
eventLogWriter.fireDeleteEvent(SECOND_DATA_CHANGE_EVENT_TYPE, PUBLISHER_DATA_TYPE, payload);
69+
eventLogWriter.fireBusinessEvent(MY_BUSINESS_EVENT_TYPE, payload);
70+
71+
eventTransmitter.sendEvents();
72+
73+
List<String> dataEvents = nakadiClient.getSentEvents(SECOND_DATA_CHANGE_EVENT_TYPE);
74+
assertThat(dataEvents.size(), is(1));
75+
assertThat(read(dataEvents.get(0), "$.metadata.partition_compaction_key"), is(COMPACTION_KEY));
76+
77+
List<String> businessEvents = nakadiClient.getSentEvents(MY_BUSINESS_EVENT_TYPE);
78+
assertThat(businessEvents.size(), is(1));
79+
assertThat(read(businessEvents.get(0), "$.metadata.partition_compaction_key"), is(CODE));
80+
}
81+
82+
@Test
83+
public void compactionKeyIsNotInvented() throws IOException {
84+
MockPayload payload = Fixture.mockPayload(1, CODE);
85+
eventLogWriter.fireDeleteEvent(MY_DATA_CHANGE_EVENT_TYPE, PUBLISHER_DATA_TYPE, payload);
86+
87+
eventTransmitter.sendEvents();
88+
List<String> value = nakadiClient.getSentEvents(MY_DATA_CHANGE_EVENT_TYPE);
89+
90+
assertThat(value.size(), is(1));
91+
assertThat(read(value.get(0), "$.metadata[?]", where("partition_compaction_key").exists(true)),
92+
is(empty()));
93+
}
94+
95+
5896
@Test
5997
public void businessEventsShouldBeSubmittedToNakadi() throws IOException {
6098
MockPayload payload = Fixture.mockPayload(1, CODE);
@@ -75,4 +113,16 @@ public void businessEventsShouldBeSubmittedToNakadi() throws IOException {
75113
assertThat(read(value.get(0), "$[?]", where("data_type").exists(true)), is(empty()));
76114
assertThat(read(value.get(0), "$[?]", where("data").exists(true)), is(empty()));
77115
}
116+
117+
public static class Config {
118+
@Bean
119+
public CompactionKeyExtractor compactionKeyExtractorForSecondDataEventType() {
120+
return CompactionKeyExtractor.of(SECOND_DATA_CHANGE_EVENT_TYPE, MockPayload.class, m -> COMPACTION_KEY);
121+
}
122+
123+
@Bean
124+
public CompactionKeyExtractor keyExtractorForBusinessEventType() {
125+
return CompactionKeyExtractor.of(MY_BUSINESS_EVENT_TYPE, MockPayload.class, MockPayload::getCode);
126+
}
127+
}
78128
}

nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogRepositoryIT.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,19 +38,22 @@ public class EventLogRepositoryIT extends BaseMockedExternalCommunicationIT {
3838

3939
private final String WAREHOUSE_EVENT_TYPE = "wholesale.warehouse-change-event";
4040

41+
public static final String COMPACTION_KEY = "COMPACTED";
42+
4143
@BeforeEach
4244
public void setUp() {
4345
eventLogRepository.deleteAll();
4446

4547
final EventLog eventLog = EventLog.builder()
4648
.eventBodyData(WAREHOUSE_EVENT_BODY_DATA)
4749
.eventType(WAREHOUSE_EVENT_TYPE)
50+
.compactionKey(COMPACTION_KEY)
4851
.flowId("FLOW_ID").build();
4952
eventLogRepository.persist(eventLog);
5053
}
5154

5255
@Test
53-
public void findEventRepositoryId() {
56+
public void testFindEventInRepositoryById() {
5457
Integer id = jdbcTemplate.queryForObject(
5558
"SELECT id FROM nakadi_events.event_log WHERE flow_id = 'FLOW_ID'",
5659
Integer.class);
@@ -61,6 +64,7 @@ public void findEventRepositoryId() {
6164
private void compareWithPersistedEvent(final EventLog eventLog) {
6265
assertThat(eventLog.getEventBodyData(), is(WAREHOUSE_EVENT_BODY_DATA));
6366
assertThat(eventLog.getEventType(), is(WAREHOUSE_EVENT_TYPE));
67+
assertThat(eventLog.getCompactionKey(), is(COMPACTION_KEY));
6468
}
6569

6670
}

0 commit comments

Comments
 (0)