Skip to content

Commit 6792120

Browse files
[b456377401] Cache Spark YARN applications
1. Collected Spark YARN applications need to be cached as they will be needed in the upcoming task connecting to Spark Server History and fetching their source and used Spark version from event logs. 2. Make ClouderaAPIHostsTask as the only source of truth for caching hosts as this is more reliable than CMF endpoint. It simplifies the logic and make it less error-prone. 3. Move common methods used in multiple tests into utils class.
1 parent 00f1c4d commit 6792120

19 files changed

+542
-319
lines changed

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/ClouderaAPIHostsTask.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,11 @@ protected void doRun(
8080
hosts.add(ClouderaHostDTO.create(apiHost.getId(), apiHost.getName()));
8181
}
8282
}
83-
handle.initHostsIfNull(hosts);
83+
if (hosts.isEmpty()) {
84+
throw new MetadataDumperUsageException(
85+
"No hosts were found in any of the initialized Cloudera clusters.");
86+
}
87+
handle.initHosts(hosts);
8488
}
8589
}
8690
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/ClouderaCMFHostsTask.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,11 @@
2020
import com.fasterxml.jackson.databind.JsonNode;
2121
import com.google.common.io.ByteSink;
2222
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaClusterDTO;
23-
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaHostDTO;
24-
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.CMFHostDTO;
25-
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.CMFHostListDTO;
2623
import com.google.edwmigration.dumper.application.dumper.task.TaskCategory;
2724
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
2825
import java.io.Writer;
2926
import java.net.URI;
3027
import java.nio.charset.StandardCharsets;
31-
import java.util.ArrayList;
3228
import java.util.List;
3329
import javax.annotation.Nonnull;
3430
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -67,7 +63,6 @@ protected void doRun(
6763
}
6864

6965
final URI baseURI = handle.getBaseURI();
70-
List<ClouderaHostDTO> hosts = new ArrayList<>();
7166
try (Writer writer = sink.asCharSink(StandardCharsets.UTF_8).openBufferedStream()) {
7267
for (ClouderaClusterDTO cluster : clusters) {
7368
if (cluster.getId() == null) {
@@ -95,14 +90,7 @@ protected void doRun(
9590
String stringifiedHosts = hostsJson.toString();
9691
writer.write(stringifiedHosts);
9792
writer.write('\n');
98-
99-
CMFHostListDTO apiHosts = parseJsonStringToObject(stringifiedHosts, CMFHostListDTO.class);
100-
for (CMFHostDTO apiHost : apiHosts.getHosts()) {
101-
hosts.add(ClouderaHostDTO.create(apiHost.getId(), apiHost.getName()));
102-
}
10393
}
10494
}
105-
106-
handle.initHostsIfNull(hosts);
10795
}
10896
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/ClouderaManagerHandle.java

Lines changed: 35 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ public class ClouderaManagerHandle implements Handle {
3737

3838
private ImmutableList<ClouderaClusterDTO> clusters;
3939
private ImmutableList<ClouderaHostDTO> hosts;
40+
private ImmutableList<ClouderaYarnApplicationDTO> sparkYarnApplications;
4041

4142
public ClouderaManagerHandle(URI apiURI, CloseableHttpClient httpClient) {
4243
Preconditions.checkNotNull(apiURI, "Cloudera's apiURI can't be null.");
@@ -85,10 +86,8 @@ public synchronized void initClusters(List<ClouderaClusterDTO> clusters) {
8586
Preconditions.checkNotNull(clusters, "Clusters can't be initialised to null list.");
8687
Preconditions.checkArgument(
8788
!clusters.isEmpty(), "Clusters can't be initialised to empty list.");
89+
Preconditions.checkState(this.clusters == null, "The cluster already initialized.");
8890

89-
if (this.clusters != null) {
90-
throw new IllegalStateException("The cluster already initialized!");
91-
}
9291
this.clusters = ImmutableList.copyOf(clusters);
9392
}
9493

@@ -97,14 +96,27 @@ public synchronized ImmutableList<ClouderaHostDTO> getHosts() {
9796
return hosts;
9897
}
9998

100-
public synchronized void initHostsIfNull(List<ClouderaHostDTO> hosts) {
101-
// Todo
102-
// Preconditions.checkNotNull(hosts, "Hosts can't be initialised to null list.");
103-
// Preconditions.checkArgument(!hosts.isEmpty(), "Hosts can't be initialised to empty list.");
99+
public synchronized void initHosts(List<ClouderaHostDTO> hosts) {
100+
Preconditions.checkNotNull(hosts, "Hosts can't be initialised to null list.");
101+
Preconditions.checkArgument(!hosts.isEmpty(), "Hosts can't be initialised to empty list.");
102+
Preconditions.checkState(this.hosts == null, "Hosts already initialized.");
104103

105-
if (this.hosts == null) {
106-
this.hosts = ImmutableList.copyOf(hosts);
107-
}
104+
this.hosts = ImmutableList.copyOf(hosts);
105+
}
106+
107+
@CheckForNull
108+
public synchronized ImmutableList<ClouderaYarnApplicationDTO> getSparkYarnApplications() {
109+
return sparkYarnApplications;
110+
}
111+
112+
public synchronized void initSparkYarnApplications(
113+
List<ClouderaYarnApplicationDTO> sparkYarnApplications) {
114+
Preconditions.checkNotNull(
115+
sparkYarnApplications, "Spark YARN applications can't be initialised to null list.");
116+
Preconditions.checkState(
117+
this.sparkYarnApplications == null, "Spark YARN applications already initialized.");
118+
119+
this.sparkYarnApplications = ImmutableList.copyOf(sparkYarnApplications);
108120
}
109121

110122
@Override
@@ -145,4 +157,17 @@ public static ClouderaHostDTO create(String id, String name) {
145157

146158
abstract String getName();
147159
}
160+
161+
@AutoValue
162+
public abstract static class ClouderaYarnApplicationDTO {
163+
public static ClouderaYarnApplicationDTO create(String id, String clusterName) {
164+
return new AutoValue_ClouderaManagerHandle_ClouderaYarnApplicationDTO(id, clusterName);
165+
}
166+
167+
@CheckForNull
168+
@Nullable
169+
abstract String getId();
170+
171+
abstract String getClusterName();
172+
}
148173
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/ClouderaYarnApplicationTypeTask.java

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,19 @@
1616
*/
1717
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager;
1818

19+
import static com.google.common.collect.ImmutableList.toImmutableList;
20+
import static java.util.Arrays.stream;
21+
1922
import com.fasterxml.jackson.databind.JsonNode;
2023
import com.google.common.base.Preconditions;
2124
import com.google.common.collect.ImmutableList;
2225
import com.google.common.collect.ImmutableMap;
2326
import com.google.common.io.ByteSink;
2427
import com.google.edwmigration.dumper.application.dumper.MetadataDumperUsageException;
2528
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaClusterDTO;
29+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaYarnApplicationDTO;
2630
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.ApiYARNApplicationDTO;
31+
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.model.YarnApplicationType;
2732
import com.google.edwmigration.dumper.application.dumper.task.TaskCategory;
2833
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
2934
import java.io.IOException;
@@ -34,7 +39,6 @@
3439
import java.util.HashSet;
3540
import java.util.List;
3641
import java.util.Set;
37-
import java.util.stream.Collectors;
3842
import java.util.stream.StreamSupport;
3943
import javax.annotation.Nonnull;
4044
import org.apache.http.client.methods.CloseableHttpResponse;
@@ -47,9 +51,6 @@ public class ClouderaYarnApplicationTypeTask extends AbstractClouderaYarnApplica
4751
private static final Logger logger =
4852
LoggerFactory.getLogger(ClouderaYarnApplicationTypeTask.class);
4953

50-
private final ImmutableList<String> predefinedAppTypes =
51-
ImmutableList.of("MAPREDUCE", "SPARK", "Oozie Launcher");
52-
5354
public ClouderaYarnApplicationTypeTask(
5455
ZonedDateTime startDate, ZonedDateTime endDate, TaskCategory taskCategory) {
5556
super("yarn-application-types.jsonl", startDate, endDate, taskCategory);
@@ -73,21 +74,28 @@ protected void doRun(
7374
new PaginatedClouderaYarnApplicationsLoader(
7475
handle, context.getArguments().getPaginationPageSize());
7576

77+
List<ClouderaYarnApplicationDTO> sparkYarnApplications = new ArrayList<>();
7678
try (Writer writer = sink.asCharSink(StandardCharsets.UTF_8).openBufferedStream()) {
7779
for (ClouderaClusterDTO cluster : clusters) {
7880
final String clusterName = cluster.getName();
79-
Set<String> yarnAppTypes = new HashSet<>(fetchYARNApplicationTypes(handle, clusterName));
80-
yarnAppTypes.addAll(predefinedAppTypes);
81-
yarnAppTypes.addAll(context.getArguments().getYarnApplicationTypes());
82-
for (String yarnAppType : yarnAppTypes) {
81+
for (String yarnAppType : collectYarnApplicationTypes(context, handle, clusterName)) {
8382
logger.info(
8483
"Dump YARN applications with {} type from {} cluster", yarnAppType, clusterName);
8584
int loadedAppsCount =
8685
appLoader.load(
8786
clusterName,
8887
yarnAppType,
89-
yarnAppsPage ->
90-
writeYarnAppTypes(writer, yarnAppsPage, yarnAppType, clusterName));
88+
yarnAppsPage -> {
89+
writeYarnAppTypes(writer, yarnAppsPage, yarnAppType, clusterName);
90+
if (yarnAppType.equals(YarnApplicationType.SPARK.getValue())) {
91+
yarnAppsPage.stream()
92+
.map(
93+
yarnApp ->
94+
ClouderaYarnApplicationDTO.create(
95+
yarnApp.getApplicationId(), clusterName))
96+
.forEach(sparkYarnApplications::add);
97+
}
98+
});
9199
logger.info(
92100
"Dumped {} YARN applications with {} type from {} cluster",
93101
loadedAppsCount,
@@ -96,6 +104,7 @@ protected void doRun(
96104
}
97105
}
98106
}
107+
handle.initSparkYarnApplications(sparkYarnApplications);
99108
}
100109

101110
private void writeYarnAppTypes(
@@ -115,22 +124,37 @@ private void writeYarnAppTypes(
115124
}
116125
}
117126

118-
private List<String> fetchYARNApplicationTypes(ClouderaManagerHandle handle, String clusterName) {
119-
String yarnAppTypesUrl =
127+
private Set<String> collectYarnApplicationTypes(
128+
TaskRunContext context, ClouderaManagerHandle handle, String clusterName) {
129+
Set<String> yarnApplicationTypes = new HashSet<>();
130+
ImmutableList<String> predefinedYarnAppTypes =
131+
stream(YarnApplicationType.values())
132+
.map(YarnApplicationType::getValue)
133+
.collect(toImmutableList());
134+
yarnApplicationTypes.addAll(predefinedYarnAppTypes);
135+
yarnApplicationTypes.addAll(fetchClusterServiceTypes(handle, clusterName));
136+
yarnApplicationTypes.addAll(context.getArguments().getYarnApplicationTypes());
137+
return yarnApplicationTypes;
138+
}
139+
140+
private ImmutableList<String> fetchClusterServiceTypes(
141+
ClouderaManagerHandle handle, String clusterName) {
142+
String serviceTypesUrl =
120143
handle.getApiURI().toString() + "clusters/" + clusterName + "/serviceTypes";
121144
CloseableHttpClient httpClient = handle.getHttpClient();
122-
try (CloseableHttpResponse appTypesResp = httpClient.execute(new HttpGet(yarnAppTypesUrl))) {
123-
int statusCode = appTypesResp.getStatusLine().getStatusCode();
145+
try (CloseableHttpResponse serviceTypesResp =
146+
httpClient.execute(new HttpGet(serviceTypesUrl))) {
147+
int statusCode = serviceTypesResp.getStatusLine().getStatusCode();
124148
if (!isStatusCodeOK(statusCode)) {
125149
throw new ClouderaConnectorException(
126150
String.format(
127151
"Cloudera API returned bad http status: %d. Message: %s",
128-
statusCode, readFromStream(appTypesResp.getEntity().getContent())));
152+
statusCode, readFromStream(serviceTypesResp.getEntity().getContent())));
129153
}
130-
JsonNode appTypesJson = readJsonTree(appTypesResp.getEntity().getContent());
131-
return StreamSupport.stream(appTypesJson.get("items").spliterator(), false)
154+
JsonNode serviceTypesJson = readJsonTree(serviceTypesResp.getEntity().getContent());
155+
return StreamSupport.stream(serviceTypesJson.get("items").spliterator(), false)
132156
.map(JsonNode::asText)
133-
.collect(Collectors.toList());
157+
.collect(toImmutableList());
134158
} catch (IOException ex) {
135159
throw new ClouderaConnectorException(ex.getMessage(), ex);
136160
}

dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/dto/CMFHostDTO.java renamed to dumper/app/src/main/java/com/google/edwmigration/dumper/application/dumper/connector/cloudera/manager/model/YarnApplicationType.java

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,28 +14,20 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17-
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto;
17+
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.model;
1818

19-
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
20-
import com.fasterxml.jackson.annotation.JsonProperty;
19+
public enum YarnApplicationType {
20+
MAPREDUCE("MAPREDUCE"),
21+
SPARK("SPARK"),
22+
OOZIE_LAUNCHER("Oozie Launcher");
2123

22-
/**
23-
* DTO class for the unofficial UI part of Cloudera Management. Display the host from a Memory Usage
24-
* chart.
25-
*/
26-
@JsonIgnoreProperties(ignoreUnknown = true)
27-
public class CMFHostDTO {
28-
@JsonProperty(required = true)
29-
private String hostName;
30-
31-
@JsonProperty(required = true)
32-
private String hostId;
24+
private final String value;
3325

34-
public String getName() {
35-
return hostName;
26+
YarnApplicationType(String value) {
27+
this.value = value;
3628
}
3729

38-
public String getId() {
39-
return hostId;
30+
public String getValue() {
31+
return value;
4032
}
4133
}

0 commit comments

Comments
 (0)