Skip to content

Publish is NOT Non-Blocking #801

Description

@lazyBisa

🐛 Bug Report

Publishing sometimes blocks.

🔬 How To Reproduce

Publish a significant Volume of Messages blocks the thread calling Publish on the netty thread processing the publishes.

This is a problem for any scenario where calls to publish happen on the netty thread (e.g. high-throughput request-response processing that wants to avoid thread-hopping)

Code sample

void createClient() {
	var executor = Executors.newSingleThreadExecutor();

    	var client = Mqtt5Client.builder()
                ....
                .executorConfig()
                .nettyExecutor(executor)
		.applicationScheduler(Schedulers.from(r-> r.run()))
                .applyExecutorConfig()
		.buildAsync();
        client.connect();

	mqtt5AsyncClient.subscribe(Mqtt5Subscribe.builder().topicFilter("request").build(), msg -> {
			for (int i = 0; i < 100; i++) {
				log.info("publishing " + i);
				mqtt5AsyncClient.publish(Mqtt5Publish.builder().topic("response").build());
			}
			log.info("done publishing");
	}).whenComplete((subAck, throwable) -> {
				mqtt5AsyncClient.publish(Mqtt5Publish.builder().topic("request").build());
	});
}

Expected output

publishing 0
publishing 1
publishing 2
publishing 3
...
publishing 99
done publishing

Observed Output:

publishing 0
publishing 1
....
publishing 63       <-- here it just hangs

Environment

What version of this client are you using?
1.3.13

JVM version?
22: GraalVM CE 22.0.1 - VM 24.0.1

Operating System?
Windows/Linus/Docker....

Which MQTT protocol version is being used?
5

Which MQTT broker (name and version)?
hivemq/hivemq-ce:latest

📈 Expected behavior

Publishing does not block the thread.

📎 Additional context

Behaviour has also been observed on a high-throughput request-response service that was simply publishing one response per received request.

Blocking call is a this.wait() in MqttPublishFlowable:

  public void add(final @NotNull Flowable<MqttPublishWithFlow> publishFlowable) {
        synchronized (this) {
            while (requested == 0) {
                try {
                    this.wait();
                } catch (final InterruptedException e) {
                    LOGGER.error("thread interrupted while waiting to publish.", e);
                    return;
                }
            }
            assert subscriber != null;
            subscriber.onNext(publishFlowable);
            requested--;
        }
    }

The requested limit seems to be set by MqttOutgoinQosHandler#203 (even has a TODO configurable on the constant)

      publishFlowables.flatMap(
                    f -> f, true, MAX_CONCURRENT_PUBLISH_FLOWABLES, Math.min(newSendMaximum, Flowable.bufferSize()))
                    .subscribe(this);

Stack Trace:

at java.lang.Object.wait0(Object.java:-1)
at java.lang.Object.wait(Object.java:375)
at java.lang.Object.wait(Object.java:348)
at com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttPublishFlowables.add(MqttPublishFlowables.java:53)
at com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingle.subscribeActual(MqttAckSingle.java:56)
at io.reactivex.Single.subscribe(Single.java:3666)
at io.reactivex.internal.operators.single.SingleObserveOn.subscribeActual(SingleObserveOn.java:35)
at io.reactivex.Single.subscribe(Single.java:3666)
at com.hivemq.client.internal.rx.RxFutureConverter$RxSingleFuture.<init>(RxFutureConverter.java:113)
at com.hivemq.client.internal.rx.RxFutureConverter.toFuture(RxFutureConverter.java:43)
at com.hivemq.client.internal.mqtt.MqttAsyncClient.publish(MqttAsyncClient.java:243)
at com.bmw.ispi.core.dmsmock.control.DmsMockMqttClientProvider.lambda$dmsMockMqttClient$0(DmsMockMqttClientProvider.java:61)
at com.bmw.ispi.core.dmsmock.control.DmsMockMqttClientProvider$$Lambda/0x000001dd02052000.accept(Unknown Source:-1)
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:303)
at com.hivemq.client.internal.mqtt.MqttAsyncClient$CallbackSubscriber.onNext(MqttAsyncClient.java:288)
at com.hivemq.client.rx.FlowableWithSingle$SingleFutureSubscriber.onNext(FlowableWithSingle.java:406)
at com.hivemq.client.internal.rx.operators.FlowableWithSingleCombine$SplitSubscriber$Default.tryOnNextActual(FlowableWithSingleCombine.java:235)
at com.hivemq.client.internal.rx.operators.FlowableWithSingleCombine$SplitSubscriber.tryOnNext(FlowableWithSingleCombine.java:200)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$ObserveOnConditionalSubscriber.runAsync(FlowableObserveOn.java:649)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.run(FlowableObserveOn.java:176)
at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker$BooleanRunnable.run(ExecutorScheduler.java:288)
at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.run(ExecutorScheduler.java:253)
at io.vertx.core.impl.ContextInternal.dispatch(ContextInternal.java:240)
at com.bmw.ispi.core.messaging.mqtt.MqttClientProvider.lambda$createExecutorConfig$2(MqttClientProvider.java:145)
at com.bmw.ispi.core.messaging.mqtt.MqttClientProvider$$Lambda/0x000001dd01facca0.execute(Unknown Source:-1)
at io.reactivex.internal.schedulers.ExecutorScheduler$ExecutorWorker.schedule(ExecutorScheduler.java:171)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.trySchedule(FlowableObserveOn.java:166)
at io.reactivex.internal.operators.flowable.FlowableObserveOn$BaseObserveOnSubscriber.onNext(FlowableObserveOn.java:117)
at com.hivemq.client.internal.rx.operators.FlowableWithSingleCombine$CombineSubscriber.next(FlowableWithSingleCombine.java:90)
at com.hivemq.client.internal.rx.operators.FlowableWithSingleCombine$CombineSubscriber.onNext(FlowableWithSingleCombine.java:77)
at com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishFlow.onNext(MqttIncomingPublishFlow.java:75)
at com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlow.onNext(MqttSubscribedPublishFlow.java:33)
at com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishService.emit(MqttIncomingPublishService.java:166)
at com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishService.onPublish(MqttIncomingPublishService.java:112)
at com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingPublishService.onPublishQos0(MqttIncomingPublishService.java:79)
at com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingQosHandler.readPublishQos0(MqttIncomingQosHandler.java:121)
at com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingQosHandler.readPublish(MqttIncomingQosHandler.java:109)
at com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttIncomingQosHandler.channelRead(MqttIncomingQosHandler.java:98)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttOutgoingQosHandler.channelRead(MqttOutgoingQosHandler.java:319)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at com.hivemq.client.internal.mqtt.handler.ping.MqttPingHandler.channelRead(MqttPingHandler.java:84)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:361)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:325)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at com.hivemq.client.internal.mqtt.codec.encoder.MqttEncoder.channelRead(MqttEncoder.java:88)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1357)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:868)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:796)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:732)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:658)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:998)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.lang.Thread.runWith(Thread.java:1583)
at java.lang.Thread.run(Thread.java:1570)

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions