Skip to content

Commit 46b96f5

Browse files
committed
Replace RxJava 2 with RxJava 3.
1 parent 9cca723 commit 46b96f5

50 files changed

Lines changed: 128 additions & 134 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

gradle/libs.versions.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ pmd = "5.8.1"
1616
reactiveStreams = "1.0.4"
1717
reactor = "3.8.5"
1818
reactor-adapter = "3.6.0"
19-
rxjava = "2.2.21"
19+
rxjava = "3.1.12"
2020
slf4j = "1.7.36"
2121
testcontainers = "2.0.5"
2222

@@ -49,7 +49,7 @@ reactiveStreams = { module = "org.reactivestreams:reactive-streams", version.ref
4949
reactor = { module = "io.projectreactor:reactor-core", version.ref = "reactor" }
5050
reactor-adapter = { module = "io.projectreactor.addons:reactor-adapter", version.ref = "reactor-adapter" }
5151
reactor-test = { module = "io.projectreactor:reactor-test", version.ref = "reactor" }
52-
rxjava = { module = "io.reactivex.rxjava2:rxjava", version.ref = "rxjava" }
52+
rxjava = { module = "io.reactivex.rxjava3:rxjava", version.ref = "rxjava" }
5353
slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" }
5454
slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" }
5555
testcontainers = { module = "org.testcontainers:testcontainers", version.ref = "testcontainers" }

reactor/src/main/java/com/hivemq/client/internal/mqtt/mqtt3/reactor/Mqtt3ReactorClientView.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
import com.hivemq.client.mqtt.mqtt3.message.unsubscribe.Mqtt3Unsubscribe;
3535
import com.hivemq.client.mqtt.mqtt3.reactor.Mqtt3ReactorClient;
3636
import com.hivemq.client.rx.reactor.FluxWithSingle;
37-
import io.reactivex.Flowable;
37+
import io.reactivex.rxjava3.core.Flowable;
3838
import org.jetbrains.annotations.NotNull;
3939
import org.reactivestreams.Publisher;
4040
import reactor.adapter.rxjava.RxJava2Adapter;

reactor/src/main/java/com/hivemq/client/internal/mqtt/reactor/MqttReactorClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
3939
import com.hivemq.client.mqtt.mqtt5.reactor.Mqtt5ReactorClient;
4040
import com.hivemq.client.rx.reactor.FluxWithSingle;
41-
import io.reactivex.Flowable;
41+
import io.reactivex.rxjava3.core.Flowable;
4242
import org.jetbrains.annotations.NotNull;
4343
import org.reactivestreams.Publisher;
4444
import reactor.adapter.rxjava.RxJava2Adapter;

src/main/java/com/hivemq/client/internal/mqtt/MqttAsyncClient.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,8 @@
4040
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
4141
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
4242
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
43-
import io.reactivex.FlowableSubscriber;
44-
import io.reactivex.schedulers.Schedulers;
43+
import io.reactivex.rxjava3.core.FlowableSubscriber;
44+
import io.reactivex.rxjava3.schedulers.Schedulers;
4545
import org.jetbrains.annotations.NotNull;
4646
import org.jetbrains.annotations.Nullable;
4747
import org.reactivestreams.Subscription;

src/main/java/com/hivemq/client/internal/mqtt/MqttBlockingClient.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@
4545
import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
4646
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
4747
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
48-
import io.reactivex.Flowable;
49-
import io.reactivex.FlowableSubscriber;
50-
import io.reactivex.internal.subscriptions.SubscriptionHelper;
48+
import io.reactivex.rxjava3.core.Flowable;
49+
import io.reactivex.rxjava3.core.FlowableSubscriber;
50+
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
5151
import org.jetbrains.annotations.NotNull;
5252
import org.jetbrains.annotations.Nullable;
5353
import org.reactivestreams.Subscription;

src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package com.hivemq.client.internal.mqtt;
1818

1919
import com.hivemq.client.mqtt.MqttClientExecutorConfig;
20-
import io.reactivex.Scheduler;
20+
import io.reactivex.rxjava3.core.Scheduler;
2121
import org.jetbrains.annotations.NotNull;
2222
import org.jetbrains.annotations.Nullable;
2323

src/main/java/com/hivemq/client/internal/mqtt/MqttClientExecutorConfigImplBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
import com.hivemq.client.internal.util.Checks;
2020
import com.hivemq.client.mqtt.MqttClientExecutorConfigBuilder;
21-
import io.reactivex.Scheduler;
21+
import io.reactivex.rxjava3.core.Scheduler;
2222
import org.jetbrains.annotations.NotNull;
2323
import org.jetbrains.annotations.Nullable;
2424

src/main/java/com/hivemq/client/internal/mqtt/MqttRxClient.java

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import com.hivemq.client.internal.mqtt.handler.publish.incoming.MqttSubscribedPublishFlowable;
2424
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckFlowable;
2525
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingle;
26-
import com.hivemq.client.internal.mqtt.handler.publish.outgoing.MqttAckSingleFlowable;
2726
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttSubAckSingle;
2827
import com.hivemq.client.internal.mqtt.handler.subscribe.MqttUnsubAckSingle;
2928
import com.hivemq.client.internal.mqtt.message.connect.MqttConnect;
@@ -49,12 +48,11 @@
4948
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.Mqtt5Unsubscribe;
5049
import com.hivemq.client.mqtt.mqtt5.message.unsubscribe.unsuback.Mqtt5UnsubAck;
5150
import com.hivemq.client.rx.FlowableWithSingle;
52-
import io.reactivex.Completable;
53-
import io.reactivex.Flowable;
54-
import io.reactivex.Scheduler;
55-
import io.reactivex.Single;
56-
import io.reactivex.functions.Function;
57-
import io.reactivex.internal.fuseable.ScalarCallable;
51+
import io.reactivex.rxjava3.core.Completable;
52+
import io.reactivex.rxjava3.core.Flowable;
53+
import io.reactivex.rxjava3.core.Scheduler;
54+
import io.reactivex.rxjava3.core.Single;
55+
import io.reactivex.rxjava3.functions.Function;
5856
import org.jetbrains.annotations.NotNull;
5957
import org.jetbrains.annotations.Nullable;
6058

@@ -214,20 +212,6 @@ public MqttRxClient(final @NotNull MqttClientConfig clientConfig) {
214212
final @NotNull Flowable<P> publishFlowable, final @NotNull Function<P, MqttPublish> publishMapper) {
215213

216214
final Scheduler applicationScheduler = clientConfig.getExecutorConfig().getApplicationScheduler();
217-
if (publishFlowable instanceof ScalarCallable) {
218-
//noinspection unchecked
219-
final P publish = ((ScalarCallable<P>) publishFlowable).call();
220-
if (publish == null) {
221-
return Flowable.empty();
222-
}
223-
final MqttPublish mqttPublish;
224-
try {
225-
mqttPublish = publishMapper.apply(publish);
226-
} catch (final Throwable t) {
227-
return Flowable.error(t);
228-
}
229-
return new MqttAckSingleFlowable(clientConfig, mqttPublish).observeOn(applicationScheduler, true);
230-
}
231215
return new MqttAckFlowable(
232216
clientConfig, publishFlowable.subscribeOn(applicationScheduler).map(publishMapper)).observeOn(
233217
applicationScheduler, true);

src/main/java/com/hivemq/client/internal/mqtt/exceptions/mqtt3/Mqtt3ExceptionFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import com.hivemq.client.mqtt.mqtt3.exceptions.*;
2525
import com.hivemq.client.mqtt.mqtt5.exceptions.Mqtt5MessageException;
2626
import com.hivemq.client.mqtt.mqtt5.message.Mqtt5Message;
27-
import io.reactivex.functions.Function;
27+
import io.reactivex.rxjava3.functions.Function;
2828
import org.jetbrains.annotations.NotNull;
2929

3030
/**

src/main/java/com/hivemq/client/internal/mqtt/handler/auth/MqttReAuthCompletable.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,9 @@
2222
import com.hivemq.client.internal.rx.CompletableFlow;
2323
import io.netty.channel.Channel;
2424
import io.netty.channel.ChannelHandler;
25-
import io.reactivex.Completable;
26-
import io.reactivex.CompletableObserver;
27-
import io.reactivex.internal.disposables.EmptyDisposable;
25+
import io.reactivex.rxjava3.core.Completable;
26+
import io.reactivex.rxjava3.core.CompletableObserver;
27+
import io.reactivex.rxjava3.internal.disposables.EmptyDisposable;
2828
import org.jetbrains.annotations.NotNull;
2929

3030
/**

0 commit comments

Comments
 (0)