Skip to content

Commit 9a504a8

Browse files
authored
fixes block() in MetadataPushRequesterMono/FnfRequesterMono (#1044)
1 parent 80a05f8 commit 9a504a8

File tree

3 files changed

+75
-5
lines changed

3 files changed

+75
-5
lines changed

Diff for: rsocket-core/src/main/java/io/rsocket/core/FireAndForgetRequesterMono.java

+5
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ public Void block(Duration m) {
185185
return block();
186186
}
187187

188+
/**
189+
* This method is deliberately non-blocking regardless it is named as `.block`. The main intent to
190+
* keep this method along with the {@link #subscribe()} is to eliminate redundancy which comes
191+
* with a default block method implementation.
192+
*/
188193
@Override
189194
@Nullable
190195
public Void block() {

Diff for: rsocket-core/src/main/java/io/rsocket/core/MetadataPushRequesterMono.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,11 @@ public Void block(Duration m) {
120120
return block();
121121
}
122122

123+
/**
124+
* This method is deliberately non-blocking regardless it is named as `.block`. The main intent to
125+
* keep this method along with the {@link #subscribe()} is to eliminate redundancy which comes
126+
* with a default block method implementation.
127+
*/
123128
@Override
124129
@Nullable
125130
public Void block() {
@@ -133,15 +138,16 @@ public Void block() {
133138
try {
134139
final boolean hasMetadata = p.hasMetadata();
135140
metadata = p.metadata();
136-
if (hasMetadata) {
141+
if (!hasMetadata) {
137142
lazyTerminate(STATE, this);
138143
p.release();
139-
throw new IllegalArgumentException("Metadata push does not support metadata field");
144+
throw new IllegalArgumentException("Metadata push should have metadata field present");
140145
}
141146
if (!isValidMetadata(this.maxFrameLength, metadata)) {
142147
lazyTerminate(STATE, this);
143148
p.release();
144-
throw new IllegalArgumentException("Too Big Payload size");
149+
throw new IllegalArgumentException(
150+
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));
145151
}
146152
} catch (IllegalReferenceCountException e) {
147153
lazyTerminate(STATE, this);

Diff for: rsocket-core/src/test/java/io/rsocket/core/RSocketRequesterTest.java

+61-2
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import static io.rsocket.frame.FrameType.REQUEST_RESPONSE;
3434
import static io.rsocket.frame.FrameType.REQUEST_STREAM;
3535
import static org.assertj.core.api.Assertions.assertThat;
36+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
3637
import static org.mockito.ArgumentMatchers.any;
3738
import static org.mockito.Mockito.verify;
3839

@@ -81,7 +82,6 @@
8182
import java.util.function.BiFunction;
8283
import java.util.function.Function;
8384
import java.util.stream.Stream;
84-
import org.assertj.core.api.Assertions;
8585
import org.assertj.core.api.Assumptions;
8686
import org.junit.jupiter.api.AfterEach;
8787
import org.junit.jupiter.api.BeforeEach;
@@ -169,7 +169,7 @@ protected void hookOnSubscribe(Subscription subscription) {
169169
public void testHandleSetupException() {
170170
rule.connection.addToReceivedBuffer(
171171
ErrorFrameCodec.encode(rule.alloc(), 0, new RejectedSetupException("boom")));
172-
Assertions.assertThatThrownBy(() -> rule.socket.onClose().block())
172+
assertThatThrownBy(() -> rule.socket.onClose().block())
173173
.isInstanceOf(RejectedSetupException.class);
174174
rule.assertHasNoLeaks();
175175
}
@@ -373,6 +373,65 @@ public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmen
373373
});
374374
}
375375

376+
@ParameterizedTest
377+
@ValueSource(ints = {128, 256, FRAME_LENGTH_MASK})
378+
public void shouldThrownExceptionIfGivenPayloadIsExitsSizeAllowanceWithNoFragmentation1(
379+
int maxFrameLength) {
380+
rule.setMaxFrameLength(maxFrameLength);
381+
prepareCalls()
382+
.forEach(
383+
generator -> {
384+
byte[] metadata = new byte[maxFrameLength];
385+
byte[] data = new byte[maxFrameLength];
386+
ThreadLocalRandom.current().nextBytes(metadata);
387+
ThreadLocalRandom.current().nextBytes(data);
388+
389+
assertThatThrownBy(
390+
() -> {
391+
final Publisher<?> source =
392+
generator.apply(rule.socket, DefaultPayload.create(data, metadata));
393+
394+
if (source instanceof Mono) {
395+
((Mono<?>) source).block();
396+
} else {
397+
((Flux) source).blockLast();
398+
}
399+
})
400+
.isInstanceOf(IllegalArgumentException.class)
401+
.hasMessage(String.format(INVALID_PAYLOAD_ERROR_MESSAGE, maxFrameLength));
402+
403+
rule.assertHasNoLeaks();
404+
});
405+
}
406+
407+
@Test
408+
public void shouldRejectCallOfNoMetadataPayload() {
409+
final ByteBuf data = rule.allocator.buffer(10);
410+
final Payload payload = ByteBufPayload.create(data);
411+
StepVerifier.create(rule.socket.metadataPush(payload))
412+
.expectSubscription()
413+
.expectErrorSatisfies(
414+
t ->
415+
assertThat(t)
416+
.isInstanceOf(IllegalArgumentException.class)
417+
.hasMessage("Metadata push should have metadata field present"))
418+
.verify();
419+
PayloadAssert.assertThat(payload).isReleased();
420+
rule.assertHasNoLeaks();
421+
}
422+
423+
@Test
424+
public void shouldRejectCallOfNoMetadataPayloadBlocking() {
425+
final ByteBuf data = rule.allocator.buffer(10);
426+
final Payload payload = ByteBufPayload.create(data);
427+
428+
assertThatThrownBy(() -> rule.socket.metadataPush(payload).block())
429+
.isInstanceOf(IllegalArgumentException.class)
430+
.hasMessage("Metadata push should have metadata field present");
431+
PayloadAssert.assertThat(payload).isReleased();
432+
rule.assertHasNoLeaks();
433+
}
434+
376435
static Stream<BiFunction<RSocket, Payload, Publisher<?>>> prepareCalls() {
377436
return Stream.of(
378437
RSocket::fireAndForget,

0 commit comments

Comments
 (0)