Skip to content

Commit 792f2dd

Browse files
committed
provides handling of requestChannel with complete flag
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent 94c8d57 commit 792f2dd

File tree

2 files changed

+20
-2
lines changed

2 files changed

+20
-2
lines changed

Diff for: rsocket-core/src/main/java/io/rsocket/core/RSocketResponder.java

+9-2
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,11 @@ private void handleFrame(ByteBuf frame) {
221221
break;
222222
case REQUEST_CHANNEL:
223223
long channelInitialRequestN = RequestChannelFrameCodec.initialRequestN(frame);
224-
handleChannel(streamId, frame, channelInitialRequestN);
224+
handleChannel(streamId, frame, channelInitialRequestN, false);
225+
break;
226+
case REQUEST_CHANNEL_COMPLETE:
227+
long completeChannelInitialRequestN = RequestChannelFrameCodec.initialRequestN(frame);
228+
handleChannel(streamId, frame, completeChannelInitialRequestN, true);
225229
break;
226230
case METADATA_PUSH:
227231
handleMetadataPush(metadataPush(super.getPayloadDecoder().apply(frame)));
@@ -345,7 +349,7 @@ private void handleStream(int streamId, ByteBuf frame, long initialRequestN) {
345349
}
346350
}
347351

348-
private void handleChannel(int streamId, ByteBuf frame, long initialRequestN) {
352+
private void handleChannel(int streamId, ByteBuf frame, long initialRequestN, boolean complete) {
349353
if (FrameHeaderCodec.hasFollows(frame)) {
350354
RequestChannelResponderSubscriber subscriber =
351355
new RequestChannelResponderSubscriber(streamId, initialRequestN, frame, this, this);
@@ -358,6 +362,9 @@ private void handleChannel(int streamId, ByteBuf frame, long initialRequestN) {
358362

359363
if (this.add(streamId, subscriber)) {
360364
this.requestChannel(firstPayload, subscriber).subscribe(subscriber);
365+
if (complete) {
366+
subscriber.handleComplete();
367+
}
361368
}
362369
}
363370
}

Diff for: rsocket-core/src/main/java/io/rsocket/frame/FrameType.java

+11
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,17 @@ public enum FrameType {
195195
/** A {@link #PAYLOAD} frame with {@code NEXT} and {@code COMPLETE} flags set. */
196196
NEXT_COMPLETE(0xC0, Flags.CAN_HAVE_DATA | Flags.CAN_HAVE_METADATA | Flags.IS_FRAGMENTABLE),
197197

198+
// SYNTHETIC REQUEST_CHANNEL WITH COMPLETION
199+
200+
/** A {@link #REQUEST_CHANNEL} and {@code COMPLETE} flags set. */
201+
REQUEST_CHANNEL_COMPLETE(
202+
0xD7,
203+
Flags.CAN_HAVE_METADATA
204+
| Flags.CAN_HAVE_DATA
205+
| Flags.HAS_INITIAL_REQUEST_N
206+
| Flags.IS_FRAGMENTABLE
207+
| Flags.IS_REQUEST_TYPE),
208+
198209
/**
199210
* Used To Extend more frame types as well as extensions.
200211
*

0 commit comments

Comments
 (0)