Skip to content

Commit be4a755

Browse files
authored
Merge pull request #113 from zalando-nakadi/feature/batching-sb1
Batch event submission (#48)
2 parents a78e62d + 528d683 commit be4a755

File tree

12 files changed

+448
-37
lines changed

12 files changed

+448
-37
lines changed

nakadi-producer-spring-boot-starter/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<parent>
1111
<groupId>org.zalando</groupId>
1212
<artifactId>nakadi-producer-reactor</artifactId>
13-
<version>4.2.0</version>
13+
<version>4.3.0</version>
1414
</parent>
1515

1616
<artifactId>nakadi-producer-spring-boot-starter</artifactId>

nakadi-producer-spring-boot-starter/src/test/java/org/zalando/nakadiproducer/LockingIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ public void eventsShouldNotBeSentTwiceWhenLockExpiresDuringTransmission() {
6060
Collection<EventLog> eventLogsLockedSecond = eventTransmissionService.lockSomeEvents();
6161

6262
// when both job instances try to send their locked events
63-
eventLogsLockedFirst.forEach(eventTransmissionService::sendEvent);
64-
eventLogsLockedSecond.forEach(eventTransmissionService::sendEvent);
63+
eventTransmissionService.sendEvents(eventLogsLockedFirst);
64+
eventTransmissionService.sendEvents(eventLogsLockedSecond);
6565

6666
// Then the event should have been sent only once.
6767
List<String> value = nakadiClient.getSentEvents(MY_EVENT_TYPE);

nakadi-producer/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
<parent>
1111
<groupId>org.zalando</groupId>
1212
<artifactId>nakadi-producer-reactor</artifactId>
13-
<version>4.2.0</version>
13+
<version>4.3.0</version>
1414
</parent>
1515

1616
<artifactId>nakadi-producer</artifactId>

nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/MockNakadiPublishingClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ public MockNakadiPublishingClient(ObjectMapper objectMapper) {
2828
}
2929

3030
@Override
31-
public synchronized void publish(String eventType, List<?> nakadiEvents) {
31+
public synchronized void publish(String eventType, List<?> nakadiEvents) throws Exception {
3232
nakadiEvents.stream().map(this::transformToJson).forEach(e -> sentEvents.add(eventType, e));
3333
}
3434

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
package org.zalando.nakadiproducer.transmission.impl;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import lombok.AllArgsConstructor;
5+
import lombok.EqualsAndHashCode;
6+
import lombok.Getter;
7+
import lombok.ToString;
8+
import lombok.extern.slf4j.Slf4j;
9+
import org.zalando.nakadiproducer.eventlog.impl.EventLog;
10+
11+
import java.util.ArrayList;
12+
import java.util.List;
13+
import java.util.function.Consumer;
14+
15+
@Slf4j
16+
public class EventBatcher {
17+
18+
private static final long NAKADI_BATCH_SIZE_LIMIT_IN_BYTES = 50000000;
19+
private final ObjectMapper objectMapper;
20+
private final Consumer<List<BatchItem>> publisher;
21+
22+
private List<BatchItem> batch;
23+
private long aggregatedBatchSize;
24+
25+
public EventBatcher(ObjectMapper objectMapper, Consumer<List<BatchItem>> publisher) {
26+
this.objectMapper = objectMapper;
27+
this.publisher = publisher;
28+
29+
this.batch = new ArrayList<>();
30+
this.aggregatedBatchSize = 0;
31+
}
32+
33+
/**
34+
* Pushes one event to be published. It will be either published right now, or with some other events,
35+
* latest when calling {@link #finish()}.
36+
* @param eventLogEntry The event log entry for this event.
37+
* @param nakadiEvent The Nakadi form of the event.
38+
*/
39+
public void pushEvent(EventLog eventLogEntry, NakadiEvent nakadiEvent) {
40+
long eventSize;
41+
42+
try {
43+
eventSize = objectMapper.writeValueAsBytes(nakadiEvent).length;
44+
} catch (Exception e) {
45+
log.error("Could not serialize event {} of type {}, skipping it.", eventLogEntry.getId(), eventLogEntry.getEventType(), e);
46+
return;
47+
}
48+
49+
50+
if (!batch.isEmpty() &&
51+
(hasAnotherEventType(batch, eventLogEntry) || batchWouldBecomeTooBig(aggregatedBatchSize, eventSize))) {
52+
this.publisher.accept(batch);
53+
54+
batch = new ArrayList<>();
55+
aggregatedBatchSize = 0;
56+
}
57+
58+
batch.add(new BatchItem(eventLogEntry, nakadiEvent));
59+
aggregatedBatchSize += eventSize;
60+
}
61+
62+
/**
63+
* Publishes all events which were pushed and not yet published.
64+
*/
65+
public void finish() {
66+
if (!batch.isEmpty()) {
67+
this.publisher.accept(batch);
68+
}
69+
}
70+
71+
private boolean hasAnotherEventType(List<BatchItem> batch, EventLog event) {
72+
return !event.getEventType().equals(batch.get(0).getEventLogEntry().getEventType());
73+
}
74+
75+
private boolean batchWouldBecomeTooBig(long aggregatedBatchSize, long eventSize) {
76+
return aggregatedBatchSize + eventSize > 0.8 * NAKADI_BATCH_SIZE_LIMIT_IN_BYTES;
77+
}
78+
79+
@AllArgsConstructor
80+
@Getter
81+
@EqualsAndHashCode
82+
@ToString
83+
protected static class BatchItem {
84+
EventLog eventLogEntry;
85+
NakadiEvent nakadiEvent;
86+
}
87+
88+
}

nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmissionService.java

Lines changed: 70 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,22 +3,27 @@
33
import com.fasterxml.jackson.core.type.TypeReference;
44
import com.fasterxml.jackson.databind.ObjectMapper;
55
import lombok.extern.slf4j.Slf4j;
6+
import org.zalando.fahrschein.EventPublishingException;
7+
import org.zalando.fahrschein.domain.BatchItemResponse;
68
import org.zalando.nakadiproducer.eventlog.impl.EventLog;
79
import org.zalando.nakadiproducer.eventlog.impl.EventLogRepository;
810
import org.zalando.nakadiproducer.transmission.NakadiPublishingClient;
11+
import org.zalando.nakadiproducer.transmission.impl.EventBatcher.BatchItem;
912

1013
import javax.transaction.Transactional;
14+
1115
import java.io.IOException;
12-
import java.io.UncheckedIOException;
1316
import java.time.Clock;
1417
import java.time.Instant;
18+
import java.util.Arrays;
1519
import java.util.Collection;
16-
import java.util.HashMap;
1720
import java.util.LinkedHashMap;
21+
import java.util.List;
1822
import java.util.UUID;
23+
import java.util.stream.Collectors;
24+
import java.util.stream.Stream;
1925

2026
import static java.time.temporal.ChronoUnit.MINUTES;
21-
import static java.util.Collections.singletonList;
2227

2328
@Slf4j
2429
public class EventTransmissionService {
@@ -44,20 +49,72 @@ public Collection<EventLog> lockSomeEvents() {
4449
}
4550

4651
@Transactional
47-
public void sendEvent(EventLog eventLog) {
48-
if (lockNearlyExpired(eventLog)) {
49-
// to avoid that two instances process this event, we skip it
50-
return;
52+
public void sendEvents(Collection<EventLog> events) {
53+
EventBatcher batcher = new EventBatcher(objectMapper, this::publishBatch);
54+
55+
for (EventLog event : events) {
56+
if (lockNearlyExpired(event)) {
57+
// to avoid that two instances process this event, we skip it
58+
continue;
59+
}
60+
61+
NakadiEvent nakadiEvent;
62+
63+
try {
64+
nakadiEvent = mapToNakadiEvent(event);
65+
} catch (Exception e) {
66+
log.error("Could not serialize event {} of type {}, skipping it.", event.getId(), event.getEventType(), e);
67+
continue;
68+
}
69+
70+
batcher.pushEvent(event, nakadiEvent);
5171
}
5272

73+
batcher.finish();
74+
}
75+
76+
/**
77+
* Publishes a list of events.
78+
* All of the events in this list need to be destined for the same event type.
79+
*/
80+
private void publishBatch(List<BatchItem> batch) {
5381
try {
54-
nakadiPublishingClient.publish(eventLog.getEventType(), singletonList(mapToNakadiEvent(eventLog)));
55-
log.info("Event {} locked by {} was successfully transmitted to nakadi", eventLog.getId(), eventLog.getLockedBy());
56-
eventLogRepository.delete(eventLog);
82+
this.tryToPublishBatch(batch);
5783
} catch (Exception e) {
58-
log.error("Event {} locked by {} could not be transmitted to nakadi: {}", eventLog.getId(), eventLog.getLockedBy(), e.getMessage());
84+
log.error("Could not send {} events of type {}, skipping them.", batch.size(), batch.get(0).getEventLogEntry().getEventType(), e);
85+
}
86+
}
87+
88+
/**
89+
* Tries to publish a set of events (all of which need to belong to the same event type).
90+
* The successful ones will be deleted from the database.
91+
*/
92+
private void tryToPublishBatch(List<BatchItem> batch) throws Exception {
93+
Stream<EventLog> successfulEvents;
94+
String eventType = batch.get(0).getEventLogEntry().getEventType();
95+
try {
96+
nakadiPublishingClient.publish(
97+
eventType,
98+
batch.stream()
99+
.map(BatchItem::getNakadiEvent)
100+
.collect(Collectors.toList())
101+
);
102+
successfulEvents = batch.stream().map(BatchItem::getEventLogEntry);
103+
log.info("Sent {} events of type {}.", batch.size(), eventType);
104+
} catch (EventPublishingException e) {
105+
log.error("{} out of {} events of type {} failed to be sent.", e.getResponses().length, batch.size(), eventType);
106+
List<String> failedEids = collectEids(e);
107+
successfulEvents =
108+
batch.stream()
109+
.map(BatchItem::getEventLogEntry)
110+
.filter(rawEvent -> !failedEids.contains(convertToUUID(rawEvent.getId())));
59111
}
60112

113+
successfulEvents.forEach(eventLogRepository::delete);
114+
}
115+
116+
private List<String> collectEids(EventPublishingException e) {
117+
return Arrays.stream(e.getResponses()).map(BatchItemResponse::getEid).collect(Collectors.toList());
61118
}
62119

63120
private boolean lockNearlyExpired(EventLog eventLog) {
@@ -67,7 +124,7 @@ private boolean lockNearlyExpired(EventLog eventLog) {
67124
return now().isAfter(eventLog.getLockedUntil().minus(1, MINUTES));
68125
}
69126

70-
public NakadiEvent mapToNakadiEvent(final EventLog event) {
127+
private NakadiEvent mapToNakadiEvent(final EventLog event) throws IOException {
71128
final NakadiEvent nakadiEvent = new NakadiEvent();
72129

73130
final NakadiMetadata metadata = new NakadiMetadata();
@@ -76,13 +133,7 @@ public NakadiEvent mapToNakadiEvent(final EventLog event) {
76133
metadata.setFlowId(event.getFlowId());
77134
nakadiEvent.setMetadata(metadata);
78135

79-
HashMap<String, Object> payloadDTO;
80-
try {
81-
payloadDTO = objectMapper.readValue(event.getEventBodyData(), new TypeReference<LinkedHashMap<String, Object>>() { });
82-
} catch (IOException e) {
83-
log.error("An error occurred at JSON deserialization", e);
84-
throw new UncheckedIOException(e);
85-
}
136+
LinkedHashMap<String, Object> payloadDTO = objectMapper.readValue(event.getEventBodyData(), new TypeReference<LinkedHashMap<String, Object>>() { });
86137

87138
nakadiEvent.setData(payloadDTO);
88139

@@ -106,5 +157,4 @@ private String convertToUUID(final int number) {
106157
return new UUID(0, number).toString();
107158
}
108159

109-
110160
}

nakadi-producer/src/main/java/org/zalando/nakadiproducer/transmission/impl/EventTransmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,6 @@ public EventTransmitter(EventTransmissionService eventTransmissionService) {
88
}
99

1010
public void sendEvents() {
11-
eventTransmissionService.lockSomeEvents().forEach(eventTransmissionService::sendEvent);
11+
eventTransmissionService.sendEvents(eventTransmissionService.lockSomeEvents());
1212
}
1313
}

nakadi-producer/src/test/java/org/zalando/nakadiproducer/eventlog/impl/EventLogWriterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
import org.mockito.ArgumentCaptor;
1414
import org.mockito.Captor;
1515
import org.mockito.Mock;
16-
import org.mockito.runners.MockitoJUnitRunner;
16+
import org.mockito.junit.MockitoJUnitRunner;
1717
import org.zalando.nakadiproducer.flowid.FlowIdComponent;
1818
import org.zalando.nakadiproducer.util.Fixture;
1919
import org.zalando.nakadiproducer.util.MockPayload;

nakadi-producer/src/test/java/org/zalando/nakadiproducer/transmission/MockNakadiPublishingClientTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,15 @@ public void returnsEmptyResultIfNoEventsHaveBeenSent() {
2727
}
2828

2929
@Test
30-
public void returnsOnlyThoseEventsOfTheGivenType() {
30+
public void returnsOnlyThoseEventsOfTheGivenType() throws Exception {
3131
mockNakadiPublishingClient.publish(MY_EVENT_TYPE, singletonList(new Event("anEvent")));
3232
mockNakadiPublishingClient.publish(OTHER_EVENT_TYPE, singletonList(new Event("anotherEvent")));
3333

3434
assertThat(mockNakadiPublishingClient.getSentEvents(MY_EVENT_TYPE), contains("{\"attribute\":\"anEvent\"}"));
3535
}
3636

3737
@Test
38-
public void concatenatesSubsequentlyPublishedEventLists() {
38+
public void concatenatesSubsequentlyPublishedEventLists() throws Exception {
3939
mockNakadiPublishingClient.publish(MY_EVENT_TYPE,
4040
asList(new Event("event1"), new Event("event2"))
4141
);
@@ -55,7 +55,7 @@ public void concatenatesSubsequentlyPublishedEventLists() {
5555
}
5656

5757
@Test
58-
public void deletesAllEventsOnClear() {
58+
public void deletesAllEventsOnClear() throws Exception {
5959
mockNakadiPublishingClient.publish(MY_EVENT_TYPE, singletonList(new Event("event1")));
6060
mockNakadiPublishingClient.clearSentEvents();
6161

0 commit comments

Comments
 (0)