Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions src/main/java/com/aws/greengrass/tes/CredentialRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,14 @@ public class CredentialRequestHandler implements HttpHandler {
public static final String AUTH_HEADER = "Authorization";
public static final String IOT_CREDENTIALS_HTTP_VERB = "GET";
public static final String SUPPORTED_REQUEST_VERB = "GET";
public static final int TIME_BEFORE_CACHE_EXPIRE_IN_MIN = 5;
public static final int CLOUD_4XX_ERROR_CACHE_IN_MIN = 2;
public static final int CLOUD_5XX_ERROR_CACHE_IN_MIN = 1;
public static final int UNKNOWN_ERROR_CACHE_IN_MIN = 5;
public static final int TIME_BEFORE_CACHE_EXPIRE_IN_SEC = 300;
public static final int CLOUD_4XX_ERROR_CACHE_IN_SEC = 120;
public static final int CLOUD_5XX_ERROR_CACHE_IN_SEC = 60;
public static final int UNKNOWN_ERROR_CACHE_IN_SEC = 300;

private int cloud4xxErrorCacheInSec = CLOUD_4XX_ERROR_CACHE_IN_SEC;
private int cloud5xxErrorCacheInSec = CLOUD_5XX_ERROR_CACHE_IN_SEC;
private int unknownErrorCacheInSec = UNKNOWN_ERROR_CACHE_IN_SEC;

private String iotCredentialsPath;

Expand Down Expand Up @@ -142,6 +146,12 @@ void setIotCredentialsPath(String iotRoleAlias) {
this.iotCredentialsPath = "/role-aliases/" + iotRoleAlias + "/credentials";
}

void configureCacheSettings(int cloud4xxErrorCache, int cloud5xxErrorCache, int unknownErrorCache) {
this.cloud4xxErrorCacheInSec = cloud4xxErrorCache;
this.cloud5xxErrorCacheInSec = cloud5xxErrorCache;
this.unknownErrorCacheInSec = unknownErrorCache;
}

@Override
@SuppressWarnings("PMD.AvoidCatchingThrowable")
public void handle(final HttpExchange exchange) throws IOException {
Expand Down Expand Up @@ -281,14 +291,14 @@ private byte[] getCredentialsBypassCache() {
LOGGER.atError().kv(IOT_CRED_PATH_KEY, iotCredentialsPath)
.log("Unable to cache expired credentials which expired at {}", expiry);
} else {
newExpiry = expiry.minus(Duration.ofMinutes(TIME_BEFORE_CACHE_EXPIRE_IN_MIN));
newExpiry = expiry.minus(Duration.ofSeconds(TIME_BEFORE_CACHE_EXPIRE_IN_SEC));
tesCache.get(iotCredentialsPath).responseCode = HttpURLConnection.HTTP_OK;

if (newExpiry.isBefore(Instant.now(clock))) {
LOGGER.atWarn().kv(IOT_CRED_PATH_KEY, iotCredentialsPath)
.log("Can't cache credentials as new credentials {} will "
+ "expire in less than {} minutes", expiry,
TIME_BEFORE_CACHE_EXPIRE_IN_MIN);
+ "expire in less than {} seconds", expiry,
TIME_BEFORE_CACHE_EXPIRE_IN_SEC);
} else {
LOGGER.atInfo().kv(IOT_CRED_PATH_KEY, iotCredentialsPath)
.log("Received IAM credentials that will be cached until {}", newExpiry);
Expand Down Expand Up @@ -318,7 +328,7 @@ private byte[] getCredentialsBypassCache() {
String responseString = "Failed to get connection";
response = responseString.getBytes(StandardCharsets.UTF_8);
// Use unknown error cache policy for SSL/TLS connection errors to prevent excessive retries
newExpiry = Instant.now(clock).plus(Duration.ofMinutes(UNKNOWN_ERROR_CACHE_IN_MIN));
newExpiry = Instant.now(clock).plus(Duration.ofSeconds(unknownErrorCacheInSec));
tesCache.get(iotCredentialsPath).responseCode = HttpURLConnection.HTTP_INTERNAL_ERROR;
tesCache.get(iotCredentialsPath).expiry = newExpiry;
tesCache.get(iotCredentialsPath).credentials = response;
Expand Down Expand Up @@ -421,16 +431,16 @@ private String parseExpiryFromResponse(final String credentials) throws AWSIotEx
}

private Instant getExpiryPolicyForErr(int statusCode) {
int expiryTime = UNKNOWN_ERROR_CACHE_IN_MIN; // In case of unrecognized cloud errors, back off
int expiryTime = unknownErrorCacheInSec; // In case of unrecognized cloud errors, back off
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it was already like this, but expiryDuration is more appropriate than expiryTime

// Add caching Time-To-Live (TTL) for TES cloud errors
if (statusCode >= 400 && statusCode < 500) {
// 4xx retries are only meaningful unless a user action has been adopted, TTL should be longer
expiryTime = CLOUD_4XX_ERROR_CACHE_IN_MIN;
expiryTime = cloud4xxErrorCacheInSec;
} else if (statusCode >= 500 && statusCode < 600) {
// 5xx could be a temporary cloud unavailability, TTL should be shorter
expiryTime = CLOUD_5XX_ERROR_CACHE_IN_MIN;
expiryTime = cloud5xxErrorCacheInSec;
}
return Instant.now(clock).plus(Duration.ofMinutes(expiryTime));
return Instant.now(clock).plus(Duration.ofSeconds(expiryTime));
}

/**
Expand Down
95 changes: 85 additions & 10 deletions src/main/java/com/aws/greengrass/tes/TokenExchangeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.aws.greengrass.authorization.exceptions.AuthorizationException;
import com.aws.greengrass.config.Topic;
import com.aws.greengrass.config.Topics;
import com.aws.greengrass.config.WhatHappened;
import com.aws.greengrass.dependency.ImplementsService;
import com.aws.greengrass.dependency.State;
import com.aws.greengrass.deployment.DeviceConfiguration;
Expand Down Expand Up @@ -41,6 +42,14 @@ public class TokenExchangeService extends GreengrassService implements AwsCreden
private String iotRoleAlias;
private HttpServerImpl server;

public static final String CLOUD_4XX_ERROR_CACHE_TOPIC = "error4xxCredentialRetryInSec";
public static final String CLOUD_5XX_ERROR_CACHE_TOPIC = "error5xxCredentialRetryInSec";
public static final String UNKNOWN_ERROR_CACHE_TOPIC = "errorUnknownCredentialRetryInSec";
private static final int MINIMUM_ERROR_CACHE_IN_SEC = 10;
private int cloud4xxErrorCache;
private int cloud5xxErrorCache;
private int unknownErrorCache;

private final AuthorizationHandler authZHandler;
private final CredentialRequestHandler credentialRequestHandler;

Expand All @@ -57,24 +66,69 @@ public TokenExchangeService(Topics topics,
AuthorizationHandler authZHandler, DeviceConfiguration deviceConfiguration) {
super(topics);
port = Coerce.toInt(config.lookup(CONFIGURATION_CONFIG_KEY, PORT_TOPIC).dflt(DEFAULT_PORT));

deviceConfiguration.getIotRoleAlias().subscribe((why, newv) -> {
iotRoleAlias = Coerce.toString(newv);
});

this.authZHandler = authZHandler;
this.credentialRequestHandler = credentialRequestHandler;

cloud4xxErrorCache = validateCacheConfig(Coerce.toInt(config.findOrDefault(
CredentialRequestHandler.CLOUD_4XX_ERROR_CACHE_IN_SEC, CONFIGURATION_CONFIG_KEY,
CLOUD_4XX_ERROR_CACHE_TOPIC)), CLOUD_4XX_ERROR_CACHE_TOPIC);
cloud5xxErrorCache = validateCacheConfig(Coerce.toInt(config.findOrDefault(
CredentialRequestHandler.CLOUD_5XX_ERROR_CACHE_IN_SEC, CONFIGURATION_CONFIG_KEY,
CLOUD_5XX_ERROR_CACHE_TOPIC)), CLOUD_5XX_ERROR_CACHE_TOPIC);
unknownErrorCache = validateCacheConfig(Coerce.toInt(config.findOrDefault(
CredentialRequestHandler.UNKNOWN_ERROR_CACHE_IN_SEC, CONFIGURATION_CONFIG_KEY,
UNKNOWN_ERROR_CACHE_TOPIC)), UNKNOWN_ERROR_CACHE_TOPIC);

credentialRequestHandler.configureCacheSettings(cloud4xxErrorCache, cloud5xxErrorCache, unknownErrorCache);

// Subscribe to cache configuration changes
config.subscribe((why, node) -> {
if (node != null && node.childOf(PORT_TOPIC)) {
if (why.equals(WhatHappened.timestampUpdated)) {
return;
}
if (node != null && (node.childOf(PORT_TOPIC)
|| node.childOf(CLOUD_4XX_ERROR_CACHE_TOPIC)
|| node.childOf(CLOUD_5XX_ERROR_CACHE_TOPIC)
|| node.childOf(UNKNOWN_ERROR_CACHE_TOPIC))) {
Comment on lines +94 to +97
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: We don't need to execute this callback if the what happened events are irrelevant to this. For eg, see how we don't take an action if the what happened event doesn't show a change in value.

https://github.com/aws-greengrass/aws-greengrass-shadow-manager/blob/132bf7438a2dedbbf71c7787ad633a35db7fbd39/src/main/java/com/aws/greengrass/shadowmanager/ShadowManager.java#L247-L249

logger.atDebug("tes-config-change").kv("node", node).kv("why", why).log();

port = Coerce.toInt(node);
Topic activePortTopic = config.lookup(CONFIGURATION_CONFIG_KEY, ACTIVE_PORT_TOPIC);
if (port != Coerce.toInt(activePortTopic)) {
logger.atInfo("tes-config-change").kv(PORT_TOPIC, port).kv("node", node).kv("why", why)
.log("Restarting TES server due to port config change");

int newCloud4xxErrorCache = validateCacheConfig(Coerce.toInt(config.findOrDefault(
CredentialRequestHandler.CLOUD_4XX_ERROR_CACHE_IN_SEC, CONFIGURATION_CONFIG_KEY,
CLOUD_4XX_ERROR_CACHE_TOPIC)), CLOUD_4XX_ERROR_CACHE_TOPIC);
int newCloud5xxErrorCache = validateCacheConfig(Coerce.toInt(config.findOrDefault(
CredentialRequestHandler.CLOUD_5XX_ERROR_CACHE_IN_SEC, CONFIGURATION_CONFIG_KEY,
CLOUD_5XX_ERROR_CACHE_TOPIC)), CLOUD_5XX_ERROR_CACHE_TOPIC);
int newUnknownErrorCache = validateCacheConfig(Coerce.toInt(config.findOrDefault(
CredentialRequestHandler.UNKNOWN_ERROR_CACHE_IN_SEC, CONFIGURATION_CONFIG_KEY,
UNKNOWN_ERROR_CACHE_TOPIC)), UNKNOWN_ERROR_CACHE_TOPIC);

if (port != Coerce.toInt(activePortTopic)
|| cloud4xxErrorCache != newCloud4xxErrorCache
|| cloud5xxErrorCache != newCloud5xxErrorCache
|| unknownErrorCache != newUnknownErrorCache) {

cloud4xxErrorCache = newCloud4xxErrorCache;
cloud5xxErrorCache = newCloud5xxErrorCache;
unknownErrorCache = newUnknownErrorCache;

credentialRequestHandler.configureCacheSettings(
newCloud4xxErrorCache, newCloud5xxErrorCache, newUnknownErrorCache);

logger.atInfo("tes-config-change")
.kv("node", node).kv("why", why)
.log("Restarting TES server due to config change");
requestRestart();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe you should be able to change the error cache time during runtime and without restart. I don't see why not. This would change the subscription handler logic to only request restart if the port changes. This would be useful because TES restarts cause a lot of dependent component restarts, which we want to avoid if possible.

}
}
});
deviceConfiguration.getIotRoleAlias().subscribe((why, newv) -> {
iotRoleAlias = Coerce.toString(newv);
});

this.authZHandler = authZHandler;
this.credentialRequestHandler = credentialRequestHandler;
}

@Override
Expand All @@ -95,6 +149,7 @@ protected void startup() {
.log("Attempting to start server at configured port {}", port);
try {
validateConfig();
validateAllCacheConfigs();
server = new HttpServerImpl(port, credentialRequestHandler);
server.start();
logger.atInfo().log("Started server at port {}", server.getServerPort());
Expand Down Expand Up @@ -130,6 +185,26 @@ private void validateConfig() {
}
}

private void validateAllCacheConfigs() {
validateCacheConfig(Coerce.toInt(config.findOrDefault(
CredentialRequestHandler.CLOUD_4XX_ERROR_CACHE_IN_SEC, CONFIGURATION_CONFIG_KEY,
CLOUD_4XX_ERROR_CACHE_TOPIC)), CLOUD_4XX_ERROR_CACHE_TOPIC);
validateCacheConfig(Coerce.toInt(config.findOrDefault(
CredentialRequestHandler.CLOUD_5XX_ERROR_CACHE_IN_SEC, CONFIGURATION_CONFIG_KEY,
CLOUD_5XX_ERROR_CACHE_TOPIC)), CLOUD_5XX_ERROR_CACHE_TOPIC);
validateCacheConfig(Coerce.toInt(config.findOrDefault(
CredentialRequestHandler.UNKNOWN_ERROR_CACHE_IN_SEC, CONFIGURATION_CONFIG_KEY,
UNKNOWN_ERROR_CACHE_TOPIC)), UNKNOWN_ERROR_CACHE_TOPIC);
}

private int validateCacheConfig(int newCacheValue, String topic) {
if (newCacheValue < MINIMUM_ERROR_CACHE_IN_SEC) {
throw new IllegalArgumentException(
"Error cache value for " + topic + " must be at least " + MINIMUM_ERROR_CACHE_IN_SEC + " seconds");
}
return newCacheValue;
}

@Override
public AwsCredentials resolveCredentials() {
return credentialRequestHandler.getAwsCredentials();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

import static com.aws.greengrass.tes.CredentialRequestHandler.CLOUD_4XX_ERROR_CACHE_IN_MIN;
import static com.aws.greengrass.tes.CredentialRequestHandler.CLOUD_5XX_ERROR_CACHE_IN_MIN;
import static com.aws.greengrass.tes.CredentialRequestHandler.TIME_BEFORE_CACHE_EXPIRE_IN_MIN;
import static com.aws.greengrass.tes.CredentialRequestHandler.UNKNOWN_ERROR_CACHE_IN_MIN;
import static com.aws.greengrass.tes.CredentialRequestHandler.CLOUD_4XX_ERROR_CACHE_IN_SEC;
import static com.aws.greengrass.tes.CredentialRequestHandler.CLOUD_5XX_ERROR_CACHE_IN_SEC;
import static com.aws.greengrass.tes.CredentialRequestHandler.TIME_BEFORE_CACHE_EXPIRE_IN_SEC;
import static com.aws.greengrass.tes.CredentialRequestHandler.UNKNOWN_ERROR_CACHE_IN_SEC;
import static com.aws.greengrass.testcommons.testutilities.ExceptionLogProtector.ignoreExceptionOfType;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
Expand Down Expand Up @@ -320,15 +320,15 @@ void GIVEN_credential_handler_WHEN_called_handle_THEN_caches_creds() throws Exce
verify(mockStream, times(1)).write(expectedResponse);

// Expiry time in recent future won't give error but there wil be no caching
expirationTime = Instant.now().plus(Duration.ofMinutes(TIME_BEFORE_CACHE_EXPIRE_IN_MIN - 1));
expirationTime = Instant.now().plus(Duration.ofSeconds(TIME_BEFORE_CACHE_EXPIRE_IN_SEC - 60));
responseStr = String.format(RESPONSE_STR, expirationTime.toString());
mockResponse = new IotCloudResponse(responseStr.getBytes(StandardCharsets.UTF_8), 200);
when(mockCloudHelper.sendHttpRequest(any(), any(), any(), any(), any())).thenReturn(mockResponse);
handler.handle(mockExchange);
verify(mockCloudHelper, times(2)).sendHttpRequest(any(), any(), any(), any(), any());

// Expiry time in future will result in credentials being cached
expirationTime = Instant.now().plus(Duration.ofMinutes(TIME_BEFORE_CACHE_EXPIRE_IN_MIN + 1));
expirationTime = Instant.now().plus(Duration.ofSeconds(TIME_BEFORE_CACHE_EXPIRE_IN_SEC + 60));
responseStr = String.format(RESPONSE_STR, expirationTime.toString());
mockResponse = new IotCloudResponse(responseStr.getBytes(StandardCharsets.UTF_8), 200);
when(mockCloudHelper.sendHttpRequest(any(), any(), any(), any(), any())).thenReturn(mockResponse);
Expand Down Expand Up @@ -401,7 +401,7 @@ void GIVEN_4xx_response_code_WHEN_called_handle_THEN_expire_in_2_minutes() throw
String.format("TES responded with status code: %d. Caching response. ", expectedStatus).getBytes();
// expire in 2 minutes
handler.getAwsCredentials();
Instant expirationTime = Instant.now().plus(Duration.ofMinutes(CLOUD_4XX_ERROR_CACHE_IN_MIN));
Instant expirationTime = Instant.now().plus(Duration.ofSeconds(CLOUD_4XX_ERROR_CACHE_IN_SEC));
Clock mockClock = Clock.fixed(expirationTime, ZoneId.of("UTC"));
handler.setClock(mockClock);
handler.getAwsCredentials();
Expand All @@ -425,7 +425,7 @@ void GIVEN_5xx_response_code_WHEN_called_handle_THEN_expire_in_1_minute() throws
String.format("TES responded with status code: %d. Caching response. ", expectedStatus).getBytes();
// expire in 1 minute
handler.getAwsCredentials();
Instant expirationTime = Instant.now().plus(Duration.ofMinutes(CLOUD_5XX_ERROR_CACHE_IN_MIN));
Instant expirationTime = Instant.now().plus(Duration.ofSeconds(CLOUD_5XX_ERROR_CACHE_IN_SEC));
Clock mockClock = Clock.fixed(expirationTime, ZoneId.of("UTC"));
handler.setClock(mockClock);
handler.getAwsCredentials();
Expand All @@ -449,7 +449,7 @@ void GIVEN_unknown_error_response_code_WHEN_called_handle_THEN_expire_in_5_minut
String.format("TES responded with status code: %d. Caching response. ", expectedStatus).getBytes();
// expire in 5 minutes
handler.getAwsCredentials();
Instant expirationTime = Instant.now().plus(Duration.ofMinutes(UNKNOWN_ERROR_CACHE_IN_MIN));
Instant expirationTime = Instant.now().plus(Duration.ofSeconds(UNKNOWN_ERROR_CACHE_IN_SEC));
Clock mockClock = Clock.fixed(expirationTime, ZoneId.of("UTC"));
handler.setClock(mockClock);
handler.getAwsCredentials();
Expand Down
Loading
Loading