Skip to content

Commit 6388137

Browse files
committed
Revert "fix pulsar-proxy unit test case failure"
This reverts commit 4efcf70.
1 parent ca1cbf6 commit 6388137

File tree

2 files changed

+202
-31
lines changed

2 files changed

+202
-31
lines changed

pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/URLRegexLookupProxyHandler.java

Lines changed: 22 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -84,41 +84,32 @@ public void handleLookup(CommandLookupTopic lookup) {
8484
performLookup(clientRequestId, lookup.getTopic(), serviceUrl, false, 10)
8585
.whenComplete(
8686
(brokerUrl, ex) -> {
87-
if (ex != null) {
88-
ServerError serverError = ex instanceof LookupException
89-
? ((LookupException) ex).getServerError()
90-
: getServerError(ex);
91-
proxyConnection.ctx().writeAndFlush(
92-
Commands.newLookupErrorResponse(serverError, ex.getMessage(),
93-
clientRequestId));
94-
} else {
95-
try {
96-
if (pattern.matcher(brokerUrl).matches()) {
97-
if (log.isDebugEnabled()) {
98-
log.debug("Broker URL {} matches regex {}", brokerUrl, pattern);
99-
}
100-
String proxyUrl = pattern.matcher(brokerUrl).replaceAll(replacement);
101-
if (log.isDebugEnabled()) {
102-
log.debug("Redirect to proxy URL {}", proxyUrl);
103-
}
104-
proxyConnection.ctx().writeAndFlush(
105-
Commands.newLookupResponse(proxyUrl, proxyUrl, true,
106-
CommandLookupTopicResponse.LookupType.Redirect, clientRequestId,
107-
false));
108-
} else {
109-
if (log.isDebugEnabled()) {
110-
log.debug("Broker URL {} doesn't match regex {}", brokerUrl, pattern);
111-
}
112-
proxyConnection.ctx().writeAndFlush(
113-
Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
114-
"Broker URL does not match the lookup handler regex",
115-
clientRequestId));
87+
try {
88+
if (pattern.matcher(brokerUrl).matches()) {
89+
if (log.isDebugEnabled()) {
90+
log.debug("Broker URL {} matches regex {}", brokerUrl, pattern);
91+
}
92+
String proxyUrl = pattern.matcher(brokerUrl).replaceAll(replacement);
93+
if (log.isDebugEnabled()) {
94+
log.debug("Redirect to proxy URL {}", proxyUrl);
95+
}
96+
proxyConnection.ctx().writeAndFlush(
97+
Commands.newLookupResponse(proxyUrl, proxyUrl, true,
98+
CommandLookupTopicResponse.LookupType.Redirect, clientRequestId,
99+
false));
100+
} else {
101+
if (log.isDebugEnabled()) {
102+
log.debug("Broker URL {} doesn't match regex {}", brokerUrl, pattern);
116103
}
117-
} catch (IllegalArgumentException iae) {
118104
proxyConnection.ctx().writeAndFlush(
119105
Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
120-
iae.getMessage(), clientRequestId));
106+
"Broker URL does not match the lookup handler regex",
107+
clientRequestId));
121108
}
109+
} catch (IllegalArgumentException iae) {
110+
proxyConnection.ctx().writeAndFlush(
111+
Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
112+
iae.getMessage(), clientRequestId));
122113
}
123114
});
124115

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.proxy.server;
20+
21+
import static org.mockito.Mockito.doReturn;
22+
import static org.testng.Assert.assertEquals;
23+
import static org.testng.Assert.assertNotNull;
24+
import static org.testng.Assert.assertNull;
25+
import static org.testng.Assert.assertThrows;
26+
import static org.testng.Assert.assertTrue;
27+
import java.util.Optional;
28+
import java.util.concurrent.TimeUnit;
29+
import lombok.Cleanup;
30+
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
31+
import org.apache.pulsar.broker.authentication.AuthenticationService;
32+
import org.apache.pulsar.client.api.Authentication;
33+
import org.apache.pulsar.client.api.AuthenticationFactory;
34+
import org.apache.pulsar.client.api.Consumer;
35+
import org.apache.pulsar.client.api.Message;
36+
import org.apache.pulsar.client.api.MessageRoutingMode;
37+
import org.apache.pulsar.client.api.Producer;
38+
import org.apache.pulsar.client.api.PulsarClient;
39+
import org.apache.pulsar.client.api.PulsarClientException;
40+
import org.apache.pulsar.client.api.Schema;
41+
import org.apache.pulsar.common.configuration.PulsarConfigurationLoader;
42+
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
43+
import org.mockito.Mockito;
44+
import org.testng.annotations.BeforeClass;
45+
import org.testng.annotations.Test;
46+
47+
public class URLRegexLookupProxyHandlerTest extends MockedPulsarServiceBaseTest {
48+
49+
protected ProxyService proxyService;
50+
protected ProxyConfiguration proxyConfig = new ProxyConfiguration();
51+
protected Authentication proxyClientAuthentication;
52+
53+
@Override
54+
@BeforeClass
55+
protected void setup() throws Exception {
56+
internalSetup();
57+
58+
proxyConfig.setServicePort(Optional.of(0));
59+
proxyConfig.setBrokerProxyAllowedTargetPorts("*");
60+
proxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
61+
proxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
62+
proxyConfig.setClusterName(configClusterName);
63+
64+
proxyClientAuthentication = AuthenticationFactory.create(proxyConfig.getBrokerClientAuthenticationPlugin(),
65+
proxyConfig.getBrokerClientAuthenticationParameters());
66+
proxyClientAuthentication.start();
67+
68+
proxyService = Mockito.spy(new ProxyService(proxyConfig, new AuthenticationService(
69+
PulsarConfigurationLoader.convertFrom(proxyConfig)), proxyClientAuthentication));
70+
doReturn(new ZKMetadataStore(mockZooKeeper)).when(proxyService).createLocalMetadataStore();
71+
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(proxyService).createConfigurationMetadataStore();
72+
73+
proxyService.start();
74+
}
75+
76+
@Override
77+
protected void cleanup() throws Exception {
78+
internalCleanup();
79+
80+
proxyService.close();
81+
}
82+
83+
@Test
84+
void testMatchingRegex() throws Exception {
85+
ProxyConfiguration redirectProxyConfig = new ProxyConfiguration();
86+
redirectProxyConfig.setServicePort(Optional.of(0));
87+
redirectProxyConfig.setBrokerProxyAllowedTargetPorts("*");
88+
redirectProxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
89+
redirectProxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
90+
redirectProxyConfig.setLookupHandler("org.apache.pulsar.proxy.server.URLRegexLookupProxyHandler");
91+
redirectProxyConfig.getProperties().setProperty("urlRegexLookupProxyHandlerRegex", "pulsar:\\/\\/(\\w+):\\d+");
92+
redirectProxyConfig.getProperties()
93+
.setProperty("urlRegexLookupProxyHandlerReplacement", proxyService.getServiceUrl());
94+
redirectProxyConfig.setClusterName(configClusterName);
95+
96+
@Cleanup
97+
ProxyService redirectProxyService = Mockito.spy(new ProxyService(redirectProxyConfig, new AuthenticationService(
98+
PulsarConfigurationLoader.convertFrom(redirectProxyConfig)), proxyClientAuthentication));
99+
doReturn(new ZKMetadataStore(mockZooKeeper)).when(redirectProxyService).createLocalMetadataStore();
100+
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(redirectProxyService)
101+
.createConfigurationMetadataStore();
102+
103+
redirectProxyService.start();
104+
105+
// Check that the target proxy is not connected to any broker at the moment
106+
assertEquals(proxyService.getClientCnxs().size(), 0);
107+
108+
@Cleanup
109+
PulsarClient client = PulsarClient.builder().serviceUrl(redirectProxyService.getServiceUrl())
110+
.lookupTimeout(5, TimeUnit.SECONDS)
111+
.build();
112+
113+
@Cleanup
114+
Producer<byte[]> producer = client.newProducer(Schema.BYTES)
115+
.topic("persistent://sample/test/local/producer-consumer-topic")
116+
.enableBatching(false)
117+
.messageRoutingMode(MessageRoutingMode.SinglePartition)
118+
.create();
119+
120+
// Create a consumer directly attached to broker
121+
@Cleanup
122+
Consumer<byte[]> consumer = pulsarClient.newConsumer()
123+
.topic("persistent://sample/test/local/producer-consumer-topic").subscriptionName("my-sub")
124+
.subscribe();
125+
126+
for (int i = 0; i < 10; i++) {
127+
producer.send("test".getBytes());
128+
}
129+
130+
for (int i = 0; i < 10; i++) {
131+
Message<byte[]> msg = consumer.receive(1, TimeUnit.SECONDS);
132+
assertNotNull(msg);
133+
consumer.acknowledge(msg);
134+
}
135+
136+
Message<byte[]> msg = consumer.receive(0, TimeUnit.SECONDS);
137+
assertNull(msg);
138+
139+
// Check that the target proxy now has connections to the broker
140+
assertTrue(proxyService.getClientCnxs().size() > 0);
141+
}
142+
143+
@Test
144+
void testNotMatchingRegex() throws Exception {
145+
ProxyConfiguration redirectProxyConfig = new ProxyConfiguration();
146+
redirectProxyConfig.setServicePort(Optional.of(0));
147+
redirectProxyConfig.setBrokerProxyAllowedTargetPorts("*");
148+
redirectProxyConfig.setMetadataStoreUrl(DUMMY_VALUE);
149+
redirectProxyConfig.setConfigurationMetadataStoreUrl(GLOBAL_DUMMY_VALUE);
150+
redirectProxyConfig.setLookupHandler("org.apache.pulsar.proxy.server.URLRegexLookupProxyHandler");
151+
redirectProxyConfig.getProperties().setProperty("urlRegexLookupProxyHandlerRegex", "invalid");
152+
redirectProxyConfig.getProperties().setProperty("urlRegexLookupProxyHandlerReplacement", proxyService
153+
.getServiceUrl());
154+
redirectProxyConfig.setClusterName(configClusterName);
155+
156+
@Cleanup
157+
ProxyService redirectProxyService = Mockito.spy(new ProxyService(redirectProxyConfig, new AuthenticationService(
158+
PulsarConfigurationLoader.convertFrom(redirectProxyConfig)), proxyClientAuthentication));
159+
doReturn(new ZKMetadataStore(mockZooKeeper)).when(redirectProxyService).createLocalMetadataStore();
160+
doReturn(new ZKMetadataStore(mockZooKeeperGlobal)).when(redirectProxyService)
161+
.createConfigurationMetadataStore();
162+
163+
redirectProxyService.start();
164+
165+
// Check that the target proxy is not connected to any broker at the moment
166+
assertEquals(proxyService.getClientCnxs().size(), 0);
167+
168+
@Cleanup
169+
PulsarClient client = PulsarClient.builder().serviceUrl(redirectProxyService.getServiceUrl())
170+
.lookupTimeout(5, TimeUnit.SECONDS)
171+
.build();
172+
173+
assertThrows(PulsarClientException.LookupException.class, () -> client.newProducer(Schema.BYTES)
174+
.topic("persistent://sample/test/local/producer-consumer-topic")
175+
.enableBatching(false)
176+
.messageRoutingMode(MessageRoutingMode.SinglePartition)
177+
.create());
178+
}
179+
180+
}

0 commit comments

Comments
 (0)