Skip to content

Commit ea910b3

Browse files
authored
chore: gracefully shutdown when error encountered (#152)
Signed-off-by: Keran Yang <[email protected]>
1 parent 134414a commit ea910b3

File tree

27 files changed

+634
-338
lines changed

27 files changed

+634
-338
lines changed

examples/pom.xml

+18
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@
5555
<goal>dockerBuild</goal>
5656
</goals>
5757
<configuration>
58+
<from>
59+
<image>amazoncorretto:11</image>
60+
</from>
5861
<container>
5962
<mainClass>
6063
io.numaproj.numaflow.examples.batchmap.flatmap.BatchFlatMap
@@ -74,6 +77,9 @@
7477
<goal>dockerBuild</goal>
7578
</goals>
7679
<configuration>
80+
<from>
81+
<image>amazoncorretto:11</image>
82+
</from>
7783
<container>
7884
<mainClass>
7985
io.numaproj.numaflow.examples.sourcetransformer.eventtimefilter.EventTimeFilterFunction
@@ -93,6 +99,9 @@
9399
<goal>dockerBuild</goal>
94100
</goals>
95101
<configuration>
102+
<from>
103+
<image>amazoncorretto:11</image>
104+
</from>
96105
<container>
97106
<mainClass>
98107
io.numaproj.numaflow.examples.mapstream.flatmapstream.FlatMapStreamFunction
@@ -110,6 +119,9 @@
110119
<goal>dockerBuild</goal>
111120
</goals>
112121
<configuration>
122+
<from>
123+
<image>amazoncorretto:11</image>
124+
</from>
113125
<container>
114126
<mainClass>
115127
io.numaproj.numaflow.examples.map.flatmap.FlatMapFunction
@@ -195,6 +207,9 @@
195207
<goal>dockerBuild</goal>
196208
</goals>
197209
<configuration>
210+
<from>
211+
<image>amazoncorretto:11</image>
212+
</from>
198213
<container>
199214
<mainClass>
200215
io.numaproj.numaflow.examples.map.forward.ForwardFunction
@@ -266,6 +281,9 @@
266281
<goal>dockerBuild</goal>
267282
</goals>
268283
<configuration>
284+
<from>
285+
<image>amazoncorretto:11</image>
286+
</from>
269287
<container>
270288
<mainClass>
271289
io.numaproj.numaflow.examples.source.simple.SimpleSource

src/main/java/io/numaproj/numaflow/batchmapper/Server.java

+39-11
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
import io.numaproj.numaflow.info.ContainerType;
77
import io.numaproj.numaflow.info.ServerInfoAccessor;
88
import io.numaproj.numaflow.info.ServerInfoAccessorImpl;
9+
import io.numaproj.numaflow.shared.GrpcServerHelper;
910
import io.numaproj.numaflow.shared.GrpcServerUtils;
1011
import lombok.extern.slf4j.Slf4j;
1112

1213
import java.util.Collections;
14+
import java.util.concurrent.CompletableFuture;
1315
import java.util.concurrent.TimeUnit;
1416

1517
/**
@@ -20,8 +22,10 @@ public class Server {
2022

2123
private final GRPCConfig grpcConfig;
2224
private final Service service;
25+
private final CompletableFuture<Void> shutdownSignal;
2326
private final ServerInfoAccessor serverInfoAccessor = new ServerInfoAccessorImpl(new ObjectMapper());
2427
private io.grpc.Server server;
28+
private final GrpcServerHelper grpcServerHelper;
2529

2630
/**
2731
* constructor to create sink gRPC server.
@@ -39,8 +43,10 @@ public Server(BatchMapper batchMapper) {
3943
* @param batchMapper to process the message
4044
*/
4145
public Server(BatchMapper batchMapper, GRPCConfig grpcConfig) {
42-
this.service = new Service(batchMapper);
46+
this.shutdownSignal = new CompletableFuture<>();
47+
this.service = new Service(batchMapper, this.shutdownSignal);
4348
this.grpcConfig = grpcConfig;
49+
this.grpcServerHelper = new GrpcServerHelper();
4450
}
4551

4652
/**
@@ -57,35 +63,55 @@ public void start() throws Exception {
5763
Collections.singletonMap(Constants.MAP_MODE_KEY, Constants.MAP_MODE));
5864

5965
if (this.server == null) {
60-
// create server builder
61-
ServerBuilder<?> serverBuilder = GrpcServerUtils.createServerBuilder(
66+
this.server = grpcServerHelper.createServer(
6267
grpcConfig.getSocketPath(),
6368
grpcConfig.getMaxMessageSize(),
6469
grpcConfig.isLocal(),
65-
grpcConfig.getPort());
66-
// build server
67-
this.server = serverBuilder
68-
.addService(this.service)
69-
.build();
70+
grpcConfig.getPort(),
71+
this.service);
7072
}
7173

72-
// start server
7374
server.start();
7475

7576
log.info(
76-
"Server started, listening on socket path: " + grpcConfig.getSocketPath());
77+
"server started, listening on socket path: " + grpcConfig.getSocketPath());
7778

78-
// register shutdown hook
79+
// register shutdown hook to gracefully shut down the server
7980
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
8081
// Use stderr here since the logger may have been reset by its JVM shutdown hook.
8182
System.err.println("*** shutting down gRPC server since JVM is shutting down");
83+
if (server != null && server.isTerminated()) {
84+
return;
85+
}
8286
try {
8387
Server.this.stop();
88+
log.info("gracefully shutting down event loop groups");
89+
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
8490
} catch (InterruptedException e) {
8591
Thread.interrupted();
8692
e.printStackTrace(System.err);
8793
}
8894
}));
95+
96+
// if there are any exceptions, shutdown the server gracefully.
97+
shutdownSignal.whenCompleteAsync((v, e) -> {
98+
if (server != null && server.isTerminated()) {
99+
return;
100+
}
101+
102+
if (e != null) {
103+
System.err.println("*** shutting down batch map gRPC server because of an exception - " + e.getMessage());
104+
try {
105+
log.info("stopping server");
106+
Server.this.stop();
107+
log.info("gracefully shutting down event loop groups");
108+
this.grpcServerHelper.gracefullyShutdownEventLoopGroups();
109+
} catch (InterruptedException ex) {
110+
Thread.interrupted();
111+
ex.printStackTrace(System.err);
112+
}
113+
}
114+
});
89115
}
90116

91117
/**
@@ -96,7 +122,9 @@ public void start() throws Exception {
96122
* @throws InterruptedException if the current thread is interrupted while waiting
97123
*/
98124
public void awaitTermination() throws InterruptedException {
125+
log.info("batch map server is waiting for termination");
99126
server.awaitTermination();
127+
log.info("batch map server has terminated");
100128
}
101129

102130
/**

src/main/java/io/numaproj/numaflow/batchmapper/Service.java

+11-6
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ class Service extends MapGrpc.MapImplBase {
3535
// BatchMapper instance to process the messages
3636
private final BatchMapper batchMapper;
3737

38+
// Signal to shut down the gRPC server
39+
private final CompletableFuture<Void> shutdownSignal;
40+
3841
// Applies a map function to each datum element in the stream.
3942
@Override
4043
public StreamObserver<MapOuterClass.MapRequest> mapFn(StreamObserver<MapOuterClass.MapResponse> responseObserver) {
@@ -93,8 +96,9 @@ public void onNext(MapOuterClass.MapRequest mapRequest) {
9396
datumStream.writeMessage(constructHandlerDatum(mapRequest));
9497
}
9598
} catch (Exception e) {
96-
log.error("Encountered an error in batch map", e);
97-
responseObserver.onError(Status.UNKNOWN
99+
log.error("Encountered an error in batch map onNext - {}", e.getMessage());
100+
shutdownSignal.completeExceptionally(e);
101+
responseObserver.onError(Status.INTERNAL
98102
.withDescription(e.getMessage())
99103
.withCause(e)
100104
.asException());
@@ -104,11 +108,12 @@ public void onNext(MapOuterClass.MapRequest mapRequest) {
104108
// Called when an error occurs
105109
@Override
106110
public void onError(Throwable throwable) {
107-
log.error("Error Encountered in batchMap Stream", throwable);
108-
var status = Status.UNKNOWN
111+
log.error("Error Encountered in batchMap Stream - {}", throwable.getMessage());
112+
shutdownSignal.completeExceptionally(throwable);
113+
responseObserver.onError(Status.INTERNAL
109114
.withDescription(throwable.getMessage())
110-
.withCause(throwable);
111-
responseObserver.onError(status.asException());
115+
.withCause(throwable)
116+
.asException());
112117
}
113118

114119
// Called when the client has finished sending requests

src/main/java/io/numaproj/numaflow/mapper/MapSupervisorActor.java

+40-16
Original file line numberDiff line numberDiff line change
@@ -46,33 +46,38 @@
4646
class MapSupervisorActor extends AbstractActor {
4747
private final Mapper mapper;
4848
private final StreamObserver<MapOuterClass.MapResponse> responseObserver;
49-
private final CompletableFuture<Void> failureFuture;
49+
private final CompletableFuture<Void> shutdownSignal;
50+
private int activeMapperCount;
51+
private Exception userException;
5052

5153
public MapSupervisorActor(
5254
Mapper mapper,
5355
StreamObserver<MapOuterClass.MapResponse> responseObserver,
5456
CompletableFuture<Void> failureFuture) {
5557
this.mapper = mapper;
5658
this.responseObserver = responseObserver;
57-
this.failureFuture = failureFuture;
59+
this.shutdownSignal = failureFuture;
60+
this.userException = null;
61+
this.activeMapperCount = 0;
5862
}
5963

6064
public static Props props(
6165
Mapper mapper,
6266
StreamObserver<MapOuterClass.MapResponse> responseObserver,
63-
CompletableFuture<Void> failureFuture) {
64-
return Props.create(MapSupervisorActor.class, mapper, responseObserver, failureFuture);
67+
CompletableFuture<Void> shutdownSignal) {
68+
return Props.create(MapSupervisorActor.class, mapper, responseObserver, shutdownSignal);
6569
}
6670

6771
@Override
6872
public void preRestart(Throwable reason, Optional<Object> message) {
69-
log.debug("supervisor pre restart was executed");
70-
failureFuture.completeExceptionally(reason);
71-
responseObserver.onError(Status.UNKNOWN
73+
getContext().getSystem().log().warning("supervisor pre restart was executed due to: {}", reason.getMessage());
74+
shutdownSignal.completeExceptionally(reason);
75+
responseObserver.onError(Status.INTERNAL
7276
.withDescription(reason.getMessage())
7377
.withCause(reason)
7478
.asException());
7579
Service.mapperActorSystem.stop(getSelf());
80+
shutdownSignal.completeExceptionally(reason);
7681
}
7782

7883
@Override
@@ -93,34 +98,53 @@ public Receive createReceive() {
9398
}
9499

95100
private void handleFailure(Exception e) {
96-
responseObserver.onError(Status.UNKNOWN
97-
.withDescription(e.getMessage())
98-
.withCause(e)
99-
.asException());
100-
failureFuture.completeExceptionally(e);
101+
log.error("Encountered error in mapFn - {}", e.getMessage());
102+
if (userException == null) {
103+
userException = e;
104+
// only send the very first exception to the client
105+
// one exception should trigger a container restart
106+
responseObserver.onError(Status.INTERNAL
107+
.withDescription(e.getMessage())
108+
.withCause(e)
109+
.asException());
110+
}
111+
activeMapperCount--;
101112
}
102113

103114
private void sendResponse(MapOuterClass.MapResponse mapResponse) {
104115
responseObserver.onNext(mapResponse);
116+
activeMapperCount--;
105117
}
106118

107119
private void processRequest(MapOuterClass.MapRequest mapRequest) {
120+
if (userException != null) {
121+
log.info("a previous mapper actor failed, not processing any more requests");
122+
if (activeMapperCount == 0) {
123+
log.info("there is no more active mapper AKKA actors - stopping the system");
124+
getContext().getSystem().stop(getSelf());
125+
log.info("AKKA system stopped");
126+
shutdownSignal.completeExceptionally(userException);
127+
}
128+
return;
129+
}
130+
108131
// Create a MapperActor for each incoming request.
109132
ActorRef mapperActor = getContext()
110133
.actorOf(MapperActor.props(
111134
mapper));
112135

113136
// Send the message to the MapperActor.
114137
mapperActor.tell(mapRequest, getSelf());
138+
activeMapperCount++;
115139
}
116140

117141
// if we see dead letters, we need to stop the execution and exit
118142
// to make sure no messages are lost
119143
private void handleDeadLetters(AllDeadLetters deadLetter) {
120144
log.debug("got a dead letter, stopping the execution");
121-
responseObserver.onError(Status.UNKNOWN.withDescription("dead letters").asException());
122-
failureFuture.completeExceptionally(new Throwable("dead letters"));
145+
responseObserver.onError(Status.INTERNAL.withDescription("dead letters").asException());
123146
getContext().getSystem().stop(getSelf());
147+
shutdownSignal.completeExceptionally(new Throwable("dead letters"));
124148
}
125149

126150
@Override
@@ -129,8 +153,8 @@ public SupervisorStrategy supervisorStrategy() {
129153
return new AllForOneStrategy(
130154
DeciderBuilder
131155
.match(Exception.class, e -> {
132-
failureFuture.completeExceptionally(e);
133-
responseObserver.onError(Status.UNKNOWN
156+
shutdownSignal.completeExceptionally(e);
157+
responseObserver.onError(Status.INTERNAL
134158
.withDescription(e.getMessage())
135159
.withCause(e)
136160
.asException());

0 commit comments

Comments
 (0)