Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,15 @@
<jar.name>aws.greengrass.telemetry.NucleusEmitter</jar.name>

<nucleus.version>2.4.0-SNAPSHOT</nucleus.version>
<logging.version>2.4.0-SNAPSHOT</logging.version>
<junit.version>5.6.2</junit.version>
<mockito.version>3.2.4</mockito.version>
<lombok.version>1.18.10</lombok.version>
<spotbugs.version>4.0.1</spotbugs.version>
<hamcrest.core.version>2.2</hamcrest.core.version>
<surefire.version>3.0.0-M5</surefire.version>
<aws.maven.version>1.4.2</aws.maven.version>
<emf.version>4.2.0</emf.version>
</properties>
<repositories>
<repository>
Expand All @@ -61,12 +63,36 @@
<version>${nucleus.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.aws.greengrass</groupId>
<artifactId>logging</artifactId>
<version>${logging.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>software.amazon.cloudwatchlogs</groupId>
<artifactId>aws-embedded-metrics</artifactId>
<version>${emf.version}</version>
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>*</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -625,6 +627,40 @@ void GIVEN_output_mode_both_WHEN_component_started_THEN_it_works() throws Except
}
}

@Test
void GIVEN_output_mode_both_WHEN_publish_fires_THEN_emf_file_created() throws Exception {
final CountDownLatch pubsubLog = new CountDownLatch(1);
try (AutoCloseable listener = TestUtils.createCloseableLogListener((m) -> {
String stdoutStr = m.getMessage();
if (stdoutStr != null && stdoutStr.contains(PUBSUB_PUBLISH_STARTING)) {
pubsubLog.countDown();
}
})) {
startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(
OUTPUT_MODE_BOTH_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir);
assertTrue(pubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected.");

// Poll for EMF file creation (publish fires immediately with initialDelay=0)
Path emfDir = Paths.get("/tmp/greengrass/telemetry");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this tmp file get cleanup after the test?

long deadline = System.currentTimeMillis() + 30000;
long emfCount = 0;
while (System.currentTimeMillis() < deadline) {
if (Files.isDirectory(emfDir)) {
try (java.util.stream.Stream<Path> files = Files.list(emfDir)) {
emfCount = files
.filter(p -> p.getFileName().toString().startsWith("emf-metrics"))
.count();
}
if (emfCount > 0) {
break;
}
}
Thread.sleep(1000);
}
assertTrue(emfCount > 0, "EMF file created in output directory.");
}
}

@Test
void GIVEN_default_config_WHEN_metricsLevel_changed_to_detailed_THEN_sme_recreated() throws Exception {
final CountDownLatch startupLog = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,23 @@
import com.aws.greengrass.config.Topics;
import com.aws.greengrass.dependency.ImplementsService;
import com.aws.greengrass.dependency.State;
import com.aws.greengrass.deployment.DeviceConfiguration;
import com.aws.greengrass.lifecyclemanager.PluginService;
import com.aws.greengrass.telemetry.impl.Metric;
import com.aws.greengrass.telemetry.nucleus.emitter.emf.EmfFileWriter;
import com.aws.greengrass.telemetry.nucleus.emitter.metrics.KernelMetricsEmitter;
import com.aws.greengrass.telemetry.nucleus.emitter.metrics.SystemMetricsEmitter;
import com.aws.greengrass.telemetry.nucleus.emitter.publisher.MqttPublisher;
import com.aws.greengrass.telemetry.nucleus.emitter.publisher.PubSubPublisher;
import com.aws.greengrass.util.Coerce;
import com.aws.greengrass.util.SerializerFactory;
import com.aws.greengrass.util.Utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.AccessLevel;
import lombok.Getter;

import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -64,6 +68,8 @@ public class NucleusEmitter extends PluginService {
//Metric publishers
private final PubSubPublisher pubSubPublisher;
private final MqttPublisher mqttPublisher;
// volatile ensures visibility across config subscriber and publish threads
private volatile EmfFileWriter emfFileWriter;

private final ChildChanged subscribeToConfigChanges = (what, topic) ->
handleConfiguration(this.config.lookupTopics(CONFIGURATION_CONFIG_KEY));
Expand Down Expand Up @@ -108,9 +114,14 @@ private void handleConfiguration(Topics configurationTopics) {
.equals(newConfiguration.getExcludeMounts());
boolean excludeInterfacesChanged = !configuration.getExcludeInterfaces()
.equals(newConfiguration.getExcludeInterfaces());
boolean outputModeChanged = !configuration.getOutputMode()
.equals(newConfiguration.getOutputMode());
boolean outputDirectoryChanged = !configuration.getOutputDirectory()
.equals(newConfiguration.getOutputDirectory());

if (!pubSubPublishChanged && !mqttTopicChanged && !telemetryPublishIntervalMsChanged
&& !metricsLevelChanged && !excludeMountsChanged && !excludeInterfacesChanged) {
&& !metricsLevelChanged && !excludeMountsChanged && !excludeInterfacesChanged
&& !outputModeChanged && !outputDirectoryChanged) {
return;
}

Expand Down Expand Up @@ -138,6 +149,14 @@ private void handleConfiguration(Topics configurationTopics) {
newConfiguration.getExcludeMounts(),
newConfiguration.getExcludeInterfaces());
}
if (outputModeChanged || outputDirectoryChanged) {
if (newConfiguration.isEmfEnabled()) {
this.emfFileWriter = new EmfFileWriter(getThingName(),
Paths.get(newConfiguration.getOutputDirectory()));
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recommendation generated by Amazon CodeGuru Reviewer. Leave feedback on this recommendation by replying to the comment or by reacting to the comment using emoji.

Path traversal vulnerability detected. User-controlled input in file paths
can allow attackers to access files outside intended directories using
../ sequences. Secure your code by using framework functions like
getCanonicalPath(), normalize(), or validating paths with
.startsWith() checks. Learn more:
https://cwe.mitre.org/data/definitions/22.html

} else {
this.emfFileWriter = null; // NOPMD NullAssignment - disables EMF output
}
}
scheduleTelemetryPublish();
}

Expand All @@ -151,16 +170,36 @@ public void startup() {
scheduleTelemetryPublish();
}

private void publishTelemetry(boolean pubSubPublish, String pubSubTopic, boolean mqttPublish, String mqttTopic) {
String jsonString = retrieveMetricsJson(jsonMapper);
if (pubSubPublish) {
this.pubSubPublisher.publishMessage(jsonString, pubSubTopic);
private void publishTelemetry(boolean pubSubPublish, String pubSubTopic,
boolean mqttPublish, String mqttTopic) {
List<Metric> metrics = collectMetrics();
if (pubSubPublish || mqttPublish) {
String jsonString = null; // NOPMD - set in try, checked before use
try {
jsonString = jsonMapper.writeValueAsString(metrics);
} catch (JsonProcessingException e) {
logger.error(JSON_PARSE_ERROR_LOG, e);
}
if (pubSubPublish && jsonString != null) {
this.pubSubPublisher.publishMessage(jsonString, pubSubTopic);
}
if (mqttPublish && jsonString != null) {
this.mqttPublisher.publishMessage(jsonString, mqttTopic);
}
}
if (mqttPublish) {
this.mqttPublisher.publishMessage(jsonString, mqttTopic);
EmfFileWriter localEmf = this.emfFileWriter;
if (localEmf != null) {
localEmf.write(metrics);
}
}

private List<Metric> collectMetrics() {
SystemMetricsEmitter localSme = this.sme;
return Stream.of(localSme.getMetrics(), kme.getMetrics())
.flatMap(Collection::stream)
.collect(Collectors.toList());
}

private void scheduleTelemetryPublish() {
final NucleusEmitterConfiguration configuration = currentConfiguration.get();
final boolean newPubPublish = configuration.isPubsubPublish();
Expand Down Expand Up @@ -192,13 +231,9 @@ private void scheduleTelemetryPublish() {
}

protected String retrieveMetricsJson(ObjectMapper jsonMapper) {

String jsonString = null;
try {
SystemMetricsEmitter localSme = this.sme;
List<Metric> metrics = Stream.of(localSme.getMetrics(), kme.getMetrics())
.flatMap(Collection::stream)
.collect(Collectors.toList());
List<Metric> metrics = collectMetrics();
jsonString = jsonMapper.writeValueAsString(metrics);
} catch (JsonProcessingException e) {
logger.error(JSON_PARSE_ERROR_LOG, e);
Expand All @@ -209,6 +244,13 @@ protected String retrieveMetricsJson(ObjectMapper jsonMapper) {
@Override
public void shutdown() {
cancelJob(telemetryPublishFuture, telemetryPublishInProgressLock, true);
this.emfFileWriter = null; // NOPMD NullAssignment - release on shutdown
}

private String getThingName() {
return Coerce.toString(
this.context.get(DeviceConfiguration.class)
.getThingName());
}

private void cancelJob(ScheduledFuture<?> future, Object lock, boolean immediately) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0
*/

package com.aws.greengrass.telemetry.nucleus.emitter.emf;

import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.LogManager;
import com.aws.greengrass.telemetry.impl.Metric;
import com.aws.greengrass.telemetry.models.TelemetryUnit;
import com.fasterxml.jackson.core.JsonProcessingException;
import software.amazon.cloudwatchlogs.emf.model.DimensionSet;
import software.amazon.cloudwatchlogs.emf.model.MetricsContext;
import software.amazon.cloudwatchlogs.emf.model.Unit;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;

/**
* Serializes metrics in EMF (Embedded Metric Format) and writes them via a
* raw Greengrass logger (no envelope). The GG Log Manager handles file rotation
* and upload to CloudWatch Logs, where EMF is auto-parsed into CloudWatch Metrics.
*/
public class EmfFileWriter {

private static final Logger logger = LogManager.getLogger(EmfFileWriter.class);

private static final Map<TelemetryUnit, Unit> UNIT_MAP;

static {
Map<TelemetryUnit, Unit> m = new LinkedHashMap<>();
m.put(TelemetryUnit.Percent, Unit.PERCENT);
m.put(TelemetryUnit.Bytes, Unit.BYTES);
m.put(TelemetryUnit.Megabytes, Unit.MEGABYTES);
m.put(TelemetryUnit.Count, Unit.COUNT);
UNIT_MAP = Collections.unmodifiableMap(m);
}

private final String thingName;
private final Logger emfLogger;

/**
* Creates an EmfFileWriter.
*
* @param thingName thing name for dimensions
* @param outputDirectory directory for EMF output files
*/
public EmfFileWriter(String thingName, Path outputDirectory) {
this.thingName = thingName;
this.emfLogger = LogManager.getRawLogger("emf-metrics", outputDirectory);
}

/**
* Serializes metrics to EMF JSON and writes them via the raw logger.
*
* @param metrics list of metrics to write
*/
public void write(List<Metric> metrics) {
if (metrics == null || metrics.isEmpty()) {
return;
}
Map<String, List<Metric>> byNamespace = groupByNamespace(metrics);
for (Map.Entry<String, List<Metric>> entry : byNamespace.entrySet()) {
MetricsContext context = new MetricsContext();
context.setNamespace(entry.getKey());
context.putDimension(DimensionSet.of("ThingName", thingName));
for (Metric metric : entry.getValue()) {
if (!(metric.getValue() instanceof Number)) {
continue;
}
context.putMetric(metric.getName(),
((Number) metric.getValue()).doubleValue(),
toEmfUnit(metric.getUnit()));
}
try {
for (String line : context.serialize()) {
emfLogger.atInfo().log(line);
}
} catch (JsonProcessingException e) {
logger.error("Failed to serialize EMF metrics", e);
}
}
}

private Map<String, List<Metric>> groupByNamespace(List<Metric> metrics) {
Map<String, List<Metric>> result = new LinkedHashMap<>();
for (Metric m : metrics) {
result.computeIfAbsent(m.getNamespace(), k -> new ArrayList<>()).add(m);
}
return result;
}

private static Unit toEmfUnit(TelemetryUnit unit) {
return UNIT_MAP.getOrDefault(unit, Unit.NONE);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

package com.aws.greengrass.telemetry.nucleus.emitter.metrics;

import com.aws.greengrass.logging.api.Logger;
import com.aws.greengrass.logging.impl.LogManager;
import com.aws.greengrass.telemetry.impl.Metric;
import com.aws.greengrass.telemetry.models.TelemetryAggregation;
import com.aws.greengrass.telemetry.models.TelemetryUnit;
Expand All @@ -30,6 +32,7 @@
import java.util.Map;

public class SystemMetricsEmitter extends PeriodicMetricsEmitter {
private static final Logger logger = LogManager.getLogger(SystemMetricsEmitter.class);
private static final int MB_CONVERTER = 1024 * 1024;
private static final int PERCENTAGE_CONVERTER = 100;
public static final String NAMESPACE = "SystemMetrics";
Expand Down Expand Up @@ -144,7 +147,8 @@ private List<Metric> collectNetworkMetrics(long timestamp) {
continue;
}
} catch (SocketException e) {
// Interface unavailable (e.g. being removed); skip this cycle, next call retries
logger.atDebug().kv("interface", name)
.cause(e).log("Skipping unavailable network interface");
continue;
}
if (excludeInterfaces.contains(name)) {
Expand All @@ -159,6 +163,9 @@ private List<Metric> collectNetworkMetrics(long timestamp) {
continue;
}
long timeDelta = current.timestamp - prev.timestamp;
if (timeDelta <= 0) {
continue;
}
double seconds = timeDelta / 1000.0;

addNetworkMetric(metrics, "BytesRecvPerSec", name, TelemetryUnit.Bytes,
Expand Down
Loading
Loading