Skip to content

Commit c30bb98

Browse files
lee0cbinzywu
authored andcommitted
Proxy support via websockets (#279)
* Mirrored Event Hub changes to ConnectionHandler classes & MessagingFactory. Not buildable * Proxy support, builds but not tested. Design issues around how proxy settings are stored and accessed. * getClientSettings() added to IAmpqConnection * Changes to TestUtils to allow proxy tests, misc fixes * Added instructions for running proxy tests. * Fix NullRef in Unittests. * Removed a typo, slight edit to proxy test setup * Fix for failing test - change ClientSettings constructor. * Fix in response to request for RUN_WITH_PROXY to be an environment variable. * Remove getHostname() and getClientSettings() from IAmqpConnection, replace by casting messagingFactory in ProxyConnectionHandler * Fixed compilation error * Reordered import statements * Updated qpid-proton-j-extensions to full version, not preview
1 parent 7f642ed commit c30bb98

File tree

12 files changed

+237
-36
lines changed

12 files changed

+237
-36
lines changed

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,8 @@ Only prerequisite to running tests is setting an environment variable named 'AZU
7777
And test classes also have methods to specify whether to create entities per test or once for all tests in a suite. Creating entities per test is better
7878
as it keeps test independent of each other.
7979

80+
To use a proxy for unit tests, set an environment variable `RUN_WITH_PROXY` to `true`. Then set the environment variables `PROXY_HOSTNAME` and `PROXY_PORT` to your values.
81+
8082
#### Please see a sample using Eclipse below
8183

8284
1. First clone the repository to your local machine: git clone https://github.com/Azure/azure-service-bus-java.git

azure-servicebus/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@
5454
<dependency>
5555
<groupId>com.microsoft.azure</groupId>
5656
<artifactId>qpid-proton-j-extensions</artifactId>
57-
<version>${qpid-proton-j-extensions-version}</version>
57+
<version>1.1.0</version>
5858
</dependency>
5959
<dependency>
6060
<groupId>org.bouncycastle</groupId>

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientSettings.java

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@ public class ClientSettings {
1818
private RetryPolicy retryPolicy;
1919
private Duration operationTimeout;
2020
private TransportType transportType;
21-
21+
22+
private String proxyHostName = null;
23+
private int proxyHostPort = 0;
24+
private String proxyUserName = null;
25+
private String proxyPassword = null;
26+
2227
/**
2328
* Creates a new instance with the given token provider, default retry policy and default operation timeout.
2429
* @param tokenProvider {@link TokenProvider} instance
@@ -29,12 +34,24 @@ public ClientSettings(TokenProvider tokenProvider)
2934
{
3035
this(tokenProvider, RetryPolicy.getDefault(), Duration.ofSeconds(ClientConstants.DEFAULT_OPERATION_TIMEOUT_IN_SECONDS), TransportType.AMQP);
3136
}
32-
37+
38+
/**
39+
* Creates a new instance with the given token provider, retry policy and operation timeout.
40+
* @param tokenProvider {@link TokenProvider} instance
41+
* @param retryPolicy {@link RetryPolicy} instance
42+
* @param operationTimeout default operation timeout to be used for all client operations. Client can override this value by explicitly specifying a timeout in the operation.
43+
*/
44+
public ClientSettings(TokenProvider tokenProvider, RetryPolicy retryPolicy, Duration operationTimeout)
45+
{
46+
this(tokenProvider, retryPolicy, operationTimeout, TransportType.AMQP);
47+
}
48+
3349
/**
3450
* Creates a new instance with the given token provider, retry policy and operation timeout.
3551
* @param tokenProvider {@link TokenProvider} instance
3652
* @param retryPolicy {@link RetryPolicy} instance
3753
* @param operationTimeout default operation timeout to be used for all client operations. Client can override this value by explicitly specifying a timeout in the operation.
54+
* @param transportType {@link TransportType} instance
3855
*/
3956
public ClientSettings(TokenProvider tokenProvider, RetryPolicy retryPolicy, Duration operationTimeout, TransportType transportType)
4057
{
@@ -71,5 +88,61 @@ public Duration getOperationTimeout()
7188
return operationTimeout;
7289
}
7390

91+
/**
92+
* Gets the transport type for this instance
93+
* @return transport type for the instance
94+
*/
7495
public TransportType getTransportType() { return transportType; }
96+
97+
/**
98+
* Sets the proxy hostname. Required for proxy connection
99+
* Proxy settings are only valid with transport type AMQP_WEB_SOCKETS
100+
* @param proxyHostName
101+
*/
102+
public void setProxyHostName(String proxyHostName) { this.proxyHostName = proxyHostName; }
103+
104+
/**
105+
* Sets the proxy host port. Required for proxy connection
106+
* Proxy settings are only valid with transport type AMQP_WEB_SOCKETS
107+
* @param proxyHostPort
108+
*/
109+
public void setProxyHostPort(int proxyHostPort) { this.proxyHostPort = proxyHostPort; }
110+
111+
/**
112+
* Sets the proxy username
113+
* Proxy settings are only valid with transport type AMQP_WEB_SOCKETS
114+
* @param proxyUserName
115+
*/
116+
public void setProxyUserName(String proxyUserName) { this.proxyUserName = proxyUserName; }
117+
118+
/**
119+
* Sets the proxy password
120+
* Proxy settings are only valid with transport type AMQP_WEB_SOCKETS
121+
* @param proxyPassword
122+
*/
123+
public void setProxyPassword(String proxyPassword) { this.proxyPassword = proxyPassword; }
124+
125+
/**
126+
* Gets the proxy host name
127+
* @return proxy host name
128+
*/
129+
public String getProxyHostName() { return proxyHostName; }
130+
131+
/**
132+
* Gets the proxy port
133+
* @return proxy port
134+
*/
135+
public int getProxyHostPort() { return proxyHostPort; }
136+
137+
/**
138+
* Gets the proxy username
139+
* @return proxy username
140+
*/
141+
public String getProxyUserName() { return proxyUserName; }
142+
143+
/**
144+
* Gets the proxy password
145+
* @return proxy password
146+
*/
147+
public String getProxyPassword() { return proxyPassword; }
75148
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/MessageSender.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -201,14 +201,14 @@ public CompletableFuture<Long> scheduleMessageAsync(IMessage message, Instant sc
201201
return this.internalSender.scheduleMessageAsync(
202202
new org.apache.qpid.proton.message.Message[]{amqpMessage},
203203
transaction,
204-
this.messagingFactory.getClientSetttings().getOperationTimeout()).thenApply(sequenceNumbers -> sequenceNumbers[0]);
204+
this.messagingFactory.getClientSettings().getOperationTimeout()).thenApply(sequenceNumbers -> sequenceNumbers[0]);
205205
}
206206

207207
@Override
208208
public CompletableFuture<Void> cancelScheduledMessageAsync(long sequenceNumber) {
209209
return this.internalSender.cancelScheduledMessageAsync(
210210
new Long[]{sequenceNumber},
211-
this.messagingFactory.getClientSetttings().getOperationTimeout());
211+
this.messagingFactory.getClientSettings().getOperationTimeout());
212212
}
213213

214214
@Override

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import java.util.HashMap;
88
import java.util.Map;
99

10+
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
11+
import com.microsoft.azure.servicebus.primitives.TransportType;
1012
import org.apache.qpid.proton.Proton;
1113
import org.apache.qpid.proton.amqp.Symbol;
1214
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@@ -30,19 +32,37 @@
3032
public class ConnectionHandler extends BaseHandler
3133
{
3234
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);
33-
private final IAmqpConnection messagingFactory;
35+
protected final IAmqpConnection messagingFactory;
3436

35-
public ConnectionHandler(final IAmqpConnection messagingFactory)
37+
protected ConnectionHandler(final IAmqpConnection messagingFactory)
3638
{
3739
add(new Handshaker());
3840
this.messagingFactory = messagingFactory;
3941
}
42+
43+
public static ConnectionHandler create(TransportType transportType, IAmqpConnection messagingFactory)
44+
{
45+
switch(transportType) {
46+
case AMQP_WEB_SOCKETS:
47+
if (ProxyConnectionHandler.shouldUseProxy(messagingFactory)) {
48+
return new ProxyConnectionHandler(messagingFactory);
49+
} else {
50+
return new WebSocketConnectionHandler(messagingFactory);
51+
}
52+
case AMQP:
53+
default:
54+
return new ConnectionHandler(messagingFactory);
55+
}
56+
}
4057

4158
@Override
4259
public void onConnectionInit(Event event)
4360
{
4461
final Connection connection = event.getConnection();
45-
final String hostName = event.getReactor().getConnectionAddress(connection);
62+
final String hostName = new StringBuilder(((MessagingFactory)messagingFactory).getHostName())
63+
.append(":")
64+
.append(String.valueOf(this.getProtocolPort()))
65+
.toString();
4666
TRACE_LOGGER.debug("onConnectionInit: hostname:{}", hostName);
4767
connection.setHostname(hostName);
4868
connection.setContainer(StringUtil.getShortRandomString());
@@ -58,14 +78,21 @@ public void onConnectionInit(Event event)
5878

5979
public void addTransportLayers(final Event event, final TransportInternal transport)
6080
{
81+
final SslDomain domain = makeDomain(SslDomain.Mode.CLIENT);
82+
transport.ssl(domain);
6183
}
62-
public int getPort()
84+
85+
public String getOutboundSocketHostName() { return ((MessagingFactory)messagingFactory).getHostName(); }
86+
87+
public int getOutboundSocketPort() { return this.getProtocolPort(); }
88+
89+
public int getProtocolPort()
6390
{
6491
return ClientConstants.AMQPS_PORT;
6592
}
93+
6694
public int getMaxFrameSize()
6795
{
68-
6996
return AmqpConstants.MAX_FRAME_SIZE;
7097
}
7198

@@ -77,9 +104,6 @@ public void onConnectionBound(Event event)
77104

78105
this.addTransportLayers(event, (TransportInternal) transport);
79106

80-
SslDomain domain = makeDomain(SslDomain.Mode.CLIENT);
81-
transport.ssl(domain);
82-
83107
Sasl sasl = transport.sasl();
84108
sasl.setMechanisms("ANONYMOUS");
85109
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/IAmqpConnection.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
*/
55
package com.microsoft.azure.servicebus.amqp;
66

7+
import com.microsoft.azure.servicebus.ClientSettings;
78
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
89
import org.apache.qpid.proton.engine.Link;
910

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package com.microsoft.azure.servicebus.amqp;
2+
3+
import com.microsoft.azure.proton.transport.proxy.ProxyHandler;
4+
import com.microsoft.azure.proton.transport.proxy.impl.ProxyHandlerImpl;
5+
import com.microsoft.azure.proton.transport.proxy.impl.ProxyImpl;
6+
7+
import com.microsoft.azure.servicebus.primitives.StringUtil;
8+
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
9+
import org.apache.qpid.proton.engine.Event;
10+
import org.apache.qpid.proton.engine.impl.TransportInternal;
11+
import org.slf4j.Logger;
12+
import org.slf4j.LoggerFactory;
13+
14+
import java.util.Base64;
15+
import java.util.HashMap;
16+
import java.util.Map;
17+
18+
public class ProxyConnectionHandler extends WebSocketConnectionHandler {
19+
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ProxyConnectionHandler.class);
20+
21+
public static boolean shouldUseProxy(IAmqpConnection messagingFactory) {
22+
return !StringUtil.isNullOrEmpty(((MessagingFactory)messagingFactory).getClientSettings().getProxyHostName());
23+
}
24+
25+
public ProxyConnectionHandler(IAmqpConnection messagingFactory) { super(messagingFactory); }
26+
27+
@Override
28+
public void addTransportLayers(final Event event, final TransportInternal transport) {
29+
super.addTransportLayers(event, transport);
30+
31+
final ProxyImpl proxy = new ProxyImpl();
32+
33+
final String hostName = event.getConnection().getHostname();
34+
final ProxyHandler proxyHandler = new ProxyHandlerImpl();
35+
final Map<String, String> proxyHeader = getAuthorizationHeader();
36+
proxy.configure(hostName, proxyHeader, proxyHandler, transport);
37+
38+
transport.addTransportLayer(proxy);
39+
40+
if (TRACE_LOGGER.isInfoEnabled()) {
41+
TRACE_LOGGER.info("addProxyHandshake: hostname[" + hostName + "]");
42+
}
43+
}
44+
45+
private Map<String, String> getAuthorizationHeader() {
46+
final String proxyUserName = ((MessagingFactory)messagingFactory).getClientSettings().getProxyUserName();
47+
final String proxyPassword = ((MessagingFactory)messagingFactory).getClientSettings().getProxyPassword();
48+
if (StringUtil.isNullOrEmpty(proxyUserName) ||
49+
StringUtil.isNullOrEmpty(proxyPassword)) {
50+
return null;
51+
}
52+
53+
final HashMap<String, String> proxyAuthorizationHeader = new HashMap<>();
54+
final String usernamePasswordPair = proxyUserName + ":" + proxyPassword;
55+
proxyAuthorizationHeader.put(
56+
"Proxy-Authorization",
57+
"Basic" + Base64.getEncoder().encodeToString(usernamePasswordPair.getBytes()));
58+
return proxyAuthorizationHeader;
59+
}
60+
61+
@Override
62+
public String getOutboundSocketHostName() {
63+
return ((MessagingFactory)messagingFactory).getClientSettings().getProxyHostName();
64+
}
65+
66+
@Override
67+
public int getOutboundSocketPort() {
68+
return ((MessagingFactory)messagingFactory).getClientSettings().getProxyHostPort();
69+
}
70+
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/WebSocketConnectionHandler.java

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
public class WebSocketConnectionHandler extends ConnectionHandler {
1111

12-
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ConnectionHandler.class);
12+
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(WebSocketConnectionHandler.class);
1313

1414
public WebSocketConnectionHandler(IAmqpConnection messagingFactory)
1515
{
@@ -19,26 +19,29 @@ public WebSocketConnectionHandler(IAmqpConnection messagingFactory)
1919
@Override
2020
public void addTransportLayers(final Event event, final TransportInternal transport)
2121
{
22+
final String hostName = event.getConnection().getHostname();
23+
2224
final WebSocketImpl webSocket = new WebSocketImpl();
2325
webSocket.configure(
24-
event.getConnection().getHostname(),
26+
hostName,
2527
"/$servicebus/websocket",
26-
null,
28+
"",
2729
0,
2830
"AMQPWSB10",
2931
null,
3032
null);
3133

3234
transport.addTransportLayer(webSocket);
3335

34-
if (TRACE_LOGGER.isInfoEnabled())
35-
{
36-
TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + event.getConnection().getHostname() +"]");
36+
if (TRACE_LOGGER.isInfoEnabled()) {
37+
TRACE_LOGGER.info("addWebsocketHandshake: hostname[" + hostName +"]");
3738
}
39+
40+
super.addTransportLayers(event, transport);
3841
}
3942

4043
@Override
41-
public int getPort()
44+
public int getProtocolPort()
4245
{
4346
return ClientConstants.HTTPS_PORT;
4447
}

0 commit comments

Comments
 (0)