|
23 | 23 | import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlowable; |
24 | 24 | import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckFlowable; |
25 | 25 | import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingle; |
26 | | -import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingleFlowable; |
27 | 26 | import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubAckSingle; |
28 | 27 | import com.hivemq.client.internal.mqtt.handler.subscribe.MqttUnsubAckSingle; |
29 | 28 | import com.hivemq.client.internal.mqtt.message.connect.MqttConnect; |
|
49 | 48 | import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe; |
50 | 49 | import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck; |
51 | 50 | import com.hivemq.client.rx.FlowableWithSingle; |
52 | | -import io.reactivex.Completable; |
53 | | -import io.reactivex.Flowable; |
54 | | -import io.reactivex.Scheduler; |
55 | | -import io.reactivex.Single; |
56 | | -import io.reactivex.functions.Function; |
57 | | -import io.reactivex.internal.fuseable.ScalarCallable; |
| 51 | +import io.reactivex.rxjava3.core.Completable; |
| 52 | +import io.reactivex.rxjava3.core.Flowable; |
| 53 | +import io.reactivex.rxjava3.core.Scheduler; |
| 54 | +import io.reactivex.rxjava3.core.Single; |
| 55 | +import io.reactivex.rxjava3.functions.Function; |
58 | 56 | import org.jetbrains.annotations.NotNull; |
59 | 57 | import org.jetbrains.annotations.Nullable; |
60 | 58 |
|
@@ -214,20 +212,6 @@ public MqttRxClient(final @NotNull MqttClientConfig clientConfig) { |
214 | 212 | final @NotNull Flowable<P> publishFlowable, final @NotNull Function<P, MqttPublish> publishMapper) { |
215 | 213 |
|
216 | 214 | final Scheduler applicationScheduler = clientConfig.getExecutorConfig().getApplicationScheduler(); |
217 | | - if (publishFlowable instanceof ScalarCallable) { |
218 | | - //noinspection unchecked |
219 | | - final P publish = ((ScalarCallable<P>) publishFlowable).call(); |
220 | | - if (publish == null) { |
221 | | - return Flowable.empty(); |
222 | | - } |
223 | | - final MqttPublish mqttPublish; |
224 | | - try { |
225 | | - mqttPublish = publishMapper.apply(publish); |
226 | | - } catch (final Throwable t) { |
227 | | - return Flowable.error(t); |
228 | | - } |
229 | | - return new MqttAckSingleFlowable(clientConfig, mqttPublish).observeOn(applicationScheduler, true); |
230 | | - } |
231 | 215 | return new MqttAckFlowable( |
232 | 216 | clientConfig, publishFlowable.subscribeOn(applicationScheduler).map(publishMapper)).observeOn( |
233 | 217 | applicationScheduler, true); |
|
0 commit comments