Skip to content

Commit d8f46dd

Browse files
author
Sahith Reddy Adudodla
committed
feat(deployment): report terminal deployment status to source account after endpoint switch
After a successful endpoint switch (or FAILED_UNABLE_TO_ROLLBACK), the main MQTT client is connected to the destination endpoint and cannot report status back to the source endpoint's IoT Job or deployment shadow. This adds source endpoint status reporting via a standalone MQTT connection. - EndpointSwitchState: new class managing persisted state (source endpoint, deployment ID) for in-flight endpoint-switch deployments - IotJobsHelper: consumer detects endpoint-switch deployment, publishes SUCCEEDED to source IoT Job via standalone MQTT, clears keys - ShadowDeploymentListener: same pattern for shadow deployments (publishes unconditional shadow update without version field) - StandaloneMqttConnector.publish(): reusable publish with exponential backoff - IN_PROGRESS guard: dropped after nucleus restart to avoid stale publishes to destination where the IoT Job/shadow doesn't exist
1 parent a6291f1 commit d8f46dd

14 files changed

Lines changed: 1001 additions & 60 deletions

src/main/java/com/aws/greengrass/deployment/DeploymentConfigMerger.java

Lines changed: 20 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,7 @@ public class DeploymentConfigMerger {
6262
public static final String DEPLOYMENT_ID_LOG_KEY = "deploymentId";
6363
public static final String SERVICE_NAME_LOG_KEY = "serviceName";
6464
protected static final int WAIT_SVC_START_POLL_INTERVAL_MILLISEC = 1000;
65-
static final long DEFAULT_PREFLIGHT_MQTT_TIMEOUT_MS = 60_000;
66-
static final String STANDALONE_MQTT_TIMEOUT_KEY = "standaloneMqttTimeoutMs";
6765
private static final String ENDPOINT_LOG_KEY = "endpoint";
68-
private static final String ENDPOINT_SWITCH_CLIENT_SUFFIX = "#endpoint-switch";
6966

7067
private static final Logger logger = LogManager.getLogger(DeploymentConfigMerger.class);
7168

@@ -161,10 +158,20 @@ private void updateActionForDeployment(Map<String, Object> newConfig, Deployment
161158

162159
// Endpoint-switch deployments: pre-flight connectivity check, then persist source endpoint.
163160
// Pre-flight runs BEFORE persisting so that on failure, no device state has changed.
164-
// Source endpoint is stored under services.DeploymentService.runtime (internal deployment state,
165-
// not customer config). Persisted to config.tlog so it survives device crashes mid-endpoint-switch.
166-
// Step 5 uses this value to report deployment status back to the source account.
167-
// Cleared after status reporting; stale keys are harmless and overwritten by the next switch.
161+
//
162+
// Before switching the IoT endpoint, save two values to runtime config:
163+
//
164+
// 1. sourceIotDataEndpoint — the current IoT data endpoint (where the device is connected
165+
// now). After the switch, the status consumer uses this to publish the deployment result
166+
// back to the originating account via a standalone MQTT connection.
167+
//
168+
// 2. sourceDeploymentId — this deployment's ID (job ID, config ARN, or UUID depending on
169+
// type). The status consumer matches this against the persisted status entry to identify
170+
// which entry belongs to the endpoint-switch deployment.
171+
//
172+
// Both are stored under services.DeploymentService.runtime (internal deployment state, not
173+
// customer config) and persisted to config.tlog (survives crashes). Removed by the status
174+
// consumer after it successfully publishes the deployment result to the source endpoint.
168175
EndpointSwitchPreflightValidator preflightValidator =
169176
kernel.getContext().get(EndpointSwitchPreflightValidator.class);
170177
if (isEndpointSwitchDeployment(nucleusConfig, deviceConfiguration)) {
@@ -180,24 +187,24 @@ private void updateActionForDeployment(Map<String, Object> newConfig, Deployment
180187
.kv(ENDPOINT_LOG_KEY, newDataEndpoint)
181188
.log("Starting pre-flight MQTT connectivity check");
182189
long preflightTimeout = Coerce.toLong(deviceConfiguration.getMQTTNamespace()
183-
.findOrDefault(DEFAULT_PREFLIGHT_MQTT_TIMEOUT_MS, STANDALONE_MQTT_TIMEOUT_KEY));
190+
.findOrDefault(EndpointSwitchState.DEFAULT_STANDALONE_MQTT_TIMEOUT_MS,
191+
EndpointSwitchState.STANDALONE_MQTT_TIMEOUT_KEY));
184192
if (preflightTimeout <= 0) {
185193
logger.atWarn().kv("configured", preflightTimeout)
186-
.kv("using", DEFAULT_PREFLIGHT_MQTT_TIMEOUT_MS)
194+
.kv("using", EndpointSwitchState.DEFAULT_STANDALONE_MQTT_TIMEOUT_MS)
187195
.log("Invalid standaloneMqttTimeoutMs, using default");
188-
preflightTimeout = DEFAULT_PREFLIGHT_MQTT_TIMEOUT_MS;
196+
preflightTimeout = EndpointSwitchState.DEFAULT_STANDALONE_MQTT_TIMEOUT_MS;
189197
}
190198
if (!preflightValidator.verifyMqttConnectivity(totallyCompleteFuture, newDataEndpoint,
191-
ENDPOINT_SWITCH_CLIENT_SUFFIX, preflightTimeout)) {
199+
EndpointSwitchState.ENDPOINT_SWITCH_CLIENT_SUFFIX, preflightTimeout)) {
192200
return;
193201
}
194202
logger.atInfo().setEventType(MERGE_CONFIG_EVENT_KEY)
195203
.kv(ENDPOINT_LOG_KEY, newDataEndpoint)
196204
.log("Pre-flight MQTT connectivity check passed");
197205

198206
// Persist source endpoint only after pre-flight succeeds — no state change on failure
199-
kernel.getContext().get(DeploymentService.class).getRuntimeConfig()
200-
.lookup(DeploymentService.SOURCE_IOT_DATA_ENDPOINT_KEY).withValue(currentDataEndpoint);
207+
kernel.getContext().get(EndpointSwitchState.class).persist(currentDataEndpoint, deployment.getId());
201208
}
202209

203210
// Pre-flight credential endpoint check — uses the device certificate for mTLS authentication,

src/main/java/com/aws/greengrass/deployment/DeploymentService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,6 @@ public class DeploymentService extends GreengrassService {
9898
public static final String GROUP_TO_LAST_DEPLOYMENT_CONFIG_ARN_KEY = "configArn";
9999
public static final String GROUP_TO_ROOT_COMPONENTS_VERSION_KEY = "version";
100100
public static final String GROUP_TO_ROOT_COMPONENTS_GROUP_CONFIG_ARN = "groupConfigArn";
101-
public static final String SOURCE_IOT_DATA_ENDPOINT_KEY = "sourceIotDataEndpoint";
102101
public static final String GROUP_TO_ROOT_COMPONENTS_GROUP_NAME = "groupConfigName";
103102
public static final String DEPLOYMENT_DETAILED_STATUS_KEY = "detailed-deployment-status";
104103
public static final String DEPLOYMENT_FAILURE_CAUSE_KEY = "deployment-failure-cause";
@@ -422,6 +421,7 @@ private void finishCurrentDeployment() throws InterruptedException {
422421
logger.atInfo().kv(DEPLOYMENT_ID_LOG_KEY_NAME, currentDeploymentTaskMetadata.getDeploymentId())
423422
.kv(GG_DEPLOYMENT_ID_LOG_KEY_NAME, ggDeploymentId).log("Deployment task is cancelled");
424423
}
424+
425425
// Setting this to null to indicate there is no current deployment being processed
426426
// Did not use optionals over null due to performance
427427
currentDeploymentTaskMetadata = null;
@@ -1059,6 +1059,7 @@ public Set<String> getAllGroupNames() {
10591059
return allGroupNames;
10601060
}
10611061

1062+
10621063
/**
10631064
* Checks whether a component is a root component or not.
10641065
*
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
package com.aws.greengrass.deployment;
7+
8+
import com.aws.greengrass.config.Topic;
9+
import com.aws.greengrass.config.Topics;
10+
import com.aws.greengrass.util.Coerce;
11+
import com.aws.greengrass.util.Utils;
12+
13+
import javax.inject.Inject;
14+
15+
/**
16+
* Manages persisted state for in-flight endpoint-switch deployments.
17+
*
18+
* <p>Before an endpoint switch, {@link DeploymentConfigMerger} persists the source IoT data endpoint and
19+
* deployment ID into DeploymentService's runtime config. After the switch completes, the status consumers
20+
* ({@link IotJobsHelper}, {@link ShadowDeploymentListener}) use this class to identify the endpoint-switch
21+
* deployment and report terminal status back to the source endpoint via a standalone MQTT connection.</p>
22+
*
23+
* <p>Keys are stored under {@code services.DeploymentService.runtime} and persisted to config.tlog
24+
* (survives crashes). Cleared after terminal status is reported.</p>
25+
*/
26+
public class EndpointSwitchState {
27+
28+
static final String SOURCE_IOT_DATA_ENDPOINT_KEY = "sourceIotDataEndpoint";
29+
static final String SOURCE_DEPLOYMENT_ID_KEY = "sourceDeploymentId";
30+
static final String ENDPOINT_SWITCH_CLIENT_SUFFIX = "#endpoint-switch";
31+
static final long DEFAULT_STANDALONE_MQTT_TIMEOUT_MS = 60_000;
32+
static final String STANDALONE_MQTT_TIMEOUT_KEY = "standaloneMqttTimeoutMs";
33+
34+
private final DeploymentService deploymentService;
35+
36+
@Inject
37+
EndpointSwitchState(DeploymentService deploymentService) {
38+
this.deploymentService = deploymentService;
39+
}
40+
41+
/**
42+
* Check if the given deployment is the in-flight endpoint-switch deployment by matching
43+
* the deployment ID against the persisted source deployment ID.
44+
*
45+
* @param deploymentId the deployment ID to check (job ID for IoT Jobs, config ARN for Shadow)
46+
* @return true if this is the endpoint-switch deployment
47+
*/
48+
public boolean isEndpointSwitchDeployment(String deploymentId) {
49+
String sourceEndpoint = getSourceIotDataEndpoint();
50+
if (sourceEndpoint == null) {
51+
return false;
52+
}
53+
String sourceDepId = getSourceDeploymentId();
54+
return deploymentId != null && deploymentId.equals(sourceDepId);
55+
}
56+
57+
/**
58+
* Read the IoT data endpoint where the endpoint-switch deployment originated.
59+
* This is the endpoint the device was connected to before the switch, used to report terminal
60+
* deployment status back to the source endpoint via a standalone MQTT connection.
61+
*
62+
* @return the source IoT data endpoint, or null if no endpoint switch is in progress
63+
*/
64+
public String getSourceIotDataEndpoint() {
65+
String sourceEndpoint = Coerce.toString(getRuntimeConfig().find(SOURCE_IOT_DATA_ENDPOINT_KEY));
66+
if (Utils.isEmpty(sourceEndpoint)) {
67+
return null;
68+
}
69+
return sourceEndpoint;
70+
}
71+
72+
/**
73+
* Persist the source endpoint and deployment ID before applying the endpoint switch.
74+
* Called by {@link DeploymentConfigMerger} after pre-flight passes.
75+
*
76+
* @param sourceEndpoint the current IoT data endpoint (before switch)
77+
* @param deploymentId the deployment ID initiating the switch
78+
*/
79+
public void persist(String sourceEndpoint, String deploymentId) {
80+
getRuntimeConfig().lookup(SOURCE_IOT_DATA_ENDPOINT_KEY).withValue(sourceEndpoint);
81+
getRuntimeConfig().lookup(SOURCE_DEPLOYMENT_ID_KEY).withValue(deploymentId);
82+
}
83+
84+
/**
85+
* Remove the persisted source IoT data endpoint and deployment ID from runtime config.
86+
* Called after the endpoint-switch deployment's terminal status has been published to the
87+
* source endpoint (or publish failed — keys are cleared regardless to avoid stale state).
88+
*/
89+
public void clear() {
90+
Topic sourceEndpointTopic = getRuntimeConfig().find(SOURCE_IOT_DATA_ENDPOINT_KEY);
91+
if (sourceEndpointTopic != null) {
92+
sourceEndpointTopic.remove();
93+
}
94+
Topic sourceDeploymentIdTopic = getRuntimeConfig().find(SOURCE_DEPLOYMENT_ID_KEY);
95+
if (sourceDeploymentIdTopic != null) {
96+
sourceDeploymentIdTopic.remove();
97+
}
98+
}
99+
100+
private String getSourceDeploymentId() {
101+
return Coerce.toString(getRuntimeConfig().find(SOURCE_DEPLOYMENT_ID_KEY));
102+
}
103+
104+
private Topics getRuntimeConfig() {
105+
return deploymentService.getRuntimeConfig();
106+
}
107+
}

src/main/java/com/aws/greengrass/deployment/IotJobsClientWrapper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
@SuppressWarnings("PMD.AvoidCatchingGenericException")
4141
@SuppressFBWarnings("NM_METHOD_NAMING_CONVENTION")
4242
public class IotJobsClientWrapper extends IotJobsClient {
43-
private static final String UPDATE_JOB_TOPIC =
43+
static final String UPDATE_JOB_TOPIC =
4444
"$aws/things/%s/jobs/%s/namespace-aws-gg-deployment/update";
4545
static final String JOB_UPDATE_ACCEPTED_TOPIC =
4646
"$aws/things/%s/jobs/%s/namespace-aws-gg-deployment/update/accepted";

src/main/java/com/aws/greengrass/deployment/IotJobsHelper.java

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
import com.aws.greengrass.logging.api.Logger;
1818
import com.aws.greengrass.logging.impl.LogManager;
1919
import com.aws.greengrass.mqttclient.MqttClient;
20+
import com.aws.greengrass.mqttclient.StandaloneMqttConnector;
2021
import com.aws.greengrass.mqttclient.WrapperMqttClientConnection;
22+
import com.aws.greengrass.security.SecurityService;
2123
import com.aws.greengrass.status.FleetStatusService;
2224
import com.aws.greengrass.status.model.Trigger;
2325
import com.aws.greengrass.util.Coerce;
@@ -26,6 +28,8 @@
2628
import com.aws.greengrass.util.SerializerFactory;
2729
import com.aws.greengrass.util.Utils;
2830
import com.fasterxml.jackson.core.JsonProcessingException;
31+
import com.google.gson.GsonBuilder;
32+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
2933
import lombok.AccessLevel;
3034
import lombok.Getter;
3135
import lombok.NoArgsConstructor;
@@ -46,6 +50,7 @@
4650
import software.amazon.awssdk.iot.iotjobs.model.UpdateJobExecutionRequest;
4751
import software.amazon.awssdk.iot.iotjobs.model.UpdateJobExecutionSubscriptionRequest;
4852

53+
import java.nio.charset.StandardCharsets;
4954
import java.time.Duration;
5055
import java.time.Instant;
5156
import java.util.HashMap;
@@ -97,6 +102,10 @@ public class IotJobsHelper implements InjectionActions {
97102
"Interrupted while subscribing to Iot jobs event notifications topic";
98103
private static final String JOB_ID_LOG_KEY_NAME = "JobId";
99104
private static final String NEXT_JOB_LITERAL = "$next";
105+
/**
106+
* IoT Jobs status detail values are limited to 1024 characters by the service.
107+
*/
108+
private static final int JOB_STATUS_DETAIL_MAX_LENGTH = 1024;
100109
// Sometimes when we are notified that a new job is queued and request the next pending job document immediately,
101110
// we get an empty response. This unprocessedJobs is to track the number of new queued jobs that we are notified
102111
// with, and keep retrying the request until we get a non-empty response.
@@ -361,6 +370,34 @@ private void setupCommWithIotJobs(Boolean isConfigurationUpdate) {
361370
private Boolean deploymentStatusChanged(Map<String, Object> deploymentDetails) {
362371
String jobId = (String) deploymentDetails.get(DEPLOYMENT_ID_KEY_NAME);
363372
String status = (String) deploymentDetails.get(DEPLOYMENT_STATUS_KEY_NAME);
373+
374+
// Endpoint-switch deployment: report SUCCEEDED to the source endpoint via standalone MQTT.
375+
//
376+
// After a successful switch the main MQTT client is on the destination endpoint, so the
377+
// IoT Job at the source endpoint can only be updated via a standalone connection.
378+
//
379+
// FAILED (rollback): the device is still on the source endpoint (sourceEndpoint ==
380+
// currentEndpoint), so the block clears keys and falls through — the existing
381+
// updateJobStatus() below publishes FAILED via the main MQTT client which is still
382+
// connected to the source endpoint.
383+
//
384+
// IN_PROGRESS: only reaches this block after a nucleus restart (the source endpoint
385+
// already received IN_PROGRESS before the restart). Dropped to avoid publishing to the
386+
// destination endpoint where the IoT Job doesn't exist.
387+
EndpointSwitchState endpointSwitchState = kernel.getContext().get(EndpointSwitchState.class);
388+
if (endpointSwitchState.isEndpointSwitchDeployment(jobId)) {
389+
if (JobStatus.IN_PROGRESS.toString().equals(status)) {
390+
return true;
391+
}
392+
String sourceEndpoint = endpointSwitchState.getSourceIotDataEndpoint();
393+
if (!sourceEndpoint.equals(Coerce.toString(deviceConfiguration.getIotDataEndpoint()))) {
394+
publishStatusToSourceEndpoint(sourceEndpoint, jobId, deploymentDetails);
395+
endpointSwitchState.clear();
396+
return true;
397+
}
398+
endpointSwitchState.clear();
399+
}
400+
364401
Map<String, Object> statusDetails =
365402
(Map<String, Object>) deploymentDetails.get(DEPLOYMENT_STATUS_DETAILS_KEY_NAME);
366403
HashMap<String, String> jobStatusDetails = new HashMap<>();
@@ -435,7 +472,8 @@ public void updateJobStatus(String jobId, JobStatus status, HashMap<String, Stri
435472
});
436473

437474
// Truncate status detail map values longer than the 1024 characters limit
438-
statusDetailsMap.entrySet().forEach(e -> statusDetailsMap.put(e.getKey(), Utils.truncate(e.getValue(), 1024)));
475+
statusDetailsMap.entrySet().forEach(e -> statusDetailsMap.put(e.getKey(),
476+
Utils.truncate(e.getValue(), JOB_STATUS_DETAIL_MAX_LENGTH)));
439477

440478
UpdateJobExecutionRequest updateJobRequest = new UpdateJobExecutionRequest();
441479
updateJobRequest.jobId = jobId;
@@ -664,6 +702,61 @@ private void evaluateCancellationAndCancelDeploymentIfNeeded() {
664702
}
665703
}
666704

705+
@SuppressWarnings("PMD.AvoidCatchingGenericException")
706+
@SuppressFBWarnings("REC_CATCH_EXCEPTION")
707+
private void publishStatusToSourceEndpoint(String sourceEndpoint, String jobId,
708+
Map<String, Object> deploymentDetails) {
709+
String status = (String) deploymentDetails.get(DEPLOYMENT_STATUS_KEY_NAME);
710+
Map<String, Object> statusDetails =
711+
(Map<String, Object>) deploymentDetails.get(DEPLOYMENT_STATUS_DETAILS_KEY_NAME);
712+
HashMap<String, String> jobStatusDetails = new HashMap<>();
713+
statusDetails.forEach((k, v) -> {
714+
if (v instanceof String) {
715+
jobStatusDetails.put(k, (String) v);
716+
} else {
717+
try {
718+
jobStatusDetails.put(k, SerializerFactory.getFailSafeJsonObjectMapper().writeValueAsString(v));
719+
} catch (JsonProcessingException e) {
720+
logger.atWarn().setCause(e).log("Failed to serialize status detail");
721+
}
722+
}
723+
});
724+
725+
jobStatusDetails.entrySet().forEach(e -> jobStatusDetails.put(e.getKey(),
726+
Utils.truncate(e.getValue(), JOB_STATUS_DETAIL_MAX_LENGTH)));
727+
728+
String thing = Coerce.toString(deviceConfiguration.getThingName());
729+
String topic = String.format(IotJobsClientWrapper.UPDATE_JOB_TOPIC, thing, jobId);
730+
long connectTimeout = Coerce.toLong(deviceConfiguration.getMQTTNamespace()
731+
.findOrDefault(EndpointSwitchState.DEFAULT_STANDALONE_MQTT_TIMEOUT_MS,
732+
EndpointSwitchState.STANDALONE_MQTT_TIMEOUT_KEY));
733+
if (connectTimeout <= 0) {
734+
connectTimeout = EndpointSwitchState.DEFAULT_STANDALONE_MQTT_TIMEOUT_MS;
735+
}
736+
try (StandaloneMqttConnector connector = StandaloneMqttConnector.of(
737+
kernel.getContext().get(SecurityService.class), deviceConfiguration,
738+
sourceEndpoint, EndpointSwitchState.ENDPOINT_SWITCH_CLIENT_SUFFIX)) {
739+
connector.connect(connectTimeout);
740+
UpdateJobExecutionRequest request = new UpdateJobExecutionRequest();
741+
request.status = JobStatus.valueOf(status);
742+
request.statusDetails = jobStatusDetails;
743+
request.thingName = thing;
744+
request.jobId = jobId;
745+
// Use Gson with HTML escaping disabled to match the IoT Jobs SDK's wire format
746+
// (IotJobsClientWrapper uses the same GsonBuilder configuration)
747+
byte[] payload = new GsonBuilder().disableHtmlEscaping().create()
748+
.toJson(request).getBytes(StandardCharsets.UTF_8);
749+
connector.publish(topic, payload, QualityOfService.AT_LEAST_ONCE,
750+
EndpointSwitchState.DEFAULT_STANDALONE_MQTT_TIMEOUT_MS);
751+
logger.atInfo().kv("jobId", jobId).kv("status", status)
752+
.kv("sourceEndpoint", sourceEndpoint)
753+
.log("Reported terminal job status to source endpoint");
754+
} catch (Exception e) {
755+
logger.atWarn().kv("jobId", jobId).kv("status", status).kv("sourceEndpoint", sourceEndpoint)
756+
.setCause(e).log("Failed to report terminal job status to source endpoint");
757+
}
758+
}
759+
667760
public static class WrapperMqttConnectionFactory {
668761
public WrapperMqttClientConnection getAwsIotMqttConnection(MqttClient mqttClient) {
669762
return new WrapperMqttClientConnection(mqttClient);

0 commit comments

Comments
 (0)