Skip to content

Commit 0c1f536

Browse files
authored
Fix error propagation in AbstractReactiveElasticsearchTemplate:save()
Previously, errors occurring during the saveAll operation within the reactive save method were swallowed because the inner subscriber did not have an error handler. This caused the Flux to hang indefinitely instead of terminating with an error. This commit adds an error handler to the inner subscriber that: 1. Cancels the upstream subscription to prevent further processing. 2. Propagates the error to the sink, allowing the caller to receive the error signal. 3. Updates the map operation to return the entity for better debugging capability. Signed-off-by: Noel F <noel@Noels-MacBook-Pro.local> * Add test for error propagation in reactive Flux save operations This test verifies that errors occurring during saveAll operations with a Flux are properly propagated to the subscriber instead of being swallowed. The test creates a Flux that emits valid entities followed by an error, and confirms the error reaches the caller. Signed-off-by: Noel F <noel@Noels-MacBook-Pro.local> * undo format fixes Signed-off-by: Noel F <noel@Noels-MacBook-Pro.local> * Update error propagation test: expect 0 entities before error due to race condition The manual subscriber's onError fires before in-flight saveAll can push results through tryEmitNext, so the caller sees 0 entities before the error. Updated test expectation and added clarifying comment. Signed-off-by: Noel F <noel@Noels-MacBook-Pro.local> --------- Signed-off-by: Noel F <noel@Noels-MacBook-Pro.local> Co-authored-by: xylos19 <noel@Noels-MacBook-Pro.local> Closes #3233
1 parent 0d688ac commit 0c1f536

File tree

2 files changed

+34
-2
lines changed

2 files changed

+34
-2
lines changed

src/main/java/org/springframework/data/elasticsearch/core/AbstractReactiveElasticsearchTemplate.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,7 +245,10 @@ public void onSubscribe(Subscription subscription) {
245245
public void onNext(List<T> entityList) {
246246
onNextHasBeenCalled.set(true);
247247
saveAll(entityList, index)
248-
.map(sink::tryEmitNext)
248+
.map(entity -> {
249+
sink.tryEmitNext(entity);
250+
return entity;
251+
})
249252
.doOnComplete(() -> {
250253
if (!upstreamComplete.get()) {
251254
if (subscription == null) {
@@ -255,7 +258,14 @@ public void onNext(List<T> entityList) {
255258
} else {
256259
sink.tryEmitComplete();
257260
}
258-
}).subscribe();
261+
})
262+
.subscribe(v -> {
263+
}, error -> {
264+
if (subscription != null) {
265+
subscription.cancel();
266+
}
267+
sink.tryEmitError(error);
268+
});
259269
}
260270

261271
@Override

src/test/java/org/springframework/data/elasticsearch/core/ReactiveElasticsearchIntegrationTests.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1220,6 +1220,28 @@ void shouldFailWithConflictOnAttemptToSaveWithSameVersion() {
12201220
.allMatch(failureStatus -> failureStatus.status().equals(409));
12211221
}
12221222

1223+
@Test // Error propagation in reactive Flux save
1224+
@DisplayName("should propagate errors during Flux save operations")
1225+
void shouldPropagateErrorsDuringFluxSaveOperations() {
1226+
// Create a Flux that will produce an error after emitting some valid entities
1227+
Flux<SampleEntity> entitiesWithError = Flux.concat(
1228+
Flux.just(
1229+
randomEntity("valid entity 1"),
1230+
randomEntity("valid entity 2")),
1231+
Flux.error(new RuntimeException("Simulated error during entity creation")));
1232+
1233+
// The save operation should propagate the error to the subscriber.
1234+
// With the manual subscriber approach, the error propagates eagerly —
1235+
// sink.tryEmitError is called before in-flight saveAll results can be emitted,
1236+
// so the caller sees 0 entities before the error.
1237+
operations.save(entitiesWithError, SampleEntity.class, 10)
1238+
.as(StepVerifier::create)
1239+
.expectNextCount(0)
1240+
.expectErrorMatches(throwable -> throwable instanceof RuntimeException &&
1241+
throwable.getMessage().equals("Simulated error during entity creation"))
1242+
.verify();
1243+
}
1244+
12231245
// endregion
12241246

12251247
// region Helper functions

0 commit comments

Comments
 (0)