-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathNotificationQueuePopulator.js
More file actions
365 lines (350 loc) · 14.2 KB
/
NotificationQueuePopulator.js
File metadata and controls
365 lines (350 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
const assert = require('assert');
const util = require('util');
const { isMasterKey } = require('arsenal').versioning;
const { mpuBucketPrefix, supportedNotificationEvents } = require('arsenal').constants;
const VID_SEPERATOR = require('arsenal').versioning.VersioningConstants.VersionId.Separator;
const configUtil = require('./utils/config');
const safeJsonParse = require('./utils/safeJsonParse');
const messageUtil = require('./utils/message');
const notifConstants = require('./constants');
const QueuePopulatorExtension =
require('../../lib/queuePopulator/QueuePopulatorExtension');
const { traceHeadersFromEntry } = require('../../lib/tracing/kafkaTraceContext');
class NotificationQueuePopulator extends QueuePopulatorExtension {
/**
* @constructor
* @param {Object} params - constructor params
* @param {Object} params.config - notification configuration object
* @param {Logger} params.logger - logger object
* @param {Object} params.bnConfigManager - bucket notification config
* manager
*/
constructor(params) {
super(params);
this.notificationConfig = params.config;
this.bnConfigManager = params.bnConfigManager;
assert(this.bnConfigManager, 'bucket notification configuration manager'
+ ' is not set');
// callbackify functions
this._processObjectEntryCb = util.callbackify(this._processObjectEntry).bind(this);
this._metricsStore = params.metricsHandler;
}
/**
* Check if bucket entry based on bucket and key params
*
* @param {String} bucket - bucket
* @param {String} key - object key
* @return {boolean} - true if bucket entry
*/
_isBucketEntry(bucket, key) {
return ((bucket.toLowerCase() === this.notificationConfig.bucketMetastore && !!key)
|| key === undefined);
}
/**
* Get bucket attributes from log entry
*
* @param {Object} value - log entry object
* @return {Object|undefined} - bucket attributes if available
*/
_getBucketAttributes(value) {
if (value && value.attributes) {
const { error, result } = safeJsonParse(value.attributes);
if (error) {
return undefined;
}
return result;
}
return undefined;
}
/**
* Get bucket name from bucket attributes
*
* @param {Object} attributes - bucket attributes from log entry
* @return {String|undefined} - bucket name if available
*/
_getBucketNameFromAttributes(attributes) {
if (attributes && attributes.name) {
return attributes.name;
}
return undefined;
}
/**
* Get notification configuration from bucket attributes
*
* @param {Object} attributes - bucket attributes from log entry
* @return {Object|undefined} - notification configuration if available
*/
_getBucketNotificationConfiguration(attributes) {
if (attributes && attributes.notificationConfiguration) {
return attributes.notificationConfiguration;
}
return undefined;
}
/**
* Process bucket entry from the log
*
* @param {string} bucket - bucket name from log entry
* @param {Object} value - log entry object
* @return {undefined}
*/
_processBucketEntry(bucket, value) {
const attributes = this._getBucketAttributes(value);
const bucketName = this._getBucketNameFromAttributes(attributes);
const notificationConfiguration
= this._getBucketNotificationConfiguration(attributes);
if (notificationConfiguration &&
Object.keys(notificationConfiguration).length > 0) {
const bnConfig = {
bucket: bucketName,
notificationConfiguration,
};
// bucket notification config is available, update node
this.bnConfigManager.setConfig(bucketName, bnConfig);
return undefined;
}
// bucket was deleter or notification conf has been removed, so remove zk node
this.bnConfigManager.removeConfig(bucketName || bucket);
return undefined;
}
/**
* Returns the correct versionId
* to display according to the
* versioning state of the object
* @param {Object} value log entry object
* @param {Object} overheadFields - extra fields
* @param {Object} overheadFields.versionId - object versionId
* @return {String} versionId
*/
_getVersionId(value, overheadFields) {
const versionId = value.versionId || (overheadFields && overheadFields.versionId);
const isNullVersion = value.isNull;
const isVersioned = !!versionId;
// Versioning suspended objects have
// a versionId, however it is internal
// and should not be used to get the object
if (isNullVersion || !isVersioned) {
return null;
} else {
return versionId;
}
}
/**
* Decides if we should process the entry.
* Since we get both master and version events,
* we need to avoid pushing two notifications for
* the same event.
* - For non versiond buckets, we process the master
* objects' events.
* - For versioned buckets, we process version events
* and ignore all master events.
* - For versioning suspended buckets, we need to process
* both master and version events, as the master is considered
* a separate version.
* @param {String} key object key
* @param {Object} value object metadata
* @return {boolean} - true if entry is valid
*/
_shouldProcessEntry(key, value) {
const isMaster = isMasterKey(key);
const isVersionedMaster = isMaster && !!value.versionId;
const isNullVersion = isMaster && value.isNull;
if (!isMaster
|| !isVersionedMaster
|| isNullVersion
) {
return true;
}
return false;
}
/**
* Notification rules are normally verified when setting the notification
* configuration (even for wildcards), however we need an explicit check at
* this level to filter out non standard events that might be valid for one
* of the wildcard rules set.
* @param {String} eventType - notification event type
* @returns {boolean} - true if notification is supported
*/
_isNotificationEventSupported(eventType) {
return supportedNotificationEvents.has(eventType);
}
/**
* Extract base key from versioned key
*
* @param {String} key - object key
* @return {String} - versioned base key
*/
_extractVersionedBaseKey(key) {
return key.split(VID_SEPERATOR)[0];
}
/**
* Returns the dateTime of the event
* based on the event message property if existent
* or overhead fields
* @param {ObjectMD} value object metadata
* @param {Object} overheadFields overhead fields
* @param {Object} overheadFields.commitTimestamp - Kafka commit timestamp
* @param {Object} overheadFields.opTimestamp - MongoDB operation timestamp
* @returns {string} dateTime of the event
*/
_getEventDateTime(value, overheadFields) {
if (overheadFields) {
return overheadFields.opTimestamp || overheadFields.commitTimestamp || null;
}
return (value && value[notifConstants.eventMessageProperty.dateTime]) || null;
}
/**
* Process object entry from the log
*
* @param {String} bucket - bucket
* @param {String} key - object key
* @param {Object} value - log entry object
* @param {String} type - log entry type
* @param {Object} overheadFields - extra fields
* @param {Object} overheadFields.commitTimestamp - Kafka commit timestamp
* @param {Object} overheadFields.opTimestamp - MongoDB operation timestamp
* @return {undefined}
*/
async _processObjectEntry(bucket, key, value, type, overheadFields) {
try {
this._metricsStore.notifEvent();
if (!this._shouldProcessEntry(key, value)) {
return undefined;
}
const { eventMessageProperty, deleteEvent } = notifConstants;
// Delete can have an overheadField originOp that should be used for lifecycle expiration (on metadata)
// This takes precedence over the originOp from the value object
let eventType = overheadFields?.[eventMessageProperty.eventType] || value[eventMessageProperty.eventType];
if (eventType === undefined && type === 'del') {
eventType = deleteEvent;
}
if (!this._isNotificationEventSupported(eventType)) {
return undefined;
}
const baseKey = this._extractVersionedBaseKey(key);
const versionId = this._getVersionId(value, overheadFields);
const dateTime = this._getEventDateTime(value, overheadFields);
const config = await this.bnConfigManager.getConfig(bucket);
if (config && Object.keys(config).length > 0) {
const ent = {
bucket,
key: baseKey,
eventType,
versionId,
dateTime,
};
this.log.debug('validating entry', {
method: 'NotificationQueuePopulator._processObjectEntry',
bucket,
key,
eventType,
});
const pushedToTopic = new Map();
// validate and push kafka message foreach destination topic
this.notificationConfig.destinations.forEach(destination => {
const topic = destination.internalTopic ||
this.notificationConfig.topic;
// avoid pushing a message multiple times to the
// same internal topic
if (pushedToTopic[topic]) {
return undefined;
}
// get destination specific notification config
const queueConfig = config.notificationConfiguration.queueConfig.filter(
c => c.queueArn.split(':').pop() === destination.resource
);
if (!queueConfig.length) {
// skip, if there is no config for the current
// destination resource
return undefined;
}
// pass only destination resource specific config to
// validate entry
const destConfig = {
bucket,
notificationConfiguration: {
queueConfig,
},
};
const { isValid, matchingConfig } = configUtil.validateEntry(destConfig, ent);
if (isValid) {
const message
= messageUtil.addLogAttributes(value, ent);
this.log.info('publishing message', {
method: 'NotificationQueuePopulator._processObjectEntry',
bucket,
key: message.key,
versionId,
eventType,
eventTime: message.dateTime,
matchingConfig,
});
// Propagate the oplog entry's trace context as
// Kafka headers so the notification-processor
// consumer becomes a child of the original S3
// trace (same pattern as ReplicationQueuePopulator).
const traceHeaders = traceHeadersFromEntry(value);
this.publish(topic,
// keeping all messages for same object
// in the same partition to keep the order.
// here we use the object name and not the
// "_id" which also includes the versionId
`${bucket}/${message.key}`,
JSON.stringify(message),
undefined,
traceHeaders);
// keep track of internal topics we have pushed to
pushedToTopic[topic] = true;
}
return undefined;
});
}
// skip if there is no bucket notification configuration
return undefined;
} catch (error) {
this.log.error('Error processing object entry', {
method: 'NotificationQueuePopulator._processObjectEntry',
bucket,
key,
error: error.message,
});
throw error;
}
}
/**
* filter
*
* @param {Object} entry - log entry
* @param {Function} cb - callback
* @return {undefined} Promise|undefined
*/
filterAsync(entry, cb) {
const { bucket, key, type, overheadFields } = entry;
const value = entry.value || '{}';
const { error, result } = safeJsonParse(value);
// ignore if entry's value is not valid
if (error) {
this.log.error('could not parse log entry', { value, error });
return cb();
}
// ignore mpu's or if the entry has no bucket
if (!bucket || (key && key.startsWith(mpuBucketPrefix))) {
return cb();
}
// ignore internal buckets
if (bucket.includes('..')) {
this.log.trace('skipping internal bucket entry', { bucket });
return cb();
}
// bucket notification configuration updates
if (bucket && result && this._isBucketEntry(bucket, key)) {
this._processBucketEntry(key, result);
return cb();
}
// object entry processing - filter and publish
if (key && result) {
return this._processObjectEntryCb(bucket, key, result, type, overheadFields, cb);
}
return cb();
}
}
module.exports = NotificationQueuePopulator;