Skip to content

Commit cf216d8

Browse files
[fleet] Roll forward PIT id in agent paging (#252536)
## Summary - Roll forward refreshed `pit_id` in Fleet agent paging helpers/callers and close the PIT using the most recent id. ## Test plan - `yarn test:jest x-pack/platform/plugins/shared/fleet/server/services/agents/crud.test.ts` Refs: ralph issue-236; #252458 Made with [Cursor](https://cursor.com) Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent d83e0ba commit cf216d8

4 files changed

Lines changed: 59 additions & 8 deletions

File tree

x-pack/platform/plugins/shared/fleet/server/services/agents/action_runner.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ export abstract class ActionRunner {
213213

214214
async processAgentsInBatches(): Promise<{ actionId: string }> {
215215
const start = Date.now();
216-
const pitId = this.retryParams.pitId;
216+
let pitId = this.retryParams.pitId;
217217

218218
const perPage = this.actionParams.batchSize ?? SO_SEARCH_LIMIT;
219219

@@ -239,6 +239,10 @@ export abstract class ActionRunner {
239239
};
240240

241241
const res = await getAgents();
242+
if (res.pit) {
243+
pitId = res.pit;
244+
this.retryParams.pitId = pitId;
245+
}
242246

243247
let currentAgents = res.agents;
244248
if (currentAgents.length === 0) {
@@ -255,6 +259,10 @@ export abstract class ActionRunner {
255259
const lastAgent = currentAgents[currentAgents.length - 1];
256260
this.retryParams.searchAfter = lastAgent.sort!;
257261
const nextPage = await getAgents();
262+
if (nextPage.pit) {
263+
pitId = nextPage.pit;
264+
this.retryParams.pitId = pitId;
265+
}
258266
currentAgents = nextPage.agents;
259267
if (currentAgents.length === 0) {
260268
appContextService

x-pack/platform/plugins/shared/fleet/server/services/agents/crud.test.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,11 @@ describe('Agents CRUD test', () => {
8181
ids: string[],
8282
total: number,
8383
status: AgentStatus,
84-
generateSource: (id: string) => Partial<Agent> = () => ({})
84+
generateSource: (id: string) => Partial<Agent> = () => ({}),
85+
pitId?: string
8586
) {
8687
return {
88+
...(pitId ? { pit_id: pitId } : {}),
8789
hits: {
8890
total,
8991
hits: ids.map((id: string) => ({
@@ -175,6 +177,38 @@ describe('Agents CRUD test', () => {
175177
});
176178

177179
describe('getAgentsByKuery', () => {
180+
it('should roll forward PIT id from search responses', async () => {
181+
searchMock.mockResolvedValueOnce(getEsResponse(['1'], 1, 'online', () => ({}), 'pit-2'));
182+
183+
const firstRes = await getAgentsByKuery(esClientMock, soClientMock, {
184+
showAgentless: true,
185+
showInactive: false,
186+
pitId: 'pit-1',
187+
});
188+
189+
expect(searchMock).toHaveBeenCalledWith(
190+
expect.objectContaining({
191+
pit: expect.objectContaining({ id: 'pit-1' }),
192+
})
193+
);
194+
expect(firstRes.pit).toBe('pit-2');
195+
196+
searchMock.mockResolvedValueOnce(getEsResponse(['2'], 1, 'online', () => ({}), 'pit-3'));
197+
198+
const secondRes = await getAgentsByKuery(esClientMock, soClientMock, {
199+
showAgentless: true,
200+
showInactive: false,
201+
pitId: firstRes.pit,
202+
});
203+
204+
expect(searchMock).toHaveBeenLastCalledWith(
205+
expect.objectContaining({
206+
pit: expect.objectContaining({ id: 'pit-2' }),
207+
})
208+
);
209+
expect(secondRes.pit).toBe('pit-3');
210+
});
211+
178212
it('should return upgradeable on first page', async () => {
179213
searchMock
180214
.mockImplementationOnce(() =>

x-pack/platform/plugins/shared/fleet/server/services/agents/crud.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -224,6 +224,7 @@ export async function getAgentsByKuery(
224224
total: number;
225225
page: number;
226226
perPage: number;
227+
pit?: string;
227228
statusSummary?: Record<AgentStatus, number>;
228229
aggregations?: Record<string, estypes.AggregationsAggregate>;
229230
}> {
@@ -294,7 +295,9 @@ export async function getAgentsByKuery(
294295
uninstalled: 0,
295296
};
296297

297-
const pitIdToUse = pitId || (openPit ? await openPointInTime(esClient, pitKeepAlive) : undefined);
298+
const initialPitId =
299+
pitId || (openPit ? await openPointInTime(esClient, pitKeepAlive) : undefined);
300+
let currentPitId = initialPitId;
298301

299302
const queryAgents = async (
300303
queryOptions: { from: number; size: number } | { searchAfter: SortResults; size: number }
@@ -334,10 +337,10 @@ export async function getAgentsByKuery(
334337
fields: Object.keys(runtimeFields),
335338
sort,
336339
query: kueryNode ? toElasticsearchQuery(kueryNode) : undefined,
337-
...(pitIdToUse
340+
...(currentPitId
338341
? {
339342
pit: {
340-
id: pitIdToUse,
343+
id: currentPitId,
341344
keep_alive: pitKeepAlive,
342345
},
343346
}
@@ -361,6 +364,8 @@ export async function getAgentsByKuery(
361364
throw err;
362365
}
363366

367+
currentPitId = res.pit_id ?? currentPitId;
368+
364369
let agents = res.hits.hits.map(searchHitToAgent);
365370
let total = res.hits.total as number;
366371
// filtering for a range on the version string will not work,
@@ -372,6 +377,7 @@ export async function getAgentsByKuery(
372377
// if there are more than SO_SEARCH_LIMIT agents, the logic falls back to same as before
373378
if (total < SO_SEARCH_LIMIT) {
374379
const response = await queryAgents({ from: 0, size: SO_SEARCH_LIMIT });
380+
currentPitId = response.pit_id ?? currentPitId;
375381
agents = response.hits.hits
376382
.map(searchHitToAgent)
377383
.filter((agent) => isAgentUpgradeAvailable(agent, latestAgentVersion));
@@ -404,7 +410,7 @@ export async function getAgentsByKuery(
404410
total,
405411
...(searchAfter ? { page: 0 } : { page }),
406412
perPage,
407-
...(pitIdToUse ? { pit: pitIdToUse } : {}),
413+
...(initialPitId ? { pit: currentPitId } : {}),
408414
...(aggregations ? { aggregations: res.aggregations } : {}),
409415
...(getStatusSummary ? { statusSummary } : {}),
410416
};

x-pack/platform/plugins/shared/fleet/server/services/spaces/agent_policy.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -168,19 +168,22 @@ export async function updateAgentPolicySpaces({
168168

169169
// Update agent actions
170170
if (agentIndexExists) {
171-
const pitId = await openPointInTime(esClient);
171+
let pitId = await openPointInTime(esClient);
172172

173173
try {
174174
let hasMore = true;
175175
let searchAfter: SortResults | undefined;
176176
while (hasMore) {
177-
const { agents } = await getAgentsByKuery(esClient, newSpaceSoClient, {
177+
const { agents, pit } = await getAgentsByKuery(esClient, newSpaceSoClient, {
178178
kuery: `policy_id:"${agentPolicyId}"`,
179179
showInactive: true,
180180
perPage: UPDATE_AGENT_BATCH_SIZE,
181181
pitId,
182182
searchAfter,
183183
});
184+
if (pit) {
185+
pitId = pit;
186+
}
184187

185188
if (agents.length === 0) {
186189
hasMore = false;

0 commit comments

Comments
 (0)