Skip to content

Commit baf59db

Browse files
committed
fix(sqs): wait for receive message completion(#569)
fix: #569
1 parent c779792 commit baf59db

File tree

1 file changed

+18
-18
lines changed

1 file changed

+18
-18
lines changed

src/main/java/io/kestra/plugin/aws/sqs/RealtimeTrigger.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,16 @@
1717
import lombok.experimental.SuperBuilder;
1818
import org.reactivestreams.Publisher;
1919
import reactor.core.publisher.Flux;
20+
import reactor.core.publisher.Mono;
2021
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
2122
import software.amazon.awssdk.services.sqs.model.DeleteMessageRequest;
2223
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
24+
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
2325

2426
import java.time.Duration;
27+
import java.util.concurrent.CompletableFuture;
2528
import java.util.concurrent.CountDownLatch;
29+
import java.util.concurrent.ExecutionException;
2630
import java.util.concurrent.atomic.AtomicBoolean;
2731

2832
@SuperBuilder
@@ -148,30 +152,26 @@ public Flux<Message> publisher(final Consume task,
148152
.maxNumberOfMessages(runContext.render(maxNumberOfMessage).as(Integer.class).orElseThrow())
149153
.build();
150154

151-
sqsClient.receiveMessage(receiveRequest)
152-
.whenComplete((messageResponse, throwable) -> {
153-
if (throwable != null) {
154-
fluxSink.error(throwable);
155-
} else {
156-
messageResponse.messages().forEach(message -> {
157-
fluxSink.next(Message.builder().data(message.body()).build());
158-
});
159-
messageResponse.messages().forEach(message ->
160-
sqsClient.deleteMessage(DeleteMessageRequest.builder()
161-
.queueUrl(renderedQueueUrl)
162-
.receiptHandle(message.receiptHandle())
163-
.build()
164-
)
165-
);
166-
}
167-
});
155+
final CompletableFuture<ReceiveMessageResponse> future = sqsClient.receiveMessage(receiveRequest);
156+
168157
try {
169-
Thread.sleep(100);
158+
ReceiveMessageResponse response = future.get();
159+
response.messages().forEach(message -> fluxSink.next(Message.builder().data(message.body()).build()));
160+
161+
response.messages().forEach(message ->
162+
sqsClient.deleteMessage(DeleteMessageRequest.builder()
163+
.queueUrl(renderedQueueUrl)
164+
.receiptHandle(message.receiptHandle())
165+
.build()
166+
)
167+
);
170168
} catch (InterruptedException e) {
171169
Thread.currentThread().interrupt();
172170
isActive.set(false); // proactively stop polling
173171
}
174172
}
173+
} catch (ExecutionException e) {
174+
fluxSink.error(e.getCause() != null ? e.getCause() : e);
175175
} catch (Throwable e) {
176176
fluxSink.error(e);
177177
} finally {

0 commit comments

Comments
 (0)