Skip to content

Commit ca1cbf6

Browse files
committed
Revert "fix safe delete URLRegexLookupProxyHandler which is not used"
This reverts commit 158fc14.
1 parent 158fc14 commit ca1cbf6

File tree

1 file changed

+140
-0
lines changed

1 file changed

+140
-0
lines changed
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.proxy.server;
20+
21+
import com.google.common.base.Strings;
22+
import java.util.Properties;
23+
import java.util.regex.Pattern;
24+
import org.apache.pulsar.common.api.proto.CommandLookupTopic;
25+
import org.apache.pulsar.common.api.proto.CommandLookupTopicResponse;
26+
import org.apache.pulsar.common.api.proto.ServerError;
27+
import org.apache.pulsar.common.protocol.Commands;
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
public class URLRegexLookupProxyHandler extends DefaultLookupProxyHandler {
32+
33+
private static final Logger log = LoggerFactory.getLogger(URLRegexLookupProxyHandler.class);
34+
35+
private Pattern pattern;
36+
37+
private String replacement;
38+
39+
@Override
40+
public void initialize(ProxyService proxy, ProxyConnection proxyConnection) {
41+
super.initialize(proxy, proxyConnection);
42+
Properties properties = proxy.getConfiguration().getProperties();
43+
String regex = properties.getProperty("urlRegexLookupProxyHandlerRegex");
44+
if (Strings.isNullOrEmpty(regex)) {
45+
throw new IllegalArgumentException("urlRegexLookupProxyHandlerRegex is not set");
46+
}
47+
this.pattern = Pattern.compile(regex);
48+
this.replacement = properties.getProperty("urlRegexLookupProxyHandlerReplacement");
49+
if (Strings.isNullOrEmpty(this.replacement)) {
50+
throw new IllegalArgumentException("urlRegexLookupProxyHandlerReplacement is not set");
51+
}
52+
}
53+
54+
@Override
55+
public void handleLookup(CommandLookupTopic lookup) {
56+
if (log.isDebugEnabled()) {
57+
log.debug("Received Lookup from {}", clientAddress);
58+
}
59+
long clientRequestId = lookup.getRequestId();
60+
if (lookupRequestSemaphore.tryAcquire()) {
61+
try {
62+
LOOKUP_REQUESTS.inc();
63+
String serviceUrl = getBrokerServiceUrl(clientRequestId);
64+
if (serviceUrl != null) {
65+
if (lookup.isAuthoritative()) {
66+
performLookup(clientRequestId, lookup.getTopic(), serviceUrl, false, 10)
67+
.whenComplete(
68+
(brokerUrl, ex) -> {
69+
if (ex != null) {
70+
ServerError serverError = ex instanceof LookupException
71+
? ((LookupException) ex).getServerError()
72+
: getServerError(ex);
73+
proxyConnection.ctx().writeAndFlush(
74+
Commands.newLookupErrorResponse(serverError, ex.getMessage(),
75+
clientRequestId));
76+
} else {
77+
proxyConnection.ctx().writeAndFlush(
78+
Commands.newLookupResponse(brokerUrl, brokerUrl, true,
79+
CommandLookupTopicResponse.LookupType.Connect, clientRequestId,
80+
true /* this is coming from proxy */));
81+
}
82+
});
83+
} else {
84+
performLookup(clientRequestId, lookup.getTopic(), serviceUrl, false, 10)
85+
.whenComplete(
86+
(brokerUrl, ex) -> {
87+
if (ex != null) {
88+
ServerError serverError = ex instanceof LookupException
89+
? ((LookupException) ex).getServerError()
90+
: getServerError(ex);
91+
proxyConnection.ctx().writeAndFlush(
92+
Commands.newLookupErrorResponse(serverError, ex.getMessage(),
93+
clientRequestId));
94+
} else {
95+
try {
96+
if (pattern.matcher(brokerUrl).matches()) {
97+
if (log.isDebugEnabled()) {
98+
log.debug("Broker URL {} matches regex {}", brokerUrl, pattern);
99+
}
100+
String proxyUrl = pattern.matcher(brokerUrl).replaceAll(replacement);
101+
if (log.isDebugEnabled()) {
102+
log.debug("Redirect to proxy URL {}", proxyUrl);
103+
}
104+
proxyConnection.ctx().writeAndFlush(
105+
Commands.newLookupResponse(proxyUrl, proxyUrl, true,
106+
CommandLookupTopicResponse.LookupType.Redirect, clientRequestId,
107+
false));
108+
} else {
109+
if (log.isDebugEnabled()) {
110+
log.debug("Broker URL {} doesn't match regex {}", brokerUrl, pattern);
111+
}
112+
proxyConnection.ctx().writeAndFlush(
113+
Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
114+
"Broker URL does not match the lookup handler regex",
115+
clientRequestId));
116+
}
117+
} catch (IllegalArgumentException iae) {
118+
proxyConnection.ctx().writeAndFlush(
119+
Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
120+
iae.getMessage(), clientRequestId));
121+
}
122+
}
123+
});
124+
125+
}
126+
}
127+
} finally {
128+
lookupRequestSemaphore.release();
129+
}
130+
} else {
131+
REJECTED_LOOKUP_REQUESTS.inc();
132+
if (log.isDebugEnabled()) {
133+
log.debug("Lookup Request ID {} from {} rejected - {}.", clientRequestId, clientAddress,
134+
throttlingErrorMessage);
135+
}
136+
proxyConnection.ctx().writeAndFlush(Commands.newLookupErrorResponse(ServerError.ServiceNotReady,
137+
throttlingErrorMessage, clientRequestId));
138+
}
139+
}
140+
}

0 commit comments

Comments
 (0)