Open
Description
Search before asking
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
Version
Linux, JDK 17
Broker: 3.0.5
Client: image: streamnative/pulsar-io-jdbc-clickhouse:4.0.1.3
Minimal reproduce step
Currently not known how to reproduce.
What did you expect to see?
All messages from the non-persistent partioned topics are sinked into a clickhouse database.
What did you see instead?
After some time of a working sink (hours to days), presumably after a network error, the consumer reconnects to the broker and no further messages are processed.
Broker:
2025-02-24T13:07:54,380+0100 [pulsar-io-4-1] WARN org.apache.pulsar.broker.service.ServerCnx - [/86.119.37.116:37398] Got exception io.netty.channel.unix.Errors$NativeIoException: recvAddress(..) failed: Connection reset by peer
2025-02-24T13:07:54,380+0100 [pulsar-io-4-8] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /86.119.x.x:37390
2025-02-24T13:07:54,380+0100 [pulsar-io-4-1] INFO org.apache.pulsar.broker.service.ServerCnx - Closed connection from /86.119.x.x:37398
2025-02-24T13:07:54,380+0100 [pulsar-io-4-1] INFO org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers - Removed consumer Consumer{subscription=NonPersistentSubscription{topic=non-persistent://net/flow/goflow.v2-partition-5, name=c/c/sink}, consumerId=5, consumerName=zXnBW, address=[id: 0x29446605, L:/10.0.0.27:6651 ! R:/86.119.x.x:37398] [SR:86.119.x.x, state:Failed]}
2025-02-24T13:08:00,539+0100 [pulsar-io-4-6] INFO org.apache.pulsar.broker.service.ServerCnx - [/86.119.x.x:45206] connected with role=c using authMethod=token, clientVersion=Pulsar-Java-v4.0.1.3, clientProtocolVersion=21, proxyVersion=null
2025-02-24T13:08:00,566+0100 [pulsar-io-4-6] INFO org.apache.pulsar.broker.service.ServerCnx - [[id: 0x11556fae, L:/10.0.0.27:6651 - R:/86.119.x.x:45206] [SR:86.119.x.x, state:Connected]] Subscribing on topic non-persistent://net/flow/goflow.v2-partition-5 / c/c/sink. consumerId: 5
2025-02-24T13:08:00,580+0100 [configuration-metadata-store-13-1] INFO org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic - [non-persistent://net/flow/goflow.v2-partition-5][c/c/sink] Created new subscription for 5
2025-02-24T13:08:00,580+0100 [configuration-metadata-store-13-1] INFO org.apache.pulsar.broker.service.ServerCnx - [/86.119.x.x:45206] Created subscription on topic non-persistent://net/flow/goflow.v2-partition-5 / c/c/sink
Client:
2025-02-24T12:07:26,285+0000 [pulsar-timer-16-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [non-persistent://net/flow/goflow.v2-partition-5] [c/c/sink] [sxk2e] Prefetched messages: 537 --- Consume throughput received: 7623.44 msgs/s --- 8.42 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2025-02-24T12:07:42,843+0000 [pulsar-timer-16-1] INFO org.apache.pulsar.client.impl.UnAckedMessageTracker - [ConsumerBase{subscription='c/c/sink', consumerName='sxk2e', topic='non-persistent://net/flow/goflow.v2'}] 21 messages will be re-delivered
2025-02-24T12:08:26,287+0000 [pulsar-timer-16-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [non-persistent://net/flow/goflow.v2-partition-0] [c/c/sink] [sxk2e] Prefetched messages: 0 --- Consume throughput received: 3513.81 msgs/s --- 3.88 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2025-02-24T12:08:26,288+0000 [pulsar-timer-16-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [non-persistent://net/flow/goflow.v2-partition-1] [c/c/sink] [sxk2e] Prefetched messages: 0 --- Consume throughput received: 3436.86 msgs/s --- 3.81 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2025-02-24T12:08:26,288+0000 [pulsar-timer-16-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [non-persistent://net/flow/goflow.v2-partition-2] [c/c/sink] [sxk2e] Prefetched messages: 0 --- Consume throughput received: 3489.75 msgs/s --- 3.85 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2025-02-24T12:08:26,288+0000 [pulsar-timer-16-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [non-persistent://net/flow/goflow.v2-partition-3] [c/c/sink] [sxk2e] Prefetched messages: 0 --- Consume throughput received: 3475.43 msgs/s --- 3.84 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2025-02-24T12:08:26,288+0000 [pulsar-timer-16-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [non-persistent://net/flow/goflow.v2-partition-4] [c/c/sink] [sxk2e] Prefetched messages: 0 --- Consume throughput received: 3459.66 msgs/s --- 3.84 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
2025-02-24T12:08:26,288+0000 [pulsar-timer-16-1] INFO org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl - [non-persistent://net/flow/goflow.v2-partition-5] [c/c/sink] [sxk2e] Prefetched messages: 0 --- Consume throughput received: 3562.72 msgs/s --- 3.95 Mbit/s --- Ack sent rate: 0.00 ack/s --- Failed messages: 0 --- batch messages: 0 ---Failed acks: 0
(no more stats are logged)
The sink is then in a WAITING state:
"c/c/p-0" #23 prio=5 os_prio=0 cpu=4923326.68ms elapsed=8309.03s tid=0x000074de9d75c060 nid=0x28 waiting on condition [0x000074de9b52f000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park([email protected]/Native Method)
- parking to wait for <0x0000000742843e98> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
at java.util.concurrent.locks.LockSupport.park([email protected]/LockSupport.java:341)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionNode.block([email protected]/AbstractQueuedSynchronizer.java:506)
at java.util.concurrent.ForkJoinPool.unmanagedBlock([email protected]/ForkJoinPool.java:3465)
at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3436)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await([email protected]/AbstractQueuedSynchronizer.java:1630)
at org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue.take(GrowableArrayBlockingQueue.java:189)
at org.apache.pulsar.client.impl.MultiTopicsConsumerImpl.internalReceive(MultiTopicsConsumerImpl.java:380)
at org.apache.pulsar.client.impl.ConsumerBase.receive(ConsumerBase.java:263)
at org.apache.pulsar.functions.source.SingleConsumerPulsarSource.read(SingleConsumerPulsarSource.java:84)
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.readInput(JavaInstanceRunnable.java:529)
at org.apache.pulsar.functions.instance.JavaInstanceRunnable.run(JavaInstanceRunnable.java:320)
at java.lang.Thread.run([email protected]/Thread.java:840)
and the metrics for sinked messages drop of a cliff:

Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!