Skip to content

Commit 38734c0

Browse files
authored
feat: nack support for udsource (#204)
1 parent 4adc44b commit 38734c0

File tree

20 files changed

+359
-113
lines changed

20 files changed

+359
-113
lines changed
Lines changed: 50 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,23 @@
11
package io.numaproj.numaflow.examples.source.simple;
22

3-
import com.google.common.primitives.Longs;
43
import io.numaproj.numaflow.sourcer.AckRequest;
54
import io.numaproj.numaflow.sourcer.Message;
5+
import io.numaproj.numaflow.sourcer.NackRequest;
66
import io.numaproj.numaflow.sourcer.Offset;
77
import io.numaproj.numaflow.sourcer.OutputObserver;
88
import io.numaproj.numaflow.sourcer.ReadRequest;
99
import io.numaproj.numaflow.sourcer.Server;
1010
import io.numaproj.numaflow.sourcer.Sourcer;
1111
import lombok.extern.slf4j.Slf4j;
1212

13+
import java.nio.ByteBuffer;
1314
import java.time.Instant;
1415
import java.util.HashMap;
1516
import java.util.List;
1617
import java.util.Map;
1718
import java.util.UUID;
1819
import java.util.concurrent.ConcurrentHashMap;
20+
import java.util.concurrent.atomic.AtomicInteger;
1921

2022
/**
2123
* SimpleSource is a simple implementation of Sourcer.
@@ -26,8 +28,9 @@
2628

2729
@Slf4j
2830
public class SimpleSource extends Sourcer {
29-
private final Map<Long, Boolean> messages = new ConcurrentHashMap<>();
30-
private long readIndex = 0;
31+
private final Map<Integer, Boolean> yetToBeAcked = new ConcurrentHashMap<>();
32+
Map<Integer, Boolean> nacked = new ConcurrentHashMap<>();
33+
private final AtomicInteger readIndex = new AtomicInteger(0);
3134

3235
public static void main(String[] args) throws Exception {
3336
Server server = new Server(new SimpleSource());
@@ -42,7 +45,18 @@ public static void main(String[] args) throws Exception {
4245
@Override
4346
public void read(ReadRequest request, OutputObserver observer) {
4447
long startTime = System.currentTimeMillis();
45-
if (messages.entrySet().size() > 0) {
48+
49+
// if there are messages which got nacked, we should read them first.
50+
if (!nacked.isEmpty()) {
51+
for (int i = 0; i < nacked.size(); i++) {
52+
Integer index = readIndex.incrementAndGet();
53+
yetToBeAcked.put(index, true);
54+
observer.send(constructMessage(index));
55+
}
56+
nacked.clear();
57+
}
58+
59+
if (!yetToBeAcked.isEmpty()) {
4660
// if there are messages not acknowledged, return
4761
return;
4862
}
@@ -52,41 +66,57 @@ public void read(ReadRequest request, OutputObserver observer) {
5266
return;
5367
}
5468

55-
Map<String, String> headers = new HashMap<>();
56-
headers.put("x-txn-id", UUID.randomUUID().toString());
57-
58-
// create a message with increasing offset
59-
Offset offset = new Offset(Longs.toByteArray(readIndex));
60-
Message message = new Message(
61-
Long.toString(readIndex).getBytes(),
62-
offset,
63-
Instant.now(),
64-
headers);
69+
Integer index = readIndex.incrementAndGet();
6570
// send the message to the observer
66-
observer.send(message);
71+
observer.send(constructMessage(index));
6772
// keep track of the messages read and not acknowledged
68-
messages.put(readIndex, true);
69-
readIndex += 1;
73+
yetToBeAcked.put(index, true);
7074
}
7175
}
7276

7377
@Override
7478
public void ack(AckRequest request) {
7579
for (Offset offset : request.getOffsets()) {
76-
Long decoded_offset = Longs.fromByteArray(offset.getValue());
80+
Integer decoded_offset = ByteBuffer.wrap(offset.getValue()).getInt();
7781
// remove the acknowledged messages from the map
78-
messages.remove(decoded_offset);
82+
yetToBeAcked.remove(decoded_offset);
83+
}
84+
}
85+
86+
@Override
87+
public void nack(NackRequest request) {
88+
// put them to nacked offsets so that they will be retried immediately.
89+
for (Offset offset : request.getOffsets()) {
90+
Integer decoded_offset = ByteBuffer.wrap(offset.getValue()).getInt();
91+
yetToBeAcked.remove(decoded_offset);
92+
nacked.put(decoded_offset, true);
93+
readIndex.decrementAndGet();
7994
}
8095
}
8196

8297
@Override
8398
public long getPending() {
8499
// number of messages not acknowledged yet
85-
return messages.size();
100+
return yetToBeAcked.size();
86101
}
87102

88103
@Override
89104
public List<Integer> getPartitions() {
90105
return Sourcer.defaultPartitions();
91106
}
107+
108+
private Message constructMessage(Integer readIndex) {
109+
Map<String, String> headers = new HashMap<>();
110+
headers.put("x-txn-id", UUID.randomUUID().toString());
111+
112+
// create a message with increasing offset
113+
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
114+
buffer.putInt(readIndex);
115+
Offset offset = new Offset(buffer.array());
116+
return new Message(
117+
Integer.toString(readIndex).getBytes(),
118+
offset,
119+
Instant.now(),
120+
headers);
121+
}
92122
}

src/main/java/io/numaproj/numaflow/accumulator/Service.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,9 @@ static void handleFailure(
3838
try {
3939
failureFuture.get();
4040
} catch (Exception e) {
41-
e.printStackTrace();
41+
log.error("Exception occurred while performing accumulator operation", e);
4242
// Build gRPC Status
43-
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
44-
.setCode(Code.INTERNAL.getNumber())
45-
.setMessage(
46-
ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
47-
.addDetails(Any.pack(DebugInfo.newBuilder()
48-
.setDetail(ExceptionUtils.getStackTrace(e))
49-
.build()))
50-
.build();
43+
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
5144
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
5245
}
5346
}).start();

src/main/java/io/numaproj/numaflow/batchmapper/Service.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -103,16 +103,9 @@ public void onNext(MapOuterClass.MapRequest mapRequest) {
103103
}
104104
} catch (Exception e) {
105105
log.error("Encountered an error in batch map onNext", e);
106-
shutdownSignal.completeExceptionally(e);
107-
// Build gRPC Status
108-
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
109-
.setCode(Code.INTERNAL.getNumber())
110-
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
111-
.addDetails(Any.pack(DebugInfo.newBuilder()
112-
.setDetail(ExceptionUtils.getStackTrace(e))
113-
.build()))
114-
.build();
106+
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
115107
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
108+
shutdownSignal.completeExceptionally(e);
116109
}
117110
}
118111

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,7 @@ private void handleFailure(Exception e) {
112112
// only send the very first exception to the client
113113
// one exception should trigger a container restart
114114
// Build gRPC Status
115-
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
116-
.setCode(Code.INTERNAL.getNumber())
117-
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
118-
.addDetails(Any.pack(DebugInfo.newBuilder()
119-
.setDetail(ExceptionUtils.getStackTrace(e))
120-
.build()))
121-
.build();
115+
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
122116
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
123117
}
124118
activeMapperCount--;

src/main/java/io/numaproj/numaflow/mapstreamer/MapStreamSupervisorActor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,10 @@
88
import akka.actor.SupervisorStrategy;
99
import akka.japi.pf.DeciderBuilder;
1010
import io.grpc.Status;
11+
import io.grpc.protobuf.StatusProto;
1112
import io.grpc.stub.StreamObserver;
1213
import io.numaproj.numaflow.map.v1.MapOuterClass;
14+
import io.numaproj.numaflow.shared.ExceptionUtils;
1315
import lombok.extern.slf4j.Slf4j;
1416

1517
import java.util.Optional;
@@ -110,13 +112,11 @@ public Receive createReceive() {
110112
}
111113

112114
private void handleFailure(Exception e) {
113-
getContext().getSystem().log().error("Encountered error in mapStreamFn", e);
115+
getContext().getSystem().log().error("Encountered error in mapStreamFn {}", e);
114116
if (userException == null) {
115117
userException = e;
116-
responseObserver.onError(Status.INTERNAL
117-
.withDescription(e.getMessage())
118-
.withCause(e)
119-
.asException());
118+
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
119+
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
120120
}
121121
activeMapStreamersCount--;
122122
}

src/main/java/io/numaproj/numaflow/reducer/Service.java

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,9 @@ static void handleFailure(
4141
try {
4242
failureFuture.get();
4343
} catch (Exception e) {
44-
e.printStackTrace();
45-
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
46-
.setCode(Code.INTERNAL.getNumber())
47-
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
48-
.addDetails(Any.pack(DebugInfo.newBuilder()
49-
.setDetail(ExceptionUtils.getStackTrace(e))
50-
.build()))
51-
.build();
44+
log.error("Exception occurred while performing reduce operation", e);
45+
// Build gRPC Status
46+
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
5247
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
5348
}
5449
}).start();

src/main/java/io/numaproj/numaflow/reducestreamer/Service.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,9 @@ static void handleFailure(
4242
try {
4343
failureFuture.get();
4444
} catch (Exception e) {
45-
e.printStackTrace();
46-
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
47-
.setCode(Code.INTERNAL.getNumber())
48-
.setMessage(
49-
ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
50-
.addDetails(Any.pack(DebugInfo.newBuilder()
51-
.setDetail(ExceptionUtils.getStackTrace(e))
52-
.build()))
53-
.build();
45+
log.error("Exception occurred while performing reduce streaming operation", e);
46+
// Build gRPC Status
47+
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
5448
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
5549
}
5650
}).start();

src/main/java/io/numaproj/numaflow/sessionreducer/Service.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,15 +42,9 @@ static void handleFailure(
4242
try {
4343
failureFuture.get();
4444
} catch (Exception e) {
45-
e.printStackTrace();
46-
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
47-
.setCode(Code.INTERNAL.getNumber())
48-
.setMessage(
49-
ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
50-
.addDetails(Any.pack(DebugInfo.newBuilder()
51-
.setDetail(ExceptionUtils.getStackTrace(e))
52-
.build()))
53-
.build();
45+
log.error("Exception occurred while performing session reduce operation", e);
46+
// Build gRPC Status
47+
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
5448
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
5549
}
5650
}).start();

src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
package io.numaproj.numaflow.shared;
22

3+
import com.google.protobuf.Any;
4+
import com.google.rpc.Code;
5+
import com.google.rpc.DebugInfo;
6+
import io.grpc.Status;
7+
38
import java.io.PrintWriter;
49
import java.io.StringWriter;
510
import java.util.Objects;
@@ -41,4 +46,20 @@ public static String getExceptionErrorString() {
4146
+ Objects.requireNonNullElse(CONTAINER_NAME, "unknown-container")
4247
+ ")";
4348
}
49+
50+
/**
51+
* Builds rpc status from the user's exception.
52+
*
53+
* @param exception encountered in user's code.
54+
* @return the status constructed using the exception.
55+
*/
56+
public static com.google.rpc.Status buildStatusFromUserException(Exception exception) {
57+
return com.google.rpc.Status.newBuilder()
58+
.setCode(Code.INTERNAL.getNumber())
59+
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (exception.getMessage() != null ? exception.getMessage() : ""))
60+
.addDetails(Any.pack(DebugInfo.newBuilder()
61+
.setDetail(ExceptionUtils.getStackTrace(exception))
62+
.build()))
63+
.build();
64+
}
4465
}

src/main/java/io/numaproj/numaflow/sinker/Service.java

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -107,14 +107,7 @@ public void onNext(SinkOuterClass.SinkRequest request) {
107107
} catch (Exception e) {
108108
log.error("Encountered error in sinkFn onNext", e);
109109
// Build gRPC Status
110-
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
111-
.setCode(Code.INTERNAL.getNumber())
112-
.setMessage(ExceptionUtils.getExceptionErrorString() + ": "
113-
+ (e.getMessage() != null ? e.getMessage() : ""))
114-
.addDetails(Any.pack(DebugInfo.newBuilder()
115-
.setDetail(ExceptionUtils.getStackTrace(e))
116-
.build()))
117-
.build();
110+
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
118111
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
119112
shutdownSignal.completeExceptionally(e);
120113
}

0 commit comments

Comments
 (0)