Skip to content

Commit 38fc28b

Browse files
lee0cnemakam
authored andcommitted
Proxy update (#285)
Update to proxy support to align with updates to Event Hubs proxy support.
1 parent 482f95f commit 38fc28b

File tree

7 files changed

+233
-83
lines changed

7 files changed

+233
-83
lines changed

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/ClientSettings.java

Lines changed: 0 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,6 @@ public class ClientSettings {
1919
private Duration operationTimeout;
2020
private TransportType transportType;
2121

22-
private String proxyHostName = null;
23-
private int proxyHostPort = 0;
24-
private String proxyUserName = null;
25-
private String proxyPassword = null;
26-
2722
/**
2823
* Creates a new instance with the given token provider, default retry policy and default operation timeout.
2924
* @param tokenProvider {@link TokenProvider} instance
@@ -93,56 +88,4 @@ public Duration getOperationTimeout()
9388
* @return transport type for the instance
9489
*/
9590
public TransportType getTransportType() { return transportType; }
96-
97-
/**
98-
* Sets the proxy hostname. Required for proxy connection
99-
* Proxy settings are only valid with transport type AMQP_WEB_SOCKETS
100-
* @param proxyHostName
101-
*/
102-
public void setProxyHostName(String proxyHostName) { this.proxyHostName = proxyHostName; }
103-
104-
/**
105-
* Sets the proxy host port. Required for proxy connection
106-
* Proxy settings are only valid with transport type AMQP_WEB_SOCKETS
107-
* @param proxyHostPort
108-
*/
109-
public void setProxyHostPort(int proxyHostPort) { this.proxyHostPort = proxyHostPort; }
110-
111-
/**
112-
* Sets the proxy username
113-
* Proxy settings are only valid with transport type AMQP_WEB_SOCKETS
114-
* @param proxyUserName
115-
*/
116-
public void setProxyUserName(String proxyUserName) { this.proxyUserName = proxyUserName; }
117-
118-
/**
119-
* Sets the proxy password
120-
* Proxy settings are only valid with transport type AMQP_WEB_SOCKETS
121-
* @param proxyPassword
122-
*/
123-
public void setProxyPassword(String proxyPassword) { this.proxyPassword = proxyPassword; }
124-
125-
/**
126-
* Gets the proxy host name
127-
* @return proxy host name
128-
*/
129-
public String getProxyHostName() { return proxyHostName; }
130-
131-
/**
132-
* Gets the proxy port
133-
* @return proxy port
134-
*/
135-
public int getProxyHostPort() { return proxyHostPort; }
136-
137-
/**
138-
* Gets the proxy username
139-
* @return proxy username
140-
*/
141-
public String getProxyUserName() { return proxyUserName; }
142-
143-
/**
144-
* Gets the proxy password
145-
* @return proxy password
146-
*/
147-
public String getProxyPassword() { return proxyPassword; }
14891
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpErrorCode.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,4 +25,7 @@ public final class AmqpErrorCode
2525

2626
// connection errors
2727
public static final Symbol ConnectionForced = Symbol.getSymbol("amqp:connection:forced");
28+
29+
// proton library IOExceptions while performing operations on SocketChannel (in IOHandler.java)
30+
public static final Symbol ProtonIOError = Symbol.getSymbol("proton:io");
2831
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ConnectionHandler.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public static ConnectionHandler create(TransportType transportType, IAmqpConnect
4444
{
4545
switch(transportType) {
4646
case AMQP_WEB_SOCKETS:
47-
if (ProxyConnectionHandler.shouldUseProxy(messagingFactory)) {
47+
if (ProxyConnectionHandler.shouldUseProxy( ((MessagingFactory)messagingFactory).getHostName() )) {
4848
return new ProxyConnectionHandler(messagingFactory);
4949
} else {
5050
return new WebSocketConnectionHandler(messagingFactory);
@@ -76,12 +76,19 @@ public void onConnectionInit(Event event)
7676
connection.open();
7777
}
7878

79+
protected IAmqpConnection getMessagingFactory()
80+
{
81+
return this.messagingFactory;
82+
}
83+
7984
public void addTransportLayers(final Event event, final TransportInternal transport)
8085
{
8186
final SslDomain domain = makeDomain(SslDomain.Mode.CLIENT);
8287
transport.ssl(domain);
8388
}
8489

90+
protected void notifyTransportErrors(final Event event) { /* no-op */ }
91+
8592
public String getOutboundSocketHostName() { return ((MessagingFactory)messagingFactory).getHostName(); }
8693

8794
public int getOutboundSocketPort() { return this.getProtocolPort(); }
@@ -127,6 +134,8 @@ public void onTransportError(Event event)
127134
{
128135
connection.free();
129136
}
137+
138+
this.notifyTransportErrors(event);
130139
}
131140

132141
@Override

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/amqp/ProxyConnectionHandler.java

Lines changed: 121 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,38 @@
44
import com.microsoft.azure.proton.transport.proxy.impl.ProxyHandlerImpl;
55
import com.microsoft.azure.proton.transport.proxy.impl.ProxyImpl;
66

7+
import com.microsoft.azure.servicebus.primitives.ClientConstants;
78
import com.microsoft.azure.servicebus.primitives.StringUtil;
89
import com.microsoft.azure.servicebus.primitives.MessagingFactory;
10+
import org.apache.qpid.proton.amqp.transport.ConnectionError;
11+
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
12+
import org.apache.qpid.proton.engine.Connection;
913
import org.apache.qpid.proton.engine.Event;
14+
import org.apache.qpid.proton.engine.Transport;
1015
import org.apache.qpid.proton.engine.impl.TransportInternal;
1116
import org.slf4j.Logger;
1217
import org.slf4j.LoggerFactory;
1318

19+
import java.io.IOException;
20+
import java.net.*;
1421
import java.util.Base64;
1522
import java.util.HashMap;
23+
import java.util.List;
1624
import java.util.Map;
1725

1826
public class ProxyConnectionHandler extends WebSocketConnectionHandler {
1927
private static final Logger TRACE_LOGGER = LoggerFactory.getLogger(ProxyConnectionHandler.class);
28+
private final String proxySelectorModifiedError = "Proxy Selector has been modified.";
2029

21-
public static boolean shouldUseProxy(IAmqpConnection messagingFactory) {
22-
return !StringUtil.isNullOrEmpty(((MessagingFactory)messagingFactory).getClientSettings().getProxyHostName());
30+
public static boolean shouldUseProxy(final String hostName) {
31+
final URI uri = createURIFromHostNamePort(hostName, ClientConstants.HTTPS_PORT);
32+
final ProxySelector proxySelector = ProxySelector.getDefault();
33+
if (proxySelector == null) {
34+
return false;
35+
}
36+
37+
final List<Proxy> proxies = proxySelector.select(uri);
38+
return isProxyAddressLegal(proxies);
2339
}
2440

2541
public ProxyConnectionHandler(IAmqpConnection messagingFactory) { super(messagingFactory); }
@@ -42,11 +58,67 @@ public void addTransportLayers(final Event event, final TransportInternal transp
4258
}
4359
}
4460

61+
@Override
62+
protected void notifyTransportErrors(final Event event) {
63+
final Transport transport = event.getTransport();
64+
final Connection connection = event.getConnection();
65+
if (connection == null || transport == null) {
66+
return;
67+
}
68+
69+
final ErrorCondition errorCondition = transport.getCondition();
70+
final String hostName = event.getReactor().getConnectionAddress(connection);
71+
final ProxySelector proxySelector = ProxySelector.getDefault();
72+
if (errorCondition == null
73+
|| !(errorCondition.getCondition().equals(ConnectionError.FRAMING_ERROR)
74+
|| errorCondition.getCondition().equals(AmqpErrorCode.ProtonIOError))
75+
|| proxySelector == null
76+
|| StringUtil.isNullOrEmpty(hostName)) {
77+
return;
78+
}
79+
80+
final String[] hostNameParts = hostName.split(":");
81+
if (hostNameParts.length != 2) {
82+
return;
83+
}
84+
85+
int port;
86+
try {
87+
port = Integer.parseInt(hostNameParts[1]);
88+
} catch (NumberFormatException ignore) {
89+
return;
90+
}
91+
92+
final IOException ioException = reconstructIOException(errorCondition);
93+
proxySelector.connectFailed(
94+
createURIFromHostNamePort(((MessagingFactory)this.getMessagingFactory()).getHostName(), this.getProtocolPort()),
95+
new InetSocketAddress(hostNameParts[0], port),
96+
ioException
97+
);
98+
99+
}
100+
45101
private Map<String, String> getAuthorizationHeader() {
46-
final String proxyUserName = ((MessagingFactory)messagingFactory).getClientSettings().getProxyUserName();
47-
final String proxyPassword = ((MessagingFactory)messagingFactory).getClientSettings().getProxyPassword();
48-
if (StringUtil.isNullOrEmpty(proxyUserName) ||
49-
StringUtil.isNullOrEmpty(proxyPassword)) {
102+
final PasswordAuthentication authentication = Authenticator.requestPasswordAuthentication(
103+
getOutboundSocketHostName(),
104+
null,
105+
getOutboundSocketPort(),
106+
null,
107+
null,
108+
"http",
109+
null,
110+
Authenticator.RequestorType.PROXY
111+
);
112+
if (authentication == null) {
113+
return null;
114+
}
115+
116+
final String proxyUserName = authentication.getUserName();
117+
final String proxyPassword = authentication.getPassword() != null
118+
? new String(authentication.getPassword())
119+
: null;
120+
if (StringUtil.isNullOrEmpty(proxyUserName)
121+
|| StringUtil.isNullOrEmpty(proxyPassword)) {
50122
return null;
51123
}
52124

@@ -60,11 +132,52 @@ private Map<String, String> getAuthorizationHeader() {
60132

61133
@Override
62134
public String getOutboundSocketHostName() {
63-
return ((MessagingFactory)messagingFactory).getClientSettings().getProxyHostName();
135+
final InetSocketAddress socketAddress = getProxyAddress();
136+
return socketAddress.getHostString();
64137
}
65138

66139
@Override
67140
public int getOutboundSocketPort() {
68-
return ((MessagingFactory)messagingFactory).getClientSettings().getProxyHostPort();
141+
final InetSocketAddress socketAddress = getProxyAddress();
142+
return socketAddress.getPort();
143+
}
144+
145+
private InetSocketAddress getProxyAddress() {
146+
final URI serviceUri = createURIFromHostNamePort(
147+
((MessagingFactory)this.getMessagingFactory()).getHostName(),
148+
this.getProtocolPort());
149+
final ProxySelector proxySelector = ProxySelector.getDefault();
150+
if (proxySelector == null) {
151+
throw new IllegalStateException(proxySelectorModifiedError);
152+
}
153+
154+
final List<Proxy> proxies = proxySelector.select(serviceUri);
155+
if (!isProxyAddressLegal(proxies)) {
156+
throw new IllegalStateException(proxySelectorModifiedError);
157+
}
158+
159+
final Proxy proxy = proxies.get(0);
160+
return (InetSocketAddress) proxy.address();
161+
}
162+
163+
private static URI createURIFromHostNamePort(final String hostName, final int port) {
164+
return URI.create(String.format(ClientConstants.HTTPS_URI_FORMAT, hostName, port));
165+
}
166+
167+
private static boolean isProxyAddressLegal(final List<Proxy> proxies) {
168+
// only checks the first proxy in the list
169+
// returns true if it is an InetSocketAddress, which is required for qpid-proton-j library
170+
return proxies != null
171+
&& !proxies.isEmpty()
172+
&& proxies.get(0).type() == Proxy.Type.HTTP
173+
&& proxies.get(0).address() != null
174+
&& proxies.get(0).address() instanceof InetSocketAddress;
175+
}
176+
177+
private static IOException reconstructIOException(ErrorCondition errorCondition) {
178+
// since proton library communicates all errors based on amqp-error-condition
179+
// it swallows the IOException and translates it to proton-io error code
180+
// we reconstruct the IOException here, but callstack is lost
181+
return new IOException(errorCondition.getDescription());
69182
}
70183
}

azure-servicebus/src/main/java/com/microsoft/azure/servicebus/primitives/ClientConstants.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,7 +179,9 @@ private ClientConstants() { }
179179
public static final UnsignedLong TRUE_FILTER_DESCRIPTOR = new UnsignedLong(0x000001370000007L);
180180
public static final UnsignedLong FALSE_FILTER_DESCRIPTOR = new UnsignedLong(0x000001370000008L);
181181
public static final UnsignedLong CORRELATION_FILTER_DESCRIPTOR = new UnsignedLong(0x000001370000009L);
182-
182+
183+
public static final String HTTPS_URI_FORMAT = "https://%s:%s";
184+
183185
static final int DEFAULT_SAS_TOKEN_SEND_RETRY_INTERVAL_IN_SECONDS = 5;
184186
static final String SAS_TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s";
185187

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package com.microsoft.azure.servicebus;
2+
3+
import com.microsoft.azure.servicebus.primitives.ConnectionStringBuilder;
4+
import com.microsoft.azure.servicebus.primitives.ServiceBusException;
5+
import com.microsoft.azure.servicebus.primitives.TransportType;
6+
import org.junit.Assert;
7+
import org.junit.Test;
8+
9+
import java.io.IOException;
10+
import java.net.*;
11+
import java.time.Duration;
12+
import java.util.LinkedList;
13+
import java.util.List;
14+
import java.util.concurrent.CompletableFuture;
15+
import java.util.concurrent.TimeUnit;
16+
17+
public class ProxySelectorTests {
18+
19+
20+
@Test
21+
public void proxySelectorConnectFailedInvokeTest() throws Exception
22+
{
23+
// set up proxy selector with a bad address in order to check that the connectFailed() method is invoked
24+
int noProxyPort = 8888;
25+
final CompletableFuture<Void> connectFailedTask = new CompletableFuture<>();
26+
27+
ProxySelector.setDefault(new ProxySelector() {
28+
@Override
29+
public List<Proxy> select(URI uri) {
30+
List<Proxy> proxies = new LinkedList<>();
31+
proxies.add(new Proxy(Proxy.Type.HTTP, new InetSocketAddress("localhost", noProxyPort)));
32+
return proxies;
33+
}
34+
35+
@Override
36+
public void connectFailed(URI uri, SocketAddress sa, IOException ioe) {
37+
connectFailedTask.complete(null);
38+
}
39+
});
40+
41+
ConnectionStringBuilder connectionStringBuilder = new ConnectionStringBuilder(TestUtils.getNamespaceConnectionString());
42+
connectionStringBuilder.setTransportType(TransportType.AMQP_WEB_SOCKETS);
43+
connectionStringBuilder.setOperationTimeout(Duration.ofSeconds(10));
44+
45+
try {
46+
QueueClient sendClient = new QueueClient(connectionStringBuilder, ReceiveMode.PEEKLOCK);
47+
} catch (ServiceBusException ex) {
48+
// todo? Better assert statement?
49+
Assert.assertEquals(
50+
"Error{condition=amqp:connection:framing-error, description='connection aborted', info=null}",
51+
ex.getLocalizedMessage());
52+
}
53+
54+
connectFailedTask.get(2, TimeUnit.SECONDS);
55+
}
56+
}

0 commit comments

Comments
 (0)