Skip to content

Add de-dup of datafeed keys #4872

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 11, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package stroom.receive.common;

import com.fasterxml.jackson.annotation.JsonIgnore;
import stroom.meta.api.AttributeMap;
import stroom.util.shared.string.CIKey;

import jakarta.validation.constraints.Min;
import jakarta.validation.constraints.NotBlank;

import java.nio.file.Path;
import java.time.Instant;
Expand All @@ -14,7 +15,6 @@ public class CachedHashedDataFeedKey {
private final HashedDataFeedKey hashedDataFeedKey;
private final Path sourceFile;

@JsonIgnore
private final int hashCode;

public CachedHashedDataFeedKey(final HashedDataFeedKey hashedDataFeedKey,
Expand All @@ -39,7 +39,6 @@ public Path getSourceFile() {
* @return The hash of the data feed key. The hash algorithm used is defined by
* {@link CachedHashedDataFeedKey#getHashAlgorithm()}
*/
@NotBlank
public String getHash() {
return hashedDataFeedKey.getHash();
}
Expand All @@ -51,30 +50,30 @@ public String getSalt() {
return hashedDataFeedKey.getSalt();
}

@NotBlank
public DataFeedKeyHashAlgorithm getHashAlgorithm() {
return hashedDataFeedKey.getHashAlgorithm();
}

/**
* @return The value of a specified meta key.
*/
@NotBlank
public String getStreamMetaValue(final String metaKey) {
return hashedDataFeedKey.getStreamMetaValue(metaKey);
}

// @NotBlank
// public String getSubjectId() {
// return hashedDataFeedKey.getSubjectId();
// }
//
// public String getDisplayName() {
// return hashedDataFeedKey.getDisplayName();
// }
/**
* @return The value of a specified meta key.
*/
public String getStreamMetaValue(final CIKey metaKey) {
return hashedDataFeedKey.getStreamMetaValue(metaKey);
}

public Map<CIKey, String> getStreamMetaData() {
return hashedDataFeedKey.getCIStreamMetaData();
}

public Map<String, String> getStreamMetaData() {
return hashedDataFeedKey.getStreamMetaData();
public AttributeMap getAttributeMap() {
return hashedDataFeedKey.getAttributeMap();
}

@Min(0)
Expand All @@ -85,15 +84,13 @@ public long getExpiryDateEpochMs() {
/**
* @return The expiry date of this data feed key
*/
@JsonIgnore
public Instant getExpiryDate() {
return hashedDataFeedKey.getExpiryDate();
}

/**
* @return True if this data feed key has expired
*/
@JsonIgnore
public boolean isExpired() {
return hashedDataFeedKey.isExpired();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Function;
Expand Down Expand Up @@ -67,13 +66,13 @@ public class DataFeedKeyServiceImpl implements DataFeedKeyService, Managed, HasS
// private final Map<CacheKey, List<CachedHashedDataFeedKey>> cacheKeyToDataFeedKeyMap = new ConcurrentHashMap<>();
// The owner will likely have >1 CachedHashedDataFeedKey due to the overlap of keys when
// new keys are being supplied, but not many. Use CIKey so we are not fussy on case.
private final Map<CIKey, List<CachedHashedDataFeedKey>> keyOwnerToDataFeedKeyMap = new ConcurrentHashMap<>();
private final Map<CIKey, Set<CachedHashedDataFeedKey>> keyOwnerToDataFeedKeyMap = new ConcurrentHashMap<>();

// Cache of the un-hashed key to validated DataFeedKey.
// Cache of the un-hashed key + owner to verified CachedHashedDataFeedKeys
// If the un-hashed key is in this map, then it means we have hashed it, checked it against
// cacheKeyToDataFeedKeyMap and found it to be valid (at the time of checking).
// This cache saves us the hashing cost on every data receipt, which can be expensive.
private final LoadingStroomCache<UnHashedCacheKey, List<CachedHashedDataFeedKey>> unHashedKeyToDataFeedKeyCache;
private final LoadingStroomCache<UnHashedCacheKey, Set<CachedHashedDataFeedKey>> unHashedKeyToDataFeedKeyCache;

private final Provider<ReceiveDataConfig> receiveDataConfigProvider;
private final Map<DataFeedKeyHashAlgorithm, DataFeedKeyHasher> hashFunctionMap;
Expand Down Expand Up @@ -152,40 +151,43 @@ private Optional<HashedDataFeedKey> getDataFeedKey(final HttpServletRequest requ
@Override
public synchronized int addDataFeedKeys(final HashedDataFeedKeys hashedDataFeedKeys,
final Path sourceFile) {
int addedCount = 0;
final AtomicInteger addedCount = new AtomicInteger();
if (NullSafe.hasItems(hashedDataFeedKeys.getDataFeedKeys())) {
LOGGER.debug(() -> LogUtil.message("Adding {} dataFeedKeys",
hashedDataFeedKeys.getDataFeedKeys().size()));

final AtomicInteger invalidCount = new AtomicInteger();
final ReceiveDataConfig receiveDataConfig = receiveDataConfigProvider.get();
final String keyOwnerMetaKey = receiveDataConfig.getDataFeedKeyOwnerMetaKey();
final AtomicInteger dupCount = new AtomicInteger();
final CIKey keyOwnerMetaKey = getOwnerMetaKey(receiveDataConfigProvider.get());

addedCount = hashedDataFeedKeys.getDataFeedKeys()
hashedDataFeedKeys.getDataFeedKeys()
.stream()
.filter(Objects::nonNull)
.map(dataFeedKey ->
new CachedHashedDataFeedKey(dataFeedKey, sourceFile))
.filter(dataFeedKey ->
isValidDataFeedKey(dataFeedKey, keyOwnerMetaKey, invalidCount))
.mapToInt(cachedHashedDataFeedKey -> {
addDataFeedKey(cachedHashedDataFeedKey, keyOwnerMetaKey);
return 1;
})
.sum();

LOGGER.debug("Added: {}, ignored {} invalid data feed keys, file: {}",
addedCount, invalidCount, sourceFile);
.forEach(cachedHashedDataFeedKey -> {
addDataFeedKey(cachedHashedDataFeedKey, keyOwnerMetaKey, addedCount, dupCount);
});

LOGGER.debug(() -> LogUtil.message(
"Added: {}, ignored {} data feed keys (invalid: {}, duplicate: {}), file: {}",
addedCount,
invalidCount.get() + dupCount.get(),
invalidCount.get(),
dupCount.get(),
sourceFile));
}
LOGGER.debug(() -> LogUtil.message("Total cached keys: {}", keyOwnerToDataFeedKeyMap.values()
.stream()
.mapToInt(List::size)
.mapToInt(Set::size)
.sum()));
return addedCount;
return addedCount.get();
}

private boolean isValidDataFeedKey(final CachedHashedDataFeedKey dataFeedKey,
final String ownerMetaKey,
final CIKey ownerMetaKey,
final AtomicInteger invalidCount) {

if (dataFeedKey.isExpired()) {
Expand All @@ -204,15 +206,22 @@ private boolean isValidDataFeedKey(final CachedHashedDataFeedKey dataFeedKey,
return true;
}

private void addDataFeedKey(final CachedHashedDataFeedKey cachedHashedDataFeedKey,
final String keyOwnerMetaKey) {
private synchronized void addDataFeedKey(final CachedHashedDataFeedKey cachedHashedDataFeedKey,
final CIKey keyOwnerMetaKey,
final AtomicInteger addedCount,
final AtomicInteger dupCount) {
if (cachedHashedDataFeedKey != null) {
final String keyOwner = cachedHashedDataFeedKey.getStreamMetaValue(keyOwnerMetaKey);
// Use CopyOnWriteArrayList as write are very infrequent
keyOwnerToDataFeedKeyMap.computeIfAbsent(
final boolean success = keyOwnerToDataFeedKeyMap.computeIfAbsent(
CIKey.of(keyOwner),
k -> new CopyOnWriteArrayList<>())
k -> ConcurrentHashMap.newKeySet())
.add(cachedHashedDataFeedKey);
if (success) {
addedCount.incrementAndGet();
} else {
dupCount.incrementAndGet();
}
}
}

Expand Down Expand Up @@ -295,7 +304,7 @@ public synchronized void removeKeysForFile(final Path sourceFile) {
}
LOGGER.debug(() -> LogUtil.message("Total cached keys: {}", keyOwnerToDataFeedKeyMap.values()
.stream()
.mapToInt(List::size)
.mapToInt(Set::size)
.sum()));
}

Expand Down Expand Up @@ -343,8 +352,11 @@ private Optional<HashedDataFeedKey> lookupAndValidateKey(final String unHashedKe
return Optional.empty();
}

final String keyOwnerKey = receiveDataConfig.getDataFeedKeyOwnerMetaKey();
final String keyOwner = NullSafe.get(attributeMap, map -> map.get(keyOwnerKey), String::trim);
final CIKey keyOwnerKey = getOwnerMetaKey(receiveDataConfig);
final String keyOwner = NullSafe.get(
attributeMap,
map -> map.get(keyOwnerKey.get()),
String::trim);
if (NullSafe.isBlankString(keyOwner)) {
LOGGER.debug("Blank keyOwner, attributeMap: {}", attributeMap);
throw new StroomStreamException(
Expand All @@ -353,9 +365,9 @@ private Optional<HashedDataFeedKey> lookupAndValidateKey(final String unHashedKe
"Mandatory header '" + keyOwnerKey + "' must be provided to authenticate with a data feed key.");
}

final UnHashedCacheKey unHashedCacheKey = new UnHashedCacheKey(unHashedKey, keyOwner);
final UnHashedCacheKey unHashedCacheKey = new UnHashedCacheKey(unHashedKey, CIKey.ofDynamicKey(keyOwner));

final List<CachedHashedDataFeedKey> dataFeedKeys = unHashedKeyToDataFeedKeyCache.get(unHashedCacheKey);
final Set<CachedHashedDataFeedKey> dataFeedKeys = unHashedKeyToDataFeedKeyCache.get(unHashedCacheKey);
if (NullSafe.isEmptyCollection(dataFeedKeys)) {
LOGGER.debug("Unknown data feed key {}, attributeMap: {}", unHashedKey, attributeMap);
// Data Feed Key is not known to us regardless of account ID
Expand All @@ -368,7 +380,7 @@ private Optional<HashedDataFeedKey> lookupAndValidateKey(final String unHashedKe
}
}

final String keyOwnerMetaKey = receiveDataConfig.getDataFeedKeyOwnerMetaKey();
final CIKey keyOwnerMetaKey = getOwnerMetaKey(receiveDataConfig);
final String ownerFromAttrMap = getAttribute(attributeMap, keyOwnerMetaKey)
.orElse(null);
final Predicate<CachedHashedDataFeedKey> filter = createKeyOwnerFilter(keyOwnerMetaKey, ownerFromAttrMap);
Expand Down Expand Up @@ -407,10 +419,10 @@ private Optional<HashedDataFeedKey> lookupAndValidateKey(final String unHashedKe
}
}

private static Predicate<CachedHashedDataFeedKey> createKeyOwnerFilter(final String keyOwnerMetaKey,
private static Predicate<CachedHashedDataFeedKey> createKeyOwnerFilter(final CIKey keyOwnerMetaKey,
final String ownerFromAttrMap) {
Predicate<CachedHashedDataFeedKey> filter;
if (NullSafe.isNonBlankString(keyOwnerMetaKey)) {
if (CIKey.isNonBlank(keyOwnerMetaKey)) {
filter = (CachedHashedDataFeedKey key) -> {
final String ownerFromKey = key.getStreamMetaValue(keyOwnerMetaKey);
final boolean result = Objects.equals(ownerFromKey, ownerFromAttrMap);
Expand All @@ -436,23 +448,21 @@ private static Predicate<CachedHashedDataFeedKey> createKeyOwnerFilter(final Str
/**
* Loading function for an un-hashed key and its keyOwner.
*/
private List<CachedHashedDataFeedKey> createHashedDataFeedKey(final UnHashedCacheKey unHashedCacheKey) {
private Set<CachedHashedDataFeedKey> createHashedDataFeedKey(final UnHashedCacheKey unHashedCacheKey) {
Objects.requireNonNull(unHashedCacheKey);
if (!DATA_FEED_KEY_PATTERN.matcher(unHashedCacheKey.unHashedKey).matches()) {
LOGGER.debug("key '{}' does not look like a not a datafeed key", unHashedCacheKey.unHashedKey);
return Collections.emptyList();
return Collections.emptySet();
}

final CIKey ciKey = CIKey.ofDynamicKey(unHashedCacheKey.keyOwner);
final List<CachedHashedDataFeedKey> matchingKeys = new ArrayList<>();
final Set<CachedHashedDataFeedKey> matchingKeys;
synchronized (this) {
final List<CachedHashedDataFeedKey> ownersKeys = keyOwnerToDataFeedKeyMap.get(ciKey);
NullSafe.forEach(ownersKeys, cachedHashedDataFeedKey -> {
final boolean isValid = verifyKey(unHashedCacheKey.unHashedKey, cachedHashedDataFeedKey);
if (isValid) {
matchingKeys.add(cachedHashedDataFeedKey);
}
});
// Likely to only be 1-2 items per owner
final Set<CachedHashedDataFeedKey> ownersKeys = keyOwnerToDataFeedKeyMap.get(unHashedCacheKey.keyOwner);
matchingKeys = NullSafe.stream(ownersKeys)
.filter(cachedHashedDataFeedKey ->
verifyKey(unHashedCacheKey.unHashedKey, cachedHashedDataFeedKey))
.collect(Collectors.toCollection(ConcurrentHashMap::newKeySet));
}
LOGGER.debug("unHashedCacheKey: {}, matchingKeys: {}", unHashedCacheKey, matchingKeys);
return matchingKeys;
Expand All @@ -475,32 +485,31 @@ private boolean verifyKey(final String unHashedKey, final CachedHashedDataFeedKe
return isValid;
}

// private List<CachedHashedDataFeedKey> createHashedDataFeedKey(final String unHashedKey) {
// final Optional<CacheKey> optCacheKey = getCacheKey(unHashedKey);
// final List<CachedHashedDataFeedKey> cachedHashedDataFeedKeys;
// if (optCacheKey.isEmpty()) {
// cachedHashedDataFeedKeys = Collections.emptyList();
// } else {
// // If this returns null then the key is not known to us
// final CacheKey cacheKey = optCacheKey.get();
// synchronized (this) {
// final List<CachedHashedDataFeedKey> dataFeedKeys = cacheKeyToDataFeedKeyMap.get(cacheKey);
// LOGGER.debug("Lookup of cacheKey {}, found {}", cacheKey, dataFeedKeys);
// cachedHashedDataFeedKeys = NullSafe.list(dataFeedKeys);
// }
// }
// LOGGER.debug("unHashedKey: {}, cachedHashedDataFeedKeys: {}", unHashedKey, cachedHashedDataFeedKeys);
// return cachedHashedDataFeedKeys;
// }

/**
* @return An optional containing a non-blank attribute value, else empty.
*/
private Optional<String> getAttribute(final AttributeMap attributeMap, final CIKey header) {
if (header == null) {
return Optional.empty();
}
return Optional.ofNullable(attributeMap.get(header.get()))
.filter(StringUtils::isNotBlank);
}

private Optional<String> getAttribute(final AttributeMap attributeMap, final String header) {
return Optional.ofNullable(attributeMap.get(header))
.filter(StringUtils::isNotBlank);
}

private Optional<String> getRequestHeader(final HttpServletRequest request, final CIKey header) {
if (header == null) {
return Optional.empty();
}
final String value = request.getHeader(header.get());
return Optional.ofNullable(value)
.filter(NullSafe::isNonBlankString);
}

private Optional<String> getRequestHeader(final HttpServletRequest request, final String header) {
final String value = request.getHeader(header);
return Optional.ofNullable(value)
Expand All @@ -514,13 +523,13 @@ public Optional<UserIdentity> authenticate(final HttpServletRequest request,
final ReceiveDataConfig receiveDataConfig = receiveDataConfigProvider.get();
final Optional<UserIdentity> optUserIdentity = getDataFeedKey(request, attributeMap, receiveDataConfig)
.map(dataFeedKey -> {
final String keyOwnerMetaKey = receiveDataConfig.getDataFeedKeyOwnerMetaKey();
final CIKey keyOwnerMetaKey = getOwnerMetaKey(receiveDataConfig);
final String keyOwner = dataFeedKey.getStreamMetaValue(keyOwnerMetaKey);
// Ensure the stream attributes from the data feed key are set in the attributeMap so
// that the AttributeMapFilters have access to them and any attributes that are static
// to this key are applied to all streams that use it, e.g. aws account number.
// Entries from the data feed key trump what is in the headers
attributeMap.putAll(dataFeedKey.getStreamMetaData());
attributeMap.putAll(dataFeedKey.getAttributeMap());
return new DataFeedKeyUserIdentity(keyOwner);
});
LOGGER.debug("Returning {}, attributeMap: {}", optUserIdentity, attributeMap);
Expand Down Expand Up @@ -590,6 +599,10 @@ public SystemInfoResult getSystemInfo() {
.build();
}

private CIKey getOwnerMetaKey(final ReceiveDataConfig receiveDataConfig) {
return CIKey.of(receiveDataConfig.getDataFeedKeyOwnerMetaKey());
}

// --------------------------------------------------------------------------------


Expand All @@ -602,7 +615,7 @@ public SystemInfoResult getSystemInfo() {
// --------------------------------------------------------------------------------


private record UnHashedCacheKey(String unHashedKey, String keyOwner) {
private record UnHashedCacheKey(String unHashedKey, CIKey keyOwner) {

}
}
Loading