37
37
import java .util .Timer ;
38
38
import java .util .TimerTask ;
39
39
import java .util .concurrent .ConcurrentHashMap ;
40
- import java .util .concurrent .CopyOnWriteArrayList ;
41
40
import java .util .concurrent .atomic .AtomicInteger ;
42
41
import java .util .concurrent .atomic .LongAdder ;
43
42
import java .util .function .Function ;
@@ -67,13 +66,13 @@ public class DataFeedKeyServiceImpl implements DataFeedKeyService, Managed, HasS
67
66
// private final Map<CacheKey, List<CachedHashedDataFeedKey>> cacheKeyToDataFeedKeyMap = new ConcurrentHashMap<>();
68
67
// The owner will likely have >1 CachedHashedDataFeedKey due to the overlap of keys when
69
68
// new keys are being supplied, but not many. Use CIKey so we are not fussy on case.
70
- private final Map <CIKey , List <CachedHashedDataFeedKey >> keyOwnerToDataFeedKeyMap = new ConcurrentHashMap <>();
69
+ private final Map <CIKey , Set <CachedHashedDataFeedKey >> keyOwnerToDataFeedKeyMap = new ConcurrentHashMap <>();
71
70
72
- // Cache of the un-hashed key to validated DataFeedKey.
71
+ // Cache of the un-hashed key + owner to verified CachedHashedDataFeedKeys
73
72
// If the un-hashed key is in this map, then it means we have hashed it, checked it against
74
73
// cacheKeyToDataFeedKeyMap and found it to be valid (at the time of checking).
75
74
// This cache saves us the hashing cost on every data receipt, which can be expensive.
76
- private final LoadingStroomCache <UnHashedCacheKey , List <CachedHashedDataFeedKey >> unHashedKeyToDataFeedKeyCache ;
75
+ private final LoadingStroomCache <UnHashedCacheKey , Set <CachedHashedDataFeedKey >> unHashedKeyToDataFeedKeyCache ;
77
76
78
77
private final Provider <ReceiveDataConfig > receiveDataConfigProvider ;
79
78
private final Map <DataFeedKeyHashAlgorithm , DataFeedKeyHasher > hashFunctionMap ;
@@ -152,40 +151,43 @@ private Optional<HashedDataFeedKey> getDataFeedKey(final HttpServletRequest requ
152
151
@ Override
153
152
public synchronized int addDataFeedKeys (final HashedDataFeedKeys hashedDataFeedKeys ,
154
153
final Path sourceFile ) {
155
- int addedCount = 0 ;
154
+ final AtomicInteger addedCount = new AtomicInteger () ;
156
155
if (NullSafe .hasItems (hashedDataFeedKeys .getDataFeedKeys ())) {
157
156
LOGGER .debug (() -> LogUtil .message ("Adding {} dataFeedKeys" ,
158
157
hashedDataFeedKeys .getDataFeedKeys ().size ()));
159
158
160
159
final AtomicInteger invalidCount = new AtomicInteger ();
161
- final ReceiveDataConfig receiveDataConfig = receiveDataConfigProvider . get ();
162
- final String keyOwnerMetaKey = receiveDataConfig . getDataFeedKeyOwnerMetaKey ( );
160
+ final AtomicInteger dupCount = new AtomicInteger ();
161
+ final CIKey keyOwnerMetaKey = getOwnerMetaKey ( receiveDataConfigProvider . get () );
163
162
164
- addedCount = hashedDataFeedKeys .getDataFeedKeys ()
163
+ hashedDataFeedKeys .getDataFeedKeys ()
165
164
.stream ()
166
165
.filter (Objects ::nonNull )
167
166
.map (dataFeedKey ->
168
167
new CachedHashedDataFeedKey (dataFeedKey , sourceFile ))
169
168
.filter (dataFeedKey ->
170
169
isValidDataFeedKey (dataFeedKey , keyOwnerMetaKey , invalidCount ))
171
- .mapToInt (cachedHashedDataFeedKey -> {
172
- addDataFeedKey (cachedHashedDataFeedKey , keyOwnerMetaKey );
173
- return 1 ;
174
- })
175
- .sum ();
176
-
177
- LOGGER .debug ("Added: {}, ignored {} invalid data feed keys, file: {}" ,
178
- addedCount , invalidCount , sourceFile );
170
+ .forEach (cachedHashedDataFeedKey -> {
171
+ addDataFeedKey (cachedHashedDataFeedKey , keyOwnerMetaKey , addedCount , dupCount );
172
+ });
173
+
174
+ LOGGER .debug (() -> LogUtil .message (
175
+ "Added: {}, ignored {} data feed keys (invalid: {}, duplicate: {}), file: {}" ,
176
+ addedCount ,
177
+ invalidCount .get () + dupCount .get (),
178
+ invalidCount .get (),
179
+ dupCount .get (),
180
+ sourceFile ));
179
181
}
180
182
LOGGER .debug (() -> LogUtil .message ("Total cached keys: {}" , keyOwnerToDataFeedKeyMap .values ()
181
183
.stream ()
182
- .mapToInt (List ::size )
184
+ .mapToInt (Set ::size )
183
185
.sum ()));
184
- return addedCount ;
186
+ return addedCount . get () ;
185
187
}
186
188
187
189
private boolean isValidDataFeedKey (final CachedHashedDataFeedKey dataFeedKey ,
188
- final String ownerMetaKey ,
190
+ final CIKey ownerMetaKey ,
189
191
final AtomicInteger invalidCount ) {
190
192
191
193
if (dataFeedKey .isExpired ()) {
@@ -204,15 +206,22 @@ private boolean isValidDataFeedKey(final CachedHashedDataFeedKey dataFeedKey,
204
206
return true ;
205
207
}
206
208
207
- private void addDataFeedKey (final CachedHashedDataFeedKey cachedHashedDataFeedKey ,
208
- final String keyOwnerMetaKey ) {
209
+ private synchronized void addDataFeedKey (final CachedHashedDataFeedKey cachedHashedDataFeedKey ,
210
+ final CIKey keyOwnerMetaKey ,
211
+ final AtomicInteger addedCount ,
212
+ final AtomicInteger dupCount ) {
209
213
if (cachedHashedDataFeedKey != null ) {
210
214
final String keyOwner = cachedHashedDataFeedKey .getStreamMetaValue (keyOwnerMetaKey );
211
215
// Use CopyOnWriteArrayList as write are very infrequent
212
- keyOwnerToDataFeedKeyMap .computeIfAbsent (
216
+ final boolean success = keyOwnerToDataFeedKeyMap .computeIfAbsent (
213
217
CIKey .of (keyOwner ),
214
- k -> new CopyOnWriteArrayList <> ())
218
+ k -> ConcurrentHashMap . newKeySet ())
215
219
.add (cachedHashedDataFeedKey );
220
+ if (success ) {
221
+ addedCount .incrementAndGet ();
222
+ } else {
223
+ dupCount .incrementAndGet ();
224
+ }
216
225
}
217
226
}
218
227
@@ -295,7 +304,7 @@ public synchronized void removeKeysForFile(final Path sourceFile) {
295
304
}
296
305
LOGGER .debug (() -> LogUtil .message ("Total cached keys: {}" , keyOwnerToDataFeedKeyMap .values ()
297
306
.stream ()
298
- .mapToInt (List ::size )
307
+ .mapToInt (Set ::size )
299
308
.sum ()));
300
309
}
301
310
@@ -343,8 +352,11 @@ private Optional<HashedDataFeedKey> lookupAndValidateKey(final String unHashedKe
343
352
return Optional .empty ();
344
353
}
345
354
346
- final String keyOwnerKey = receiveDataConfig .getDataFeedKeyOwnerMetaKey ();
347
- final String keyOwner = NullSafe .get (attributeMap , map -> map .get (keyOwnerKey ), String ::trim );
355
+ final CIKey keyOwnerKey = getOwnerMetaKey (receiveDataConfig );
356
+ final String keyOwner = NullSafe .get (
357
+ attributeMap ,
358
+ map -> map .get (keyOwnerKey .get ()),
359
+ String ::trim );
348
360
if (NullSafe .isBlankString (keyOwner )) {
349
361
LOGGER .debug ("Blank keyOwner, attributeMap: {}" , attributeMap );
350
362
throw new StroomStreamException (
@@ -353,9 +365,9 @@ private Optional<HashedDataFeedKey> lookupAndValidateKey(final String unHashedKe
353
365
"Mandatory header '" + keyOwnerKey + "' must be provided to authenticate with a data feed key." );
354
366
}
355
367
356
- final UnHashedCacheKey unHashedCacheKey = new UnHashedCacheKey (unHashedKey , keyOwner );
368
+ final UnHashedCacheKey unHashedCacheKey = new UnHashedCacheKey (unHashedKey , CIKey . ofDynamicKey ( keyOwner ) );
357
369
358
- final List <CachedHashedDataFeedKey > dataFeedKeys = unHashedKeyToDataFeedKeyCache .get (unHashedCacheKey );
370
+ final Set <CachedHashedDataFeedKey > dataFeedKeys = unHashedKeyToDataFeedKeyCache .get (unHashedCacheKey );
359
371
if (NullSafe .isEmptyCollection (dataFeedKeys )) {
360
372
LOGGER .debug ("Unknown data feed key {}, attributeMap: {}" , unHashedKey , attributeMap );
361
373
// Data Feed Key is not known to us regardless of account ID
@@ -368,7 +380,7 @@ private Optional<HashedDataFeedKey> lookupAndValidateKey(final String unHashedKe
368
380
}
369
381
}
370
382
371
- final String keyOwnerMetaKey = receiveDataConfig . getDataFeedKeyOwnerMetaKey ( );
383
+ final CIKey keyOwnerMetaKey = getOwnerMetaKey ( receiveDataConfig );
372
384
final String ownerFromAttrMap = getAttribute (attributeMap , keyOwnerMetaKey )
373
385
.orElse (null );
374
386
final Predicate <CachedHashedDataFeedKey > filter = createKeyOwnerFilter (keyOwnerMetaKey , ownerFromAttrMap );
@@ -407,10 +419,10 @@ private Optional<HashedDataFeedKey> lookupAndValidateKey(final String unHashedKe
407
419
}
408
420
}
409
421
410
- private static Predicate <CachedHashedDataFeedKey > createKeyOwnerFilter (final String keyOwnerMetaKey ,
422
+ private static Predicate <CachedHashedDataFeedKey > createKeyOwnerFilter (final CIKey keyOwnerMetaKey ,
411
423
final String ownerFromAttrMap ) {
412
424
Predicate <CachedHashedDataFeedKey > filter ;
413
- if (NullSafe . isNonBlankString (keyOwnerMetaKey )) {
425
+ if (CIKey . isNonBlank (keyOwnerMetaKey )) {
414
426
filter = (CachedHashedDataFeedKey key ) -> {
415
427
final String ownerFromKey = key .getStreamMetaValue (keyOwnerMetaKey );
416
428
final boolean result = Objects .equals (ownerFromKey , ownerFromAttrMap );
@@ -436,23 +448,21 @@ private static Predicate<CachedHashedDataFeedKey> createKeyOwnerFilter(final Str
436
448
/**
437
449
* Loading function for an un-hashed key and its keyOwner.
438
450
*/
439
- private List <CachedHashedDataFeedKey > createHashedDataFeedKey (final UnHashedCacheKey unHashedCacheKey ) {
451
+ private Set <CachedHashedDataFeedKey > createHashedDataFeedKey (final UnHashedCacheKey unHashedCacheKey ) {
440
452
Objects .requireNonNull (unHashedCacheKey );
441
453
if (!DATA_FEED_KEY_PATTERN .matcher (unHashedCacheKey .unHashedKey ).matches ()) {
442
454
LOGGER .debug ("key '{}' does not look like a not a datafeed key" , unHashedCacheKey .unHashedKey );
443
- return Collections .emptyList ();
455
+ return Collections .emptySet ();
444
456
}
445
457
446
- final CIKey ciKey = CIKey .ofDynamicKey (unHashedCacheKey .keyOwner );
447
- final List <CachedHashedDataFeedKey > matchingKeys = new ArrayList <>();
458
+ final Set <CachedHashedDataFeedKey > matchingKeys ;
448
459
synchronized (this ) {
449
- final List <CachedHashedDataFeedKey > ownersKeys = keyOwnerToDataFeedKeyMap .get (ciKey );
450
- NullSafe .forEach (ownersKeys , cachedHashedDataFeedKey -> {
451
- final boolean isValid = verifyKey (unHashedCacheKey .unHashedKey , cachedHashedDataFeedKey );
452
- if (isValid ) {
453
- matchingKeys .add (cachedHashedDataFeedKey );
454
- }
455
- });
460
+ // Likely to only be 1-2 items per owner
461
+ final Set <CachedHashedDataFeedKey > ownersKeys = keyOwnerToDataFeedKeyMap .get (unHashedCacheKey .keyOwner );
462
+ matchingKeys = NullSafe .stream (ownersKeys )
463
+ .filter (cachedHashedDataFeedKey ->
464
+ verifyKey (unHashedCacheKey .unHashedKey , cachedHashedDataFeedKey ))
465
+ .collect (Collectors .toCollection (ConcurrentHashMap ::newKeySet ));
456
466
}
457
467
LOGGER .debug ("unHashedCacheKey: {}, matchingKeys: {}" , unHashedCacheKey , matchingKeys );
458
468
return matchingKeys ;
@@ -475,32 +485,31 @@ private boolean verifyKey(final String unHashedKey, final CachedHashedDataFeedKe
475
485
return isValid ;
476
486
}
477
487
478
- // private List<CachedHashedDataFeedKey> createHashedDataFeedKey(final String unHashedKey) {
479
- // final Optional<CacheKey> optCacheKey = getCacheKey(unHashedKey);
480
- // final List<CachedHashedDataFeedKey> cachedHashedDataFeedKeys;
481
- // if (optCacheKey.isEmpty()) {
482
- // cachedHashedDataFeedKeys = Collections.emptyList();
483
- // } else {
484
- // // If this returns null then the key is not known to us
485
- // final CacheKey cacheKey = optCacheKey.get();
486
- // synchronized (this) {
487
- // final List<CachedHashedDataFeedKey> dataFeedKeys = cacheKeyToDataFeedKeyMap.get(cacheKey);
488
- // LOGGER.debug("Lookup of cacheKey {}, found {}", cacheKey, dataFeedKeys);
489
- // cachedHashedDataFeedKeys = NullSafe.list(dataFeedKeys);
490
- // }
491
- // }
492
- // LOGGER.debug("unHashedKey: {}, cachedHashedDataFeedKeys: {}", unHashedKey, cachedHashedDataFeedKeys);
493
- // return cachedHashedDataFeedKeys;
494
- // }
495
-
496
488
/**
497
489
* @return An optional containing a non-blank attribute value, else empty.
498
490
*/
491
+ private Optional <String > getAttribute (final AttributeMap attributeMap , final CIKey header ) {
492
+ if (header == null ) {
493
+ return Optional .empty ();
494
+ }
495
+ return Optional .ofNullable (attributeMap .get (header .get ()))
496
+ .filter (StringUtils ::isNotBlank );
497
+ }
498
+
499
499
private Optional <String > getAttribute (final AttributeMap attributeMap , final String header ) {
500
500
return Optional .ofNullable (attributeMap .get (header ))
501
501
.filter (StringUtils ::isNotBlank );
502
502
}
503
503
504
+ private Optional <String > getRequestHeader (final HttpServletRequest request , final CIKey header ) {
505
+ if (header == null ) {
506
+ return Optional .empty ();
507
+ }
508
+ final String value = request .getHeader (header .get ());
509
+ return Optional .ofNullable (value )
510
+ .filter (NullSafe ::isNonBlankString );
511
+ }
512
+
504
513
private Optional <String > getRequestHeader (final HttpServletRequest request , final String header ) {
505
514
final String value = request .getHeader (header );
506
515
return Optional .ofNullable (value )
@@ -514,13 +523,13 @@ public Optional<UserIdentity> authenticate(final HttpServletRequest request,
514
523
final ReceiveDataConfig receiveDataConfig = receiveDataConfigProvider .get ();
515
524
final Optional <UserIdentity > optUserIdentity = getDataFeedKey (request , attributeMap , receiveDataConfig )
516
525
.map (dataFeedKey -> {
517
- final String keyOwnerMetaKey = receiveDataConfig . getDataFeedKeyOwnerMetaKey ( );
526
+ final CIKey keyOwnerMetaKey = getOwnerMetaKey ( receiveDataConfig );
518
527
final String keyOwner = dataFeedKey .getStreamMetaValue (keyOwnerMetaKey );
519
528
// Ensure the stream attributes from the data feed key are set in the attributeMap so
520
529
// that the AttributeMapFilters have access to them and any attributes that are static
521
530
// to this key are applied to all streams that use it, e.g. aws account number.
522
531
// Entries from the data feed key trump what is in the headers
523
- attributeMap .putAll (dataFeedKey .getStreamMetaData ());
532
+ attributeMap .putAll (dataFeedKey .getAttributeMap ());
524
533
return new DataFeedKeyUserIdentity (keyOwner );
525
534
});
526
535
LOGGER .debug ("Returning {}, attributeMap: {}" , optUserIdentity , attributeMap );
@@ -590,6 +599,10 @@ public SystemInfoResult getSystemInfo() {
590
599
.build ();
591
600
}
592
601
602
+ private CIKey getOwnerMetaKey (final ReceiveDataConfig receiveDataConfig ) {
603
+ return CIKey .of (receiveDataConfig .getDataFeedKeyOwnerMetaKey ());
604
+ }
605
+
593
606
// --------------------------------------------------------------------------------
594
607
595
608
@@ -602,7 +615,7 @@ public SystemInfoResult getSystemInfo() {
602
615
// --------------------------------------------------------------------------------
603
616
604
617
605
- private record UnHashedCacheKey (String unHashedKey , String keyOwner ) {
618
+ private record UnHashedCacheKey (String unHashedKey , CIKey keyOwner ) {
606
619
607
620
}
608
621
}
0 commit comments