-
Notifications
You must be signed in to change notification settings - Fork 0
fix: refactor search functions to use fetchAllPages for pagination and add integration tests for pagination beyond CI_PAGE_SIZE #113
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3,7 +3,7 @@ | |
| */ | ||
|
|
||
| import { getLogger, Logger, sortTableData, type SortOrder } from '../logger.ts'; | ||
| import { createClient, fetchAllPages } from '../client.ts'; | ||
| import { createClient, fetchAllPages, DEFAULT_PAGE_SIZE } from '../client.ts'; | ||
| import { resolveTenantId } from '../config.ts'; | ||
| import { parseBetween, buildDateFilter } from '../date-filter.ts'; | ||
|
|
||
|
|
@@ -238,8 +238,6 @@ export async function searchProcessDefinitions(options: { | |
| }, | ||
| }; | ||
|
|
||
| if (hasCiFilter) filter.page = { limit: CI_PAGE_SIZE }; | ||
|
|
||
| if (options.processDefinitionId) { | ||
| filter.filter.processDefinitionId = toStringFilter(options.processDefinitionId); | ||
| } | ||
|
|
@@ -256,7 +254,12 @@ export async function searchProcessDefinitions(options: { | |
| filter.filter.processDefinitionKey = options.key; | ||
| } | ||
|
|
||
| const result = await client.searchProcessDefinitions(filter, { consistency: { waitUpToMs: 0 } }); | ||
| const allItems = await fetchAllPages( | ||
| (f, opts) => client.searchProcessDefinitions(f, opts), | ||
| filter, | ||
| ...(hasCiFilter ? [CI_PAGE_SIZE] as const : []), | ||
| ); | ||
| const result = { items: allItems } as any; | ||
|
|
||
| if (result.items?.length) { | ||
| result.items = [...result.items].sort((left: any, right: any) => { | ||
|
|
@@ -356,8 +359,6 @@ export async function searchProcessInstances(options: { | |
| }, | ||
| }; | ||
|
|
||
| if (hasCiFilter) filter.page = { limit: CI_PAGE_SIZE }; | ||
|
|
||
| if (options.processDefinitionId) { | ||
| filter.filter.processDefinitionId = toStringFilter(options.processDefinitionId); | ||
| } | ||
|
|
@@ -389,7 +390,12 @@ export async function searchProcessInstances(options: { | |
| } | ||
| } | ||
|
|
||
| const result = await client.searchProcessInstances(filter, { consistency: { waitUpToMs: 0 } }); | ||
| const allItems = await fetchAllPages( | ||
| (f, opts) => client.searchProcessInstances(f, opts), | ||
| filter, | ||
| ...(hasCiFilter ? [CI_PAGE_SIZE] as const : []), | ||
| ); | ||
| const result = { items: allItems } as any; | ||
|
|
||
| if (hasCiFilter && result.items) { | ||
| result.items = result.items.filter((pi: any) => { | ||
|
|
@@ -475,8 +481,6 @@ export async function searchUserTasks(options: { | |
| }, | ||
| }; | ||
|
|
||
| if (hasCiFilter) filter.page = { limit: CI_PAGE_SIZE }; | ||
|
|
||
| if (options.state) { | ||
| filter.filter.state = options.state; | ||
| } | ||
|
|
@@ -508,7 +512,12 @@ export async function searchUserTasks(options: { | |
| } | ||
| } | ||
|
|
||
| const result = await client.searchUserTasks(filter, { consistency: { waitUpToMs: 0 } }); | ||
| const allItems = await fetchAllPages( | ||
| (f, opts) => client.searchUserTasks(f, opts), | ||
| filter, | ||
| ...(hasCiFilter ? [CI_PAGE_SIZE] as const : []), | ||
| ); | ||
| const result = { items: allItems } as any; | ||
|
|
||
| if (hasCiFilter && result.items) { | ||
| result.items = result.items.filter((task: any) => { | ||
|
|
@@ -603,8 +612,6 @@ export async function searchIncidents(options: { | |
| }, | ||
| }; | ||
|
|
||
| if (hasCiFilter) filter.page = { limit: CI_PAGE_SIZE }; | ||
|
|
||
| if (options.state) { | ||
| filter.filter.state = options.state; | ||
| } | ||
|
|
@@ -639,7 +646,12 @@ export async function searchIncidents(options: { | |
| } | ||
| } | ||
|
|
||
| const result = await client.searchIncidents(filter, { consistency: { waitUpToMs: 0 } }); | ||
| const allItems = await fetchAllPages( | ||
| (f, opts) => client.searchIncidents(f, opts), | ||
| filter, | ||
| ...(hasCiFilter ? [CI_PAGE_SIZE] as const : []), | ||
| ); | ||
| const result = { items: allItems } as any; | ||
|
|
||
| if (hasCiFilter && result.items) { | ||
| result.items = result.items.filter((incident: any) => { | ||
|
|
@@ -724,8 +736,6 @@ export async function searchJobs(options: { | |
| }, | ||
| }; | ||
|
|
||
| if (hasCiFilter) filter.page = { limit: CI_PAGE_SIZE }; | ||
|
|
||
| if (options.state) { | ||
| filter.filter.state = options.state; | ||
| } | ||
|
|
@@ -753,7 +763,12 @@ export async function searchJobs(options: { | |
| } | ||
| } | ||
|
|
||
| const result = await client.searchJobs(filter, { consistency: { waitUpToMs: 0 } }); | ||
| const allItems = await fetchAllPages( | ||
| (f, opts) => client.searchJobs(f, opts), | ||
| filter, | ||
| ...(hasCiFilter ? [CI_PAGE_SIZE] as const : []), | ||
| ); | ||
| const result = { items: allItems } as any; | ||
|
|
||
| if (hasCiFilter && result.items) { | ||
| result.items = result.items.filter((job: any) => { | ||
|
|
@@ -839,8 +854,6 @@ export async function searchVariables(options: { | |
| }, | ||
| }; | ||
|
|
||
| if (hasCiFilter) filter.page = { limit: CI_PAGE_SIZE }; | ||
|
|
||
| if (options.name) { | ||
| filter.filter.name = toStringFilter(options.name); | ||
| } | ||
|
|
@@ -860,17 +873,12 @@ export async function searchVariables(options: { | |
| // By default, truncate values unless --fullValue is specified | ||
| const truncateValues = !options.fullValue; | ||
|
|
||
| const allItems = hasCiFilter | ||
| ? await fetchAllPages( | ||
| (f, opts) => client.searchVariables({ ...f, truncateValues }, opts), | ||
| filter, | ||
| CI_PAGE_SIZE, | ||
| options.limit, | ||
| ) | ||
| : (await client.searchVariables( | ||
| { ...filter, truncateValues }, | ||
| { consistency: { waitUpToMs: 0 } }, | ||
| )).items || []; | ||
| const allItems = await fetchAllPages( | ||
| (f, opts) => client.searchVariables({ ...f, truncateValues }, opts), | ||
| filter, | ||
| hasCiFilter ? CI_PAGE_SIZE : DEFAULT_PAGE_SIZE, | ||
| options.limit, | ||
| ); | ||
|
Comment on lines
+876
to
+881
|
||
|
|
||
| let result = { items: allItems } as any; | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| <?xml version="1.0" encoding="UTF-8"?> | ||
| <bpmn:definitions xmlns:bpmn="http://www.omg.org/spec/BPMN/20100524/MODEL" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI" xmlns:dc="http://www.omg.org/spec/DD/20100524/DC" xmlns:di="http://www.omg.org/spec/DD/20100524/DI" xmlns:modeler="http://camunda.org/schema/modeler/1.0" id="Definitions_0trqknu" targetNamespace="http://bpmn.io/schema/bpmn" exporter="Camunda Modeler" exporterVersion="5.43.1" modeler:executionPlatform="Camunda Cloud" modeler:executionPlatformVersion="8.8.0"> | ||
| <bpmn:process id="mini-process-1" isExecutable="true"> | ||
| <bpmn:startEvent id="StartEvent_1"> | ||
| <bpmn:outgoing>Flow_1nz6e26</bpmn:outgoing> | ||
| </bpmn:startEvent> | ||
| <bpmn:task id="Activity_1kkoh0e" name="do sth"> | ||
| <bpmn:incoming>Flow_1nz6e26</bpmn:incoming> | ||
| <bpmn:outgoing>Flow_0kbjfrm</bpmn:outgoing> | ||
| </bpmn:task> | ||
| <bpmn:sequenceFlow id="Flow_1nz6e26" sourceRef="StartEvent_1" targetRef="Activity_1kkoh0e" /> | ||
| <bpmn:endEvent id="Event_1lp3py7"> | ||
| <bpmn:incoming>Flow_0kbjfrm</bpmn:incoming> | ||
| </bpmn:endEvent> | ||
| <bpmn:sequenceFlow id="Flow_0kbjfrm" sourceRef="Activity_1kkoh0e" targetRef="Event_1lp3py7" /> | ||
| </bpmn:process> | ||
| <bpmndi:BPMNDiagram id="BPMNDiagram_1"> | ||
| <bpmndi:BPMNPlane id="BPMNPlane_1" bpmnElement="mini-process-1"> | ||
| <bpmndi:BPMNShape id="StartEvent_1_di" bpmnElement="StartEvent_1"> | ||
| <dc:Bounds x="182" y="102" width="36" height="36" /> | ||
| </bpmndi:BPMNShape> | ||
| <bpmndi:BPMNShape id="Activity_1kkoh0e_di" bpmnElement="Activity_1kkoh0e"> | ||
| <dc:Bounds x="270" y="80" width="100" height="80" /> | ||
| <bpmndi:BPMNLabel /> | ||
| </bpmndi:BPMNShape> | ||
| <bpmndi:BPMNShape id="Event_1lp3py7_di" bpmnElement="Event_1lp3py7"> | ||
| <dc:Bounds x="422" y="102" width="36" height="36" /> | ||
| </bpmndi:BPMNShape> | ||
| <bpmndi:BPMNEdge id="Flow_1nz6e26_di" bpmnElement="Flow_1nz6e26"> | ||
| <di:waypoint x="218" y="120" /> | ||
| <di:waypoint x="270" y="120" /> | ||
| </bpmndi:BPMNEdge> | ||
| <bpmndi:BPMNEdge id="Flow_0kbjfrm_di" bpmnElement="Flow_0kbjfrm"> | ||
| <di:waypoint x="370" y="120" /> | ||
| <di:waypoint x="422" y="120" /> | ||
| </bpmndi:BPMNEdge> | ||
| </bpmndi:BPMNPlane> | ||
| </bpmndi:BPMNDiagram> | ||
| </bpmn:definitions> |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,139 @@ | ||
| /** | ||
| * Integration tests for pagination beyond CI_PAGE_SIZE (1000) | ||
| * | ||
| * Deploys > 1000 unique process definitions from a mini-process BPMN template, | ||
| * then verifies that `search pd` and `list pd` via the CLI return ALL of them | ||
| * rather than silently truncating at the API default page size (100) or CI_PAGE_SIZE (1000). | ||
| * | ||
| * NOTE: These tests require a running Camunda 8 instance at http://localhost:8080 | ||
| * and take considerable time due to the volume of deployments. | ||
| */ | ||
|
|
||
| import { test, describe, before, after } from 'node:test'; | ||
| import assert from 'node:assert'; | ||
| import { spawnSync } from 'node:child_process'; | ||
| import { readFileSync, mkdirSync, writeFileSync, rmSync, existsSync, readdirSync } from 'node:fs'; | ||
| import { join, resolve } from 'node:path'; | ||
| import { tmpdir } from 'node:os'; | ||
| import { pollUntil } from '../utils/polling.ts'; | ||
|
|
||
| const PROJECT_ROOT = resolve(import.meta.dirname, '..', '..'); | ||
| const CLI = join(PROJECT_ROOT, 'src', 'index.ts'); | ||
| const TEMPLATE_BPMN = readFileSync(join(PROJECT_ROOT, 'tests', 'fixtures', 'mini-process.bpmn'), 'utf-8'); | ||
|
|
||
| /** Number of unique process definitions to deploy (must be > CI_PAGE_SIZE of 1000) */ | ||
| const DEPLOY_COUNT = 1010; | ||
|
|
||
| /** Max BPMN files per single deploy call (avoids multipart request size limits) */ | ||
| const DEPLOY_BATCH_SIZE = 25; | ||
|
|
||
| /** Polling configuration — indexing a large batch may take a while */ | ||
| const POLL_TIMEOUT_MS = 120_000; | ||
| const POLL_INTERVAL_MS = 3_000; | ||
|
|
||
| /** Spawn timeout for CLI commands */ | ||
| const SPAWN_TIMEOUT_MS = 300_000; | ||
|
|
||
| /** Shared temp directory + data dir for this test suite */ | ||
| let bpmnDir: string; | ||
| let dataDir: string; | ||
|
|
||
| /** | ||
| * Invoke the CLI as a subprocess, returning { stdout, stderr, status }. | ||
| * Uses a dedicated C8CTL_DATA_DIR so session state is isolated. | ||
| */ | ||
| function cli(...args: string[]) { | ||
| return spawnSync('node', [CLI, ...args], { | ||
| encoding: 'utf-8', | ||
| timeout: SPAWN_TIMEOUT_MS, | ||
| cwd: PROJECT_ROOT, | ||
| env: { ...process.env, C8CTL_DATA_DIR: dataDir }, | ||
| }); | ||
| } | ||
|
|
||
| /** | ||
| * Generate a BPMN string with a given process id by replacing the template's id. | ||
| */ | ||
| function bpmnWithId(id: string): string { | ||
| return TEMPLATE_BPMN | ||
| .replace(/id="mini-process-1"/g, `id="${id}"`) | ||
| .replace(/bpmnElement="mini-process-1"/g, `bpmnElement="${id}"`); | ||
| } | ||
|
|
||
| describe('Pagination beyond CI_PAGE_SIZE (requires Camunda 8 at localhost:8080)', { timeout: 600_000 }, () => { | ||
| before(() => { | ||
| // Create temp directories for BPMN files and CLI data dir | ||
| const base = join(tmpdir(), `c8ctl-pagination-test-${Date.now()}`); | ||
| bpmnDir = join(base, 'bpmn'); | ||
| dataDir = join(base, 'data'); | ||
| mkdirSync(bpmnDir, { recursive: true }); | ||
| mkdirSync(dataDir, { recursive: true }); | ||
|
|
||
| // Generate BPMN files with ids mini-process-1 .. mini-process-<DEPLOY_COUNT> | ||
| for (let i = 1; i <= DEPLOY_COUNT; i++) { | ||
| const id = `mini-process-${i}`; | ||
| writeFileSync(join(bpmnDir, `${id}.bpmn`), bpmnWithId(id)); | ||
| } | ||
|
|
||
| // Deploy in batches to avoid multipart request size limits | ||
| const allFiles = readdirSync(bpmnDir).filter(f => f.endsWith('.bpmn')).sort(); | ||
|
|
||
| for (let i = 0; i < allFiles.length; i += DEPLOY_BATCH_SIZE) { | ||
| const batch = allFiles.slice(i, i + DEPLOY_BATCH_SIZE); | ||
| const batchPaths = batch.map(f => join(bpmnDir, f)); | ||
| const result = cli('deploy', ...batchPaths); | ||
| assert.strictEqual( | ||
| result.status, 0, | ||
| `Deploy batch ${Math.floor(i / DEPLOY_BATCH_SIZE) + 1} should exit 0. stderr: ${result.stderr}`, | ||
| ); | ||
| } | ||
| }); | ||
|
|
||
| after(() => { | ||
| const base = join(bpmnDir, '..'); | ||
| if (existsSync(base)) { | ||
| rmSync(base, { recursive: true, force: true }); | ||
| } | ||
| }); | ||
|
|
||
| test(`search pd --id=mini-process-* returns all ${DEPLOY_COUNT} definitions`, { timeout: POLL_TIMEOUT_MS + 30_000 }, async () => { | ||
| // Switch output to JSON for easy parsing | ||
| cli('output', 'json'); | ||
|
|
||
| // Poll until Elasticsearch has indexed all deployed definitions | ||
| await pollUntil(async () => { | ||
| const result = cli('search', 'pd', '--id=mini-process-*'); | ||
| if (result.status !== 0) return false; | ||
| try { | ||
| return JSON.parse(result.stdout).length >= DEPLOY_COUNT; | ||
| } catch { return false; } | ||
| }, POLL_TIMEOUT_MS, POLL_INTERVAL_MS); | ||
|
|
||
| // Final assertion | ||
| const finalResult = cli('search', 'pd', '--id=mini-process-*'); | ||
| assert.strictEqual(finalResult.status, 0, `search should exit 0. stderr: ${finalResult.stderr}`); | ||
|
|
||
| const items = JSON.parse(finalResult.stdout); | ||
| assert.ok(items.length >= DEPLOY_COUNT, `Expected >= ${DEPLOY_COUNT} definitions, got ${items.length}`); | ||
| assert.ok(items.length > 1000, `Result count (${items.length}) should exceed CI_PAGE_SIZE (1000)`); | ||
| }); | ||
|
|
||
| test(`list pd returns all ${DEPLOY_COUNT} definitions`, { timeout: POLL_TIMEOUT_MS + 30_000 }, async () => { | ||
| cli('output', 'json'); | ||
|
|
||
| await pollUntil(async () => { | ||
| const result = cli('list', 'pd'); | ||
| if (result.status !== 0) return false; | ||
| try { | ||
| return JSON.parse(result.stdout).length >= DEPLOY_COUNT; | ||
| } catch { return false; } | ||
| }, POLL_TIMEOUT_MS, POLL_INTERVAL_MS); | ||
|
|
||
| const finalResult = cli('list', 'pd'); | ||
| assert.strictEqual(finalResult.status, 0, `list pd should exit 0. stderr: ${finalResult.stderr}`); | ||
|
|
||
| const items = JSON.parse(finalResult.stdout); | ||
| assert.ok(items.length >= DEPLOY_COUNT, `Expected >= ${DEPLOY_COUNT} definitions, got ${items.length}`); | ||
| assert.ok(items.length > 1000, `Result count (${items.length}) should exceed CI_PAGE_SIZE (1000)`); | ||
| }); | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fetchAllPages'spageSizeparameter is typed asnumber, but this passesundefinedwhenhasCiFilteris false. Instrictmode this won’t compile. Either callfetchAllPageswithout thepageSizeargument when you want the default, or provide a concretenumber.