Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.here.xyz.util.service.aws.dynamo.IndexDefinition;
import io.vertx.core.Future;
import java.util.AbstractMap.SimpleImmutableEntry;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -64,6 +65,8 @@ public final class DynamoDataReferenceConfigClient extends DataReferenceConfigCl

private static final String ID_INDEX_ATTRIBUTE_NAME = "id";

private static final long KEEP_UNTIL_GRACE_PERIOD_SECONDS = 48 * 3600;

private static final IndexDefinition idIndex = new IndexDefinition(ID_INDEX_ATTRIBUTE_NAME);

private final DynamoClient dynamoClient;
Expand Down Expand Up @@ -96,10 +99,18 @@ protected Future<UUID> doStore(DataReference dataReference) {
private static Map<String, Object> dynamoItemAsMap(DataReference dataReference) {
Map<String, Object> dataReferenceAsMap = dataReference.toMap();
dataReferenceAsMap.put(SORT_KEY_NAME, sortKeyValue(dataReference));

// Converts ms to seconds (required by DynamoDB TTL) and adds grace period
dataReferenceAsMap.put("keepUntil", ((Number) dataReferenceAsMap.get("keepUntil")).longValue() / 1000 + KEEP_UNTIL_GRACE_PERIOD_SECONDS);
return dataReferenceAsMap;
}

private static <T> T fromDynamoMap(Map<String, Object> itemAsMap, Class<T> resultItemClass) {
Map<String, Object> mutableMap = new HashMap<>(itemAsMap);
// Subtracts grace period and converts seconds back to ms for the API
mutableMap.put("keepUntil", (((Number) mutableMap.get("keepUntil")).longValue() - KEEP_UNTIL_GRACE_PERIOD_SECONDS) * 1000);
return fromMap(mutableMap, resultItemClass);
}

private static String sortKeyValue(DataReference dataReference) {
return SORT_KEY_PATTERN.formatted(dataReference.getEndVersion(), dataReference.getId());
}
Expand All @@ -110,7 +121,7 @@ protected Future<Optional<DataReference>> doLoad(UUID id) {
queryIndex(dataReferenceTable, idIndex, id.toString())
.stream()
.findFirst()
.map(item -> fromMap(item.asMap(), DataReference.class))
.map(item -> fromDynamoMap(item.asMap(), DataReference.class))
);
}

Expand Down Expand Up @@ -175,7 +186,7 @@ private static QueryFilter toQueryFilter(Entry<String, Object> filteringParamete
private static <T> List<T> itemCollectionToList(ItemCollection<QueryOutcome> itemCollection, Class<T> resultItemClass) {
return StreamSupport.stream(itemCollection.spliterator(), false)
.map(Item::asMap)
.map(itemAsMap -> fromMap(itemAsMap, resultItemClass))
.map(itemAsMap -> fromDynamoMap(itemAsMap, resultItemClass))
.toList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.here.xyz.hub.config.SpaceConfigClient;
import com.here.xyz.hub.connectors.models.Space;
import com.here.xyz.models.hub.DataReference;
import com.here.xyz.util.service.Core;
import io.vertx.core.Future;
import org.apache.logging.log4j.Marker;

Expand Down Expand Up @@ -65,6 +66,10 @@ public Future<Optional<DataReference>> loadById(Marker marker, UUID referenceId,

DataReference ref = maybeRef.get();

if (isExpired(ref)) {
return Future.succeededFuture(Optional.empty());
}

if (onlyStale) {
return Future.succeededFuture(Optional.of(ref));
}
Expand All @@ -78,18 +83,19 @@ public Future<List<DataReference>> filterStaleForEntity(Marker marker, String en
}

public Future<List<DataReference>> filterForEntity(Marker marker, String entityId, List<DataReference> refs, boolean onlyStale) {
List<DataReference> nonExpiredRefs = refs.stream().filter(r -> !isExpired(r)).toList();
return resolveAnchorSpace(marker, entityId)
.map(maybeAnchor -> {
if (maybeAnchor.isEmpty()) {
if (onlyStale) {
return List.of();
}

return distinctNewestByUniquenessKey(refs);
return distinctNewestByUniquenessKey(nonExpiredRefs);
}

long minCreatedAt = maybeAnchor.get().createdAt();
List<DataReference> filtered = refs.stream()
List<DataReference> filtered = nonExpiredRefs.stream()
.filter(r -> onlyStale
? ts(r.getCreatedAt()) < minCreatedAt
: ts(r.getCreatedAt()) >= minCreatedAt)
Expand Down Expand Up @@ -224,6 +230,10 @@ private static long ts(Long v) {
return v == null ? 0L : v;
}

private static boolean isExpired(DataReference r) {
return r.getKeepUntil() != null && r.getKeepUntil() < Core.currentTimeMillis();
}

private record Anchor(String spaceId, long createdAt) {}

private record UniquenessKey(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,36 @@ void filterForEntity_shouldReturnDistinctNewestPerUniquenessKey_whenAnchorIsMiss
assertThat(result).containsExactlyInAnyOrder(keyANew, keyB);
}

@Test
void loadById_shouldReturnEmpty_whenReferenceIsExpired() {
UUID id = UUID.randomUUID();
long expiredKeepUntil = System.currentTimeMillis() - 1000;
DataReference expired = ref("entity-id-1", 100L).withId(id).withKeepUntil(expiredKeepUntil);

when(references.load(id)).thenReturn(Future.succeededFuture(Optional.of(expired)));

Optional<DataReference> result = await(resolver.loadById(marker, id, false));

assertThat(result).isEmpty();
verifyNoInteractions(spaces);
}

@Test
void filterForEntity_shouldExcludeExpiredReferences() {
String entityId = "entity-id-1";
long expiredKeepUntil = System.currentTimeMillis() - 1000;
long futureKeepUntil = System.currentTimeMillis() + 60_000;

DataReference expired = ref(entityId, 100L).withKeepUntil(expiredKeepUntil);
DataReference valid = ref(entityId, 200L).withKeepUntil(futureKeepUntil);

when(spaces.get(marker, entityId)).thenReturn(Future.succeededFuture(null));

List<DataReference> result = await(resolver.filterForEntity(marker, entityId, List.of(expired, valid), false));

assertThat(result).containsExactly(valid);
}

private static DataReference ref(String entityId, Long createdAt) {
DataReference r = new DataReference().withEntityId(entityId);
if (createdAt != null) {
Expand Down
Loading