From bc966575b1f422c0bea91dc3d2ece033556cba76 Mon Sep 17 00:00:00 2001 From: Francis Reynders Date: Wed, 12 Apr 2023 11:10:46 +0200 Subject: [PATCH 1/2] support websocket text payload --- .../stomp/StompServerOptionsConverter.java | 8 +++++ .../java/io/vertx/ext/stomp/Destination.java | 19 +++++++++++ .../java/io/vertx/ext/stomp/PayloadMode.java | 6 ++++ .../ext/stomp/StompServerConnection.java | 14 ++++++-- .../vertx/ext/stomp/StompServerOptions.java | 24 +++++++++++++ .../io/vertx/ext/stomp/impl/FrameParser.java | 6 +++- .../java/io/vertx/ext/stomp/impl/Queue.java | 34 ++++++++++++++----- .../impl/StompServerTCPConnectionImpl.java | 7 +++- .../StompServerWebSocketConnectionImpl.java | 27 +++++++++++++-- .../java/io/vertx/ext/stomp/impl/Topic.java | 34 ++++++++++++++----- .../impl/QueueManagingAcknowledgments.java | 34 ++++++++++++++----- 11 files changed, 180 insertions(+), 33 deletions(-) create mode 100644 src/main/java/io/vertx/ext/stomp/PayloadMode.java diff --git a/src/main/generated/io/vertx/ext/stomp/StompServerOptionsConverter.java b/src/main/generated/io/vertx/ext/stomp/StompServerOptionsConverter.java index 0a022df..354c6b4 100644 --- a/src/main/generated/io/vertx/ext/stomp/StompServerOptionsConverter.java +++ b/src/main/generated/io/vertx/ext/stomp/StompServerOptionsConverter.java @@ -50,6 +50,11 @@ public static void fromJson(Iterable> json, obj.setMaxSubscriptionsByClient(((Number)member.getValue()).intValue()); } break; + case "payloadMode": + if (member.getValue() instanceof String) { + obj.setPayloadMode(io.vertx.ext.stomp.PayloadMode.valueOf((String)member.getValue())); + } + break; case "secured": if (member.getValue() instanceof Boolean) { obj.setSecured((Boolean)member.getValue()); @@ -112,6 +117,9 @@ public static void toJson(StompServerOptions obj, java.util.Map json.put("maxHeaderLength", obj.getMaxHeaderLength()); json.put("maxHeaders", obj.getMaxHeaders()); json.put("maxSubscriptionsByClient", obj.getMaxSubscriptionsByClient()); + if (obj.getPayloadMode() != null) { + json.put("payloadMode", obj.getPayloadMode().name()); + } json.put("secured", obj.isSecured()); json.put("sendErrorOnNoSubscriptions", obj.isSendErrorOnNoSubscriptions()); if (obj.getSupportedVersions() != null) { diff --git a/src/main/java/io/vertx/ext/stomp/Destination.java b/src/main/java/io/vertx/ext/stomp/Destination.java index 2610581..8930157 100644 --- a/src/main/java/io/vertx/ext/stomp/Destination.java +++ b/src/main/java/io/vertx/ext/stomp/Destination.java @@ -65,6 +65,25 @@ static Destination bridge(Vertx vertx, BridgeOptions options) { @Fluent Destination dispatch(StompServerConnection connection, Frame frame); + /** + * Dispatches the given frame as a text frame + * + * @param connection the connection + * @param frame the frame + * @return the current instance of {@link Destination} + */ + @Fluent + Destination dispatchText(StompServerConnection connection, Frame frame); + + /** + * Dispatches the given frame as a binary frame + * + * @param connection the connection + * @param frame the frame + * @return the current instance of {@link Destination} + */ + Destination dispatchBinary(StompServerConnection connection, Frame frame); + /** * Handles a subscription request to the current {@link Destination}. * diff --git a/src/main/java/io/vertx/ext/stomp/PayloadMode.java b/src/main/java/io/vertx/ext/stomp/PayloadMode.java new file mode 100644 index 0000000..d43ecd4 --- /dev/null +++ b/src/main/java/io/vertx/ext/stomp/PayloadMode.java @@ -0,0 +1,6 @@ +package io.vertx.ext.stomp; + +public enum PayloadMode { + TEXT, + BINARY +} diff --git a/src/main/java/io/vertx/ext/stomp/StompServerConnection.java b/src/main/java/io/vertx/ext/stomp/StompServerConnection.java index 3aadcab..9aa3709 100644 --- a/src/main/java/io/vertx/ext/stomp/StompServerConnection.java +++ b/src/main/java/io/vertx/ext/stomp/StompServerConnection.java @@ -34,14 +34,24 @@ public interface StompServerConnection { /** - * Writes the given frame to the socket. + * Writes the given frame to the socket using default payload type * - * @param frame the frame, must not be {@code null}. + * @param frame the frame, must not be {@code null}. * @return the current {@link StompServerConnection} */ @Fluent StompServerConnection write(Frame frame); + /** + * Writes the given frame to the socket. + * + * @param frame the frame, must not be {@code null}. + * @param payloadMode explicitely specify the payload type for the underlying socket to use (e.g. websockets) + * @return the current {@link StompServerConnection} + */ + @Fluent + StompServerConnection write(Frame frame, PayloadMode payloadMode); + /** * Writes the given buffer to the socket. This is a low level API that should be used carefully. * diff --git a/src/main/java/io/vertx/ext/stomp/StompServerOptions.java b/src/main/java/io/vertx/ext/stomp/StompServerOptions.java index 85786a1..2623ba2 100644 --- a/src/main/java/io/vertx/ext/stomp/StompServerOptions.java +++ b/src/main/java/io/vertx/ext/stomp/StompServerOptions.java @@ -43,6 +43,7 @@ public class StompServerOptions extends NetServerOptions implements StompOptions public static final String DEFAULT_WEBSOCKET_PATH = "/stomp"; + public static final PayloadMode DEFAULT_PAYLOAD_MODE = PayloadMode.BINARY; private int maxHeaderLength = DEFAULT_MAX_HEADER_LENGTH; private int maxHeaders = DEFAULT_MAX_HEADERS; @@ -68,6 +69,7 @@ public class StompServerOptions extends NetServerOptions implements StompOptions private boolean disableTCPServer; private boolean trailingLine = DEFAULT_TRAILING_LINE; + private PayloadMode payloadMode = DEFAULT_PAYLOAD_MODE; /** * Default constructor. */ @@ -102,6 +104,8 @@ public StompServerOptions(StompServerOptions other) { this.disableTCPServer = other.disableTCPServer; this.trailingLine = other.trailingLine; + + this.payloadMode = other.payloadMode; } /** @@ -471,4 +475,24 @@ public StompServerOptions setTrailingLine(boolean trailingLine) { this.trailingLine = trailingLine; return this; } + + /** + * Specify the default payload type to be used by the underlying socket. Useful for websocket transports. + * + * @return the default payload mode + */ + public PayloadMode getPayloadMode() { + return payloadMode; + } + + /** + * Specify the default payload type to be used by the underlying socket. Useful for websocket transports. + * + * @param payloadMode the default payload mode to use + * @return the current {@link StompServerOptions} + */ + public StompServerOptions setPayloadMode(PayloadMode payloadMode) { + this.payloadMode = payloadMode; + return this; + } } diff --git a/src/main/java/io/vertx/ext/stomp/impl/FrameParser.java b/src/main/java/io/vertx/ext/stomp/impl/FrameParser.java index 86a9f19..3c73920 100644 --- a/src/main/java/io/vertx/ext/stomp/impl/FrameParser.java +++ b/src/main/java/io/vertx/ext/stomp/impl/FrameParser.java @@ -110,7 +110,11 @@ private void handleLine(Buffer buffer) { String length = headers.get(Frame.CONTENT_LENGTH); if (length != null) { int contentLength = Integer.parseInt(length); - frameParser.fixedSizeMode(contentLength); + if (contentLength != 0) { + frameParser.fixedSizeMode(contentLength); + } else { + frameParser.delimitedMode(NULL); + } } else { frameParser.delimitedMode(NULL); } diff --git a/src/main/java/io/vertx/ext/stomp/impl/Queue.java b/src/main/java/io/vertx/ext/stomp/impl/Queue.java index d7601df..bba2def 100644 --- a/src/main/java/io/vertx/ext/stomp/impl/Queue.java +++ b/src/main/java/io/vertx/ext/stomp/impl/Queue.java @@ -17,10 +17,7 @@ package io.vertx.ext.stomp.impl; import io.vertx.core.Vertx; -import io.vertx.ext.stomp.Command; -import io.vertx.ext.stomp.Destination; -import io.vertx.ext.stomp.Frame; -import io.vertx.ext.stomp.StompServerConnection; +import io.vertx.ext.stomp.*; import io.vertx.ext.stomp.utils.Headers; import java.util.ArrayList; @@ -58,12 +55,12 @@ public String destination() { /** * Dispatches the given frame. * - * @param connection the connection - * @param frame the frame ({@code SEND} frame). + * @param connection the connection + * @param frame the frame + * @param payloadMode only for websocket bridge, explicitely specify payload type or null * @return the current instance of {@link Destination} */ - @Override - public synchronized Destination dispatch(StompServerConnection connection, Frame frame) { + private synchronized Destination dispatch(StompServerConnection connection, Frame frame, PayloadMode payloadMode) { if (subscriptions.isEmpty()) { lastUsedSubscriptions = -1; return this; @@ -71,10 +68,29 @@ public synchronized Destination dispatch(StompServerConnection connection, Frame Subscription subscription = getNextSubscription(); String messageId = UUID.randomUUID().toString(); Frame message = transform(frame, subscription, messageId); - subscription.connection.write(message); + if(payloadMode == null) { + subscription.connection.write(message); // Uses server defaults + } else { + subscription.connection.write(message, payloadMode); // Explicit + } return this; } + @Override + public Destination dispatch(StompServerConnection connection, Frame frame) { + return dispatch(connection, frame, null); + } + + @Override + public Destination dispatchText(StompServerConnection connection, Frame frame) { + return dispatch(connection, frame, PayloadMode.TEXT); + } + + @Override + public Destination dispatchBinary(StompServerConnection connection, Frame frame) { + return dispatch(connection, frame, PayloadMode.BINARY); + } + private Subscription getNextSubscription() { lastUsedSubscriptions = lastUsedSubscriptions + 1; if (lastUsedSubscriptions >= subscriptions.size()) { diff --git a/src/main/java/io/vertx/ext/stomp/impl/StompServerTCPConnectionImpl.java b/src/main/java/io/vertx/ext/stomp/impl/StompServerTCPConnectionImpl.java index 6d90a66..9eed62f 100644 --- a/src/main/java/io/vertx/ext/stomp/impl/StompServerTCPConnectionImpl.java +++ b/src/main/java/io/vertx/ext/stomp/impl/StompServerTCPConnectionImpl.java @@ -38,7 +38,7 @@ public class StompServerTCPConnectionImpl implements StompServerConnection { private static final Logger log = LoggerFactory.getLogger(StompServerTCPConnectionImpl.class); - private final StompServer server; + protected final StompServer server; private final NetSocket socket; private final String sessionId; protected final Handler handler; @@ -72,6 +72,11 @@ public StompServerConnection write(Frame frame) { return write(frame.toBuffer(server.options().isTrailingLine())); } + @Override + public StompServerConnection write(Frame frame, PayloadMode payloadMode) { + return write(frame); + } + @Override public StompServerConnection write(Buffer buffer) { socket.write(buffer); diff --git a/src/main/java/io/vertx/ext/stomp/impl/StompServerWebSocketConnectionImpl.java b/src/main/java/io/vertx/ext/stomp/impl/StompServerWebSocketConnectionImpl.java index 0012acb..2bd7e86 100644 --- a/src/main/java/io/vertx/ext/stomp/impl/StompServerWebSocketConnectionImpl.java +++ b/src/main/java/io/vertx/ext/stomp/impl/StompServerWebSocketConnectionImpl.java @@ -41,8 +41,26 @@ public StompServerWebSocketConnectionImpl(ServerWebSocket socket, StompServer se } @Override - public SSLSession sslSession() { - return this.socket.sslSession(); + public SSLSession sslSession() { return this.socket.sslSession(); } + + @Override + public StompServerConnection write(Frame frame) { + return write(frame, server.options().getPayloadMode()); + } + + @Override + public StompServerConnection write(Frame frame, PayloadMode payloadMode) { + if (handler != null) { + handler.handle(new ServerFrameImpl(frame, this)); + } + Buffer stompPayload = frame.toBuffer(server.options().isTrailingLine()); + if (payloadMode == PayloadMode.BINARY) { + return write(stompPayload); + } else if (payloadMode == PayloadMode.TEXT) { + return writeText(stompPayload.toString()); + } else { + return write(stompPayload); // Default + } } @Override @@ -51,6 +69,11 @@ public StompServerConnection write(Buffer buffer) { return this; } + public StompServerConnection writeText(String message) { + socket.writeTextMessage(message); + return this; + } + @Override public void ping() { if (handler != null) { diff --git a/src/main/java/io/vertx/ext/stomp/impl/Topic.java b/src/main/java/io/vertx/ext/stomp/impl/Topic.java index 02877d4..73fd193 100644 --- a/src/main/java/io/vertx/ext/stomp/impl/Topic.java +++ b/src/main/java/io/vertx/ext/stomp/impl/Topic.java @@ -17,10 +17,7 @@ package io.vertx.ext.stomp.impl; import io.vertx.core.Vertx; -import io.vertx.ext.stomp.Command; -import io.vertx.ext.stomp.Destination; -import io.vertx.ext.stomp.Frame; -import io.vertx.ext.stomp.StompServerConnection; +import io.vertx.ext.stomp.*; import io.vertx.ext.stomp.utils.Headers; import java.util.ArrayList; @@ -56,20 +53,39 @@ public String destination() { /** * Dispatches the given frame. * - * @param connection the connection - * @param frame the frame ({@code SEND} frame). + * @param connection the connection + * @param frame the frame + * @param payloadMode only for websocket bridge, explicitely specify payload type or null * @return the current instance of {@link Destination} */ - @Override - public synchronized Destination dispatch(StompServerConnection connection, Frame frame) { + private synchronized Destination dispatch(StompServerConnection connection, Frame frame, PayloadMode payloadMode) { for (Subscription subscription : subscriptions) { String messageId = UUID.randomUUID().toString(); Frame message = transform(frame, subscription, messageId); - subscription.connection.write(message); + if(payloadMode != null) { + subscription.connection.write(message, payloadMode); + } else { + subscription.connection.write(message); + } } return this; } + @Override + public Destination dispatch(StompServerConnection connection, Frame frame) { + return dispatch(connection, frame, null); + } + + @Override + public Destination dispatchText(StompServerConnection connection, Frame frame) { + return dispatch(connection, frame, PayloadMode.TEXT); + } + + @Override + public Destination dispatchBinary(StompServerConnection connection, Frame frame) { + return dispatch(connection, frame, PayloadMode.BINARY); + } + public static Frame transform(Frame frame, Subscription subscription, String messageId) { final Headers headers = Headers.create(frame.getHeaders()) // Destination already set in the input headers. diff --git a/src/test/java/io/vertx/ext/stomp/impl/QueueManagingAcknowledgments.java b/src/test/java/io/vertx/ext/stomp/impl/QueueManagingAcknowledgments.java index 386a5df..7710115 100644 --- a/src/test/java/io/vertx/ext/stomp/impl/QueueManagingAcknowledgments.java +++ b/src/test/java/io/vertx/ext/stomp/impl/QueueManagingAcknowledgments.java @@ -17,10 +17,7 @@ package io.vertx.ext.stomp.impl; import io.vertx.core.Vertx; -import io.vertx.ext.stomp.Command; -import io.vertx.ext.stomp.Destination; -import io.vertx.ext.stomp.Frame; -import io.vertx.ext.stomp.StompServerConnection; +import io.vertx.ext.stomp.*; import io.vertx.ext.stomp.utils.Headers; import java.util.ArrayList; @@ -58,12 +55,12 @@ public String destination() { /** * Dispatches the given frame. * - * @param connection the connection - * @param frame the frame ({@code SEND} frame). + * @param connection the connection + * @param frame the frame + * @param payloadMode only for websocket bridge, explicitely specify payload type or null * @return the current instance of {@link Destination} */ - @Override - public synchronized Destination dispatch(StompServerConnection connection, Frame frame) { + private synchronized Destination dispatch(StompServerConnection connection, Frame frame, PayloadMode payloadMode) { if (subscriptions.isEmpty()) { lastUsedSubscriptions = -1; return this; @@ -72,10 +69,29 @@ public synchronized Destination dispatch(StompServerConnection connection, Frame String messageId = UUID.randomUUID().toString(); Frame message = transform(frame, subscription, messageId); subscription.enqueue(message); - subscription.connection().write(message); + if(payloadMode != null) { + subscription.connection.write(message, payloadMode); + } else { + subscription.connection.write(message); + } return this; } + @Override + public Destination dispatch(StompServerConnection connection, Frame frame) { + return dispatch(connection, frame, null); + } + + @Override + public Destination dispatchText(StompServerConnection connection, Frame frame) { + return dispatch(connection, frame, PayloadMode.TEXT); + } + + @Override + public Destination dispatchBinary(StompServerConnection connection, Frame frame) { + return dispatch(connection, frame, PayloadMode.BINARY); + } + private Subscription getNextSubscription() { lastUsedSubscriptions = lastUsedSubscriptions + 1; if (lastUsedSubscriptions >= subscriptions.size()) { From 96338cd7eab9d4b5aa5a5162941530ec8a0d6b99 Mon Sep 17 00:00:00 2001 From: Francis Reynders Date: Wed, 17 May 2023 08:44:15 +0200 Subject: [PATCH 2/2] bump to 4.4.3-SNAPSHOT --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 072b3ea..5ecb31b 100644 --- a/pom.xml +++ b/pom.xml @@ -9,13 +9,13 @@ vertx-stomp - 4.4.2-SNAPSHOT + 4.4.3-SNAPSHOT Vert.x Stomp Stomp support for Vert.x 3 - 4.4.2-SNAPSHOT + 4.4.3-SNAPSHOT ${project.basedir}/src/main/resources/META-INF/MANIFEST.MF