diff --git a/xyz-hub-service/src/main/java/com/here/xyz/hub/config/dynamo/DynamoDataReferenceConfigClient.java b/xyz-hub-service/src/main/java/com/here/xyz/hub/config/dynamo/DynamoDataReferenceConfigClient.java index 3fb99f74ac..75b8bd0df1 100644 --- a/xyz-hub-service/src/main/java/com/here/xyz/hub/config/dynamo/DynamoDataReferenceConfigClient.java +++ b/xyz-hub-service/src/main/java/com/here/xyz/hub/config/dynamo/DynamoDataReferenceConfigClient.java @@ -37,6 +37,7 @@ import com.here.xyz.util.service.aws.dynamo.IndexDefinition; import io.vertx.core.Future; import java.util.AbstractMap.SimpleImmutableEntry; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -64,6 +65,8 @@ public final class DynamoDataReferenceConfigClient extends DataReferenceConfigCl private static final String ID_INDEX_ATTRIBUTE_NAME = "id"; + private static final long KEEP_UNTIL_GRACE_PERIOD_SECONDS = 48 * 3600; + private static final IndexDefinition idIndex = new IndexDefinition(ID_INDEX_ATTRIBUTE_NAME); private final DynamoClient dynamoClient; @@ -96,10 +99,22 @@ protected Future doStore(DataReference dataReference) { private static Map dynamoItemAsMap(DataReference dataReference) { Map dataReferenceAsMap = dataReference.toMap(); dataReferenceAsMap.put(SORT_KEY_NAME, sortKeyValue(dataReference)); - + // Converts ms to seconds (required by DynamoDB TTL) and adds grace period + if (dataReferenceAsMap.get("keepUntil") != null) { + dataReferenceAsMap.put("keepUntil", ((Number) dataReferenceAsMap.get("keepUntil")).longValue() / 1000 + KEEP_UNTIL_GRACE_PERIOD_SECONDS); + } return dataReferenceAsMap; } + private static T fromDynamoMap(Map itemAsMap, Class resultItemClass) { + Map mutableMap = new HashMap<>(itemAsMap); + // Subtracts grace period and converts seconds back to ms for the API + if (mutableMap.get("keepUntil") != null) { + mutableMap.put("keepUntil", (((Number) mutableMap.get("keepUntil")).longValue() - KEEP_UNTIL_GRACE_PERIOD_SECONDS) * 1000); + } + return fromMap(mutableMap, resultItemClass); + } + private static String sortKeyValue(DataReference dataReference) { return SORT_KEY_PATTERN.formatted(dataReference.getEndVersion(), dataReference.getId()); } @@ -110,7 +125,7 @@ protected Future> doLoad(UUID id) { queryIndex(dataReferenceTable, idIndex, id.toString()) .stream() .findFirst() - .map(item -> fromMap(item.asMap(), DataReference.class)) + .map(item -> fromDynamoMap(item.asMap(), DataReference.class)) ); } @@ -175,7 +190,7 @@ private static QueryFilter toQueryFilter(Entry filteringParamete private static List itemCollectionToList(ItemCollection itemCollection, Class resultItemClass) { return StreamSupport.stream(itemCollection.spliterator(), false) .map(Item::asMap) - .map(itemAsMap -> fromMap(itemAsMap, resultItemClass)) + .map(itemAsMap -> fromDynamoMap(itemAsMap, resultItemClass)) .toList(); } diff --git a/xyz-hub-service/src/main/java/com/here/xyz/hub/util/DataReferenceResolver.java b/xyz-hub-service/src/main/java/com/here/xyz/hub/util/DataReferenceResolver.java index a257b04a35..d37948f3c1 100644 --- a/xyz-hub-service/src/main/java/com/here/xyz/hub/util/DataReferenceResolver.java +++ b/xyz-hub-service/src/main/java/com/here/xyz/hub/util/DataReferenceResolver.java @@ -24,6 +24,7 @@ import com.here.xyz.hub.config.SpaceConfigClient; import com.here.xyz.hub.connectors.models.Space; import com.here.xyz.models.hub.DataReference; +import com.here.xyz.util.service.Core; import io.vertx.core.Future; import org.apache.logging.log4j.Marker; @@ -65,6 +66,10 @@ public Future> loadById(Marker marker, UUID referenceId, DataReference ref = maybeRef.get(); + if (isExpired(ref)) { + return Future.succeededFuture(Optional.empty()); + } + if (onlyStale) { return Future.succeededFuture(Optional.of(ref)); } @@ -78,6 +83,7 @@ public Future> filterStaleForEntity(Marker marker, String en } public Future> filterForEntity(Marker marker, String entityId, List refs, boolean onlyStale) { + List nonExpiredRefs = refs.stream().filter(r -> !isExpired(r)).toList(); return resolveAnchorSpace(marker, entityId) .map(maybeAnchor -> { if (maybeAnchor.isEmpty()) { @@ -85,11 +91,11 @@ public Future> filterForEntity(Marker marker, String entityI return List.of(); } - return distinctNewestByUniquenessKey(refs); + return distinctNewestByUniquenessKey(nonExpiredRefs); } long minCreatedAt = maybeAnchor.get().createdAt(); - List filtered = refs.stream() + List filtered = nonExpiredRefs.stream() .filter(r -> onlyStale ? ts(r.getCreatedAt()) < minCreatedAt : ts(r.getCreatedAt()) >= minCreatedAt) @@ -224,6 +230,10 @@ private static long ts(Long v) { return v == null ? 0L : v; } + private static boolean isExpired(DataReference r) { + return r.getKeepUntil() != null && r.getKeepUntil() < Core.currentTimeMillis(); + } + private record Anchor(String spaceId, long createdAt) {} private record UniquenessKey( diff --git a/xyz-hub-service/src/test/java/com/here/xyz/hub/util/DataReferenceResolverTest.java b/xyz-hub-service/src/test/java/com/here/xyz/hub/util/DataReferenceResolverTest.java index 9b59fea132..280fd2dd68 100644 --- a/xyz-hub-service/src/test/java/com/here/xyz/hub/util/DataReferenceResolverTest.java +++ b/xyz-hub-service/src/test/java/com/here/xyz/hub/util/DataReferenceResolverTest.java @@ -370,6 +370,36 @@ void filterForEntity_shouldReturnDistinctNewestPerUniquenessKey_whenAnchorIsMiss assertThat(result).containsExactlyInAnyOrder(keyANew, keyB); } + @Test + void loadById_shouldReturnEmpty_whenReferenceIsExpired() { + UUID id = UUID.randomUUID(); + long expiredKeepUntil = System.currentTimeMillis() - 1000; + DataReference expired = ref("entity-id-1", 100L).withId(id).withKeepUntil(expiredKeepUntil); + + when(references.load(id)).thenReturn(Future.succeededFuture(Optional.of(expired))); + + Optional result = await(resolver.loadById(marker, id, false)); + + assertThat(result).isEmpty(); + verifyNoInteractions(spaces); + } + + @Test + void filterForEntity_shouldExcludeExpiredReferences() { + String entityId = "entity-id-1"; + long expiredKeepUntil = System.currentTimeMillis() - 1000; + long futureKeepUntil = System.currentTimeMillis() + 60_000; + + DataReference expired = ref(entityId, 100L).withKeepUntil(expiredKeepUntil); + DataReference valid = ref(entityId, 200L).withKeepUntil(futureKeepUntil); + + when(spaces.get(marker, entityId)).thenReturn(Future.succeededFuture(null)); + + List result = await(resolver.filterForEntity(marker, entityId, List.of(expired, valid), false)); + + assertThat(result).containsExactly(valid); + } + private static DataReference ref(String entityId, Long createdAt) { DataReference r = new DataReference().withEntityId(entityId); if (createdAt != null) {