Skip to content

Commit 0373d4b

Browse files
[b456377401] Implement Spark History discovery
Add SparkHistoryDiscoveryService and supporting DTOs to automatically resolve Spark History Server URLs via Knox by probing Spark 2 and Spark 3 endpoints.
1 parent 2d929d1 commit 0373d4b

File tree

10 files changed

+761
-0
lines changed

10 files changed

+761
-0
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager;
18+
19+
import com.fasterxml.jackson.databind.ObjectMapper;
20+
import com.google.common.collect.ImmutableList;
21+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.ApiConfigDTO;
22+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.ApiConfigListDTO;
23+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.ApiRoleDTO;
24+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.ApiRoleListDTO;
25+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.ApiServiceDTO;
26+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.ApiServiceListDTO;
27+
import java.io.IOException;
28+
import java.net.URI;
29+
import java.util.List;
30+
import java.util.Optional;
31+
import org.apache.http.HttpEntity;
32+
import org.apache.http.client.methods.CloseableHttpResponse;
33+
import org.apache.http.client.methods.HttpGet;
34+
import org.apache.http.impl.client.CloseableHttpClient;
35+
import org.slf4j.Logger;
36+
import org.slf4j.LoggerFactory;
37+
38+
public class SparkHistoryDiscoveryService {
39+
40+
private static final Logger logger = LoggerFactory.getLogger(SparkHistoryDiscoveryService.class);
41+
42+
private static final List<String> CANDIDATE_PATHS =
43+
ImmutableList.of("spark3history", "sparkhistory");
44+
45+
private final ObjectMapper objectMapper;
46+
private final CloseableHttpClient clouderaManagerHttpClient;
47+
private final URI apiURI;
48+
49+
public SparkHistoryDiscoveryService(
50+
ObjectMapper objectMapper, CloseableHttpClient clouderaManagerHttpClient, URI apiURI) {
51+
this.objectMapper = objectMapper;
52+
this.clouderaManagerHttpClient = clouderaManagerHttpClient;
53+
this.apiURI = apiURI;
54+
}
55+
56+
/**
57+
* Discovers the active Spark History Server URL. Probes Spark 3 and Spark 2 endpoints to see
58+
* which one is alive.
59+
*/
60+
public Optional<String> resolveUrl(String clusterName, CloseableHttpClient knoxClient) {
61+
try {
62+
Optional<KnoxGatewayInfo> knoxInfo = getKnoxGatewayInfo(clusterName);
63+
64+
if (!knoxInfo.isPresent()) {
65+
logger.warn("Could not find Knox Service or Topology for cluster: {}", clusterName);
66+
return Optional.empty();
67+
}
68+
69+
KnoxGatewayInfo info = knoxInfo.get();
70+
71+
for (String context : CANDIDATE_PATHS) {
72+
String candidateUrl =
73+
String.format(
74+
"https://%s/%s/%s/%s/api/v1",
75+
info.hostname, info.gatewayPath, info.topologyName, context);
76+
77+
if (isReachable(candidateUrl, knoxClient)) {
78+
logger.info("Found active Spark History Server at: {}", candidateUrl);
79+
return Optional.of(candidateUrl);
80+
}
81+
}
82+
logger.warn(
83+
"Knox is active, but neither Spark 2 nor Spark 3 history endpoints are reachable.");
84+
} catch (IOException e) {
85+
logger.warn(
86+
"Cluster '{}': Failed to discover Spark History URL due to error: {}",
87+
clusterName,
88+
e.getMessage());
89+
}
90+
return Optional.empty();
91+
}
92+
93+
private boolean isReachable(String url, CloseableHttpClient knoxHttpClient) {
94+
String probeUrl = url + "/applications?limit=1";
95+
96+
try (CloseableHttpResponse response = knoxHttpClient.execute(new HttpGet(probeUrl))) {
97+
int status = response.getStatusLine().getStatusCode();
98+
return status == 200;
99+
} catch (IOException e) {
100+
logger.debug("Probe failed for URL: {}", url);
101+
return false;
102+
}
103+
}
104+
105+
private Optional<KnoxGatewayInfo> getKnoxGatewayInfo(String clusterName) throws IOException {
106+
Optional<String> knoxServiceName = getKnoxServiceName(clusterName);
107+
if (!knoxServiceName.isPresent()) {
108+
return Optional.empty();
109+
}
110+
111+
Optional<ApiRoleDTO> knoxRole = getKnoxRole(clusterName, knoxServiceName.get());
112+
if (!knoxRole.isPresent()) {
113+
return Optional.empty();
114+
}
115+
116+
ApiRoleDTO role = knoxRole.get();
117+
if (role.getHostRef() == null || role.getRoleConfigGroupRef() == null) {
118+
logger.warn("Knox role is missing hostRef or roleConfigGroupRef.");
119+
return Optional.empty();
120+
}
121+
122+
String hostname = role.getHostRef().getHostname();
123+
String roleConfigGroup = role.getRoleConfigGroupRef().getRoleConfigGroupName();
124+
if (hostname == null || roleConfigGroup == null) {
125+
logger.warn("Knox role has null hostname or roleConfigGroupName.");
126+
return Optional.empty();
127+
}
128+
129+
String gatewayPath = getGatewayPath(clusterName, knoxServiceName.get(), roleConfigGroup);
130+
String topologyName = getTopologyName(clusterName, knoxServiceName.get(), roleConfigGroup);
131+
132+
return Optional.of(new KnoxGatewayInfo(hostname, gatewayPath, topologyName));
133+
}
134+
135+
private Optional<String> getKnoxServiceName(String clusterName) throws IOException {
136+
String path = String.format("clusters/%s/services", clusterName);
137+
Optional<ApiServiceListDTO> serviceList = get(path, ApiServiceListDTO.class);
138+
return serviceList.flatMap(
139+
list ->
140+
list.getItems().stream()
141+
.filter(service -> "KNOX".equals(service.getType()))
142+
.map(ApiServiceDTO::getName)
143+
.findFirst());
144+
}
145+
146+
private Optional<ApiRoleDTO> getKnoxRole(String clusterName, String knoxServiceName)
147+
throws IOException {
148+
String path = String.format("clusters/%s/services/%s/roles", clusterName, knoxServiceName);
149+
Optional<ApiRoleListDTO> roleList = get(path, ApiRoleListDTO.class);
150+
return roleList.flatMap(list -> list.getItems().stream().findFirst());
151+
}
152+
153+
private String getGatewayPath(
154+
String clusterName, String knoxServiceName, String roleConfigGroupName) throws IOException {
155+
return getConfigValue(clusterName, knoxServiceName, roleConfigGroupName, "gateway_path")
156+
.orElse(clusterName);
157+
}
158+
159+
private String getTopologyName(
160+
String clusterName, String knoxServiceName, String roleConfigGroupName) throws IOException {
161+
return getConfigValue(
162+
clusterName, knoxServiceName, roleConfigGroupName, "gateway_default_api_topology_name")
163+
.orElse("cdp-proxy-api");
164+
}
165+
166+
private Optional<String> getConfigValue(
167+
String clusterName, String knoxServiceName, String roleConfigGroupName, String configName)
168+
throws IOException {
169+
String path =
170+
String.format(
171+
"clusters/%s/services/%s/roleConfigGroups/%s/config",
172+
clusterName, knoxServiceName, roleConfigGroupName);
173+
Optional<ApiConfigListDTO> configList = get(path, ApiConfigListDTO.class);
174+
return configList.flatMap(
175+
list ->
176+
list.getItems().stream()
177+
.filter(config -> configName.equals(config.getName()))
178+
.map(ApiConfigDTO::getValue)
179+
.findFirst());
180+
}
181+
182+
public <T> Optional<T> get(String path, Class<T> responseType) throws IOException {
183+
URI requestUri = apiURI.resolve(path);
184+
try (CloseableHttpResponse response = clouderaManagerHttpClient.execute(new HttpGet(requestUri))) {
185+
if (response.getStatusLine().getStatusCode() != 200) {
186+
throw new IOException(
187+
"Unexpected status code "
188+
+ response.getStatusLine().getStatusCode()
189+
+ " from "
190+
+ requestUri);
191+
}
192+
HttpEntity entity = response.getEntity();
193+
if (entity == null) {
194+
return Optional.empty();
195+
}
196+
return Optional.of(objectMapper.readValue(entity.getContent(), responseType));
197+
}
198+
}
199+
200+
private static class KnoxGatewayInfo {
201+
String hostname;
202+
String gatewayPath;
203+
String topologyName;
204+
205+
KnoxGatewayInfo(String hostname, String gatewayPath, String topologyName) {
206+
this.hostname = hostname;
207+
this.gatewayPath = gatewayPath;
208+
this.topologyName = topologyName;
209+
}
210+
}
211+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto;
18+
19+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
22+
@JsonIgnoreProperties(ignoreUnknown = true)
23+
public class ApiConfigDTO {
24+
25+
@JsonProperty("name")
26+
private String name;
27+
28+
@JsonProperty("value")
29+
private String value;
30+
31+
public String getName() {
32+
return name;
33+
}
34+
35+
public String getValue() {
36+
return value;
37+
}
38+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto;
18+
19+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
import java.util.List;
22+
23+
@JsonIgnoreProperties(ignoreUnknown = true)
24+
public class ApiConfigListDTO {
25+
26+
@JsonProperty("items")
27+
private List<ApiConfigDTO> items;
28+
29+
public List<ApiConfigDTO> getItems() {
30+
return items;
31+
}
32+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto;
18+
19+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
22+
@JsonIgnoreProperties(ignoreUnknown = true)
23+
public class ApiHostRefDTO {
24+
25+
@JsonProperty("hostname")
26+
private String hostname;
27+
28+
public String getHostname() {
29+
return hostname;
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto;
18+
19+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
22+
@JsonIgnoreProperties(ignoreUnknown = true)
23+
public class ApiRoleConfigGroupRefDTO {
24+
25+
@JsonProperty("roleConfigGroupName")
26+
private String roleConfigGroupName;
27+
28+
public String getRoleConfigGroupName() {
29+
return roleConfigGroupName;
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2022-2025 Google LLC
3+
* Copyright 2013-2021 CompilerWorks
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto;
18+
19+
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
20+
import com.fasterxml.jackson.annotation.JsonProperty;
21+
22+
@JsonIgnoreProperties(ignoreUnknown = true)
23+
public class ApiRoleDTO {
24+
25+
@JsonProperty("hostRef")
26+
private ApiHostRefDTO hostRef;
27+
28+
@JsonProperty("roleConfigGroupRef")
29+
private ApiRoleConfigGroupRefDTO roleConfigGroupRef;
30+
31+
public ApiHostRefDTO getHostRef() {
32+
return hostRef;
33+
}
34+
35+
public ApiRoleConfigGroupRefDTO getRoleConfigGroupRef() {
36+
return roleConfigGroupRef;
37+
}
38+
}

0 commit comments

Comments
 (0)