Skip to content

Commit f591167

Browse files
committed
feat: queues for postman email, tele and slack
1 parent 8a4019d commit f591167

File tree

6 files changed

+95
-0
lines changed

6 files changed

+95
-0
lines changed

packages/backend/src/apps/postman/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { IApp } from '@plumber/types'
22

3+
import { getGenericAppQueue } from '@/queues/helpers/get-generic-app-queue'
4+
35
import actions from './actions'
46

57
const app: IApp = {
@@ -18,6 +20,7 @@ const app: IApp = {
1820
url: 'https://demo.arcade.software/VppMAbGKfFXFEsKxnKiw?embed&show_copy_link=true',
1921
title: 'Setting up Email by Postman',
2022
},
23+
queue: getGenericAppQueue('POSTMAN_EMAIL'),
2124
}
2225

2326
export default app

packages/backend/src/apps/slack/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { IApp } from '@plumber/types'
22

3+
import { getGenericAppQueue } from '@/queues/helpers/get-generic-app-queue'
4+
35
import addAuthHeader from './common/add-auth-header'
46
import actions from './actions'
57
import auth from './auth'
@@ -17,6 +19,7 @@ const app: IApp = {
1719
auth,
1820
actions,
1921
dynamicData,
22+
queue: getGenericAppQueue('SLACK'),
2023
}
2124

2225
export default app

packages/backend/src/apps/telegram-bot/index.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { IApp } from '@plumber/types'
22

3+
import { getGenericAppQueue } from '@/queues/helpers/get-generic-app-queue'
4+
35
import addAuthHeader from './common/add-auth-header'
46
import rateLimitHandler from './common/interceptor/rate-limit'
57
import actions from './actions'
@@ -19,6 +21,7 @@ const app: IApp = {
1921
dynamicData,
2022
auth,
2123
actions,
24+
queue: getGenericAppQueue('TELEGRAM'),
2225
}
2326

2427
export default app
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export const QUEUE_CONCURRENCY = {
2+
POSTMAN_EMAIL: 3,
3+
SLACK: 3,
4+
TELEGRAM: 3,
5+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import { describe, expect, it } from 'vitest'
2+
3+
import { QUEUE_CONCURRENCY } from '@/config/queues'
4+
5+
import { getGenericAppQueue } from '../helpers/get-generic-app-queue'
6+
7+
describe('tests for apps with generic queue', () => {
8+
// apps that have a generic queue
9+
const apps = Object.keys(QUEUE_CONCURRENCY)
10+
11+
it.each(apps)(
12+
'should return a generic queue with the correct group config for %s',
13+
async (app) => {
14+
const appKey = app as keyof typeof QUEUE_CONCURRENCY
15+
const queue = getGenericAppQueue(appKey)
16+
const groupConfig = await queue.getGroupConfigForJob({
17+
flowId: 'some-flow-id',
18+
executionId: 'some-execution-id',
19+
stepId: 'some-step-id',
20+
})
21+
expect(groupConfig).toEqual({ id: app })
22+
},
23+
)
24+
25+
it.each(apps)(
26+
'should return a generic queue with the correct concurrency for %s',
27+
(app) => {
28+
const appKey = app as keyof typeof QUEUE_CONCURRENCY
29+
const queue = getGenericAppQueue(appKey)
30+
expect(queue.groupLimits.type).toBe('concurrency')
31+
if (queue.groupLimits.type === 'concurrency') {
32+
expect(queue.groupLimits.concurrency).toBe(QUEUE_CONCURRENCY[appKey])
33+
}
34+
},
35+
)
36+
37+
it('should return a generic queue with concurrency 2 if the app is not in the QUEUE_CONCURRENCY object', async () => {
38+
const queue = getGenericAppQueue(
39+
'NOT_A_REAL_APP' as keyof typeof QUEUE_CONCURRENCY,
40+
)
41+
const groupConfig = await queue.getGroupConfigForJob({
42+
flowId: 'some-flow-id',
43+
executionId: 'some-execution-id',
44+
stepId: 'some-step-id',
45+
})
46+
expect(groupConfig).toEqual({ id: 'NOT_A_REAL_APP' })
47+
48+
expect(queue.groupLimits.type).toBe('concurrency')
49+
if (queue.groupLimits.type === 'concurrency') {
50+
expect(queue.groupLimits.concurrency).toBe(2)
51+
}
52+
})
53+
})
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import type { IAppQueue } from '@plumber/types'
2+
3+
import { QUEUE_CONCURRENCY } from '@/config/queues'
4+
5+
/**
6+
* NOTE: this creates a generic queue for apps that do not require
7+
* any special queue configuration.
8+
* It adds a generic group id to the jobs to ensure that the
9+
* concurrency limit is applied.
10+
*/
11+
export function getGenericAppQueue(
12+
app: keyof typeof QUEUE_CONCURRENCY,
13+
): IAppQueue {
14+
const concurrency = QUEUE_CONCURRENCY[app] || 2
15+
16+
const getGroupConfigForJob = async () => {
17+
return { id: app }
18+
}
19+
20+
return {
21+
getGroupConfigForJob,
22+
groupLimits: {
23+
type: 'concurrency',
24+
concurrency: concurrency,
25+
},
26+
isQueueDelayable: false,
27+
} satisfies IAppQueue
28+
}

0 commit comments

Comments
 (0)