Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ protected void doRun(
hosts.add(ClouderaHostDTO.create(apiHost.getId(), apiHost.getName()));
}
}
handle.initHostsIfNull(hosts);
if (hosts.isEmpty()) {
throw new MetadataDumperUsageException(
"No hosts were found in any of the initialized Cloudera clusters.");
}
handle.initHosts(hosts);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,11 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.io.ByteSink;
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaClusterDTO;
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaHostDTO;
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.CMFHostDTO;
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.CMFHostListDTO;
import com.google.edwmigration.dumper.application.dumper.task.TaskCategory;
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
import java.io.Writer;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nonnull;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand Down Expand Up @@ -67,7 +63,6 @@ protected void doRun(
}

final URI baseURI = handle.getBaseURI();
List<ClouderaHostDTO> hosts = new ArrayList<>();
try (Writer writer = sink.asCharSink(StandardCharsets.UTF_8).openBufferedStream()) {
for (ClouderaClusterDTO cluster : clusters) {
if (cluster.getId() == null) {
Expand Down Expand Up @@ -95,14 +90,7 @@ protected void doRun(
String stringifiedHosts = hostsJson.toString();
writer.write(stringifiedHosts);
writer.write('\n');

CMFHostListDTO apiHosts = parseJsonStringToObject(stringifiedHosts, CMFHostListDTO.class);
for (CMFHostDTO apiHost : apiHosts.getHosts()) {
hosts.add(ClouderaHostDTO.create(apiHost.getId(), apiHost.getName()));
}
}
}

handle.initHostsIfNull(hosts);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class ClouderaManagerHandle implements Handle {

private ImmutableList<ClouderaClusterDTO> clusters;
private ImmutableList<ClouderaHostDTO> hosts;
private ImmutableList<ClouderaYarnApplicationDTO> sparkYarnApplications;

public ClouderaManagerHandle(URI apiURI, CloseableHttpClient httpClient) {
Preconditions.checkNotNull(apiURI, "Cloudera's apiURI can't be null.");
Expand Down Expand Up @@ -85,10 +86,8 @@ public synchronized void initClusters(List<ClouderaClusterDTO> clusters) {
Preconditions.checkNotNull(clusters, "Clusters can't be initialised to null list.");
Preconditions.checkArgument(
!clusters.isEmpty(), "Clusters can't be initialised to empty list.");
Preconditions.checkState(this.clusters == null, "The cluster already initialized.");

if (this.clusters != null) {
throw new IllegalStateException("The cluster already initialized!");
}
this.clusters = ImmutableList.copyOf(clusters);
}

Expand All @@ -97,14 +96,27 @@ public synchronized ImmutableList<ClouderaHostDTO> getHosts() {
return hosts;
}

public synchronized void initHostsIfNull(List<ClouderaHostDTO> hosts) {
// Todo
// Preconditions.checkNotNull(hosts, "Hosts can't be initialised to null list.");
// Preconditions.checkArgument(!hosts.isEmpty(), "Hosts can't be initialised to empty list.");
public synchronized void initHosts(List<ClouderaHostDTO> hosts) {
Preconditions.checkNotNull(hosts, "Hosts can't be initialised to null list.");
Preconditions.checkArgument(!hosts.isEmpty(), "Hosts can't be initialised to empty list.");
Preconditions.checkState(this.hosts == null, "Hosts already initialized.");

if (this.hosts == null) {
this.hosts = ImmutableList.copyOf(hosts);
}
this.hosts = ImmutableList.copyOf(hosts);
}

@CheckForNull
public synchronized ImmutableList<ClouderaYarnApplicationDTO> getSparkYarnApplications() {
return sparkYarnApplications;
}

public synchronized void initSparkYarnApplications(
List<ClouderaYarnApplicationDTO> sparkYarnApplications) {
Preconditions.checkNotNull(
sparkYarnApplications, "Spark YARN applications can't be initialised to null list.");
Preconditions.checkState(
this.sparkYarnApplications == null, "Spark YARN applications already initialized.");

this.sparkYarnApplications = ImmutableList.copyOf(sparkYarnApplications);
}

@Override
Expand Down Expand Up @@ -145,4 +157,17 @@ public static ClouderaHostDTO create(String id, String name) {

abstract String getName();
}

@AutoValue
public abstract static class ClouderaYarnApplicationDTO {
public static ClouderaYarnApplicationDTO create(String id, String clusterName) {
return new AutoValue_ClouderaManagerHandle_ClouderaYarnApplicationDTO(id, clusterName);
}

@CheckForNull
@Nullable
abstract String getId();

abstract String getClusterName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,19 @@
*/
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager;

import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Arrays.stream;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.ByteSink;
import com.google.edwmigration.dumper.application.dumper.MetadataDumperUsageException;
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaClusterDTO;
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.ClouderaManagerHandle.ClouderaYarnApplicationDTO;
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto.ApiYARNApplicationDTO;
import com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.model.YarnApplicationType;
import com.google.edwmigration.dumper.application.dumper.task.TaskCategory;
import com.google.edwmigration.dumper.application.dumper.task.TaskRunContext;
import java.io.IOException;
Expand All @@ -34,7 +39,6 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import org.apache.http.client.methods.CloseableHttpResponse;
Expand All @@ -47,9 +51,6 @@ public class ClouderaYarnApplicationTypeTask extends AbstractClouderaYarnApplica
private static final Logger logger =
LoggerFactory.getLogger(ClouderaYarnApplicationTypeTask.class);

private final ImmutableList<String> predefinedAppTypes =
ImmutableList.of("MAPREDUCE", "SPARK", "Oozie Launcher");

public ClouderaYarnApplicationTypeTask(
ZonedDateTime startDate, ZonedDateTime endDate, TaskCategory taskCategory) {
super("yarn-application-types.jsonl", startDate, endDate, taskCategory);
Expand All @@ -73,21 +74,28 @@ protected void doRun(
new PaginatedClouderaYarnApplicationsLoader(
handle, context.getArguments().getPaginationPageSize());

List<ClouderaYarnApplicationDTO> sparkYarnApplications = new ArrayList<>();
try (Writer writer = sink.asCharSink(StandardCharsets.UTF_8).openBufferedStream()) {
for (ClouderaClusterDTO cluster : clusters) {
final String clusterName = cluster.getName();
Set<String> yarnAppTypes = new HashSet<>(fetchYARNApplicationTypes(handle, clusterName));
yarnAppTypes.addAll(predefinedAppTypes);
yarnAppTypes.addAll(context.getArguments().getYarnApplicationTypes());
for (String yarnAppType : yarnAppTypes) {
for (String yarnAppType : collectYarnApplicationTypes(context, handle, clusterName)) {
logger.info(
"Dump YARN applications with {} type from {} cluster", yarnAppType, clusterName);
int loadedAppsCount =
appLoader.load(
clusterName,
yarnAppType,
yarnAppsPage ->
writeYarnAppTypes(writer, yarnAppsPage, yarnAppType, clusterName));
yarnAppsPage -> {
writeYarnAppTypes(writer, yarnAppsPage, yarnAppType, clusterName);
if (yarnAppType.equals(YarnApplicationType.SPARK.getValue())) {
yarnAppsPage.stream()
.map(
yarnApp ->
ClouderaYarnApplicationDTO.create(
yarnApp.getApplicationId(), clusterName))
.forEach(sparkYarnApplications::add);
}
});
logger.info(
"Dumped {} YARN applications with {} type from {} cluster",
loadedAppsCount,
Expand All @@ -96,6 +104,7 @@ protected void doRun(
}
}
}
handle.initSparkYarnApplications(sparkYarnApplications);
}

private void writeYarnAppTypes(
Expand All @@ -115,22 +124,37 @@ private void writeYarnAppTypes(
}
}

private List<String> fetchYARNApplicationTypes(ClouderaManagerHandle handle, String clusterName) {
String yarnAppTypesUrl =
private Set<String> collectYarnApplicationTypes(
TaskRunContext context, ClouderaManagerHandle handle, String clusterName) {
Set<String> yarnApplicationTypes = new HashSet<>();
ImmutableList<String> predefinedYarnAppTypes =
stream(YarnApplicationType.values())
.map(YarnApplicationType::getValue)
.collect(toImmutableList());
yarnApplicationTypes.addAll(predefinedYarnAppTypes);
yarnApplicationTypes.addAll(fetchClusterServiceTypes(handle, clusterName));
yarnApplicationTypes.addAll(context.getArguments().getYarnApplicationTypes());
return yarnApplicationTypes;
}

private ImmutableList<String> fetchClusterServiceTypes(
ClouderaManagerHandle handle, String clusterName) {
String serviceTypesUrl =
handle.getApiURI().toString() + "clusters/" + clusterName + "/serviceTypes";
CloseableHttpClient httpClient = handle.getHttpClient();
try (CloseableHttpResponse appTypesResp = httpClient.execute(new HttpGet(yarnAppTypesUrl))) {
int statusCode = appTypesResp.getStatusLine().getStatusCode();
try (CloseableHttpResponse serviceTypesResp =
httpClient.execute(new HttpGet(serviceTypesUrl))) {
int statusCode = serviceTypesResp.getStatusLine().getStatusCode();
if (!isStatusCodeOK(statusCode)) {
throw new ClouderaConnectorException(
String.format(
"Cloudera API returned bad http status: %d. Message: %s",
statusCode, readFromStream(appTypesResp.getEntity().getContent())));
statusCode, readFromStream(serviceTypesResp.getEntity().getContent())));
}
JsonNode appTypesJson = readJsonTree(appTypesResp.getEntity().getContent());
return StreamSupport.stream(appTypesJson.get("items").spliterator(), false)
JsonNode serviceTypesJson = readJsonTree(serviceTypesResp.getEntity().getContent());
return StreamSupport.stream(serviceTypesJson.get("items").spliterator(), false)
.map(JsonNode::asText)
.collect(Collectors.toList());
.collect(toImmutableList());
} catch (IOException ex) {
throw new ClouderaConnectorException(ex.getMessage(), ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.dto;
package com.google.edwmigration.dumper.application.dumper.connector.cloudera.manager.model;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
public enum YarnApplicationType {
MAPREDUCE("MAPREDUCE"),
SPARK("SPARK"),
OOZIE_LAUNCHER("Oozie Launcher");

/**
* DTO class for the unofficial UI part of Cloudera Management. Display the host from a Memory Usage
* chart.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class CMFHostDTO {
@JsonProperty(required = true)
private String hostName;

@JsonProperty(required = true)
private String hostId;
private final String value;

public String getName() {
return hostName;
YarnApplicationType(String value) {
this.value = value;
}

public String getId() {
return hostId;
public String getValue() {
return value;
}
}
Loading
Loading