Skip to content

Commit 3d79bf6

Browse files
committed
Add de-dup of datafeed keys
1 parent 27a2f21 commit 3d79bf6

File tree

8 files changed

+446
-130
lines changed

8 files changed

+446
-130
lines changed

stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/CachedHashedDataFeedKey.java

+15-18
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
package stroom.receive.common;
22

3-
import com.fasterxml.jackson.annotation.JsonIgnore;
3+
import stroom.meta.api.AttributeMap;
4+
import stroom.util.shared.string.CIKey;
5+
46
import jakarta.validation.constraints.Min;
5-
import jakarta.validation.constraints.NotBlank;
67

78
import java.nio.file.Path;
89
import java.time.Instant;
@@ -14,7 +15,6 @@ public class CachedHashedDataFeedKey {
1415
private final HashedDataFeedKey hashedDataFeedKey;
1516
private final Path sourceFile;
1617

17-
@JsonIgnore
1818
private final int hashCode;
1919

2020
public CachedHashedDataFeedKey(final HashedDataFeedKey hashedDataFeedKey,
@@ -39,7 +39,6 @@ public Path getSourceFile() {
3939
* @return The hash of the data feed key. The hash algorithm used is defined by
4040
* {@link CachedHashedDataFeedKey#getHashAlgorithm()}
4141
*/
42-
@NotBlank
4342
public String getHash() {
4443
return hashedDataFeedKey.getHash();
4544
}
@@ -51,30 +50,30 @@ public String getSalt() {
5150
return hashedDataFeedKey.getSalt();
5251
}
5352

54-
@NotBlank
5553
public DataFeedKeyHashAlgorithm getHashAlgorithm() {
5654
return hashedDataFeedKey.getHashAlgorithm();
5755
}
5856

5957
/**
6058
* @return The value of a specified meta key.
6159
*/
62-
@NotBlank
6360
public String getStreamMetaValue(final String metaKey) {
6461
return hashedDataFeedKey.getStreamMetaValue(metaKey);
6562
}
6663

67-
// @NotBlank
68-
// public String getSubjectId() {
69-
// return hashedDataFeedKey.getSubjectId();
70-
// }
71-
//
72-
// public String getDisplayName() {
73-
// return hashedDataFeedKey.getDisplayName();
74-
// }
64+
/**
65+
* @return The value of a specified meta key.
66+
*/
67+
public String getStreamMetaValue(final CIKey metaKey) {
68+
return hashedDataFeedKey.getStreamMetaValue(metaKey);
69+
}
70+
71+
public Map<CIKey, String> getStreamMetaData() {
72+
return hashedDataFeedKey.getCIStreamMetaData();
73+
}
7574

76-
public Map<String, String> getStreamMetaData() {
77-
return hashedDataFeedKey.getStreamMetaData();
75+
public AttributeMap getAttributeMap() {
76+
return hashedDataFeedKey.getAttributeMap();
7877
}
7978

8079
@Min(0)
@@ -85,15 +84,13 @@ public long getExpiryDateEpochMs() {
8584
/**
8685
* @return The expiry date of this data feed key
8786
*/
88-
@JsonIgnore
8987
public Instant getExpiryDate() {
9088
return hashedDataFeedKey.getExpiryDate();
9189
}
9290

9391
/**
9492
* @return True if this data feed key has expired
9593
*/
96-
@JsonIgnore
9794
public boolean isExpired() {
9895
return hashedDataFeedKey.isExpired();
9996
}

stroom-receive/stroom-receive-common/src/main/java/stroom/receive/common/DataFeedKeyServiceImpl.java

+76-63
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@
3737
import java.util.Timer;
3838
import java.util.TimerTask;
3939
import java.util.concurrent.ConcurrentHashMap;
40-
import java.util.concurrent.CopyOnWriteArrayList;
4140
import java.util.concurrent.atomic.AtomicInteger;
4241
import java.util.concurrent.atomic.LongAdder;
4342
import java.util.function.Function;
@@ -67,13 +66,13 @@ public class DataFeedKeyServiceImpl implements DataFeedKeyService, Managed, HasS
6766
// private final Map<CacheKey, List<CachedHashedDataFeedKey>> cacheKeyToDataFeedKeyMap = new ConcurrentHashMap<>();
6867
// The owner will likely have >1 CachedHashedDataFeedKey due to the overlap of keys when
6968
// new keys are being supplied, but not many. Use CIKey so we are not fussy on case.
70-
private final Map<CIKey, List<CachedHashedDataFeedKey>> keyOwnerToDataFeedKeyMap = new ConcurrentHashMap<>();
69+
private final Map<CIKey, Set<CachedHashedDataFeedKey>> keyOwnerToDataFeedKeyMap = new ConcurrentHashMap<>();
7170

72-
// Cache of the un-hashed key to validated DataFeedKey.
71+
// Cache of the un-hashed key + owner to verified CachedHashedDataFeedKeys
7372
// If the un-hashed key is in this map, then it means we have hashed it, checked it against
7473
// cacheKeyToDataFeedKeyMap and found it to be valid (at the time of checking).
7574
// This cache saves us the hashing cost on every data receipt, which can be expensive.
76-
private final LoadingStroomCache<UnHashedCacheKey, List<CachedHashedDataFeedKey>> unHashedKeyToDataFeedKeyCache;
75+
private final LoadingStroomCache<UnHashedCacheKey, Set<CachedHashedDataFeedKey>> unHashedKeyToDataFeedKeyCache;
7776

7877
private final Provider<ReceiveDataConfig> receiveDataConfigProvider;
7978
private final Map<DataFeedKeyHashAlgorithm, DataFeedKeyHasher> hashFunctionMap;
@@ -152,40 +151,43 @@ private Optional<HashedDataFeedKey> getDataFeedKey(final HttpServletRequest requ
152151
@Override
153152
public synchronized int addDataFeedKeys(final HashedDataFeedKeys hashedDataFeedKeys,
154153
final Path sourceFile) {
155-
int addedCount = 0;
154+
final AtomicInteger addedCount = new AtomicInteger();
156155
if (NullSafe.hasItems(hashedDataFeedKeys.getDataFeedKeys())) {
157156
LOGGER.debug(() -> LogUtil.message("Adding {} dataFeedKeys",
158157
hashedDataFeedKeys.getDataFeedKeys().size()));
159158

160159
final AtomicInteger invalidCount = new AtomicInteger();
161-
final ReceiveDataConfig receiveDataConfig = receiveDataConfigProvider.get();
162-
final String keyOwnerMetaKey = receiveDataConfig.getDataFeedKeyOwnerMetaKey();
160+
final AtomicInteger dupCount = new AtomicInteger();
161+
final CIKey keyOwnerMetaKey = getOwnerMetaKey(receiveDataConfigProvider.get());
163162

164-
addedCount = hashedDataFeedKeys.getDataFeedKeys()
163+
hashedDataFeedKeys.getDataFeedKeys()
165164
.stream()
166165
.filter(Objects::nonNull)
167166
.map(dataFeedKey ->
168167
new CachedHashedDataFeedKey(dataFeedKey, sourceFile))
169168
.filter(dataFeedKey ->
170169
isValidDataFeedKey(dataFeedKey, keyOwnerMetaKey, invalidCount))
171-
.mapToInt(cachedHashedDataFeedKey -> {
172-
addDataFeedKey(cachedHashedDataFeedKey, keyOwnerMetaKey);
173-
return 1;
174-
})
175-
.sum();
176-
177-
LOGGER.debug("Added: {}, ignored {} invalid data feed keys, file: {}",
178-
addedCount, invalidCount, sourceFile);
170+
.forEach(cachedHashedDataFeedKey -> {
171+
addDataFeedKey(cachedHashedDataFeedKey, keyOwnerMetaKey, addedCount, dupCount);
172+
});
173+
174+
LOGGER.debug(() -> LogUtil.message(
175+
"Added: {}, ignored {} data feed keys (invalid: {}, duplicate: {}), file: {}",
176+
addedCount,
177+
invalidCount.get() + dupCount.get(),
178+
invalidCount.get(),
179+
dupCount.get(),
180+
sourceFile));
179181
}
180182
LOGGER.debug(() -> LogUtil.message("Total cached keys: {}", keyOwnerToDataFeedKeyMap.values()
181183
.stream()
182-
.mapToInt(List::size)
184+
.mapToInt(Set::size)
183185
.sum()));
184-
return addedCount;
186+
return addedCount.get();
185187
}
186188

187189
private boolean isValidDataFeedKey(final CachedHashedDataFeedKey dataFeedKey,
188-
final String ownerMetaKey,
190+
final CIKey ownerMetaKey,
189191
final AtomicInteger invalidCount) {
190192

191193
if (dataFeedKey.isExpired()) {
@@ -204,15 +206,22 @@ private boolean isValidDataFeedKey(final CachedHashedDataFeedKey dataFeedKey,
204206
return true;
205207
}
206208

207-
private void addDataFeedKey(final CachedHashedDataFeedKey cachedHashedDataFeedKey,
208-
final String keyOwnerMetaKey) {
209+
private synchronized void addDataFeedKey(final CachedHashedDataFeedKey cachedHashedDataFeedKey,
210+
final CIKey keyOwnerMetaKey,
211+
final AtomicInteger addedCount,
212+
final AtomicInteger dupCount) {
209213
if (cachedHashedDataFeedKey != null) {
210214
final String keyOwner = cachedHashedDataFeedKey.getStreamMetaValue(keyOwnerMetaKey);
211215
// Use CopyOnWriteArrayList as write are very infrequent
212-
keyOwnerToDataFeedKeyMap.computeIfAbsent(
216+
final boolean success = keyOwnerToDataFeedKeyMap.computeIfAbsent(
213217
CIKey.of(keyOwner),
214-
k -> new CopyOnWriteArrayList<>())
218+
k -> ConcurrentHashMap.newKeySet())
215219
.add(cachedHashedDataFeedKey);
220+
if (success) {
221+
addedCount.incrementAndGet();
222+
} else {
223+
dupCount.incrementAndGet();
224+
}
216225
}
217226
}
218227

@@ -295,7 +304,7 @@ public synchronized void removeKeysForFile(final Path sourceFile) {
295304
}
296305
LOGGER.debug(() -> LogUtil.message("Total cached keys: {}", keyOwnerToDataFeedKeyMap.values()
297306
.stream()
298-
.mapToInt(List::size)
307+
.mapToInt(Set::size)
299308
.sum()));
300309
}
301310

@@ -343,8 +352,11 @@ private Optional<HashedDataFeedKey> lookupAndValidateKey(final String unHashedKe
343352
return Optional.empty();
344353
}
345354

346-
final String keyOwnerKey = receiveDataConfig.getDataFeedKeyOwnerMetaKey();
347-
final String keyOwner = NullSafe.get(attributeMap, map -> map.get(keyOwnerKey), String::trim);
355+
final CIKey keyOwnerKey = getOwnerMetaKey(receiveDataConfig);
356+
final String keyOwner = NullSafe.get(
357+
attributeMap,
358+
map -> map.get(keyOwnerKey.get()),
359+
String::trim);
348360
if (NullSafe.isBlankString(keyOwner)) {
349361
LOGGER.debug("Blank keyOwner, attributeMap: {}", attributeMap);
350362
throw new StroomStreamException(
@@ -353,9 +365,9 @@ private Optional<HashedDataFeedKey> lookupAndValidateKey(final String unHashedKe
353365
"Mandatory header '" + keyOwnerKey + "' must be provided to authenticate with a data feed key.");
354366
}
355367

356-
final UnHashedCacheKey unHashedCacheKey = new UnHashedCacheKey(unHashedKey, keyOwner);
368+
final UnHashedCacheKey unHashedCacheKey = new UnHashedCacheKey(unHashedKey, CIKey.ofDynamicKey(keyOwner));
357369

358-
final List<CachedHashedDataFeedKey> dataFeedKeys = unHashedKeyToDataFeedKeyCache.get(unHashedCacheKey);
370+
final Set<CachedHashedDataFeedKey> dataFeedKeys = unHashedKeyToDataFeedKeyCache.get(unHashedCacheKey);
359371
if (NullSafe.isEmptyCollection(dataFeedKeys)) {
360372
LOGGER.debug("Unknown data feed key {}, attributeMap: {}", unHashedKey, attributeMap);
361373
// Data Feed Key is not known to us regardless of account ID
@@ -368,7 +380,7 @@ private Optional<HashedDataFeedKey> lookupAndValidateKey(final String unHashedKe
368380
}
369381
}
370382

371-
final String keyOwnerMetaKey = receiveDataConfig.getDataFeedKeyOwnerMetaKey();
383+
final CIKey keyOwnerMetaKey = getOwnerMetaKey(receiveDataConfig);
372384
final String ownerFromAttrMap = getAttribute(attributeMap, keyOwnerMetaKey)
373385
.orElse(null);
374386
final Predicate<CachedHashedDataFeedKey> filter = createKeyOwnerFilter(keyOwnerMetaKey, ownerFromAttrMap);
@@ -407,10 +419,10 @@ private Optional<HashedDataFeedKey> lookupAndValidateKey(final String unHashedKe
407419
}
408420
}
409421

410-
private static Predicate<CachedHashedDataFeedKey> createKeyOwnerFilter(final String keyOwnerMetaKey,
422+
private static Predicate<CachedHashedDataFeedKey> createKeyOwnerFilter(final CIKey keyOwnerMetaKey,
411423
final String ownerFromAttrMap) {
412424
Predicate<CachedHashedDataFeedKey> filter;
413-
if (NullSafe.isNonBlankString(keyOwnerMetaKey)) {
425+
if (CIKey.isNonBlank(keyOwnerMetaKey)) {
414426
filter = (CachedHashedDataFeedKey key) -> {
415427
final String ownerFromKey = key.getStreamMetaValue(keyOwnerMetaKey);
416428
final boolean result = Objects.equals(ownerFromKey, ownerFromAttrMap);
@@ -436,23 +448,21 @@ private static Predicate<CachedHashedDataFeedKey> createKeyOwnerFilter(final Str
436448
/**
437449
* Loading function for an un-hashed key and its keyOwner.
438450
*/
439-
private List<CachedHashedDataFeedKey> createHashedDataFeedKey(final UnHashedCacheKey unHashedCacheKey) {
451+
private Set<CachedHashedDataFeedKey> createHashedDataFeedKey(final UnHashedCacheKey unHashedCacheKey) {
440452
Objects.requireNonNull(unHashedCacheKey);
441453
if (!DATA_FEED_KEY_PATTERN.matcher(unHashedCacheKey.unHashedKey).matches()) {
442454
LOGGER.debug("key '{}' does not look like a not a datafeed key", unHashedCacheKey.unHashedKey);
443-
return Collections.emptyList();
455+
return Collections.emptySet();
444456
}
445457

446-
final CIKey ciKey = CIKey.ofDynamicKey(unHashedCacheKey.keyOwner);
447-
final List<CachedHashedDataFeedKey> matchingKeys = new ArrayList<>();
458+
final Set<CachedHashedDataFeedKey> matchingKeys;
448459
synchronized (this) {
449-
final List<CachedHashedDataFeedKey> ownersKeys = keyOwnerToDataFeedKeyMap.get(ciKey);
450-
NullSafe.forEach(ownersKeys, cachedHashedDataFeedKey -> {
451-
final boolean isValid = verifyKey(unHashedCacheKey.unHashedKey, cachedHashedDataFeedKey);
452-
if (isValid) {
453-
matchingKeys.add(cachedHashedDataFeedKey);
454-
}
455-
});
460+
// Likely to only be 1-2 items per owner
461+
final Set<CachedHashedDataFeedKey> ownersKeys = keyOwnerToDataFeedKeyMap.get(unHashedCacheKey.keyOwner);
462+
matchingKeys = NullSafe.stream(ownersKeys)
463+
.filter(cachedHashedDataFeedKey ->
464+
verifyKey(unHashedCacheKey.unHashedKey, cachedHashedDataFeedKey))
465+
.collect(Collectors.toCollection(ConcurrentHashMap::newKeySet));
456466
}
457467
LOGGER.debug("unHashedCacheKey: {}, matchingKeys: {}", unHashedCacheKey, matchingKeys);
458468
return matchingKeys;
@@ -475,32 +485,31 @@ private boolean verifyKey(final String unHashedKey, final CachedHashedDataFeedKe
475485
return isValid;
476486
}
477487

478-
// private List<CachedHashedDataFeedKey> createHashedDataFeedKey(final String unHashedKey) {
479-
// final Optional<CacheKey> optCacheKey = getCacheKey(unHashedKey);
480-
// final List<CachedHashedDataFeedKey> cachedHashedDataFeedKeys;
481-
// if (optCacheKey.isEmpty()) {
482-
// cachedHashedDataFeedKeys = Collections.emptyList();
483-
// } else {
484-
// // If this returns null then the key is not known to us
485-
// final CacheKey cacheKey = optCacheKey.get();
486-
// synchronized (this) {
487-
// final List<CachedHashedDataFeedKey> dataFeedKeys = cacheKeyToDataFeedKeyMap.get(cacheKey);
488-
// LOGGER.debug("Lookup of cacheKey {}, found {}", cacheKey, dataFeedKeys);
489-
// cachedHashedDataFeedKeys = NullSafe.list(dataFeedKeys);
490-
// }
491-
// }
492-
// LOGGER.debug("unHashedKey: {}, cachedHashedDataFeedKeys: {}", unHashedKey, cachedHashedDataFeedKeys);
493-
// return cachedHashedDataFeedKeys;
494-
// }
495-
496488
/**
497489
* @return An optional containing a non-blank attribute value, else empty.
498490
*/
491+
private Optional<String> getAttribute(final AttributeMap attributeMap, final CIKey header) {
492+
if (header == null) {
493+
return Optional.empty();
494+
}
495+
return Optional.ofNullable(attributeMap.get(header.get()))
496+
.filter(StringUtils::isNotBlank);
497+
}
498+
499499
private Optional<String> getAttribute(final AttributeMap attributeMap, final String header) {
500500
return Optional.ofNullable(attributeMap.get(header))
501501
.filter(StringUtils::isNotBlank);
502502
}
503503

504+
private Optional<String> getRequestHeader(final HttpServletRequest request, final CIKey header) {
505+
if (header == null) {
506+
return Optional.empty();
507+
}
508+
final String value = request.getHeader(header.get());
509+
return Optional.ofNullable(value)
510+
.filter(NullSafe::isNonBlankString);
511+
}
512+
504513
private Optional<String> getRequestHeader(final HttpServletRequest request, final String header) {
505514
final String value = request.getHeader(header);
506515
return Optional.ofNullable(value)
@@ -514,13 +523,13 @@ public Optional<UserIdentity> authenticate(final HttpServletRequest request,
514523
final ReceiveDataConfig receiveDataConfig = receiveDataConfigProvider.get();
515524
final Optional<UserIdentity> optUserIdentity = getDataFeedKey(request, attributeMap, receiveDataConfig)
516525
.map(dataFeedKey -> {
517-
final String keyOwnerMetaKey = receiveDataConfig.getDataFeedKeyOwnerMetaKey();
526+
final CIKey keyOwnerMetaKey = getOwnerMetaKey(receiveDataConfig);
518527
final String keyOwner = dataFeedKey.getStreamMetaValue(keyOwnerMetaKey);
519528
// Ensure the stream attributes from the data feed key are set in the attributeMap so
520529
// that the AttributeMapFilters have access to them and any attributes that are static
521530
// to this key are applied to all streams that use it, e.g. aws account number.
522531
// Entries from the data feed key trump what is in the headers
523-
attributeMap.putAll(dataFeedKey.getStreamMetaData());
532+
attributeMap.putAll(dataFeedKey.getAttributeMap());
524533
return new DataFeedKeyUserIdentity(keyOwner);
525534
});
526535
LOGGER.debug("Returning {}, attributeMap: {}", optUserIdentity, attributeMap);
@@ -590,6 +599,10 @@ public SystemInfoResult getSystemInfo() {
590599
.build();
591600
}
592601

602+
private CIKey getOwnerMetaKey(final ReceiveDataConfig receiveDataConfig) {
603+
return CIKey.of(receiveDataConfig.getDataFeedKeyOwnerMetaKey());
604+
}
605+
593606
// --------------------------------------------------------------------------------
594607

595608

@@ -602,7 +615,7 @@ public SystemInfoResult getSystemInfo() {
602615
// --------------------------------------------------------------------------------
603616

604617

605-
private record UnHashedCacheKey(String unHashedKey, String keyOwner) {
618+
private record UnHashedCacheKey(String unHashedKey, CIKey keyOwner) {
606619

607620
}
608621
}

0 commit comments

Comments
 (0)