-
Notifications
You must be signed in to change notification settings - Fork 4k
Expand file tree
/
Copy pathamqp.js
More file actions
99 lines (96 loc) · 3.08 KB
/
amqp.js
File metadata and controls
99 lines (96 loc) · 3.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
var container = require('rhea') // https://github.com/amqp/rhea
var fs = require('fs');
var path = require('path');
const {log, error} = require('./utils.js')
var connectionOptions = getConnectionOptions()
function getAmqpConnectionOptions() {
return {
'scheme': process.env.RABBITMQ_AMQP_SCHEME || 'amqp',
'host': process.env.RABBITMQ_HOSTNAME || 'rabbitmq',
'port': process.env.RABBITMQ_AMQP_PORT || 5672,
'username' : process.env.RABBITMQ_AMQP_USERNAME || 'guest',
'password' : process.env.RABBITMQ_AMQP_PASSWORD || 'guest',
'id': "selenium-connection-id",
'container_id': "selenium-container-id"
}
}
function getAmqpsConnectionOptions() {
let options = getAmqpConnectionOptions()
let useMtls = process.env.AMQP_USE_MTLS || false
if (useMtls) {
options['enable_sasl_external'] = true
}
options['transport'] = 'tls'
let certsLocation = process.env.RABBITMQ_CERTS
options['key'] = fs.readFileSync(path.resolve(certsLocation,'client_rabbitmq_key.pem'))
options['cert'] = fs.readFileSync(path.resolve(certsLocation,'client_rabbitmq_certificate.pem'))
options['ca'] = fs.readFileSync(path.resolve(certsLocation,'ca_rabbitmq_certificate.pem'))
return options
}
function getConnectionOptions() {
let scheme = process.env.RABBITMQ_AMQP_SCHEME || 'amqp'
log("Using AMQP protocol: " + scheme)
switch(scheme){
case "amqp":
return getAmqpConnectionOptions()
case "amqps":
return getAmqpsConnectionOptions()
}
}
module.exports = {
getAmqpConnectionOptions: () => { return connectionOptions },
getAmqpUrl: () => {
return connectionOptions.scheme + '://' +
connectionOptions.username + ":" + connectionOptions.password + "@" +
connectionOptions.host + ":" + connectionOptions.port
},
open: (queueName = "/queues/my-queue") => {
let promise = new Promise((resolve, reject) => {
container.on('connection_open', function(context) {
resolve()
})
})
log("Opening amqp connection using " + JSON.stringify(connectionOptions,
(key, value) => {
// Omit the private key from the log output.
if (key === "key") return undefined;
return value;
}
))
let connection = container.connect(connectionOptions)
let receiver = connection.open_receiver({
source: queueName,
target: 'receiver-target',
name: 'receiver-link'
})
let sender = connection.open_sender({
target: queueName,
source: 'sender-source',
name: 'sender-link'
})
return {
'connection': connection,
'promise' : promise,
'receiver' : receiver,
'sender' : sender
}
},
openReceiver: (handler, queueName = "/queues/my-queue") => {
return handler.connection.open_receiver({
source: queueName,
target: 'receiver-target',
name: 'receiver-link'
})
},
close: (connection) => {
if (connection != null) {
connection.close()
}
},
once: (event, callback) => {
container.once(event, callback)
},
on: (event, callback) => {
container.on(event, callback)
}
}