Skip to content

Commit 9c5b9e2

Browse files
authored
Merge pull request #2643 from opencb/TASK-8158
TASK-8158 - GRPC doesn't stop streaming on cancelled requests.
2 parents db17ea0 + 6463530 commit 9c5b9e2

2 files changed

Lines changed: 24 additions & 12 deletions

File tree

opencga-server/src/main/java/org/opencb/opencga/server/grpc/GenericGrpcService.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.fasterxml.jackson.databind.ObjectMapper;
2121
import com.fasterxml.jackson.databind.ObjectWriter;
2222
import io.grpc.MethodDescriptor;
23+
import io.grpc.stub.ServerCallStreamObserver;
2324
import io.grpc.stub.StreamObserver;
2425
import org.apache.commons.lang3.time.StopWatch;
2526
import org.opencb.commons.datastore.core.Query;
@@ -183,39 +184,50 @@ public void run(MethodDescriptor<Request, ?> method, Request request, StreamObse
183184

184185
Exception e = null;
185186
int numResults = -1;
187+
boolean cancelled = false;
186188
try {
187189
numResults = requestRunner.run(query, queryOptions);
190+
cancelled = isCancelled(streamObserver);
191+
if (cancelled) {
192+
logger.warn("Request cancelled by client");
193+
}
188194
streamObserver.onCompleted();
189195
} catch (Exception ex) {
190196
e = ex;
191197
logger.error("Catch error: " + e.getMessage(), e);
192198
streamObserver.onError(ex);
193199
} finally {
194200
stopWatch.stop();
195-
logEnd(e, stopWatch, numResults, requestDescription);
201+
logEnd(e, cancelled, stopWatch, numResults, requestDescription);
196202
}
197203

198204
}
199205

200-
private void logEnd(Exception e, StopWatch stopWatch, int numResults, String requestDescription) {
206+
protected boolean isCancelled(StreamObserver<?> streamObserver) {
207+
return streamObserver instanceof ServerCallStreamObserver
208+
&& ((ServerCallStreamObserver<?>) streamObserver).isCancelled();
209+
}
210+
211+
private void logEnd(Exception e, boolean cancelled, StopWatch stopWatch, int numResults, String requestDescription) {
201212
StringBuilder sb = new StringBuilder();
202-
boolean ok;
203-
if (e == null) {
204-
sb.append("OK");
205-
ok = true;
206-
} else {
213+
if (e != null) {
207214
sb.append("ERROR");
208-
ok = false;
215+
} else if (cancelled) {
216+
sb.append("CANCELLED");
217+
} else {
218+
sb.append("OK");
209219
}
210220
sb.append(", ").append(stopWatch.getTime(TimeUnit.MILLISECONDS)).append("ms");
211221
if (numResults >= 0) {
212222
sb.append(", ").append(numResults).append(" results");
213223
}
214224
sb.append(", ").append(requestDescription);
215-
if (ok) {
216-
logger.info(sb.toString());
217-
} else {
225+
if (e != null) {
218226
logger.error(sb.toString());
227+
} else if (cancelled) {
228+
logger.warn(sb.toString());
229+
} else {
230+
logger.info(sb.toString());
219231
}
220232
}
221233
}

opencga-server/src/main/java/org/opencb/opencga/server/grpc/VariantGrpcService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public void query(Request request, StreamObserver<VariantResponse> responseObser
9191
events.add(eventB.build());
9292
}
9393
}
94-
while (iterator.hasNext()) {
94+
while (iterator.hasNext() && !genericGrpcService.isCancelled(responseObserver)) {
9595
Variant variant = iterator.next();
9696
VariantProto.Variant variantProto = converter.convert(variant);
9797
VariantResponse.Builder responseBuilder = VariantResponse.newBuilder();

0 commit comments

Comments
 (0)