Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
package io.numaproj.numaflow.examples.source.simple;

import com.google.common.primitives.Longs;
import io.numaproj.numaflow.sourcer.AckRequest;
import io.numaproj.numaflow.sourcer.Message;
import io.numaproj.numaflow.sourcer.NackRequest;
import io.numaproj.numaflow.sourcer.Offset;
import io.numaproj.numaflow.sourcer.OutputObserver;
import io.numaproj.numaflow.sourcer.ReadRequest;
import io.numaproj.numaflow.sourcer.Server;
import io.numaproj.numaflow.sourcer.Sourcer;
import lombok.extern.slf4j.Slf4j;

import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* SimpleSource is a simple implementation of Sourcer.
Expand All @@ -26,8 +28,9 @@

@Slf4j
public class SimpleSource extends Sourcer {
private final Map<Long, Boolean> messages = new ConcurrentHashMap<>();
private long readIndex = 0;
private final Map<Integer, Boolean> yetToBeAcked = new ConcurrentHashMap<>();
Map<Integer, Boolean> nacked = new ConcurrentHashMap<>();
private final AtomicInteger readIndex = new AtomicInteger(0);

public static void main(String[] args) throws Exception {
Server server = new Server(new SimpleSource());
Expand All @@ -42,7 +45,18 @@ public static void main(String[] args) throws Exception {
@Override
public void read(ReadRequest request, OutputObserver observer) {
long startTime = System.currentTimeMillis();
if (messages.entrySet().size() > 0) {

// if there are messages which got nacked, we should read them first.
if (!nacked.isEmpty()) {
for (int i = 0; i < nacked.size(); i++) {
Integer index = readIndex.incrementAndGet();
yetToBeAcked.put(index, true);
observer.send(constructMessage(index));
}
nacked.clear();
}

if (!yetToBeAcked.isEmpty()) {
// if there are messages not acknowledged, return
return;
}
Expand All @@ -52,41 +66,57 @@ public void read(ReadRequest request, OutputObserver observer) {
return;
}

Map<String, String> headers = new HashMap<>();
headers.put("x-txn-id", UUID.randomUUID().toString());

// create a message with increasing offset
Offset offset = new Offset(Longs.toByteArray(readIndex));
Message message = new Message(
Long.toString(readIndex).getBytes(),
offset,
Instant.now(),
headers);
Integer index = readIndex.incrementAndGet();
// send the message to the observer
observer.send(message);
observer.send(constructMessage(index));
// keep track of the messages read and not acknowledged
messages.put(readIndex, true);
readIndex += 1;
yetToBeAcked.put(index, true);
}
}

@Override
public void ack(AckRequest request) {
for (Offset offset : request.getOffsets()) {
Long decoded_offset = Longs.fromByteArray(offset.getValue());
Integer decoded_offset = ByteBuffer.wrap(offset.getValue()).getInt();
// remove the acknowledged messages from the map
messages.remove(decoded_offset);
yetToBeAcked.remove(decoded_offset);
}
}

@Override
public void nack(NackRequest request) {
// put them to nacked offsets so that they will be retried immediately.
for (Offset offset : request.getOffsets()) {
Integer decoded_offset = ByteBuffer.wrap(offset.getValue()).getInt();
yetToBeAcked.remove(decoded_offset);
nacked.put(decoded_offset, true);
readIndex.decrementAndGet();
}
}

@Override
public long getPending() {
// number of messages not acknowledged yet
return messages.size();
return yetToBeAcked.size();
}

@Override
public List<Integer> getPartitions() {
return Sourcer.defaultPartitions();
}

private Message constructMessage(Integer readIndex) {
Map<String, String> headers = new HashMap<>();
headers.put("x-txn-id", UUID.randomUUID().toString());

// create a message with increasing offset
ByteBuffer buffer = ByteBuffer.allocate(Integer.BYTES);
buffer.putInt(readIndex);
Offset offset = new Offset(buffer.array());
return new Message(
Integer.toString(readIndex).getBytes(),
offset,
Instant.now(),
headers);
}
}
11 changes: 2 additions & 9 deletions src/main/java/io/numaproj/numaflow/accumulator/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,16 +38,9 @@ static void handleFailure(
try {
failureFuture.get();
} catch (Exception e) {
e.printStackTrace();
log.error("Exception occurred while performing accumulator operation", e);
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(
ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
}).start();
Expand Down
11 changes: 2 additions & 9 deletions src/main/java/io/numaproj/numaflow/batchmapper/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,16 +103,9 @@ public void onNext(MapOuterClass.MapRequest mapRequest) {
}
} catch (Exception e) {
log.error("Encountered an error in batch map onNext", e);
shutdownSignal.completeExceptionally(e);
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
shutdownSignal.completeExceptionally(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,7 @@ private void handleFailure(Exception e) {
// only send the very first exception to the client
// one exception should trigger a container restart
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
activeMapperCount--;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
import akka.actor.SupervisorStrategy;
import akka.japi.pf.DeciderBuilder;
import io.grpc.Status;
import io.grpc.protobuf.StatusProto;
import io.grpc.stub.StreamObserver;
import io.numaproj.numaflow.map.v1.MapOuterClass;
import io.numaproj.numaflow.shared.ExceptionUtils;
import lombok.extern.slf4j.Slf4j;

import java.util.Optional;
Expand Down Expand Up @@ -110,13 +112,11 @@ public Receive createReceive() {
}

private void handleFailure(Exception e) {
getContext().getSystem().log().error("Encountered error in mapStreamFn", e);
getContext().getSystem().log().error("Encountered error in mapStreamFn {}", e);
if (userException == null) {
userException = e;
responseObserver.onError(Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asException());
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
activeMapStreamersCount--;
}
Expand Down
11 changes: 3 additions & 8 deletions src/main/java/io/numaproj/numaflow/reducer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,9 @@ static void handleFailure(
try {
failureFuture.get();
} catch (Exception e) {
e.printStackTrace();
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
log.error("Exception occurred while performing reduce operation", e);
// Build gRPC Status
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
}).start();
Expand Down
12 changes: 3 additions & 9 deletions src/main/java/io/numaproj/numaflow/reducestreamer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,9 @@ static void handleFailure(
try {
failureFuture.get();
} catch (Exception e) {
e.printStackTrace();
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(
ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
log.error("Exception occurred while performing reduce streaming operation", e);
// Build gRPC Status
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
}).start();
Expand Down
12 changes: 3 additions & 9 deletions src/main/java/io/numaproj/numaflow/sessionreducer/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,15 +42,9 @@ static void handleFailure(
try {
failureFuture.get();
} catch (Exception e) {
e.printStackTrace();
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(
ExceptionUtils.getExceptionErrorString() + ": " + (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
log.error("Exception occurred while performing session reduce operation", e);
// Build gRPC Status
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
}
}).start();
Expand Down
21 changes: 21 additions & 0 deletions src/main/java/io/numaproj/numaflow/shared/ExceptionUtils.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package io.numaproj.numaflow.shared;

import com.google.protobuf.Any;
import com.google.rpc.Code;
import com.google.rpc.DebugInfo;
import io.grpc.Status;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Objects;
Expand Down Expand Up @@ -41,4 +46,20 @@ public static String getExceptionErrorString() {
+ Objects.requireNonNullElse(CONTAINER_NAME, "unknown-container")
+ ")";
}

/**
* Builds rpc status from the user's exception.
*
* @param exception encountered in user's code.
* @return the status constructed using the exception.
*/
public static com.google.rpc.Status buildStatusFromUserException(Exception exception) {
return com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.getExceptionErrorString() + ": " + (exception.getMessage() != null ? exception.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(exception))
.build()))
.build();
}
}
9 changes: 1 addition & 8 deletions src/main/java/io/numaproj/numaflow/sinker/Service.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,7 @@ public void onNext(SinkOuterClass.SinkRequest request) {
} catch (Exception e) {
log.error("Encountered error in sinkFn onNext", e);
// Build gRPC Status
com.google.rpc.Status status = com.google.rpc.Status.newBuilder()
.setCode(Code.INTERNAL.getNumber())
.setMessage(ExceptionUtils.getExceptionErrorString() + ": "
+ (e.getMessage() != null ? e.getMessage() : ""))
.addDetails(Any.pack(DebugInfo.newBuilder()
.setDetail(ExceptionUtils.getStackTrace(e))
.build()))
.build();
com.google.rpc.Status status = ExceptionUtils.buildStatusFromUserException(e);
responseObserver.onError(StatusProto.toStatusRuntimeException(status));
shutdownSignal.completeExceptionally(e);
}
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/io/numaproj/numaflow/sourcer/NackRequest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package io.numaproj.numaflow.sourcer;


import java.util.List;

/**
* NackRequest request for negatively acknowledging messages.
*/
public interface NackRequest {
/**
* @return the list of offsets to be negatively acknowledged.
*/
List<Offset> getOffsets();
}
18 changes: 18 additions & 0 deletions src/main/java/io/numaproj/numaflow/sourcer/NackRequestImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package io.numaproj.numaflow.sourcer;

import lombok.AllArgsConstructor;

import java.util.List;

/**
* NackRequestImpl is the implementation of NackRequest.
*/
@AllArgsConstructor
class NackRequestImpl implements NackRequest {
private final List<Offset> offsets;

@Override
public List<Offset> getOffsets() {
return this.offsets;
}
}
Loading
Loading