|
69 | 69 | public class AtlasEntityChangeNotifier implements IAtlasEntityChangeNotifier { |
70 | 70 | private static final Logger LOG = LoggerFactory.getLogger(AtlasEntityChangeNotifier.class); |
71 | 71 |
|
72 | | - private static final Predicate<AtlasEntityHeader> PRED_IS_NOT_TYPE_AUDIT_ENTITY = obj -> !obj.getTypeName().equals(AtlasAuditService.ENTITY_TYPE_AUDIT_ENTRY); |
73 | | - |
74 | 72 | private final Set<EntityChangeListener> entityChangeListeners; |
75 | 73 | private final Set<EntityChangeListenerV2> entityChangeListenersV2; |
76 | 74 | private final AtlasInstanceConverter instanceConverter; |
@@ -401,7 +399,26 @@ private String getListenerName(EntityChangeListener listener) { |
401 | 399 | } |
402 | 400 |
|
403 | 401 | private boolean skipAuditEntries(List<AtlasEntityHeader> entityHeaders) { |
404 | | - return CollectionUtils.isEmpty(entityHeaders) || entityHeaders.stream().noneMatch(PRED_IS_NOT_TYPE_AUDIT_ENTITY); |
| 402 | + if (CollectionUtils.isEmpty(entityHeaders)) { |
| 403 | + return true; |
| 404 | + } |
| 405 | + |
| 406 | + // Skip audit if ALL entities are internal types (__AtlasAuditEntry, __AtlasMetricsStat, etc.) |
| 407 | + // Return true (skip) only if ALL entities are internal types |
| 408 | + // Return false (don't skip) if at least one entity is NOT an internal type |
| 409 | + boolean allInternal = entityHeaders.stream().allMatch(entityHeader -> { |
| 410 | + String typeName = entityHeader.getTypeName(); |
| 411 | + AtlasEntityType entityType = atlasTypeRegistry.getEntityTypeByName(typeName); |
| 412 | + |
| 413 | + if (entityType != null) { |
| 414 | + return entityType.isInternalType(); //return true if its -> internal type else return false |
| 415 | + } |
| 416 | + |
| 417 | + // Fallback: if type not found in registry, check if it starts with "__" (like GraphHelper.isInternalType) |
| 418 | + return GraphHelper.isInternalType(typeName); |
| 419 | + }); |
| 420 | + |
| 421 | + return allInternal; |
405 | 422 | } |
406 | 423 |
|
407 | 424 | private void notifyListeners(List<AtlasEntityHeader> entityHeaders, EntityOperation operation, boolean isImport) throws AtlasBaseException { |
|
0 commit comments