Skip to content

Commit bc96657

Browse files
committed
support websocket text payload
1 parent bbc4164 commit bc96657

11 files changed

+180
-33
lines changed

src/main/generated/io/vertx/ext/stomp/StompServerOptionsConverter.java

+8
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,11 @@ public static void fromJson(Iterable<java.util.Map.Entry<String, Object>> json,
5050
obj.setMaxSubscriptionsByClient(((Number)member.getValue()).intValue());
5151
}
5252
break;
53+
case "payloadMode":
54+
if (member.getValue() instanceof String) {
55+
obj.setPayloadMode(io.vertx.ext.stomp.PayloadMode.valueOf((String)member.getValue()));
56+
}
57+
break;
5358
case "secured":
5459
if (member.getValue() instanceof Boolean) {
5560
obj.setSecured((Boolean)member.getValue());
@@ -112,6 +117,9 @@ public static void toJson(StompServerOptions obj, java.util.Map<String, Object>
112117
json.put("maxHeaderLength", obj.getMaxHeaderLength());
113118
json.put("maxHeaders", obj.getMaxHeaders());
114119
json.put("maxSubscriptionsByClient", obj.getMaxSubscriptionsByClient());
120+
if (obj.getPayloadMode() != null) {
121+
json.put("payloadMode", obj.getPayloadMode().name());
122+
}
115123
json.put("secured", obj.isSecured());
116124
json.put("sendErrorOnNoSubscriptions", obj.isSendErrorOnNoSubscriptions());
117125
if (obj.getSupportedVersions() != null) {

src/main/java/io/vertx/ext/stomp/Destination.java

+19
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,25 @@ static Destination bridge(Vertx vertx, BridgeOptions options) {
6565
@Fluent
6666
Destination dispatch(StompServerConnection connection, Frame frame);
6767

68+
/**
69+
* Dispatches the given frame as a text frame
70+
*
71+
* @param connection the connection
72+
* @param frame the frame
73+
* @return the current instance of {@link Destination}
74+
*/
75+
@Fluent
76+
Destination dispatchText(StompServerConnection connection, Frame frame);
77+
78+
/**
79+
* Dispatches the given frame as a binary frame
80+
*
81+
* @param connection the connection
82+
* @param frame the frame
83+
* @return the current instance of {@link Destination}
84+
*/
85+
Destination dispatchBinary(StompServerConnection connection, Frame frame);
86+
6887
/**
6988
* Handles a subscription request to the current {@link Destination}.
7089
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.vertx.ext.stomp;
2+
3+
public enum PayloadMode {
4+
TEXT,
5+
BINARY
6+
}

src/main/java/io/vertx/ext/stomp/StompServerConnection.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -34,14 +34,24 @@
3434
public interface StompServerConnection {
3535

3636
/**
37-
* Writes the given frame to the socket.
37+
* Writes the given frame to the socket using default payload type
3838
*
39-
* @param frame the frame, must not be {@code null}.
39+
* @param frame the frame, must not be {@code null}.
4040
* @return the current {@link StompServerConnection}
4141
*/
4242
@Fluent
4343
StompServerConnection write(Frame frame);
4444

45+
/**
46+
* Writes the given frame to the socket.
47+
*
48+
* @param frame the frame, must not be {@code null}.
49+
* @param payloadMode explicitely specify the payload type for the underlying socket to use (e.g. websockets)
50+
* @return the current {@link StompServerConnection}
51+
*/
52+
@Fluent
53+
StompServerConnection write(Frame frame, PayloadMode payloadMode);
54+
4555
/**
4656
* Writes the given buffer to the socket. This is a low level API that should be used carefully.
4757
*

src/main/java/io/vertx/ext/stomp/StompServerOptions.java

+24
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ public class StompServerOptions extends NetServerOptions implements StompOptions
4343

4444
public static final String DEFAULT_WEBSOCKET_PATH = "/stomp";
4545

46+
public static final PayloadMode DEFAULT_PAYLOAD_MODE = PayloadMode.BINARY;
4647

4748
private int maxHeaderLength = DEFAULT_MAX_HEADER_LENGTH;
4849
private int maxHeaders = DEFAULT_MAX_HEADERS;
@@ -68,6 +69,7 @@ public class StompServerOptions extends NetServerOptions implements StompOptions
6869
private boolean disableTCPServer;
6970
private boolean trailingLine = DEFAULT_TRAILING_LINE;
7071

72+
private PayloadMode payloadMode = DEFAULT_PAYLOAD_MODE;
7173
/**
7274
* Default constructor.
7375
*/
@@ -102,6 +104,8 @@ public StompServerOptions(StompServerOptions other) {
102104

103105
this.disableTCPServer = other.disableTCPServer;
104106
this.trailingLine = other.trailingLine;
107+
108+
this.payloadMode = other.payloadMode;
105109
}
106110

107111
/**
@@ -471,4 +475,24 @@ public StompServerOptions setTrailingLine(boolean trailingLine) {
471475
this.trailingLine = trailingLine;
472476
return this;
473477
}
478+
479+
/**
480+
* Specify the default payload type to be used by the underlying socket. Useful for websocket transports.
481+
*
482+
* @return the default payload mode
483+
*/
484+
public PayloadMode getPayloadMode() {
485+
return payloadMode;
486+
}
487+
488+
/**
489+
* Specify the default payload type to be used by the underlying socket. Useful for websocket transports.
490+
*
491+
* @param payloadMode the default payload mode to use
492+
* @return the current {@link StompServerOptions}
493+
*/
494+
public StompServerOptions setPayloadMode(PayloadMode payloadMode) {
495+
this.payloadMode = payloadMode;
496+
return this;
497+
}
474498
}

src/main/java/io/vertx/ext/stomp/impl/FrameParser.java

+5-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,11 @@ private void handleLine(Buffer buffer) {
110110
String length = headers.get(Frame.CONTENT_LENGTH);
111111
if (length != null) {
112112
int contentLength = Integer.parseInt(length);
113-
frameParser.fixedSizeMode(contentLength);
113+
if (contentLength != 0) {
114+
frameParser.fixedSizeMode(contentLength);
115+
} else {
116+
frameParser.delimitedMode(NULL);
117+
}
114118
} else {
115119
frameParser.delimitedMode(NULL);
116120
}

src/main/java/io/vertx/ext/stomp/impl/Queue.java

+25-9
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
package io.vertx.ext.stomp.impl;
1818

1919
import io.vertx.core.Vertx;
20-
import io.vertx.ext.stomp.Command;
21-
import io.vertx.ext.stomp.Destination;
22-
import io.vertx.ext.stomp.Frame;
23-
import io.vertx.ext.stomp.StompServerConnection;
20+
import io.vertx.ext.stomp.*;
2421
import io.vertx.ext.stomp.utils.Headers;
2522

2623
import java.util.ArrayList;
@@ -58,23 +55,42 @@ public String destination() {
5855
/**
5956
* Dispatches the given frame.
6057
*
61-
* @param connection the connection
62-
* @param frame the frame ({@code SEND} frame).
58+
* @param connection the connection
59+
* @param frame the frame
60+
* @param payloadMode only for websocket bridge, explicitely specify payload type or null
6361
* @return the current instance of {@link Destination}
6462
*/
65-
@Override
66-
public synchronized Destination dispatch(StompServerConnection connection, Frame frame) {
63+
private synchronized Destination dispatch(StompServerConnection connection, Frame frame, PayloadMode payloadMode) {
6764
if (subscriptions.isEmpty()) {
6865
lastUsedSubscriptions = -1;
6966
return this;
7067
}
7168
Subscription subscription = getNextSubscription();
7269
String messageId = UUID.randomUUID().toString();
7370
Frame message = transform(frame, subscription, messageId);
74-
subscription.connection.write(message);
71+
if(payloadMode == null) {
72+
subscription.connection.write(message); // Uses server defaults
73+
} else {
74+
subscription.connection.write(message, payloadMode); // Explicit
75+
}
7576
return this;
7677
}
7778

79+
@Override
80+
public Destination dispatch(StompServerConnection connection, Frame frame) {
81+
return dispatch(connection, frame, null);
82+
}
83+
84+
@Override
85+
public Destination dispatchText(StompServerConnection connection, Frame frame) {
86+
return dispatch(connection, frame, PayloadMode.TEXT);
87+
}
88+
89+
@Override
90+
public Destination dispatchBinary(StompServerConnection connection, Frame frame) {
91+
return dispatch(connection, frame, PayloadMode.BINARY);
92+
}
93+
7894
private Subscription getNextSubscription() {
7995
lastUsedSubscriptions = lastUsedSubscriptions + 1;
8096
if (lastUsedSubscriptions >= subscriptions.size()) {

src/main/java/io/vertx/ext/stomp/impl/StompServerTCPConnectionImpl.java

+6-1
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ public class StompServerTCPConnectionImpl implements StompServerConnection {
3838

3939
private static final Logger log = LoggerFactory.getLogger(StompServerTCPConnectionImpl.class);
4040

41-
private final StompServer server;
41+
protected final StompServer server;
4242
private final NetSocket socket;
4343
private final String sessionId;
4444
protected final Handler<ServerFrame> handler;
@@ -72,6 +72,11 @@ public StompServerConnection write(Frame frame) {
7272
return write(frame.toBuffer(server.options().isTrailingLine()));
7373
}
7474

75+
@Override
76+
public StompServerConnection write(Frame frame, PayloadMode payloadMode) {
77+
return write(frame);
78+
}
79+
7580
@Override
7681
public StompServerConnection write(Buffer buffer) {
7782
socket.write(buffer);

src/main/java/io/vertx/ext/stomp/impl/StompServerWebSocketConnectionImpl.java

+25-2
Original file line numberDiff line numberDiff line change
@@ -41,8 +41,26 @@ public StompServerWebSocketConnectionImpl(ServerWebSocket socket, StompServer se
4141
}
4242

4343
@Override
44-
public SSLSession sslSession() {
45-
return this.socket.sslSession();
44+
public SSLSession sslSession() { return this.socket.sslSession(); }
45+
46+
@Override
47+
public StompServerConnection write(Frame frame) {
48+
return write(frame, server.options().getPayloadMode());
49+
}
50+
51+
@Override
52+
public StompServerConnection write(Frame frame, PayloadMode payloadMode) {
53+
if (handler != null) {
54+
handler.handle(new ServerFrameImpl(frame, this));
55+
}
56+
Buffer stompPayload = frame.toBuffer(server.options().isTrailingLine());
57+
if (payloadMode == PayloadMode.BINARY) {
58+
return write(stompPayload);
59+
} else if (payloadMode == PayloadMode.TEXT) {
60+
return writeText(stompPayload.toString());
61+
} else {
62+
return write(stompPayload); // Default
63+
}
4664
}
4765

4866
@Override
@@ -51,6 +69,11 @@ public StompServerConnection write(Buffer buffer) {
5169
return this;
5270
}
5371

72+
public StompServerConnection writeText(String message) {
73+
socket.writeTextMessage(message);
74+
return this;
75+
}
76+
5477
@Override
5578
public void ping() {
5679
if (handler != null) {

src/main/java/io/vertx/ext/stomp/impl/Topic.java

+25-9
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
package io.vertx.ext.stomp.impl;
1818

1919
import io.vertx.core.Vertx;
20-
import io.vertx.ext.stomp.Command;
21-
import io.vertx.ext.stomp.Destination;
22-
import io.vertx.ext.stomp.Frame;
23-
import io.vertx.ext.stomp.StompServerConnection;
20+
import io.vertx.ext.stomp.*;
2421
import io.vertx.ext.stomp.utils.Headers;
2522

2623
import java.util.ArrayList;
@@ -56,20 +53,39 @@ public String destination() {
5653
/**
5754
* Dispatches the given frame.
5855
*
59-
* @param connection the connection
60-
* @param frame the frame ({@code SEND} frame).
56+
* @param connection the connection
57+
* @param frame the frame
58+
* @param payloadMode only for websocket bridge, explicitely specify payload type or null
6159
* @return the current instance of {@link Destination}
6260
*/
63-
@Override
64-
public synchronized Destination dispatch(StompServerConnection connection, Frame frame) {
61+
private synchronized Destination dispatch(StompServerConnection connection, Frame frame, PayloadMode payloadMode) {
6562
for (Subscription subscription : subscriptions) {
6663
String messageId = UUID.randomUUID().toString();
6764
Frame message = transform(frame, subscription, messageId);
68-
subscription.connection.write(message);
65+
if(payloadMode != null) {
66+
subscription.connection.write(message, payloadMode);
67+
} else {
68+
subscription.connection.write(message);
69+
}
6970
}
7071
return this;
7172
}
7273

74+
@Override
75+
public Destination dispatch(StompServerConnection connection, Frame frame) {
76+
return dispatch(connection, frame, null);
77+
}
78+
79+
@Override
80+
public Destination dispatchText(StompServerConnection connection, Frame frame) {
81+
return dispatch(connection, frame, PayloadMode.TEXT);
82+
}
83+
84+
@Override
85+
public Destination dispatchBinary(StompServerConnection connection, Frame frame) {
86+
return dispatch(connection, frame, PayloadMode.BINARY);
87+
}
88+
7389
public static Frame transform(Frame frame, Subscription subscription, String messageId) {
7490
final Headers headers = Headers.create(frame.getHeaders())
7591
// Destination already set in the input headers.

src/test/java/io/vertx/ext/stomp/impl/QueueManagingAcknowledgments.java

+25-9
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,7 @@
1717
package io.vertx.ext.stomp.impl;
1818

1919
import io.vertx.core.Vertx;
20-
import io.vertx.ext.stomp.Command;
21-
import io.vertx.ext.stomp.Destination;
22-
import io.vertx.ext.stomp.Frame;
23-
import io.vertx.ext.stomp.StompServerConnection;
20+
import io.vertx.ext.stomp.*;
2421
import io.vertx.ext.stomp.utils.Headers;
2522

2623
import java.util.ArrayList;
@@ -58,12 +55,12 @@ public String destination() {
5855
/**
5956
* Dispatches the given frame.
6057
*
61-
* @param connection the connection
62-
* @param frame the frame ({@code SEND} frame).
58+
* @param connection the connection
59+
* @param frame the frame
60+
* @param payloadMode only for websocket bridge, explicitely specify payload type or null
6361
* @return the current instance of {@link Destination}
6462
*/
65-
@Override
66-
public synchronized Destination dispatch(StompServerConnection connection, Frame frame) {
63+
private synchronized Destination dispatch(StompServerConnection connection, Frame frame, PayloadMode payloadMode) {
6764
if (subscriptions.isEmpty()) {
6865
lastUsedSubscriptions = -1;
6966
return this;
@@ -72,10 +69,29 @@ public synchronized Destination dispatch(StompServerConnection connection, Frame
7269
String messageId = UUID.randomUUID().toString();
7370
Frame message = transform(frame, subscription, messageId);
7471
subscription.enqueue(message);
75-
subscription.connection().write(message);
72+
if(payloadMode != null) {
73+
subscription.connection.write(message, payloadMode);
74+
} else {
75+
subscription.connection.write(message);
76+
}
7677
return this;
7778
}
7879

80+
@Override
81+
public Destination dispatch(StompServerConnection connection, Frame frame) {
82+
return dispatch(connection, frame, null);
83+
}
84+
85+
@Override
86+
public Destination dispatchText(StompServerConnection connection, Frame frame) {
87+
return dispatch(connection, frame, PayloadMode.TEXT);
88+
}
89+
90+
@Override
91+
public Destination dispatchBinary(StompServerConnection connection, Frame frame) {
92+
return dispatch(connection, frame, PayloadMode.BINARY);
93+
}
94+
7995
private Subscription getNextSubscription() {
8096
lastUsedSubscriptions = lastUsedSubscriptions + 1;
8197
if (lastUsedSubscriptions >= subscriptions.size()) {

0 commit comments

Comments
 (0)