diff --git a/quarkus-solace-jcsmp-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceAckHandler.java b/quarkus-solace-jcsmp-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceAckHandler.java index 4ac5bb8..559ad12 100644 --- a/quarkus-solace-jcsmp-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceAckHandler.java +++ b/quarkus-solace-jcsmp-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceAckHandler.java @@ -3,13 +3,15 @@ import java.util.concurrent.CompletionStage; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; class SolaceAckHandler { public CompletionStage handle(SolaceInboundMessage msg) { return Uni.createFrom().voidItem() .invoke(() -> msg.getMessage().ackMessage()) - .runSubscriptionOn(msg::runOnMessageContext) + .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()) + .emitOn(msg::runOnMessageContext) .subscribeAsCompletionStage(); } } diff --git a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceAckHandler.java b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceAckHandler.java index 78b564e..ada191c 100644 --- a/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceAckHandler.java +++ b/quarkus-solace-messaging-connector/runtime/src/main/java/com/solace/quarkus/messaging/incoming/SolaceAckHandler.java @@ -5,6 +5,7 @@ import com.solace.messaging.receiver.AcknowledgementSupport; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.infrastructure.Infrastructure; class SolaceAckHandler { @@ -17,7 +18,8 @@ public SolaceAckHandler(AcknowledgementSupport ackSupport) { public CompletionStage handle(SolaceInboundMessage msg) { return Uni.createFrom().voidItem() .invoke(() -> ackSupport.ack(msg.getMessage())) - .runSubscriptionOn(msg::runOnMessageContext) + .runSubscriptionOn(Infrastructure.getDefaultWorkerPool()) + .emitOn(msg::runOnMessageContext) .subscribeAsCompletionStage(); } }