-
Notifications
You must be signed in to change notification settings - Fork 229
Expand file tree
/
Copy pathbullmq.js
More file actions
133 lines (112 loc) · 3 KB
/
Copy pathbullmq.js
File metadata and controls
133 lines (112 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
const Arena = require('../');
const IORedis = require('ioredis');
const {Queue, Worker, FlowProducer} = require('bullmq');
// Select ports that are unlikely to be used by other services a developer might be running locally.
const HTTP_SERVER_PORT = 4735;
const REDIS_SERVER_PORT = 6379;
async function main() {
const queueName = 'name_of_my_queue';
const parentQueueName = 'name_of_my_parent_queue';
const connection = new IORedis({port: REDIS_SERVER_PORT});
const queue = new Queue(queueName, {
connection: {port: REDIS_SERVER_PORT},
});
new Queue(parentQueueName, {
connection: {port: REDIS_SERVER_PORT},
});
const flow = new FlowProducer({
connection: {port: REDIS_SERVER_PORT},
});
new Worker(
queueName,
async function (job) {
await job.updateProgress(20);
// Wait 5sec
await new Promise((res) => setTimeout(res, 5000));
// Randomly succeeds or fails the job to put some jobs in completed and some in failed.
if (Math.random() > 0.5) {
throw new Error('fake error');
}
},
{
concurrency: 10,
connection: {port: REDIS_SERVER_PORT},
}
);
new Worker(
parentQueueName,
async function () {
// Wait 10sec
await new Promise((res) => setTimeout(res, 10000));
// Randomly succeeds or fails the job to put some jobs in completed and some in failed.
if (Math.random() > 0.5) {
throw new Error('fake error');
}
},
{
connection: {port: REDIS_SERVER_PORT},
}
);
const children = Array.from(Array(65).keys()).map((index) => ({
name: 'child',
data: {idx: index, foo: 'bar'},
queueName,
}));
await flow.add({
name: 'parent-job',
queueName: parentQueueName,
data: {},
children,
});
// adding delayed jobs
const delayedJob = await queue.add('delayed', {}, {delay: 60 * 1000});
await queue.add(
'delayed',
{},
{
delay: 1000,
attempts: 4,
backoff: {
type: 'exponential',
delay: 60000,
},
}
);
await queue.add('cron', {}, {repeat: {pattern: '* 1 * 1 *'}});
delayedJob.log('Log message');
Arena(
{
BullMQ: Queue,
queues: [
{
// Required for each queue definition.
name: queueName,
// User-readable display name for the host. Required.
hostId: 'Queue Server 1',
// Queue type (Bull or Bullmq or Bee - default Bull).
type: 'bullmq',
redis: connection,
},
{
// Required for each queue definition.
name: parentQueueName,
// User-readable display name for the host. Required.
hostId: 'Queue Server 2',
// Queue type (Bull or Bullmq or Bee - default Bull).
type: 'bullmq',
redis: {
// host: 'localhost',
port: REDIS_SERVER_PORT,
},
},
],
},
{
port: HTTP_SERVER_PORT,
}
);
}
main().catch((err) => {
console.error(err);
process.exit(1);
});