-
Notifications
You must be signed in to change notification settings - Fork 46
Open
Labels
Description
Describe the bug
Error logs
2025-10-30T14:34:52,611+0000 [pulsar-ph-amqp-123-2] ERROR io.streamnative.pulsar.handlers.amqp.AmqpConsumer - [test-aop.ctag-/10.255.134.236:55732] Failed to read data from exchange topic sample-exchange.
java.util.concurrent.CompletionException: java.lang.NullPointerException: Cannot invoke "io.streamnative.pulsar.handlers.amqp.AmqpMessageRouter.getExchange()" because the return value of "io.streamnative.pulsar.handlers.amqp.impl.PersistentQueue.getRouter(String)" is null
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(Unknown Source) ~[?:?]
at java.base/java.util.concurrent.CompletableFuture.thenCompose(Unknown Source) ~[?:?]
at io.streamnative.pulsar.handlers.amqp.AmqpConsumer.sendMessage(AmqpConsumer.java:155) ~[?:?]
at io.streamnative.pulsar.handlers.amqp.AmqpConsumer.lambda$sendMessages$1(AmqpConsumer.java:130) ~[?:?]
at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) ~[io.netty-netty-common-4.1.127.Final.jar:4.1.127.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) ~[io.netty-netty-common-4.1.127.Final.jar:4.1.127.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) ~[io.netty-netty-common-4.1.127.Final.jar:4.1.127.Final]
at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:405) ~[io.netty-netty-transport-classes-epoll-4.1.127.Final.jar:4.1.127.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998) ~[io.netty-netty-common-4.1.127.Final.jar:4.1.127.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.127.Final.jar:4.1.127.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.127.Final.jar:4.1.127.Final]
at java.base/java.lang.Thread.run(Unknown Source) [?:?]
Caused by: java.lang.NullPointerException: Cannot invoke "io.streamnative.pulsar.handlers.amqp.AmqpMessageRouter.getExchange()" because the return value of "io.streamnative.pulsar.handlers.amqp.impl.PersistentQueue.getRouter(String)" is null
at io.streamnative.pulsar.handlers.amqp.impl.PersistentQueue.readEntryAsync(PersistentQueue.java:90) ~[?:?]
at io.streamnative.pulsar.handlers.amqp.AmqpConsumer.lambda$sendMessage$3(AmqpConsumer.java:155) ~[?:?]
... 12 more
The exchange router of the PersistentQueue is null; maybe it's related to the PersistentQueue recovery.
public class PersistentQueue extends AbstractAmqpQueue {
@Override
public CompletableFuture<Entry> readEntryAsync(String exchangeName, long ledgerId, long entryId) {
// the method getRouter(exchangeName) return null
return getRouter(exchangeName).getExchange().readEntryAsync(getName(), ledgerId, entryId);
}
}