Skip to content

Commit 134414a

Browse files
authored
chore: Batch ack requests and sink responses for better performance (#149)
Signed-off-by: Yashash H L <[email protected]>
1 parent 0a803d7 commit 134414a

File tree

12 files changed

+72
-59
lines changed

12 files changed

+72
-59
lines changed

examples/src/main/java/io/numaproj/numaflow/examples/source/simple/SimpleSource.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -72,9 +72,11 @@ public void read(ReadRequest request, OutputObserver observer) {
7272

7373
@Override
7474
public void ack(AckRequest request) {
75-
Long offset = Longs.fromByteArray(request.getOffset().getValue());
76-
// remove the acknowledged messages from the map
77-
messages.remove(offset);
75+
for (Offset offset : request.getOffsets()) {
76+
Long decoded_offset = Longs.fromByteArray(offset.getValue());
77+
// remove the acknowledged messages from the map
78+
messages.remove(decoded_offset);
79+
}
7880
}
7981

8082
@Override

examples/src/test/java/io/numaproj/numaflow/examples/source/simple/SimpleSourceTest.java

+3-5
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,9 @@ public void test_ReadAndAck() {
3939
offsets.add(message.getOffset());
4040
}
4141

42-
for (Offset offset : offsets) {
43-
SourcerTestKit.TestAckRequest ackRequest = SourcerTestKit.TestAckRequest.builder()
44-
.offset(offset).build();
45-
simpleSource.ack(ackRequest);
46-
}
42+
SourcerTestKit.TestAckRequest ackRequest = SourcerTestKit.TestAckRequest.builder()
43+
.offsets(offsets).build();
44+
simpleSource.ack(ackRequest);
4745

4846
// Try reading 6 more messages
4947
// Since the previous batch got acked, the data source should allow us to read more messages

src/main/java/io/numaproj/numaflow/sinker/Service.java

+10-11
Original file line numberDiff line numberDiff line change
@@ -73,10 +73,11 @@ public void onNext(SinkOuterClass.SinkRequest request) {
7373
datumStream.writeMessage(HandlerDatum.EOF_DATUM);
7474

7575
ResponseList responses = result.join();
76-
responses.getResponses().forEach(response -> {
77-
SinkOuterClass.SinkResponse sinkResponse = buildResponse(response);
78-
responseObserver.onNext(sinkResponse);
79-
});
76+
SinkOuterClass.SinkResponse.Builder responseBuilder = SinkOuterClass.SinkResponse.newBuilder();
77+
for (Response response : responses.getResponses()) {
78+
responseBuilder.addResults(buildResult(response));
79+
}
80+
responseObserver.onNext(responseBuilder.build());
8081

8182
// send eot response to indicate end of transmission for the batch
8283
SinkOuterClass.SinkResponse eotResponse = SinkOuterClass.SinkResponse
@@ -113,15 +114,13 @@ public void onCompleted() {
113114
};
114115
}
115116

116-
private SinkOuterClass.SinkResponse buildResponse(Response response) {
117+
private SinkOuterClass.SinkResponse.Result buildResult(Response response) {
117118
SinkOuterClass.Status status = response.getFallback() ? SinkOuterClass.Status.FALLBACK :
118119
response.getSuccess() ? SinkOuterClass.Status.SUCCESS : SinkOuterClass.Status.FAILURE;
119-
return SinkOuterClass.SinkResponse.newBuilder()
120-
.setResult(SinkOuterClass.SinkResponse.Result.newBuilder()
121-
.setId(response.getId() == null ? "" : response.getId())
122-
.setErrMsg(response.getErr() == null ? "" : response.getErr())
123-
.setStatus(status)
124-
.build())
120+
return SinkOuterClass.SinkResponse.Result.newBuilder()
121+
.setId(response.getId() == null ? "" : response.getId())
122+
.setErrMsg(response.getErr() == null ? "" : response.getErr())
123+
.setStatus(status)
125124
.build();
126125
}
127126

src/main/java/io/numaproj/numaflow/sinker/SinkerTestKit.java

+13-10
Original file line numberDiff line numberDiff line change
@@ -197,19 +197,22 @@ public void onCompleted() {
197197
if (result.getHandshake().getSot()) {
198198
continue;
199199
}
200+
200201
if (result.hasStatus() && result.getStatus().getEot()) {
201202
continue;
202203
}
203-
if (result.getResult().getStatus() == SinkOuterClass.Status.SUCCESS) {
204-
responseListBuilder.addResponse(Response.responseOK(result
205-
.getResult()
206-
.getId()));
207-
} else if (result.getResult().getStatus() == SinkOuterClass.Status.FALLBACK) {
208-
responseListBuilder.addResponse(Response.responseFallback(
209-
result.getResult().getId()));
210-
} else {
211-
responseListBuilder.addResponse(Response.responseFailure(
212-
result.getResult().getId(), result.getResult().getErrMsg()));
204+
205+
for (SinkOuterClass.SinkResponse.Result response : result.getResultsList()) {
206+
if (response.getStatus() == SinkOuterClass.Status.SUCCESS) {
207+
responseListBuilder.addResponse(Response.responseOK(response
208+
.getId()));
209+
} else if (response.getStatus() == SinkOuterClass.Status.FALLBACK) {
210+
responseListBuilder.addResponse(Response.responseFallback(
211+
response.getId()));
212+
} else {
213+
responseListBuilder.addResponse(Response.responseFailure(
214+
response.getId(), response.getErrMsg()));
215+
}
213216
}
214217
}
215218

Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package io.numaproj.numaflow.sourcer;
22

33

4+
import java.util.List;
5+
46
/**
57
* AckRequest request for acknowledging messages.
68
*/
79
public interface AckRequest {
810
/**
9-
* @return the offset to be acknowledged
11+
* @return the list of offsets to be acknowledged
1012
*/
11-
Offset getOffset();
13+
List<Offset> getOffsets();
1214
}

src/main/java/io/numaproj/numaflow/sourcer/AckRequestImpl.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,17 @@
22

33
import lombok.AllArgsConstructor;
44

5+
import java.util.List;
6+
57
/**
68
* AckRequestImpl is the implementation of AckRequest.
79
*/
810
@AllArgsConstructor
911
class AckRequestImpl implements AckRequest {
10-
private final Offset offset;
12+
private final List<Offset> offsets;
1113

1214
@Override
13-
public Offset getOffset() {
14-
return this.offset;
15+
public List<Offset> getOffsets() {
16+
return this.offsets;
1517
}
1618
}

src/main/java/io/numaproj/numaflow/sourcer/Service.java

+8-5
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.numaproj.numaflow.source.v1.SourceOuterClass;
88

99
import java.time.Duration;
10+
import java.util.ArrayList;
1011
import java.util.List;
1112

1213
import static io.numaproj.numaflow.source.v1.SourceGrpc.getPendingFnMethod;
@@ -115,14 +116,16 @@ public void onNext(SourceOuterClass.AckRequest request) {
115116
return;
116117
}
117118

118-
SourceOuterClass.Offset offset = request.getRequest().getOffset();
119+
List<Offset> offsets = new ArrayList<>(request.getRequest().getOffsetsCount());
120+
for (SourceOuterClass.Offset offset : request.getRequest().getOffsetsList()) {
121+
offsets.add(new Offset(
122+
offset.getOffset().toByteArray(),
123+
offset.getPartitionId()));
124+
}
119125

120-
AckRequestImpl ackRequest = new AckRequestImpl(new Offset(
121-
offset.getOffset().toByteArray(),
122-
offset.getPartitionId()));
126+
AckRequestImpl ackRequest = new AckRequestImpl(offsets);
123127

124128
// invoke the sourcer's ack method
125-
126129
sourcer.ack(ackRequest);
127130

128131
// send an ack response to the client after acking the message

src/main/java/io/numaproj/numaflow/sourcer/SourcerTestKit.java

+9-7
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.util.List;
1717
import java.util.concurrent.CompletableFuture;
1818
import java.util.concurrent.TimeUnit;
19+
import java.util.stream.Collectors;
1920

2021
/**
2122
* SourcerTestKit is a test kit for testing Sourcer implementations.
@@ -165,14 +166,15 @@ public void onCompleted() {
165166
*/
166167
public void sendAckRequest(AckRequest request) throws Exception {
167168
CompletableFuture<SourceOuterClass.AckResponse> future = new CompletableFuture<>();
169+
168170
SourceOuterClass.AckRequest.Request.Builder builder = SourceOuterClass.AckRequest.Request
169171
.newBuilder()
170-
.setOffset(SourceOuterClass.Offset.newBuilder()
171-
.setOffset(com.google.protobuf.ByteString.copyFrom(request
172-
.getOffset()
173-
.getValue()))
174-
.setPartitionId(request.getOffset().getPartitionId())
175-
.build());
172+
.addAllOffsets(request.getOffsets().stream().map(
173+
offset -> SourceOuterClass.Offset.newBuilder()
174+
.setOffset(com.google.protobuf.ByteString.copyFrom(offset.getValue()))
175+
.setPartitionId(offset.getPartitionId())
176+
.build()
177+
).collect(Collectors.toList()));
176178

177179
SourceOuterClass.AckRequest grpcRequest = SourceOuterClass.AckRequest.newBuilder()
178180
.setRequest(builder.build())
@@ -288,7 +290,7 @@ public static class TestReadRequest implements ReadRequest {
288290
@Setter
289291
@Builder
290292
public static class TestAckRequest implements AckRequest {
291-
Offset offset;
293+
List<Offset> offsets;
292294
}
293295

294296
/**

src/main/proto/sink/v1/sink.proto

+1-1
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ message SinkResponse {
7979
// err_msg is the error message, set it if success is set to false.
8080
string err_msg = 3;
8181
}
82-
Result result = 1;
82+
repeated Result results = 1;
8383
optional Handshake handshake = 2;
8484
optional TransmissionStatus status = 3;
8585
}

src/main/proto/source/v1/source.proto

+2-2
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ message ReadResponse {
112112
*/
113113
message AckRequest {
114114
message Request {
115-
// Required field holding the offset to be acked
116-
Offset offset = 1;
115+
// Required field holding the offsets to be acked
116+
repeated Offset offsets = 1;
117117
}
118118
// Required field holding the request. The list will be ordered and will have the same order as the original Read response.
119119
Request request = 1;

src/test/java/io/numaproj/numaflow/sinker/ServerTest.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -114,19 +114,19 @@ public void sinkerSuccess() {
114114

115115
while (!outputStreamObserver.completed.get()) ;
116116
List<SinkOuterClass.SinkResponse> responseList = outputStreamObserver.getSinkResponse();
117-
assertEquals(111, responseList.size());
117+
assertEquals(21, responseList.size());
118118
// first response is the handshake response
119119
assertTrue(responseList.get(0).getHandshake().getSot());
120120

121121
responseList = responseList.subList(1, responseList.size());
122-
responseList.forEach(response -> {
123-
if (response.hasStatus() && response.getStatus().getEot()) {
122+
var response = responseList.get(0);
123+
response.getResultsList().forEach(result -> {
124+
if (result.getStatus() == SinkOuterClass.Status.FAILURE) {
125+
assertEquals(result.getErrMsg(), "error message");
124126
return;
125127
}
126-
assertEquals(response.getResult().getId(), expectedId);
127-
if (response.getResult().getStatus() == SinkOuterClass.Status.FAILURE) {
128-
assertEquals(response.getResult().getErrMsg(), "error message");
129-
}
128+
assertEquals(result.getId(), expectedId);
129+
assertEquals(result.getStatus(), SinkOuterClass.Status.SUCCESS);
130130
});
131131
}
132132

src/test/java/io/numaproj/numaflow/sourcer/ServerTest.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public void onNext(SourceOuterClass.ReadResponse readResponse) {
104104
.newBuilder()
105105
.getRequest()
106106
.toBuilder()
107-
.setOffset(offset)
107+
.addOffsets(offset)
108108
.build();
109109
ackRequests.add(SourceOuterClass.AckRequest
110110
.newBuilder()
@@ -243,8 +243,10 @@ public List<Integer> getPartitions() {
243243

244244
@Override
245245
public void ack(AckRequest request) {
246-
Integer offset = ByteBuffer.wrap(request.getOffset().getValue()).getInt();
247-
yetToBeAcked.remove(offset);
246+
for (Offset offset : request.getOffsets()) {
247+
Integer decoded_offset = ByteBuffer.wrap(offset.getValue()).getInt();
248+
yetToBeAcked.remove(decoded_offset);
249+
}
248250
}
249251

250252
@Override

0 commit comments

Comments
 (0)