Skip to content

Commit 242ec95

Browse files
author
Utsav Vyas
committed
feat: wire EmfFileWriter into NucleusEmitter publish loop
1 parent dbb7449 commit 242ec95

5 files changed

Lines changed: 292 additions & 16 deletions

File tree

pom.xml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -477,9 +477,6 @@
477477
</goals>
478478
<configuration>
479479
<dataFile>${project.build.directory}/jacoco-it.exec</dataFile>
480-
<excludes>
481-
<exclude>com/aws/greengrass/telemetry/nucleus/emitter/emf/**</exclude>
482-
</excludes>
483480
<rules>
484481
<rule>
485482
<element>BUNDLE</element>

src/integrationtests/java/com/aws/greengrass/integrationtests/telemetry/nucleus/emitter/NucleusEmitterIntegrationTest.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@
3030

3131
import java.io.IOException;
3232
import java.nio.charset.StandardCharsets;
33+
import java.nio.file.Files;
3334
import java.nio.file.Path;
35+
import java.nio.file.Paths;
3436
import java.util.List;
3537
import java.util.Objects;
3638
import java.util.concurrent.CountDownLatch;
@@ -625,6 +627,40 @@ void GIVEN_output_mode_both_WHEN_component_started_THEN_it_works() throws Except
625627
}
626628
}
627629

630+
@Test
631+
void GIVEN_output_mode_both_WHEN_publish_fires_THEN_emf_file_created() throws Exception {
632+
final CountDownLatch pubsubLog = new CountDownLatch(1);
633+
try (AutoCloseable listener = TestUtils.createCloseableLogListener((m) -> {
634+
String stdoutStr = m.getMessage();
635+
if (stdoutStr != null && stdoutStr.contains(PUBSUB_PUBLISH_STARTING)) {
636+
pubsubLog.countDown();
637+
}
638+
})) {
639+
startKernelWithConfig(Objects.requireNonNull(NucleusEmitterTestUtils.class.getResource(
640+
OUTPUT_MODE_BOTH_NUCLEUS_EMITTER_KERNEL_CONFIG)).toString(), kernel, rootDir);
641+
assertTrue(pubsubLog.await(30, TimeUnit.SECONDS), "Pub/sub publish log detected.");
642+
643+
// Poll for EMF file creation (publish fires immediately with initialDelay=0)
644+
Path emfDir = Paths.get("/tmp/greengrass/telemetry");
645+
long deadline = System.currentTimeMillis() + 30000;
646+
long emfCount = 0;
647+
while (System.currentTimeMillis() < deadline) {
648+
if (Files.isDirectory(emfDir)) {
649+
try (java.util.stream.Stream<Path> files = Files.list(emfDir)) {
650+
emfCount = files
651+
.filter(p -> p.toString().endsWith(".emf.json"))
652+
.count();
653+
}
654+
if (emfCount > 0) {
655+
break;
656+
}
657+
}
658+
Thread.sleep(1000);
659+
}
660+
assertTrue(emfCount > 0, "EMF file created in output directory.");
661+
}
662+
}
663+
628664
@Test
629665
void GIVEN_default_config_WHEN_metricsLevel_changed_to_detailed_THEN_sme_recreated() throws Exception {
630666
final CountDownLatch startupLog = new CountDownLatch(1);

src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/NucleusEmitter.java

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,23 @@
99
import com.aws.greengrass.config.Topics;
1010
import com.aws.greengrass.dependency.ImplementsService;
1111
import com.aws.greengrass.dependency.State;
12+
import com.aws.greengrass.deployment.DeviceConfiguration;
1213
import com.aws.greengrass.lifecyclemanager.PluginService;
1314
import com.aws.greengrass.telemetry.impl.Metric;
15+
import com.aws.greengrass.telemetry.nucleus.emitter.emf.EmfFileWriter;
1416
import com.aws.greengrass.telemetry.nucleus.emitter.metrics.KernelMetricsEmitter;
1517
import com.aws.greengrass.telemetry.nucleus.emitter.metrics.SystemMetricsEmitter;
1618
import com.aws.greengrass.telemetry.nucleus.emitter.publisher.MqttPublisher;
1719
import com.aws.greengrass.telemetry.nucleus.emitter.publisher.PubSubPublisher;
20+
import com.aws.greengrass.util.Coerce;
1821
import com.aws.greengrass.util.SerializerFactory;
1922
import com.aws.greengrass.util.Utils;
2023
import com.fasterxml.jackson.core.JsonProcessingException;
2124
import com.fasterxml.jackson.databind.ObjectMapper;
2225
import lombok.AccessLevel;
2326
import lombok.Getter;
2427

28+
import java.nio.file.Paths;
2529
import java.util.Collection;
2630
import java.util.Collections;
2731
import java.util.List;
@@ -64,6 +68,8 @@ public class NucleusEmitter extends PluginService {
6468
//Metric publishers
6569
private final PubSubPublisher pubSubPublisher;
6670
private final MqttPublisher mqttPublisher;
71+
// volatile ensures visibility across config subscriber and publish threads
72+
private volatile EmfFileWriter emfFileWriter;
6773

6874
private final ChildChanged subscribeToConfigChanges = (what, topic) ->
6975
handleConfiguration(this.config.lookupTopics(CONFIGURATION_CONFIG_KEY));
@@ -108,9 +114,14 @@ private void handleConfiguration(Topics configurationTopics) {
108114
.equals(newConfiguration.getExcludeMounts());
109115
boolean excludeInterfacesChanged = !configuration.getExcludeInterfaces()
110116
.equals(newConfiguration.getExcludeInterfaces());
117+
boolean outputModeChanged = !configuration.getOutputMode()
118+
.equals(newConfiguration.getOutputMode());
119+
boolean outputDirectoryChanged = !configuration.getOutputDirectory()
120+
.equals(newConfiguration.getOutputDirectory());
111121

112122
if (!pubSubPublishChanged && !mqttTopicChanged && !telemetryPublishIntervalMsChanged
113-
&& !metricsLevelChanged && !excludeMountsChanged && !excludeInterfacesChanged) {
123+
&& !metricsLevelChanged && !excludeMountsChanged && !excludeInterfacesChanged
124+
&& !outputModeChanged && !outputDirectoryChanged) {
114125
return;
115126
}
116127

@@ -138,6 +149,14 @@ private void handleConfiguration(Topics configurationTopics) {
138149
newConfiguration.getExcludeMounts(),
139150
newConfiguration.getExcludeInterfaces());
140151
}
152+
if (outputModeChanged || outputDirectoryChanged) {
153+
if (newConfiguration.isEmfEnabled()) {
154+
this.emfFileWriter = new EmfFileWriter(getThingName(),
155+
Paths.get(newConfiguration.getOutputDirectory()));
156+
} else {
157+
this.emfFileWriter = null; // NOPMD NullAssignment - disables EMF output
158+
}
159+
}
141160
scheduleTelemetryPublish();
142161
}
143162

@@ -151,16 +170,36 @@ public void startup() {
151170
scheduleTelemetryPublish();
152171
}
153172

154-
private void publishTelemetry(boolean pubSubPublish, String pubSubTopic, boolean mqttPublish, String mqttTopic) {
155-
String jsonString = retrieveMetricsJson(jsonMapper);
156-
if (pubSubPublish) {
157-
this.pubSubPublisher.publishMessage(jsonString, pubSubTopic);
173+
private void publishTelemetry(boolean pubSubPublish, String pubSubTopic,
174+
boolean mqttPublish, String mqttTopic) {
175+
List<Metric> metrics = collectMetrics();
176+
if (pubSubPublish || mqttPublish) {
177+
String jsonString = null; // NOPMD - set in try, checked before use
178+
try {
179+
jsonString = jsonMapper.writeValueAsString(metrics);
180+
} catch (JsonProcessingException e) {
181+
logger.error(JSON_PARSE_ERROR_LOG, e);
182+
}
183+
if (pubSubPublish && jsonString != null) {
184+
this.pubSubPublisher.publishMessage(jsonString, pubSubTopic);
185+
}
186+
if (mqttPublish && jsonString != null) {
187+
this.mqttPublisher.publishMessage(jsonString, mqttTopic);
188+
}
158189
}
159-
if (mqttPublish) {
160-
this.mqttPublisher.publishMessage(jsonString, mqttTopic);
190+
EmfFileWriter localEmf = this.emfFileWriter;
191+
if (localEmf != null) {
192+
localEmf.write(metrics);
161193
}
162194
}
163195

196+
private List<Metric> collectMetrics() {
197+
SystemMetricsEmitter localSme = this.sme;
198+
return Stream.of(localSme.getMetrics(), kme.getMetrics())
199+
.flatMap(Collection::stream)
200+
.collect(Collectors.toList());
201+
}
202+
164203
private void scheduleTelemetryPublish() {
165204
final NucleusEmitterConfiguration configuration = currentConfiguration.get();
166205
final boolean newPubPublish = configuration.isPubsubPublish();
@@ -192,13 +231,9 @@ private void scheduleTelemetryPublish() {
192231
}
193232

194233
protected String retrieveMetricsJson(ObjectMapper jsonMapper) {
195-
196234
String jsonString = null;
197235
try {
198-
SystemMetricsEmitter localSme = this.sme;
199-
List<Metric> metrics = Stream.of(localSme.getMetrics(), kme.getMetrics())
200-
.flatMap(Collection::stream)
201-
.collect(Collectors.toList());
236+
List<Metric> metrics = collectMetrics();
202237
jsonString = jsonMapper.writeValueAsString(metrics);
203238
} catch (JsonProcessingException e) {
204239
logger.error(JSON_PARSE_ERROR_LOG, e);
@@ -209,6 +244,13 @@ protected String retrieveMetricsJson(ObjectMapper jsonMapper) {
209244
@Override
210245
public void shutdown() {
211246
cancelJob(telemetryPublishFuture, telemetryPublishInProgressLock, true);
247+
this.emfFileWriter = null; // NOPMD NullAssignment - release on shutdown
248+
}
249+
250+
private String getThingName() {
251+
return Coerce.toString(
252+
this.context.get(DeviceConfiguration.class)
253+
.getThingName());
212254
}
213255

214256
private void cancelJob(ScheduledFuture<?> future, Object lock, boolean immediately) {

src/main/java/com/aws/greengrass/telemetry/nucleus/emitter/metrics/SystemMetricsEmitter.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@
1212

1313
package com.aws.greengrass.telemetry.nucleus.emitter.metrics;
1414

15+
import com.aws.greengrass.logging.api.Logger;
16+
import com.aws.greengrass.logging.impl.LogManager;
1517
import com.aws.greengrass.telemetry.impl.Metric;
1618
import com.aws.greengrass.telemetry.models.TelemetryAggregation;
1719
import com.aws.greengrass.telemetry.models.TelemetryUnit;
@@ -30,6 +32,7 @@
3032
import java.util.Map;
3133

3234
public class SystemMetricsEmitter extends PeriodicMetricsEmitter {
35+
private static final Logger logger = LogManager.getLogger(SystemMetricsEmitter.class);
3336
private static final int MB_CONVERTER = 1024 * 1024;
3437
private static final int PERCENTAGE_CONVERTER = 100;
3538
public static final String NAMESPACE = "SystemMetrics";
@@ -144,7 +147,8 @@ private List<Metric> collectNetworkMetrics(long timestamp) {
144147
continue;
145148
}
146149
} catch (SocketException e) {
147-
// Interface unavailable (e.g. being removed); skip this cycle, next call retries
150+
logger.atDebug().kv("interface", name)
151+
.cause(e).log("Skipping unavailable network interface");
148152
continue;
149153
}
150154
if (excludeInterfaces.contains(name)) {

0 commit comments

Comments
 (0)