Skip to content

Commit a6bb015

Browse files
committed
fix: refactor search functions to use fetchAllPages for pagination and add integration tests for pagination beyond CI_PAGE_SIZE
1 parent 1f5e1bd commit a6bb015

File tree

3 files changed

+232
-28
lines changed

3 files changed

+232
-28
lines changed

src/commands/search.ts

Lines changed: 36 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,6 @@ export async function searchProcessDefinitions(options: {
238238
},
239239
};
240240

241-
if (hasCiFilter) filter.page = { limit: CI_PAGE_SIZE };
242-
243241
if (options.processDefinitionId) {
244242
filter.filter.processDefinitionId = toStringFilter(options.processDefinitionId);
245243
}
@@ -256,7 +254,12 @@ export async function searchProcessDefinitions(options: {
256254
filter.filter.processDefinitionKey = options.key;
257255
}
258256

259-
const result = await client.searchProcessDefinitions(filter, { consistency: { waitUpToMs: 0 } });
257+
const allItems = await fetchAllPages(
258+
(f, opts) => client.searchProcessDefinitions(f, opts),
259+
filter,
260+
hasCiFilter ? CI_PAGE_SIZE : undefined,
261+
);
262+
const result = { items: allItems } as any;
260263

261264
if (result.items?.length) {
262265
result.items = [...result.items].sort((left: any, right: any) => {
@@ -356,8 +359,6 @@ export async function searchProcessInstances(options: {
356359
},
357360
};
358361

359-
if (hasCiFilter) filter.page = { limit: CI_PAGE_SIZE };
360-
361362
if (options.processDefinitionId) {
362363
filter.filter.processDefinitionId = toStringFilter(options.processDefinitionId);
363364
}
@@ -389,7 +390,12 @@ export async function searchProcessInstances(options: {
389390
}
390391
}
391392

392-
const result = await client.searchProcessInstances(filter, { consistency: { waitUpToMs: 0 } });
393+
const allItems = await fetchAllPages(
394+
(f, opts) => client.searchProcessInstances(f, opts),
395+
filter,
396+
hasCiFilter ? CI_PAGE_SIZE : undefined,
397+
);
398+
const result = { items: allItems } as any;
393399

394400
if (hasCiFilter && result.items) {
395401
result.items = result.items.filter((pi: any) => {
@@ -475,8 +481,6 @@ export async function searchUserTasks(options: {
475481
},
476482
};
477483

478-
if (hasCiFilter) filter.page = { limit: CI_PAGE_SIZE };
479-
480484
if (options.state) {
481485
filter.filter.state = options.state;
482486
}
@@ -508,7 +512,12 @@ export async function searchUserTasks(options: {
508512
}
509513
}
510514

511-
const result = await client.searchUserTasks(filter, { consistency: { waitUpToMs: 0 } });
515+
const allItems = await fetchAllPages(
516+
(f, opts) => client.searchUserTasks(f, opts),
517+
filter,
518+
hasCiFilter ? CI_PAGE_SIZE : undefined,
519+
);
520+
const result = { items: allItems } as any;
512521

513522
if (hasCiFilter && result.items) {
514523
result.items = result.items.filter((task: any) => {
@@ -603,8 +612,6 @@ export async function searchIncidents(options: {
603612
},
604613
};
605614

606-
if (hasCiFilter) filter.page = { limit: CI_PAGE_SIZE };
607-
608615
if (options.state) {
609616
filter.filter.state = options.state;
610617
}
@@ -639,7 +646,12 @@ export async function searchIncidents(options: {
639646
}
640647
}
641648

642-
const result = await client.searchIncidents(filter, { consistency: { waitUpToMs: 0 } });
649+
const allItems = await fetchAllPages(
650+
(f, opts) => client.searchIncidents(f, opts),
651+
filter,
652+
hasCiFilter ? CI_PAGE_SIZE : undefined,
653+
);
654+
const result = { items: allItems } as any;
643655

644656
if (hasCiFilter && result.items) {
645657
result.items = result.items.filter((incident: any) => {
@@ -724,8 +736,6 @@ export async function searchJobs(options: {
724736
},
725737
};
726738

727-
if (hasCiFilter) filter.page = { limit: CI_PAGE_SIZE };
728-
729739
if (options.state) {
730740
filter.filter.state = options.state;
731741
}
@@ -753,7 +763,12 @@ export async function searchJobs(options: {
753763
}
754764
}
755765

756-
const result = await client.searchJobs(filter, { consistency: { waitUpToMs: 0 } });
766+
const allItems = await fetchAllPages(
767+
(f, opts) => client.searchJobs(f, opts),
768+
filter,
769+
hasCiFilter ? CI_PAGE_SIZE : undefined,
770+
);
771+
const result = { items: allItems } as any;
757772

758773
if (hasCiFilter && result.items) {
759774
result.items = result.items.filter((job: any) => {
@@ -839,8 +854,6 @@ export async function searchVariables(options: {
839854
},
840855
};
841856

842-
if (hasCiFilter) filter.page = { limit: CI_PAGE_SIZE };
843-
844857
if (options.name) {
845858
filter.filter.name = toStringFilter(options.name);
846859
}
@@ -860,17 +873,12 @@ export async function searchVariables(options: {
860873
// By default, truncate values unless --fullValue is specified
861874
const truncateValues = !options.fullValue;
862875

863-
const allItems = hasCiFilter
864-
? await fetchAllPages(
865-
(f, opts) => client.searchVariables({ ...f, truncateValues }, opts),
866-
filter,
867-
CI_PAGE_SIZE,
868-
options.limit,
869-
)
870-
: (await client.searchVariables(
871-
{ ...filter, truncateValues },
872-
{ consistency: { waitUpToMs: 0 } },
873-
)).items || [];
876+
const allItems = await fetchAllPages(
877+
(f, opts) => client.searchVariables({ ...f, truncateValues }, opts),
878+
filter,
879+
hasCiFilter ? CI_PAGE_SIZE : undefined,
880+
options.limit,
881+
);
874882

875883
let result = { items: allItems } as any;
876884

tests/fixtures/mini-process.bpmn

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_0trqknu" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.43.1" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.8.0">
3+
<bpmn:process id="mini-process-1" isExecutable="true">
4+
<bpmn:startEvent id="StartEvent_1">
5+
<bpmn:outgoing>Flow_1nz6e26</bpmn:outgoing>
6+
</bpmn:startEvent>
7+
<bpmn:task id="Activity_1kkoh0e" name="do sth">
8+
<bpmn:incoming>Flow_1nz6e26</bpmn:incoming>
9+
<bpmn:outgoing>Flow_0kbjfrm</bpmn:outgoing>
10+
</bpmn:task>
11+
<bpmn:sequenceFlow id="Flow_1nz6e26" sourceRef="StartEvent_1" targetRef="Activity_1kkoh0e" />
12+
<bpmn:endEvent id="Event_1lp3py7">
13+
<bpmn:incoming>Flow_0kbjfrm</bpmn:incoming>
14+
</bpmn:endEvent>
15+
<bpmn:sequenceFlow id="Flow_0kbjfrm" sourceRef="Activity_1kkoh0e" targetRef="Event_1lp3py7" />
16+
</bpmn:process>
17+
<bpmndi:BPMNDiagram id="BPMNDiagram_1">
18+
<bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="mini-process-1">
19+
<bpmndi:BPMNShape id="StartEvent_1_di" bpmnElement="StartEvent_1">
20+
<dc:Bounds x="182" y="102" width="36" height="36" />
21+
</bpmndi:BPMNShape>
22+
<bpmndi:BPMNShape id="Activity_1kkoh0e_di" bpmnElement="Activity_1kkoh0e">
23+
<dc:Bounds x="270" y="80" width="100" height="80" />
24+
<bpmndi:BPMNLabel />
25+
</bpmndi:BPMNShape>
26+
<bpmndi:BPMNShape id="Event_1lp3py7_di" bpmnElement="Event_1lp3py7">
27+
<dc:Bounds x="422" y="102" width="36" height="36" />
28+
</bpmndi:BPMNShape>
29+
<bpmndi:BPMNEdge id="Flow_1nz6e26_di" bpmnElement="Flow_1nz6e26">
30+
<di:waypoint x="218" y="120" />
31+
<di:waypoint x="270" y="120" />
32+
</bpmndi:BPMNEdge>
33+
<bpmndi:BPMNEdge id="Flow_0kbjfrm_di" bpmnElement="Flow_0kbjfrm">
34+
<di:waypoint x="370" y="120" />
35+
<di:waypoint x="422" y="120" />
36+
</bpmndi:BPMNEdge>
37+
</bpmndi:BPMNPlane>
38+
</bpmndi:BPMNDiagram>
39+
</bpmn:definitions>
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/**
2+
* Integration tests for pagination beyond CI_PAGE_SIZE (1000)
3+
*
4+
* Deploys > 1000 unique process definitions from a mini-process BPMN template,
5+
* then verifies that `search pd` and `list pd` via the CLI return ALL of them
6+
* rather than silently truncating at the API default page size (100) or CI_PAGE_SIZE (1000).
7+
*
8+
* NOTE: These tests require a running Camunda 8 instance at http://localhost:8080
9+
* and take considerable time due to the volume of deployments.
10+
*/
11+
12+
import { test, describe, before, after } from 'node:test';
13+
import assert from 'node:assert';
14+
import { spawnSync } from 'node:child_process';
15+
import { readFileSync, mkdirSync, writeFileSync, rmSync, existsSync } from 'node:fs';
16+
import { join, resolve } from 'node:path';
17+
import { tmpdir } from 'node:os';
18+
import { pollUntil } from '../utils/polling.ts';
19+
20+
const PROJECT_ROOT = resolve(import.meta.dirname, '..', '..');
21+
const CLI = join(PROJECT_ROOT, 'src', 'index.ts');
22+
const TEMPLATE_BPMN = readFileSync(join(PROJECT_ROOT, 'tests', 'fixtures', 'mini-process.bpmn'), 'utf-8');
23+
24+
/** Number of unique process definitions to deploy (must be > CI_PAGE_SIZE of 1000) */
25+
const DEPLOY_COUNT = 1010;
26+
27+
/** Polling configuration — indexing a large batch may take a while */
28+
const POLL_TIMEOUT_MS = 120_000;
29+
const POLL_INTERVAL_MS = 3_000;
30+
31+
/** Spawn timeout for CLI commands */
32+
const SPAWN_TIMEOUT_MS = 300_000;
33+
34+
/** Shared temp directory + data dir for this test suite */
35+
let bpmnDir: string;
36+
let dataDir: string;
37+
38+
/**
39+
* Invoke the CLI as a subprocess, returning { stdout, stderr, status }.
40+
* Uses a dedicated C8CTL_DATA_DIR so session state is isolated.
41+
*/
42+
function cli(...args: string[]) {
43+
const result = spawnSync('node', [CLI, ...args], {
44+
encoding: 'utf-8',
45+
timeout: SPAWN_TIMEOUT_MS,
46+
cwd: PROJECT_ROOT,
47+
env: {
48+
...process.env,
49+
C8CTL_DATA_DIR: dataDir,
50+
},
51+
});
52+
return result;
53+
}
54+
55+
/**
56+
* Generate a BPMN string with a given process id by replacing the template's id.
57+
*/
58+
function bpmnWithId(id: string): string {
59+
return TEMPLATE_BPMN
60+
.replace(/id="mini-process-1"/g, `id="${id}"`)
61+
.replace(/bpmnElement="mini-process-1"/g, `bpmnElement="${id}"`);
62+
}
63+
64+
describe('Pagination beyond CI_PAGE_SIZE (requires Camunda 8 at localhost:8080)', { timeout: 600_000 }, () => {
65+
before(() => {
66+
// Create temp directories for BPMN files and CLI data dir
67+
const base = join(tmpdir(), `c8ctl-pagination-test-${Date.now()}`);
68+
bpmnDir = join(base, 'bpmn');
69+
dataDir = join(base, 'data');
70+
mkdirSync(bpmnDir, { recursive: true });
71+
mkdirSync(dataDir, { recursive: true });
72+
73+
// Generate BPMN files with ids mini-process-1 .. mini-process-<DEPLOY_COUNT>
74+
for (let i = 1; i <= DEPLOY_COUNT; i++) {
75+
const id = `mini-process-${i}`;
76+
writeFileSync(join(bpmnDir, `${id}.bpmn`), bpmnWithId(id));
77+
}
78+
});
79+
80+
after(() => {
81+
// Clean up temp directories
82+
const base = join(bpmnDir, '..');
83+
if (existsSync(base)) {
84+
rmSync(base, { recursive: true, force: true });
85+
}
86+
});
87+
88+
test(`deploy ${DEPLOY_COUNT} process definitions via CLI`, { timeout: SPAWN_TIMEOUT_MS }, () => {
89+
const result = cli('deploy', bpmnDir);
90+
assert.strictEqual(
91+
result.status, 0,
92+
`Deploy should exit 0. stderr: ${result.stderr}`,
93+
);
94+
});
95+
96+
test(`search pd --id=mini-process-* returns all ${DEPLOY_COUNT} definitions`, { timeout: POLL_TIMEOUT_MS + 30_000 }, async () => {
97+
// Switch output to JSON for easy parsing
98+
const outputResult = cli('output', 'json');
99+
assert.strictEqual(outputResult.status, 0, `output json should succeed. stderr: ${outputResult.stderr}`);
100+
101+
// Poll until Elasticsearch has indexed all deployed definitions
102+
const allFound = await pollUntil(async () => {
103+
const result = cli('search', 'pd', '--id=mini-process-*');
104+
if (result.status !== 0) return false;
105+
try {
106+
const items = JSON.parse(result.stdout);
107+
return Array.isArray(items) && items.length >= DEPLOY_COUNT;
108+
} catch {
109+
return false;
110+
}
111+
}, POLL_TIMEOUT_MS, POLL_INTERVAL_MS);
112+
113+
// Final assertion with the actual count
114+
const finalResult = cli('search', 'pd', '--id=mini-process-*');
115+
assert.strictEqual(finalResult.status, 0, `search should exit 0. stderr: ${finalResult.stderr}`);
116+
117+
const items = JSON.parse(finalResult.stdout);
118+
assert.ok(Array.isArray(items), 'Output should be a JSON array');
119+
assert.ok(
120+
items.length >= DEPLOY_COUNT,
121+
`Expected at least ${DEPLOY_COUNT} process definitions, got ${items.length}`,
122+
);
123+
124+
// Verify pagination actually worked (i.e. we went beyond API_DEFAULT_PAGE_SIZE=100 and CI_PAGE_SIZE=1000)
125+
assert.ok(items.length > 1000, `Result count (${items.length}) should exceed CI_PAGE_SIZE (1000)`);
126+
});
127+
128+
test(`list pd returns all ${DEPLOY_COUNT} definitions`, { timeout: POLL_TIMEOUT_MS + 30_000 }, async () => {
129+
// Ensure JSON output mode is set
130+
const outputResult = cli('output', 'json');
131+
assert.strictEqual(outputResult.status, 0, `output json should succeed. stderr: ${outputResult.stderr}`);
132+
133+
// The previous test already confirmed indexing, so a single call should suffice.
134+
// Still poll briefly in case the test order changes.
135+
const allFound = await pollUntil(async () => {
136+
const result = cli('list', 'pd');
137+
if (result.status !== 0) return false;
138+
try {
139+
const items = JSON.parse(result.stdout);
140+
return Array.isArray(items) && items.length >= DEPLOY_COUNT;
141+
} catch {
142+
return false;
143+
}
144+
}, POLL_TIMEOUT_MS, POLL_INTERVAL_MS);
145+
146+
const finalResult = cli('list', 'pd');
147+
assert.strictEqual(finalResult.status, 0, `list pd should exit 0. stderr: ${finalResult.stderr}`);
148+
149+
const items = JSON.parse(finalResult.stdout);
150+
assert.ok(Array.isArray(items), 'Output should be a JSON array');
151+
assert.ok(
152+
items.length >= DEPLOY_COUNT,
153+
`Expected at least ${DEPLOY_COUNT} process definitions, got ${items.length}`,
154+
);
155+
assert.ok(items.length > 1000, `Result count (${items.length}) should exceed CI_PAGE_SIZE (1000)`);
156+
});
157+
});

0 commit comments

Comments
 (0)