forked from clientIO/appmixer-connectors
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathconnections.js
355 lines (305 loc) · 13.5 KB
/
connections.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
'use strict';
const { Kafka, logLevel, CompressionTypes, CompressionCodecs } = require('kafkajs');
const tmp = require('tmp');
const RegexParser = require('regex-parser');
const SnappyCodec = require('kafkajs-snappy');
CompressionCodecs[CompressionTypes.Snappy] = SnappyCodec;
// Note that we cannot simply define with `const KAFKA_CONNECTOR_OPEN_CONNECTIONS = {}; `.
// This is because the Appmixer engine clears the "require" cache
// when loading individual component code. Therefore, different Kafka componnets will not share the same OPEN_CONNECTIONS object
// even on the same node! Therefore, we take advantage of the global `process` object to get the variable if it exists, or create it if it doesn't.
// [connectionId]: consumer/producer object
let KAFKA_CONNECTOR_OPEN_CONNECTIONS;
if (process.KAFKA_CONNECTOR_OPEN_CONNECTIONS) {
KAFKA_CONNECTOR_OPEN_CONNECTIONS = process.KAFKA_CONNECTOR_OPEN_CONNECTIONS;
} else {
process.KAFKA_CONNECTOR_OPEN_CONNECTIONS = KAFKA_CONNECTOR_OPEN_CONNECTIONS = {};
}
const initClient = async (context, auth, connectionId) => {
let tmpDir;
let tmpFile;
const {
clientId,
brokers,
ssl,
saslMechanism,
saslUsername,
saslPassword,
sslRejectUnauthorized,
connectionTimeout = 10000
} = auth;
// Define the Kafka client configuration
const config = {
clientId,
logLevel: context.config?.logLevel ? logLevel[context.config.logLevel.toUpperCase()] : logLevel.INFO,
logCreator: (level) => {
return ({ namespace, level, label, log }) => {
if (context.log) {
let logString;
try {
if (typeof log === 'string') {
logString = log;
} else {
logString = JSON.stringify(log);
}
} catch (e) {
logString = Object.keys(log || {}).join(', ');
}
context.log('info', '[KAFKA] ' + [
'namespace: ' + namespace,
'level: ' + level,
'label: ' + label,
'gridInstanceId: ' + context.gridInstanceId,
'log: ' + logString
].join('; '));
}
};
},
brokers: brokers.split(',').map(broker => broker.trim()),
connectionTimeout: context.config?.connectionTimeout || connectionTimeout,
ssl: ssl ? ssl.toLowerCase() === 'true' : !!saslMechanism,
sasl: saslMechanism
? {
mechanism: saslMechanism,
username: saslUsername,
password: saslPassword
}
: undefined
};
// Additional SSL options. They can override the default SSL options.
if (auth.tlsCA || auth.tlsKey || auth.tlsCert || sslRejectUnauthorized === 'true' || sslRejectUnauthorized === 'false') {
config.ssl = {};
}
if (sslRejectUnauthorized === 'true' || sslRejectUnauthorized === 'false') {
config.ssl = {
rejectUnauthorized: sslRejectUnauthorized === 'true'
};
}
if (auth.tlsCA) {
try {
tmpDir = tmp.dirSync();
tmpFile = `${tmpDir.name}/ca.pem`;
fs.writeFileSync(tmpFile, auth.tlsCA);
config.ssl.ca = [auth.tlsCA];
} catch (err) {
if (connectionId) {
await context.service.stateUnset(connectionId);
}
throw new Error(`Failed to create CA certificate: ${err.message}`);
}
}
if (auth.tlsKey) {
try {
tmpDir = tmp.dirSync();
tmpFile = `${tmpDir.name}/key.pem`;
fs.writeFileSync(tmpFile, auth.tlsKey);
config.ssl.key = fs.readFileSync(tmpFile, 'utf-8');
} catch (err) {
if (connectionId) {
await context.service.stateUnset(connectionId);
}
throw new Error(`Failed to create Access Key: ${err.message}`);
}
}
if (auth.tlsPassphrase) {
config.ssl.passphrase = auth.tlsPassphrase;
}
if (auth.tlsCert) {
try {
tmpDir = tmp.dirSync();
tmpFile = `${tmpDir.name}/cert.pem`;
fs.writeFileSync(tmpFile, auth.tlsCert);
config.ssl.cert = fs.readFileSync(tmpFile, 'utf-8');
} catch (err) {
if (connectionId) {
await context.service.stateUnset(connectionId);
}
throw new Error(`Failed to create Access Certificate: ${err.message}`);
}
}
// If any SASL options are provided, ignore the SSL certificate options. Keep `rejectUnauthorized` if provided.
if (saslMechanism) {
delete config.ssl?.ca;
delete config.ssl?.key;
delete config.ssl?.cert;
}
// Return the Kafka client initialized with the configuration
return new Kafka(config);
};
const cleanupEmpty = (obj) => {
return Object.keys(obj).reduce((res, key) => {
if (obj[key] !== undefined && !isNaN(obj[key])) { res[key] = obj[key]; }
return res;
}, {});
};
const loadConsumerOptions = function(context) {
let retry = {
maxRetryTime: parseInt(context.config.consumerRetryMaxRetryTime, 10),
initialRetryTime: parseInt(context.config.consumerRetryInitialRetryTime, 10),
factor: parseFloat(context.config.consumerRetryFactor),
multiplier: parseFloat(context.config.consumerRetryMultiplier),
retries: parseInt(context.config.consumerRetryRetries, 10)
};
let options = {
sessionTimeout: parseInt(context.config.consumerSessionTimeout, 10),
rebalanceTimeout: parseInt(context.config.consumerRebalanceTimeout, 10),
heartbeatInterval: parseInt(context.config.consumerHeartbeatInterval, 10),
metadataMaxAge: parseInt(context.config.consumerMetadataMaxAge, 10),
allowAutoTopicCreation: context.config.consumerAllowAutoTopicCreation !== undefined ? context.config.consumerAllowAutoTopicCreation === 'true' : undefined,
maxBytesPerPartition: parseInt(context.config.consumerMaxBytesPerPartition, 10),
minBytes: parseInt(context.config.consumerMinBytes, 10),
maxBytes: parseInt(context.config.consumerMaxBytes, 10),
maxWaitTimeInMs: parseInt(context.config.consumerMaxWaitTimeInMs, 10),
readUncommitted: context.config.consumerReadUncommitted !== undefined ? context.config.consumerReadUncommitted === 'true' : undefined,
maxInFlightRequests: parseInt(context.config.consumerMaxInFlightRequests, 10),
rackId: context.config.consumerRackId
};
options = cleanupEmpty(options);
retry = cleanupEmpty(retry);
if (Object.keys(retry).length) {
options.retry = retry;
}
return options;
};
/**
* Adds a Kafka consumer with the specified configuration.
* @param {Object} context The application context
* @param {Array} topics The topics to subscribe to
* @param {string} flowId The flow ID
* @param {string} componentId The component ID
* @param {string} groupId The consumer group ID
* @param {boolean} fromBeginning Whether to consume from the beginning of the topic
* @param {Object} auth Authentication configuration
* @param {string} connId Optional connection ID
* @param {number} partitionsConsumedConcurrently Number of partitions to consume concurrently (default: 3)
* @returns {Promise<string>} The connection ID of the created consumer
*/
const addConsumer = async (
context, topics, flowId, componentId, groupId, fromBeginning, auth, connId, partitionsConsumedConcurrently = 3
) => {
// Genereate a unique consumer connection ID that differes between flow runs. Therefore, one setting
// of the consumer followed by a flow stop and restart is not going to cause the job or message consumption
// to consider the consumer is valid.
const connectionId = connId || `consumer:${flowId}:${componentId}:${Math.random().toString(36).substring(7)}`;
await context.service.stateSet(connectionId, {
topics, flowId, componentId, groupId, fromBeginning, auth
});
const topicSubscriptions = topics?.AND.map(topic =>
topic.topic.startsWith('/') ? RegexParser(topic.topic) : topic.topic
);
const client = await initClient(context, auth, connectionId);
const consumerOptions = {
...loadConsumerOptions(context),
groupId
};
await context.log('info', '[KAFKA] Kafka consumer options: ' + JSON.stringify(consumerOptions));
const consumer = client.consumer(consumerOptions);
await consumer.connect();
KAFKA_CONNECTOR_OPEN_CONNECTIONS[connectionId] = consumer;
await consumer.subscribe({
topics: topicSubscriptions,
fromBeginning: fromBeginning || false
});
consumer.on(consumer.events.CRASH, async (error) => {
await context.log('info', '[KAFKA] Kafka consumer CRASH (' + connectionId + '). Removing consumer from local connections.');
await consumer.disconnect();
delete KAFKA_CONNECTOR_OPEN_CONNECTIONS[connectionId];
});
await consumer.run({
eachBatchAutoResolve: false,
partitionsConsumedConcurrently: Math.max(1, parseInt(partitionsConsumedConcurrently, 10) || 3),
// eachBatch has to be used instead of eachMessage because we don't want to resolve the
// offset if connection to the consumer was removed from the cluster state.
eachBatch: async ({ batch, resolveOffset, heartbeat, isRunning, isStale }) => {
// First, make sure the consumer is still needed. The flow might have stopped
// which disconnected the consumer from open connections but only on one node in the cluster.
// move connection out of the loop.
const connection = await context.service.stateGet(connectionId);
if (!connection) {
await context.log('info', '[KAFKA] Kafka consumer connection is not available (' + connectionId + ').');
return connectionId;
}
for (let message of batch.messages) {
if (!isRunning() || isStale()) break;
const normalizedMessage = normalizeMessageData(message);
await context.triggerComponent(
flowId,
componentId,
normalizedMessage,
{ enqueueOnly: 'true' }
);
resolveOffset(message.offset);
await heartbeat();
}
}
});
return connectionId;
};
const normalizeMessageHeaders = (headers) => {
const normalizedHeaders = {};
if (headers) {
Object.entries(headers).forEach(([key, value]) => {
normalizedHeaders[key] = Buffer.isBuffer(value) ? value.toString('utf8') : value || '';
});
}
return normalizedHeaders;
};
const normalizeMessageData = (message) => {
return {
key: message.key ? message.key.toString() : null,
value: message.value ? message.value.toString() : null,
headers: normalizeMessageHeaders(message.headers || {})
};
};
const addProducer = async (context, flowId, componentId, auth, connId) => {
const connectionId = connId || `producer:${flowId}:${componentId}:${Math.random().toString(36).substring(7)}`;
await context.service.stateSet(connectionId, {
flowId, componentId, auth
});
const client = await initClient(context, auth, connectionId);
const producer = client.producer();
await producer.connect();
KAFKA_CONNECTOR_OPEN_CONNECTIONS[connectionId] = producer;
return connectionId;
};
const sendMessage = async (context, flowId, componentId, connectionId, payload) => {
let producer = KAFKA_CONNECTOR_OPEN_CONNECTIONS[connectionId];
if (!producer) {
const connection = await context.service.stateGet(connectionId);
await addProducer(context, flowId, componentId, connection.auth, connectionId);
producer = KAFKA_CONNECTOR_OPEN_CONNECTIONS[connectionId];
}
/**
When acks is set to 0, the promise is not resolved causing `connection.sendMessage` call timeout.
Considering the nature of `acks=0`, we can assume the message has been sent (unless no exceptions are thrown).
From the Kafka docs:
If (acks) set to zero then the producer will not wait for any acknowledgment from the server at all.
The record will be immediately added to the socket buffer and considered sent.
No guarantee can be made that the server has received the record in this case, and the
https://kafka.apache.org/documentation/#producerconfigs_acks
*/
if (payload.acks === '0') {
producer.send(payload);
return Promise.resolve({});
}
return await producer.send(payload);
};
const removeConnection = async (context, connectionId) => {
await context.log('info', `[KAFKA] Removing connection ${connectionId}.`);
await context.service.stateUnset(connectionId);
const connection = KAFKA_CONNECTOR_OPEN_CONNECTIONS[connectionId];
if (!connection) return; // Connection doesn't exist, do nothing
await connection.disconnect();
delete KAFKA_CONNECTOR_OPEN_CONNECTIONS[connectionId];
};
const listConnections = () => { return KAFKA_CONNECTOR_OPEN_CONNECTIONS; };
const isConsumerConnection = (connectionId) => connectionId.startsWith('consumer:');
module.exports = {
initClient,
addConsumer,
addProducer,
sendMessage,
removeConnection,
listConnections,
isConsumerConnection
};