Skip to content

Commit 27c6327

Browse files
committed
fixes possible racing on direct processor in tests
Signed-off-by: Robert Roeser <[email protected]> Signed-off-by: Oleh Dokuka <[email protected]>
1 parent e2d159b commit 27c6327

File tree

2 files changed

+12
-6
lines changed

2 files changed

+12
-6
lines changed

Diff for: rsocket-core/src/test/java/io/rsocket/test/util/LocalDuplexConnection.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -21,19 +21,20 @@
2121
import org.reactivestreams.Publisher;
2222
import reactor.core.publisher.DirectProcessor;
2323
import reactor.core.publisher.Flux;
24+
import reactor.core.publisher.FluxSink;
2425
import reactor.core.publisher.Mono;
2526
import reactor.core.publisher.MonoProcessor;
2627

2728
public class LocalDuplexConnection implements DuplexConnection {
28-
private final DirectProcessor<Frame> send;
29+
private final FluxSink<Frame> sendFluxSink;
2930
private final DirectProcessor<Frame> receive;
3031
private final MonoProcessor<Void> onClose;
3132
private final String name;
3233

3334
public LocalDuplexConnection(
3435
String name, DirectProcessor<Frame> send, DirectProcessor<Frame> receive) {
3536
this.name = name;
36-
this.send = send;
37+
this.sendFluxSink = send.serialize().sink();
3738
this.receive = receive;
3839
onClose = MonoProcessor.create();
3940
}
@@ -42,8 +43,8 @@ public LocalDuplexConnection(
4243
public Mono<Void> send(Publisher<Frame> frame) {
4344
return Flux.from(frame)
4445
.doOnNext(f -> System.out.println(name + " - " + f.toString()))
45-
.doOnNext(send::onNext)
46-
.doOnError(send::onError)
46+
.doOnNext(sendFluxSink::next)
47+
.doOnError(sendFluxSink::error)
4748
.then();
4849
}
4950

Diff for: rsocket-core/src/test/java/io/rsocket/test/util/TestDuplexConnection.java

+7-2
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.slf4j.LoggerFactory;
2828
import reactor.core.publisher.DirectProcessor;
2929
import reactor.core.publisher.Flux;
30+
import reactor.core.publisher.FluxSink;
3031
import reactor.core.publisher.Mono;
3132
import reactor.core.publisher.MonoProcessor;
3233

@@ -40,7 +41,9 @@ public class TestDuplexConnection implements DuplexConnection {
4041

4142
private final LinkedBlockingQueue<Frame> sent;
4243
private final DirectProcessor<Frame> sentPublisher;
44+
private final FluxSink<Frame> sentFluxSink;
4345
private final DirectProcessor<Frame> received;
46+
private final FluxSink<Frame> receivedFluxSink;
4447
private final MonoProcessor<Void> onClose;
4548
private final ConcurrentLinkedQueue<Subscriber<Frame>> sendSubscribers;
4649
private volatile double availability = 1;
@@ -49,7 +52,9 @@ public class TestDuplexConnection implements DuplexConnection {
4952
public TestDuplexConnection() {
5053
sent = new LinkedBlockingQueue<>();
5154
received = DirectProcessor.create();
55+
receivedFluxSink = received.serialize().sink();
5256
sentPublisher = DirectProcessor.create();
57+
sentFluxSink = sentPublisher.serialize().sink();
5358
sendSubscribers = new ConcurrentLinkedQueue<>();
5459
onClose = MonoProcessor.create();
5560
}
@@ -65,7 +70,7 @@ public Mono<Void> send(Publisher<Frame> frames) {
6570
.doOnNext(
6671
frame -> {
6772
sent.offer(frame);
68-
sentPublisher.onNext(frame);
73+
sentFluxSink.next(frame);
6974
})
7075
.doOnError(throwable -> logger.error("Error in send stream on test connection.", throwable))
7176
.subscribe(subscriber);
@@ -116,7 +121,7 @@ public Publisher<Frame> getSentAsPublisher() {
116121

117122
public void addToReceivedBuffer(Frame... received) {
118123
for (Frame frame : received) {
119-
this.received.onNext(frame);
124+
this.receivedFluxSink.next(frame);
120125
}
121126
}
122127

0 commit comments

Comments
 (0)