Skip to content

Commit 9384236

Browse files
[b456377401] Cache Spark YARN applications
1. Collected Spark YARN applications need to be cached as they will be needed in the upcoming task connecting to Spark Server History and fetching their source and used Spark version from event logs. 2. Make ClouderaAPIHostsTask as the only source of truth for caching hosts as this is more reliable than CMF endpoint. It simplifies the logic and make it less error-prone. 3. Move common methods used in multiple tests into utils class.
1 parent 9624a94 commit 9384236

File tree

18 files changed

+543
-319
lines changed

18 files changed

+543
-319
lines changed

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/ClouderaAPIHostsTask.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,11 @@ protected void doRun(
8080
hosts.add(ClouderaHostDTO.create(apiHost.getId(), apiHost.getName()));
8181
}
8282
}
83-
handle.initHostsIfNull(hosts);
83+
if (hosts.isEmpty()) {
84+
throw new MetadataDumperUsageException(
85+
"No hosts were found in any of the initialized Cloudera clusters.");
86+
}
87+
handle.initHosts(hosts);
8488
}
8589
}
8690
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/ClouderaCMFHostsTask.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,11 @@
2020
import com.fasterxml.jackson.databind.JsonNode;
2121
import com.google.common.io.ByteSink;
2222
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaClusterDTO;
23-
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaHostDTO;
24-
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.CMFHostDTO;
25-
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.CMFHostListDTO;
2623
import com.google.edwmigration.dumper.application.dumper.task.TaskCategory;
2724
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
2825
import java.io.Writer;
2926
import java.net.URI;
3027
import java.nio.charset.StandardCharsets;
31-
import java.util.ArrayList;
3228
import java.util.List;
3329
import javax.annotation.Nonnull;
3430
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -67,7 +63,6 @@ protected void doRun(
6763
}
6864

6965
final URI baseURI = handle.getBaseURI();
70-
List<ClouderaHostDTO> hosts = new ArrayList<>();
7166
try (Writer writer = sink.asCharSink(StandardCharsets.UTF_8).openBufferedStream()) {
7267
for (ClouderaClusterDTO cluster : clusters) {
7368
if (cluster.getId() == null) {
@@ -95,14 +90,7 @@ protected void doRun(
9590
String stringifiedHosts = hostsJson.toString();
9691
writer.write(stringifiedHosts);
9792
writer.write('\n');
98-
99-
CMFHostListDTO apiHosts = parseJsonStringToObject(stringifiedHosts, CMFHostListDTO.class);
100-
for (CMFHostDTO apiHost : apiHosts.getHosts()) {
101-
hosts.add(ClouderaHostDTO.create(apiHost.getId(), apiHost.getName()));
102-
}
10393
}
10494
}
105-
106-
handle.initHostsIfNull(hosts);
10795
}
10896
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/ClouderaManagerHandle.java

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class ClouderaManagerHandle implements Handle {
3737

3838
private ImmutableList<ClouderaClusterDTO> clusters;
3939
private ImmutableList<ClouderaHostDTO> hosts;
40+
private ImmutableList<ClouderaYarnApplicationDTO> sparkYarnApplications;
4041

4142
public ClouderaManagerHandle(URI apiURI, CloseableHttpClient httpClient) {
4243
Preconditions.checkNotNull(apiURI, "Cloudera's apiURI can't be null.");
@@ -85,10 +86,8 @@ public synchronized void initClusters(List<ClouderaClusterDTO> clusters) {
8586
Preconditions.checkNotNull(clusters, "Clusters can't be initialised to null list.");
8687
Preconditions.checkArgument(
8788
!clusters.isEmpty(), "Clusters can't be initialised to empty list.");
89+
Preconditions.checkState(this.clusters == null, "The cluster already initialized.");
8890

89-
if (this.clusters != null) {
90-
throw new IllegalStateException("The cluster already initialized!");
91-
}
9291
this.clusters = ImmutableList.copyOf(clusters);
9392
}
9493

@@ -97,14 +96,27 @@ public synchronized ImmutableList<ClouderaHostDTO> getHosts() {
9796
return hosts;
9897
}
9998

100-
public synchronized void initHostsIfNull(List<ClouderaHostDTO> hosts) {
101-
// Todo
102-
// Preconditions.checkNotNull(hosts, "Hosts can't be initialised to null list.");
103-
// Preconditions.checkArgument(!hosts.isEmpty(), "Hosts can't be initialised to empty list.");
99+
public synchronized void initHosts(List<ClouderaHostDTO> hosts) {
100+
Preconditions.checkNotNull(hosts, "Hosts can't be initialised to null list.");
101+
Preconditions.checkArgument(!hosts.isEmpty(), "Hosts can't be initialised to empty list.");
102+
Preconditions.checkState(this.hosts == null, "Hosts already initialized.");
104103

105-
if (this.hosts == null) {
106-
this.hosts = ImmutableList.copyOf(hosts);
107-
}
104+
this.hosts = ImmutableList.copyOf(hosts);
105+
}
106+
107+
@CheckForNull
108+
public synchronized ImmutableList<ClouderaYarnApplicationDTO> getSparkYarnApplications() {
109+
return sparkYarnApplications;
110+
}
111+
112+
public synchronized void initSparkYarnApplications(
113+
List<ClouderaYarnApplicationDTO> sparkYarnApplications) {
114+
Preconditions.checkNotNull(
115+
sparkYarnApplications, "Spark YARN applications can't be initialised to null list.");
116+
Preconditions.checkState(
117+
this.sparkYarnApplications == null, "Spark YARN applications already initialized.");
118+
119+
this.sparkYarnApplications = ImmutableList.copyOf(sparkYarnApplications);
108120
}
109121

110122
@Override
@@ -145,4 +157,17 @@ public static ClouderaHostDTO create(String id, String name) {
145157

146158
abstract String getName();
147159
}
160+
161+
@AutoValue
162+
public abstract static class ClouderaYarnApplicationDTO {
163+
public static ClouderaYarnApplicationDTO create(String id, String clusterName) {
164+
return new AutoValue_ClouderaManagerHandle_ClouderaYarnApplicationDTO(id, clusterName);
165+
}
166+
167+
@CheckForNull
168+
@Nullable
169+
abstract String getId();
170+
171+
abstract String getClusterName();
172+
}
148173
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/ClouderaYarnApplicationTypeTask.java

Lines changed: 44 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@
1616
*/
1717
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager;
1818

19+
import static com.google.common.collect.ImmutableList.toImmutableList;
20+
import static java.util.Arrays.stream;
21+
1922
import com.fasterxml.jackson.databind.JsonNode;
2023
import com.google.common.base.Preconditions;
2124
import com.google.common.collect.ImmutableList;
2225
import com.google.common.collect.ImmutableMap;
2326
import com.google.common.io.ByteSink;
2427
import com.google.edwmigration.dumper.application.dumper.MetadataDumperUsageException;
2528
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaClusterDTO;
29+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaYarnApplicationDTO;
2630
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.ApiYARNApplicationDTO;
31+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.model.YarnApplicationType;
2732
import com.google.edwmigration.dumper.application.dumper.task.TaskCategory;
2833
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
2934
import java.io.IOException;
@@ -34,7 +39,6 @@
3439
import java.util.HashSet;
3540
import java.util.List;
3641
import java.util.Set;
37-
import java.util.stream.Collectors;
3842
import java.util.stream.StreamSupport;
3943
import javax.annotation.Nonnull;
4044
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -44,10 +48,8 @@
4448
import org.slf4j.LoggerFactory;
4549

4650
public class ClouderaYarnApplicationTypeTask extends AbstractClouderaYarnApplicationTask {
47-
private static final Logger logger = LoggerFactory.getLogger(ClouderaYarnApplicationsTask.class);
48-
49-
private final ImmutableList<String> predefinedAppTypes =
50-
ImmutableList.of("MAPREDUCE", "SPARK", "Oozie Launcher");
51+
private static final Logger logger =
52+
LoggerFactory.getLogger(ClouderaYarnApplicationTypeTask.class);
5153

5254
public ClouderaYarnApplicationTypeTask(
5355
ZonedDateTime startDate, ZonedDateTime endDate, TaskCategory taskCategory) {
@@ -72,21 +74,28 @@ protected void doRun(
7274
new PaginatedClouderaYarnApplicationsLoader(
7375
handle, context.getArguments().getPaginationPageSize());
7476

77+
List<ClouderaYarnApplicationDTO> sparkYarnApplications = new ArrayList<>();
7578
try (Writer writer = sink.asCharSink(StandardCharsets.UTF_8).openBufferedStream()) {
7679
for (ClouderaClusterDTO cluster : clusters) {
7780
final String clusterName = cluster.getName();
78-
Set<String> yarnAppTypes = new HashSet<>(fetchYARNApplicationTypes(handle, clusterName));
79-
yarnAppTypes.addAll(predefinedAppTypes);
80-
yarnAppTypes.addAll(context.getArguments().getYarnApplicationTypes());
81-
for (String yarnAppType : yarnAppTypes) {
81+
for (String yarnAppType : collectYarnApplicationTypes(context, handle, clusterName)) {
8282
logger.info(
8383
"Dump YARN applications with {} type from {} cluster", yarnAppType, clusterName);
8484
int loadedAppsCount =
8585
appLoader.load(
8686
clusterName,
8787
yarnAppType,
88-
yarnAppsPage ->
89-
writeYarnAppTypes(writer, yarnAppsPage, yarnAppType, clusterName));
88+
yarnAppsPage -> {
89+
writeYarnAppTypes(writer, yarnAppsPage, yarnAppType, clusterName);
90+
if (yarnAppType.equals(YarnApplicationType.SPARK.getValue())) {
91+
yarnAppsPage.stream()
92+
.map(
93+
yarnApp ->
94+
ClouderaYarnApplicationDTO.create(
95+
yarnApp.getApplicationId(), clusterName))
96+
.forEach(sparkYarnApplications::add);
97+
}
98+
});
9099
logger.info(
91100
"Dumped {} YARN applications with {} type from {} cluster",
92101
loadedAppsCount,
@@ -95,6 +104,7 @@ protected void doRun(
95104
}
96105
}
97106
}
107+
handle.initSparkYarnApplications(sparkYarnApplications);
98108
}
99109

100110
private void writeYarnAppTypes(
@@ -114,22 +124,37 @@ private void writeYarnAppTypes(
114124
}
115125
}
116126

117-
private List<String> fetchYARNApplicationTypes(ClouderaManagerHandle handle, String clusterName) {
118-
String yarnAppTypesUrl =
127+
private Set<String> collectYarnApplicationTypes(
128+
TaskRunContext context, ClouderaManagerHandle handle, String clusterName) {
129+
Set<String> yarnApplicationTypes = new HashSet<>();
130+
ImmutableList<String> predefinedYarnAppTypes =
131+
stream(YarnApplicationType.values())
132+
.map(YarnApplicationType::getValue)
133+
.collect(toImmutableList());
134+
yarnApplicationTypes.addAll(predefinedYarnAppTypes);
135+
yarnApplicationTypes.addAll(fetchClusterServiceTypes(handle, clusterName));
136+
yarnApplicationTypes.addAll(context.getArguments().getYarnApplicationTypes());
137+
return yarnApplicationTypes;
138+
}
139+
140+
private ImmutableList<String> fetchClusterServiceTypes(
141+
ClouderaManagerHandle handle, String clusterName) {
142+
String serviceTypesUrl =
119143
handle.getApiURI().toString() + "clusters/" + clusterName + "/serviceTypes";
120144
CloseableHttpClient httpClient = handle.getHttpClient();
121-
try (CloseableHttpResponse appTypesResp = httpClient.execute(new HttpGet(yarnAppTypesUrl))) {
122-
int statusCode = appTypesResp.getStatusLine().getStatusCode();
145+
try (CloseableHttpResponse serviceTypesResp =
146+
httpClient.execute(new HttpGet(serviceTypesUrl))) {
147+
int statusCode = serviceTypesResp.getStatusLine().getStatusCode();
123148
if (!isStatusCodeOK(statusCode)) {
124149
throw new ClouderaConnectorException(
125150
String.format(
126151
"Cloudera API returned bad http status: %d. Message: %s",
127-
statusCode, readFromStream(appTypesResp.getEntity().getContent())));
152+
statusCode, readFromStream(serviceTypesResp.getEntity().getContent())));
128153
}
129-
JsonNode appTypesJson = readJsonTree(appTypesResp.getEntity().getContent());
130-
return StreamSupport.stream(appTypesJson.get("items").spliterator(), false)
154+
JsonNode serviceTypesJson = readJsonTree(serviceTypesResp.getEntity().getContent());
155+
return StreamSupport.stream(serviceTypesJson.get("items").spliterator(), false)
131156
.map(JsonNode::asText)
132-
.collect(Collectors.toList());
157+
.collect(toImmutableList());
133158
} catch (IOException ex) {
134159
throw new ClouderaConnectorException(ex.getMessage(), ex);
135160
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/dto/CMFHostDTO.java renamed to dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/model/YarnApplicationType.java

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,20 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto;
17+
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.model;
1818

19-
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
20-
import com.fasterxml.jackson.annotation.JsonProperty;
19+
public enum YarnApplicationType {
20+
MAPREDUCE("MAPREDUCE"),
21+
SPARK("SPARK"),
22+
OOZIE_LAUNCHER("Oozie Launcher");
2123

22-
/**
23-
* DTO class for the unofficial UI part of Cloudera Management. Display the host from a Memory Usage
24-
* chart.
25-
*/
26-
@JsonIgnoreProperties(ignoreUnknown = true)
27-
public class CMFHostDTO {
28-
@JsonProperty(required = true)
29-
private String hostName;
30-
31-
@JsonProperty(required = true)
32-
private String hostId;
24+
private final String value;
3325

34-
public String getName() {
35-
return hostName;
26+
YarnApplicationType(String value) {
27+
this.value = value;
3628
}
3729

38-
public String getId() {
39-
return hostId;
30+
public String getValue() {
31+
return value;
4032
}
4133
}

0 commit comments

Comments
 (0)