Skip to content

Commit e169efb

Browse files
tiansiviveclaudejaredburgettelastic
authored andcommitted
[Entity Analytics][Watchlists] Fix source label override when adding entity via manual assign or CSV (elastic#264942)
## Summary When an entity was added via the manual API or CSV upload after it had already been sync'ed via index/integration, the used to query the watchlist index to check if a doc already existed for that entity. If that lookup missed, the write would replace the existing doc entirely instead of merging into it, which would drop all previously written source labels. The fix removes the lookup and always uses a scripted upsert, which merges source labels whether or not the doc already exists. ## How to test 1. Seed your ES instance with store data using the script and steps from [here](elastic#263058 (comment)). 2. Configure a watchlist with an integration or index source and trigger a sync so entities are populated in the watchlist index. Verify a doc exists: ``` GET .entity_analytics.watchlists.default/_search { "query": { "term": { "watchlist.id": "<your-watchlist-id>" } } } ``` Note down any `entity.id` you find in the response 3. Manually assign that same entity to the same watchlist: ``` POST kbn:/api/entity_analytics/watchlists/<watchlist-id>/entities/assign { "euids": ["<euid-from-step-2>"] } ``` 4. Re-fetch the doc and verify `labels.sources` contains **both** the original sync source label and `"manual"` — not just `"manual"`. 5. Repeat step 3 using CSV upload and verify the same merge behaviour. Co-authored-by: Claude Sonnet 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Jared Burgett <147995946+jaredburgettelastic@users.noreply.github.com>
1 parent 9589f4a commit e169efb

9 files changed

Lines changed: 49 additions & 147 deletions

File tree

x-pack/solutions/security/plugins/security_solution/server/lib/entity_analytics/watchlists/entities/utils.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ export const ENTITY_ANALYTICS_WATCHLISTS_PREFIX = '.entity_analytics.watchlists'
1313
export const getIndexForWatchlist = (namespace: string) =>
1414
`${ENTITY_ANALYTICS_WATCHLISTS_PREFIX}.${namespace}`;
1515

16-
/** Builds a composite document _id for the watchlist entity index. */
16+
// Design debt: this creates a per-(watchlist, entity) key instead of a per-entity key.
17+
// The intended design is one doc per entity across all watchlists; fixing this requires a migration.
1718
export const buildWatchlistDocId = (watchlistId: string, euid: string) => `${watchlistId}:${euid}`;
1819

1920
/** Extracts the euid from a composite watchlist doc _id ({watchlistId}:{euid}). */

x-pack/solutions/security/plugins/security_solution/server/lib/entity_analytics/watchlists/entity_sources/bulk/upsert.test.ts

Lines changed: 27 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -20,93 +20,69 @@ describe('bulkUpsertOperationsFactory', () => {
2020
jest.clearAllMocks();
2121
});
2222

23-
it('generates index operations for new entities (no existingEntityId)', () => {
23+
it('generates scripted upsert operations for new entities', () => {
2424
const entities: WatchlistBulkEntity[] = [
2525
{ euid: 'user:alice', type: 'user', name: 'alice', sourceId: 'source-1' },
2626
{ euid: 'user:bob', type: 'user', name: 'bob', sourceId: 'source-1' },
2727
];
2828

2929
const ops = buildOps({ entities, sourceLabel: 'index', targetIndex });
3030

31-
// Two entities → 4 elements (action + doc pairs)
31+
// Two entities → 4 elements (action + body pairs)
3232
expect(ops).toHaveLength(4);
3333

34-
// First entity — index action
35-
expect(ops[0]).toEqual({ index: { _index: targetIndex, _id: 'watchlist-1:user:alice' } });
36-
// First entity — doc body
34+
// First entity — update action with deterministic _id
35+
expect(ops[0]).toEqual({ update: { _index: targetIndex, _id: 'watchlist-1:user:alice' } });
36+
// First entity — scripted upsert body
3737
expect(ops[1]).toEqual(
3838
expect.objectContaining({
39-
entity: { id: 'user:alice', name: 'alice', type: 'user' },
40-
labels: { sources: ['index'], source_ids: ['source-1'] },
41-
watchlist,
39+
script: expect.objectContaining({
40+
source: UPDATE_SCRIPT_SOURCE,
41+
params: expect.objectContaining({ source_id: 'source-1', source_type: 'index' }),
42+
}),
43+
upsert: expect.objectContaining({
44+
entity: { id: 'user:alice', name: 'alice', type: 'user' },
45+
labels: { sources: ['index'], source_ids: ['source-1'] },
46+
watchlist,
47+
}),
4248
})
4349
);
4450

45-
// Second entity — index action
46-
expect(ops[2]).toEqual({ index: { _index: targetIndex, _id: 'watchlist-1:user:bob' } });
51+
// Second entity
52+
expect(ops[2]).toEqual({ update: { _index: targetIndex, _id: 'watchlist-1:user:bob' } });
4753
expect(ops[3]).toEqual(
4854
expect.objectContaining({
49-
entity: { id: 'user:bob', name: 'bob', type: 'user' },
50-
labels: { sources: ['index'], source_ids: ['source-1'] },
51-
watchlist,
55+
script: expect.objectContaining({ source: UPDATE_SCRIPT_SOURCE }),
56+
upsert: expect.objectContaining({
57+
entity: { id: 'user:bob', name: 'bob', type: 'user' },
58+
}),
5259
})
5360
);
5461
});
5562

56-
it('generates update operations for existing entities (with existingEntityId)', () => {
63+
it('generates scripted upsert operations for existing entities, merging source labels', () => {
5764
const entities: WatchlistBulkEntity[] = [
58-
{
59-
euid: 'user:alice',
60-
type: 'user',
61-
name: 'alice',
62-
existingEntityId: 'doc-123',
63-
sourceId: 'source-2',
64-
},
65+
{ euid: 'user:alice', type: 'user', name: 'alice', sourceId: 'source-2' },
6566
];
6667

6768
const ops = buildOps({ entities, sourceLabel: 'index', targetIndex });
6869

6970
expect(ops).toHaveLength(2);
7071

71-
// Update action
72-
expect(ops[0]).toEqual({ update: { _index: targetIndex, _id: 'doc-123' } });
73-
// Script body
72+
expect(ops[0]).toEqual({ update: { _index: targetIndex, _id: 'watchlist-1:user:alice' } });
7473
expect(ops[1]).toEqual(
7574
expect.objectContaining({
7675
script: expect.objectContaining({
7776
source: UPDATE_SCRIPT_SOURCE,
78-
params: expect.objectContaining({
79-
source_id: 'source-2',
80-
source_type: 'index',
81-
}),
77+
params: expect.objectContaining({ source_id: 'source-2', source_type: 'index' }),
78+
}),
79+
upsert: expect.objectContaining({
80+
labels: expect.objectContaining({ source_ids: ['source-2'] }),
8281
}),
8382
})
8483
);
8584
});
8685

87-
it('generates mixed operations for a mix of new and existing entities', () => {
88-
const entities: WatchlistBulkEntity[] = [
89-
{ euid: 'user:new-user', type: 'user', name: 'new-user', sourceId: 'src-1' },
90-
{
91-
euid: 'user:existing-user',
92-
type: 'user',
93-
name: 'existing-user',
94-
existingEntityId: 'doc-456',
95-
sourceId: 'src-1',
96-
},
97-
];
98-
99-
const ops = buildOps({ entities, sourceLabel: 'index', targetIndex });
100-
101-
expect(ops).toHaveLength(4);
102-
103-
// First is an index op
104-
expect(ops[0]).toEqual({ index: { _index: targetIndex, _id: 'watchlist-1:user:new-user' } });
105-
106-
// Second pair is an update op
107-
expect(ops[2]).toEqual({ update: { _index: targetIndex, _id: 'doc-456' } });
108-
});
109-
11086
it('returns an empty array when no entities are provided', () => {
11187
const ops = buildOps({ entities: [], sourceLabel: 'index', targetIndex });
11288
expect(ops).toHaveLength(0);

x-pack/solutions/security/plugins/security_solution/server/lib/entity_analytics/watchlists/entity_sources/bulk/upsert.ts

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -69,26 +69,20 @@ export const bulkUpsertOperationsFactory =
6969
const ops: object[] = [];
7070
logger.debug(`[WatchlistSync] Building bulk operations for ${entities.length} entities`);
7171
for (const entity of entities) {
72-
if (entity.existingEntityId) {
73-
ops.push(
74-
{ update: { _index: targetIndex, _id: entity.existingEntityId } },
75-
{
76-
script: {
77-
source: UPDATE_SCRIPT_SOURCE,
78-
params: {
79-
now: new Date().toISOString(),
80-
source_id: entity.sourceId,
81-
source_type: sourceLabel,
82-
},
72+
ops.push(
73+
{ update: { _index: targetIndex, _id: buildWatchlistDocId(watchlist.id, entity.euid) } },
74+
{
75+
script: {
76+
source: UPDATE_SCRIPT_SOURCE,
77+
params: {
78+
now: new Date().toISOString(),
79+
source_id: entity.sourceId,
80+
source_type: sourceLabel,
8381
},
84-
}
85-
);
86-
} else {
87-
ops.push(
88-
{ index: { _index: targetIndex, _id: buildWatchlistDocId(watchlist.id, entity.euid) } },
89-
buildCreateDoc(entity, sourceLabel, watchlist)
90-
);
91-
}
82+
},
83+
upsert: buildCreateDoc(entity, sourceLabel, watchlist),
84+
}
85+
);
9286
}
9387
return ops;
9488
};

x-pack/solutions/security/plugins/security_solution/server/lib/entity_analytics/watchlists/entity_sources/csv/process_batch.ts

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import pMap from 'p-map';
1111
import type { WatchlistCsvUploadResponseItem } from '../../../../../../common/api/entity_analytics/watchlists/csv_upload/csv_upload.gen';
1212
import { bulkUpsertOperationsFactory } from '../bulk/upsert';
1313
import { addWatchlistAttributeToStore } from '../sync/entity_store_sync';
14-
import { getExistingEntitiesMap, getErrorFromBulkResponse, errorsMsg } from '../sync/utils';
14+
import { getErrorFromBulkResponse, errorsMsg } from '../sync/utils';
1515
import { MANUAL_SOURCE_ID } from '../manual/constants';
1616
import type { MatchedEntity, Watchlist } from './types';
1717
import { lookupEntitiesForRow } from './lookup';
@@ -68,18 +68,11 @@ const upsertToWatchlistIndex = async (
6868
currentWatchlists: m.currentWatchlists,
6969
}));
7070

71-
const existingMap = await getExistingEntitiesMap(
72-
esClient,
73-
watchlist,
74-
entities.map((e) => e.euid)
75-
);
76-
const enriched = entities.map((e) => ({ ...e, existingEntityId: existingMap.get(e.euid) }));
77-
7871
const operations = bulkUpsertOperationsFactory(
7972
logger,
8073
watchlist
8174
)({
82-
entities: enriched,
75+
entities,
8376
sourceLabel: MANUAL_SOURCE_ID,
8477
targetIndex: watchlist.index,
8578
});

x-pack/solutions/security/plugins/security_solution/server/lib/entity_analytics/watchlists/entity_sources/manual/service.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,7 @@ import { uniq } from 'lodash';
1212

1313
import type { WatchlistEntityAssignResponseItem } from '../../../../../../common/api/entity_analytics/watchlists/entities/assign.gen';
1414
import type { WatchlistEntityUnassignResponseItem } from '../../../../../../common/api/entity_analytics/watchlists/entities/unassign.gen';
15-
import {
16-
getExistingEntitiesMap,
17-
getErrorFromBulkResponse,
18-
errorsMsg,
19-
partitionBulkResults,
20-
} from '../sync/utils';
15+
import { getErrorFromBulkResponse, errorsMsg, partitionBulkResults } from '../sync/utils';
2116
import { bulkUpsertOperationsFactory } from '../bulk/upsert';
2217
import { addWatchlistAttributeToStore } from '../sync/entity_store_sync';
2318
import { applyBulkRemoveSource } from '../bulk/soft_delete';
@@ -84,13 +79,11 @@ export const createManualEntityService = ({
8479
}
8580

8681
try {
87-
const existingMap = await getExistingEntitiesMap(esClient, watchlist, [...foundEuids]);
8882
const bulkEntities: WatchlistBulkEntity[] = foundEntities.map((e) => ({
8983
euid: e.euid,
9084
type: e.type,
9185
name: e.name,
9286
sourceId: MANUAL_SOURCE_ID,
93-
existingEntityId: existingMap.get(e.euid),
9487
currentWatchlists: e.currentWatchlists,
9588
}));
9689

x-pack/solutions/security/plugins/security_solution/server/lib/entity_analytics/watchlists/entity_sources/sync/__mocks__/utils.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77

88
const actual = jest.requireActual('../utils');
99

10-
export const getExistingEntitiesMap = jest.fn().mockResolvedValue(new Map());
1110
export const getErrorFromBulkResponse = jest.fn().mockReturnValue([]);
1211
export const errorsMsg = jest.fn().mockReturnValue('');
1312
export const isTimestampGreaterThan = jest.fn().mockReturnValue(false);

x-pack/solutions/security/plugins/security_solution/server/lib/entity_analytics/watchlists/entity_sources/sync/update_detection/update_detection.ts

Lines changed: 4 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import type { EntityStoreEntityIdsByType, WatchlistsByEuid } from '../../../enti
1414
import type { WatchlistBulkEntity } from '../../types';
1515
import { createWatchlistSyncMarkersService } from '../sync_markers';
1616
import type { WatchlistEntitySourceClient } from '../../infra';
17-
import { isTimestampGreaterThan, getExistingEntitiesMap } from '../utils';
17+
import { isTimestampGreaterThan } from '../utils';
1818
import { buildEntitiesSearchBody, buildIndexSourceSearchBody } from './queries';
1919
import { applyBulkUpsert } from '../../bulk/upsert';
2020
import { getEntityNameFromDoc } from './entity_utils';
@@ -55,8 +55,6 @@ const pickLaterTimestamp = (
5555
};
5656

5757
const paginatedDetection = async <B>(
58-
esClient: ElasticsearchClient,
59-
watchlist: { name: string; id: string; index: string },
6058
search: SearchPage<B>,
6159
mapBucket: MapBucket<B>
6260
): Promise<{ entities: WatchlistBulkEntity[]; maxTimestamp?: string }> => {
@@ -77,11 +75,7 @@ const paginatedDetection = async <B>(
7775
return acc;
7876
}, []);
7977

80-
const batchEuids = mapped.map((m) => m.euid);
81-
const existingMap = await getExistingEntitiesMap(esClient, watchlist, batchEuids);
82-
83-
for (const { euid, entity, timestamp } of mapped) {
84-
entity.existingEntityId = existingMap.get(euid);
78+
for (const { entity, timestamp } of mapped) {
8579
maxTimestamp = pickLaterTimestamp(maxTimestamp, timestamp);
8680
allEntities.push(entity);
8781
}
@@ -160,7 +154,7 @@ export const createUpdateDetectionService = ({
160154
return { euid, entity, timestamp: typeof ts === 'string' ? ts : undefined };
161155
};
162156

163-
return paginatedDetection(esClient, watchlist, search, mapBucket);
157+
return paginatedDetection(search, mapBucket);
164158
};
165159

166160
const detectForIndexSource = async (
@@ -207,7 +201,7 @@ export const createUpdateDetectionService = ({
207201
};
208202
};
209203

210-
return paginatedDetection(esClient, watchlist, search, mapBucket);
204+
return paginatedDetection(search, mapBucket);
211205
};
212206

213207
const detectForStoreSource = async (
@@ -222,21 +216,6 @@ export const createUpdateDetectionService = ({
222216
}
223217
}
224218

225-
if (allEntities.length === 0) {
226-
return { entities: [] as WatchlistBulkEntity[] };
227-
}
228-
229-
// Check which entities already exist in the target index
230-
const pageSize = 100;
231-
for (let start = 0; start < allEntities.length; start += pageSize) {
232-
const batch = allEntities.slice(start, start + pageSize);
233-
const batchEuids = batch.map((e) => e.euid);
234-
const existingMap = await getExistingEntitiesMap(esClient, watchlist, batchEuids);
235-
for (const entity of batch) {
236-
entity.existingEntityId = existingMap.get(entity.euid);
237-
}
238-
}
239-
240219
return { entities: allEntities };
241220
};
242221

x-pack/solutions/security/plugins/security_solution/server/lib/entity_analytics/watchlists/entity_sources/sync/utils.ts

Lines changed: 0 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
* 2.0.
66
*/
77

8-
import { uniq } from 'lodash';
9-
import type { ElasticsearchClient } from '@kbn/core/server';
108
import type { BulkResponse, ErrorCause } from '@elastic/elasticsearch/lib/api/types';
119
import moment from 'moment';
1210

@@ -50,33 +48,3 @@ export const isTimestampGreaterThan = (date1: string, date2: string) => {
5048
if (!m2.isValid()) return true;
5149
return m1.isAfter(m2);
5250
};
53-
54-
/**
55-
* Looks up which EUIDs already have a document in the watchlist index,
56-
* returning a Map from EUID → ES document _id.
57-
*/
58-
export const getExistingEntitiesMap = async (
59-
esClient: ElasticsearchClient,
60-
watchlist: { id: string; index: string },
61-
euids: string[]
62-
): Promise<Map<string, string>> => {
63-
if (euids.length === 0) return new Map();
64-
65-
const uniqueEuids = uniq(euids);
66-
const response = await esClient.search<{ entity?: { id?: string } }>({
67-
index: watchlist.index,
68-
size: uniqueEuids.length,
69-
query: {
70-
bool: {
71-
must: [{ terms: { 'entity.id': uniqueEuids } }, { term: { 'watchlist.id': watchlist.id } }],
72-
},
73-
},
74-
_source: ['entity.id'],
75-
});
76-
77-
return response.hits.hits.reduce((map, hit) => {
78-
const euid = hit._source?.entity?.id;
79-
if (euid && hit._id) map.set(euid, hit._id);
80-
return map;
81-
}, new Map<string, string>());
82-
};

x-pack/solutions/security/plugins/security_solution/server/lib/entity_analytics/watchlists/entity_sources/types.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@ export interface WatchlistBulkEntity {
1313
type: EntityType;
1414
name?: string;
1515
sourceId: string;
16-
existingEntityId?: string;
1716
/** Current watchlist names from the entity store, used for store sync */
1817
currentWatchlists?: string[];
1918
}

0 commit comments

Comments
 (0)