Skip to content
This repository was archived by the owner on Jun 21, 2023. It is now read-only.

Commit 2bf2b9b

Browse files
authored
Merge pull request #142 from scalecube/set-original-qualifier
Set original qualifier in cancel/complete cases and simplify their preparing
2 parents 0ae9cef + 36beea8 commit 2bf2b9b

File tree

2 files changed

+96
-49
lines changed

2 files changed

+96
-49
lines changed
Lines changed: 93 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,9 @@
11
package io.scalecube.services.gateway.ws;
22

33
import io.scalecube.services.api.ServiceMessage;
4-
import io.scalecube.services.api.ServiceMessage.Builder;
54
import io.scalecube.services.exceptions.DefaultErrorMapper;
6-
import java.util.Optional;
75

8-
final class GatewayMessages {
6+
public final class GatewayMessages {
97

108
static final String QUALIFIER_FIELD = "q";
119
static final String STREAM_ID_FIELD = "sid";
@@ -18,49 +16,95 @@ private GatewayMessages() {
1816
// Do not instantiate
1917
}
2018

21-
static ServiceMessage newCancelMessage(long sid) {
22-
Builder builder = ServiceMessage.builder();
23-
setSid(builder, sid);
24-
setSignal(builder, Signal.CANCEL);
25-
return builder.build();
19+
/**
20+
* Returns cancel message by given arguments.
21+
*
22+
* @param sid sid
23+
* @param qualifier qualifier
24+
* @return {@link ServiceMessage} instance as the cancel signal
25+
*/
26+
public static ServiceMessage newCancelMessage(long sid, String qualifier) {
27+
return ServiceMessage.builder()
28+
.qualifier(qualifier)
29+
.header(STREAM_ID_FIELD, sid)
30+
.header(SIGNAL_FIELD, Signal.CANCEL.code())
31+
.build();
2632
}
2733

28-
static ServiceMessage newErrorMessage(ServiceMessage message, Throwable th) {
34+
/**
35+
* Returns error message by given arguments.
36+
*
37+
* @param message request
38+
* @param th cause
39+
* @return {@link ServiceMessage} instance as the error signal
40+
*/
41+
public static ServiceMessage newErrorMessage(ServiceMessage message, Throwable th) {
2942
ServiceMessage.Builder builder =
30-
ServiceMessage.from(DefaultErrorMapper.INSTANCE.toMessage(message.qualifier(), th));
31-
Optional.ofNullable(GatewayMessages.getSidHeader(message))
32-
.ifPresent(s -> GatewayMessages.setSid(builder, s));
33-
GatewayMessages.setSignal(builder, Signal.ERROR);
43+
ServiceMessage.from(DefaultErrorMapper.INSTANCE.toMessage(message.qualifier(), th))
44+
.header(SIGNAL_FIELD, Signal.ERROR.code());
45+
String sid = message.header(STREAM_ID_FIELD);
46+
if (sid != null) {
47+
builder.header(STREAM_ID_FIELD, sid);
48+
}
3449
return builder.build();
3550
}
3651

37-
static ServiceMessage newCompleteMessage(ServiceMessage message) {
38-
ServiceMessage.Builder builder = ServiceMessage.builder();
39-
Optional.ofNullable(GatewayMessages.getSidHeader(message))
40-
.ifPresent(s -> GatewayMessages.setSid(builder, s));
41-
GatewayMessages.setSignal(builder, Signal.COMPLETE);
42-
return builder.build();
52+
/**
53+
* Returns complete message by given arguments.
54+
*
55+
* @param sid sid
56+
* @param qualifier qualifier
57+
* @return {@link ServiceMessage} instance as the complete signal
58+
*/
59+
public static ServiceMessage newCompleteMessage(long sid, String qualifier) {
60+
return ServiceMessage.builder()
61+
.qualifier(qualifier)
62+
.header(STREAM_ID_FIELD, sid)
63+
.header(SIGNAL_FIELD, Signal.COMPLETE.code())
64+
.build();
4365
}
4466

45-
static ServiceMessage newResponseMessage(
67+
/**
68+
* Returns response message by given arguments.
69+
*
70+
* @param sid sid
71+
* @param message request
72+
* @param isErrorResponse should the message be marked as an error?
73+
* @return {@link ServiceMessage} instance as the response
74+
*/
75+
public static ServiceMessage newResponseMessage(
4676
long sid, ServiceMessage message, boolean isErrorResponse) {
47-
ServiceMessage.Builder builder = ServiceMessage.from(message);
48-
GatewayMessages.setSid(builder, sid);
4977
if (isErrorResponse) {
50-
GatewayMessages.setSignal(builder, Signal.ERROR);
78+
return ServiceMessage.from(message)
79+
.header(STREAM_ID_FIELD, sid)
80+
.header(SIGNAL_FIELD, Signal.ERROR.code())
81+
.build();
5182
}
52-
return builder.build();
83+
return ServiceMessage.from(message).header(STREAM_ID_FIELD, sid).build();
5384
}
5485

55-
static ServiceMessage validateSid(ServiceMessage message) {
56-
if (getSidHeader(message) == null) {
86+
/**
87+
* Verifies the sid existence in a given message.
88+
*
89+
* @param message message
90+
* @return incoming message
91+
*/
92+
public static ServiceMessage validateSid(ServiceMessage message) {
93+
if (message.header(STREAM_ID_FIELD) == null) {
5794
throw WebsocketContextException.badRequest("sid is missing", message);
5895
} else {
5996
return message;
6097
}
6198
}
6299

63-
static ServiceMessage validateSidOnSession(
100+
/**
101+
* Verifies the sid is not used in a given session.
102+
*
103+
* @param session session
104+
* @param message message
105+
* @return incoming message
106+
*/
107+
public static ServiceMessage validateSidOnSession(
64108
WebsocketGatewaySession session, ServiceMessage message) {
65109
long sid = getSid(message);
66110
if (session.containsSid(sid)) {
@@ -70,35 +114,37 @@ static ServiceMessage validateSidOnSession(
70114
}
71115
}
72116

73-
static ServiceMessage validateQualifier(ServiceMessage message) {
117+
/**
118+
* Verifies the qualifier existence in a given message.
119+
*
120+
* @param message message
121+
* @return incoming message
122+
*/
123+
public static ServiceMessage validateQualifier(ServiceMessage message) {
74124
if (message.qualifier() == null) {
75125
throw WebsocketContextException.badRequest("qualifier is missing", message);
76126
}
77127
return message;
78128
}
79129

80-
static long getSid(ServiceMessage message) {
81-
return Long.parseLong(getSidHeader(message));
82-
}
83-
84-
static String getSidHeader(ServiceMessage message) {
85-
return message.header(STREAM_ID_FIELD);
130+
/**
131+
* Returns sid from a given message.
132+
*
133+
* @param message message
134+
* @return sid
135+
*/
136+
public static long getSid(ServiceMessage message) {
137+
return Long.parseLong(message.header(STREAM_ID_FIELD));
86138
}
87139

88-
static ServiceMessage.Builder setSid(ServiceMessage.Builder builder, long sid) {
89-
return builder.header(STREAM_ID_FIELD, sid);
90-
}
91-
92-
static ServiceMessage.Builder setSid(ServiceMessage.Builder builder, String sid) {
93-
return builder.header(STREAM_ID_FIELD, sid);
94-
}
95-
96-
static Signal getSignal(ServiceMessage message) {
140+
/**
141+
* Returns signal from a given message.
142+
*
143+
* @param message message
144+
* @return signal
145+
*/
146+
public static Signal getSignal(ServiceMessage message) {
97147
String header = message.header(SIGNAL_FIELD);
98148
return header != null ? Signal.from(Integer.parseInt(header)) : null;
99149
}
100-
101-
static ServiceMessage.Builder setSignal(ServiceMessage.Builder builder, Signal signal) {
102-
return builder.header(SIGNAL_FIELD, signal.code());
103-
}
104150
}

services-gateway-netty/src/main/java/io/scalecube/services/gateway/ws/WebsocketGatewayAcceptor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ private void onMessage(WebsocketGatewaySession session, ServiceMessage message,
184184
() -> {
185185
if (!receivedError.get()) {
186186
session
187-
.send(newCompleteMessage(message))
187+
.send(newCompleteMessage(sid, message.qualifier()))
188188
.subscriberContext(context)
189189
.subscribe();
190190
}
@@ -205,6 +205,7 @@ private Mono<?> onCancel(WebsocketGatewaySession session, ServiceMessage message
205205
long sid = getSid(message);
206206
// dispose by sid (if anything to dispose)
207207
session.dispose(sid);
208-
return session.send(newCancelMessage(sid)); // no need to subscribe here since flatMap will do
208+
// no need to subscribe here since flatMap will do
209+
return session.send(newCancelMessage(sid, message.qualifier()));
209210
}
210211
}

0 commit comments

Comments
 (0)