Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
"migration:test-idempotency": "tsx src/scripts/test-migration-idempotency.ts",
"migrations:types": "tsx src/scripts/migrations-types.ts",
"docs:export": "tsx ./src/scripts/export-docs.ts",
"jobs:list": "tsx src/scripts/jobs-client.ts list",
"jobs:backup": "tsx src/scripts/jobs-client.ts backup",
"jobs:restore": "tsx src/scripts/jobs-client.ts restore",
"pprof:capture": "tsx src/scripts/pprof-client.ts",
"test:dummy-data": "tsx -r dotenv/config ./src/test/db/import-dummy-data.ts",
"test:unit": "vitest run --config vitest.unit.config.ts",
Expand Down
163 changes: 163 additions & 0 deletions src/http/routes/admin/queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,14 @@ import { MoveJobs, UpgradePgBossV10 } from '@storage/events'
import { FastifyInstance, RequestGenericInterface } from 'fastify'
import { FromSchema } from 'json-schema-to-ts'
import { getConfig } from '../../../config'
import {
backupQueueOverflow,
JOB_OVERFLOW_LIST_LIMIT_DEFAULT,
JOB_OVERFLOW_RESTORE_LIMIT_DEFAULT,
listQueueOverflow,
parseQueueOverflowCsv,
restoreQueueOverflow,
} from '../../../internal/queue/overflow'
import apiKey from '../../plugins/apikey'

const { pgQueueEnable } = getConfig()
Expand All @@ -25,10 +33,130 @@ const moveJobsSchema = {
},
} as const

const listQueueOverflowSchema = {
description: 'List created pgBoss jobs from the live queue table or overflow backup table.',
querystring: {
type: 'object',
properties: {
source: {
type: 'string',
enum: ['job', 'backup'],
default: 'job',
},
groupBy: {
type: 'string',
enum: ['summary', 'tenant'],
default: 'summary',
},
name: {
type: 'string',
minLength: 1,
},
eventTypes: {
type: 'string',
minLength: 1,
description: 'Comma-separated event types to filter on.',
},
tenantRefs: {
type: 'string',
minLength: 1,
description: 'Comma-separated tenant refs to filter on.',
},
limit: {
type: 'integer',
minimum: 1,
default: JOB_OVERFLOW_LIST_LIMIT_DEFAULT,
},
},
additionalProperties: false,
},
} as const

const backupQueueOverflowSchema = {
description: 'Move created pgBoss jobs into the overflow backup table.',
body: {
type: 'object',
properties: {
name: {
type: 'string',
minLength: 1,
},
eventTypes: {
type: 'array',
minItems: 1,
items: {
type: 'string',
minLength: 1,
},
},
tenantRefs: {
type: 'array',
minItems: 1,
items: {
type: 'string',
minLength: 1,
},
},
limit: {
type: 'integer',
minimum: 1,
},
},
additionalProperties: false,
},
} as const

const restoreQueueOverflowSchema = {
description: 'Restore created pgBoss jobs from the overflow backup table in batches.',
body: {
type: 'object',
properties: {
name: {
type: 'string',
minLength: 1,
},
eventTypes: {
type: 'array',
minItems: 1,
items: {
type: 'string',
minLength: 1,
},
},
tenantRefs: {
type: 'array',
minItems: 1,
items: {
type: 'string',
minLength: 1,
},
},
limit: {
type: 'integer',
minimum: 1,
default: JOB_OVERFLOW_RESTORE_LIMIT_DEFAULT,
},
},
additionalProperties: false,
},
} as const

interface MoveJobsRequestInterface extends RequestGenericInterface {
Body: FromSchema<typeof moveJobsSchema.body>
}

interface ListQueueOverflowRequestInterface extends RequestGenericInterface {
Querystring: FromSchema<typeof listQueueOverflowSchema.querystring>
}

interface BackupQueueOverflowRequestInterface extends RequestGenericInterface {
Body: FromSchema<typeof backupQueueOverflowSchema.body>
}

interface RestoreQueueOverflowRequestInterface extends RequestGenericInterface {
Body: FromSchema<typeof restoreQueueOverflowSchema.body>
}

export default async function routes(fastify: FastifyInstance) {
fastify.register(apiKey)

Expand Down Expand Up @@ -63,4 +191,39 @@ export default async function routes(fastify: FastifyInstance) {
return reply.send({ message: 'Move jobs scheduled' })
}
)

fastify.get<ListQueueOverflowRequestInterface>(
'/overflow',
{ schema: { ...listQueueOverflowSchema, tags: ['queue'] } },
async (req, reply) => {
const data = await listQueueOverflow({
source: req.query.source,
groupBy: req.query.groupBy,
name: req.query.name,
eventTypes: parseQueueOverflowCsv(req.query.eventTypes),
tenantRefs: parseQueueOverflowCsv(req.query.tenantRefs),
limit: req.query.limit,
})

return reply.send(data)
}
)

fastify.post<BackupQueueOverflowRequestInterface>(
'/overflow/backup',
{ schema: { ...backupQueueOverflowSchema, tags: ['queue'] } },
async (req, reply) => {
const data = await backupQueueOverflow(req.body)
return reply.send(data)
}
)

fastify.post<RestoreQueueOverflowRequestInterface>(
'/overflow/restore',
{ schema: { ...restoreQueueOverflowSchema, tags: ['queue'] } },
async (req, reply) => {
const data = await restoreQueueOverflow(req.body)
return reply.send(data)
}
)
}
1 change: 1 addition & 0 deletions src/internal/queue/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './event'
export * from './overflow'
export * from './queue'
63 changes: 63 additions & 0 deletions src/internal/queue/overflow.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import {
buildQueueOverflowWhereClause,
normalizeQueueOverflowFilters,
parseQueueOverflowCsv,
} from './overflow'

describe('parseQueueOverflowCsv', () => {
it('returns undefined for empty input', () => {
expect(parseQueueOverflowCsv(undefined)).toBeUndefined()
expect(parseQueueOverflowCsv(' , , ')).toBeUndefined()
})

it('trims, de-duplicates, and preserves order', () => {
expect(
parseQueueOverflowCsv(' ObjectRemoved:Delete, ObjectCreated:Put,ObjectRemoved:Delete ')
).toEqual(['ObjectRemoved:Delete', 'ObjectCreated:Put'])
})
})

describe('normalizeQueueOverflowFilters', () => {
it('trims values and removes empty strings', () => {
expect(
normalizeQueueOverflowFilters({
name: ' webhooks ',
eventTypes: [' ObjectRemoved:Delete ', ''],
tenantRefs: [' tenant-a ', 'tenant-a', ' '],
})
).toEqual({
name: 'webhooks',
eventTypes: ['ObjectRemoved:Delete'],
tenantRefs: ['tenant-a'],
})
})
})

describe('buildQueueOverflowWhereClause', () => {
it('always scopes queries to created jobs', () => {
expect(buildQueueOverflowWhereClause({})).toEqual({
sql: 'state = ?',
bindings: ['created'],
})
})

it('adds queue, event-type, and tenant filters in a stable order', () => {
expect(
buildQueueOverflowWhereClause({
name: ' webhooks ',
eventTypes: ['ObjectRemoved:Delete', ' ObjectCreated:Put '],
tenantRefs: ['tenant-b', 'tenant-a'],
})
).toEqual({
sql: "state = ? AND name = ? AND data->'event'->>'type' IN (?, ?) AND data->'tenant'->>'ref' IN (?, ?)",
bindings: [
'created',
'webhooks',
'ObjectRemoved:Delete',
'ObjectCreated:Put',
'tenant-b',
'tenant-a',
],
})
})
})
Loading