Skip to content

Commit 11a94ea

Browse files
committed
Remove messageId and duplicate detection
Remove messageId generation and propagation throughout the lifecycle pipeline. Remove duplicate message detection logic from bucket processor as duplicate detection is a complex topic that requires a dedicated PR and the current implementation was incomplete (didn't handle parallel scans) Changes: - Remove messageId generation in LifecycleConductor._taskToMessage() - Remove messageId extraction and logging in LifecycleBucketProcessor - Remove _currentScanMessageIds Set and duplicate detection logic - Remove messageId from all addContext() and addDefaultFields() calls - Remove uniqueMessagesProcessed from scan completion logs Issue: BB-740
1 parent aefb677 commit 11a94ea

File tree

7 files changed

+11
-53
lines changed

7 files changed

+11
-53
lines changed

extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,6 @@ class LifecycleBucketProcessor {
100100

101101
// scan tracking for metrics
102102
this._currentScanId = null;
103-
this._currentScanMessageIds = new Set();
104103

105104
this.clientManager = new ClientManager({
106105
id: 'lifecycle',
@@ -163,11 +162,9 @@ class LifecycleBucketProcessor {
163162
this._log.info('bucket processor scan complete', {
164163
method: 'LifecycleBucketProcessor.drain',
165164
conductorScanId: this._currentScanId,
166-
uniqueMessagesProcessed: this._currentScanMessageIds.size,
167165
});
168166
LifecycleMetrics.onBucketProcessorScanEnd(this._log);
169167
this._currentScanId = null;
170-
this._currentScanMessageIds.clear();
171168
}
172169
});
173170

@@ -296,12 +293,10 @@ class LifecycleBucketProcessor {
296293
}
297294
const { bucket, owner, accountId, taskVersion } = result.target;
298295
const conductorScanId = result.contextInfo && result.contextInfo.conductorScanId;
299-
const messageId = result.contextInfo && result.contextInfo.messageId;
300296

301297
// detect scan boundary: new scanId → new scan started
302298
if (conductorScanId && conductorScanId !== this._currentScanId) {
303299
this._currentScanId = conductorScanId;
304-
this._currentScanMessageIds.clear(); // reset message tracking for new scan
305300
// Use current timestamp for scan start time gauge
306301
LifecycleMetrics.onBucketProcessorScanStart(
307302
this._log, Date.now()
@@ -313,28 +308,13 @@ class LifecycleBucketProcessor {
313308
});
314309
}
315310

316-
// Detect duplicate messages
317-
if (messageId) {
318-
if (this._currentScanMessageIds.has(messageId)) {
319-
this._log.warn('duplicate message detected', {
320-
method: 'LifecycleBucketProcessor._processBucketEntry',
321-
conductorScanId,
322-
messageId,
323-
bucket,
324-
});
325-
} else {
326-
this._currentScanMessageIds.add(messageId);
327-
}
328-
}
329-
330311
if (!bucket || !owner || (!accountId && this._authConfig.type === authTypeAssumeRole)) {
331312
this._log.error('kafka bucket entry missing required fields', {
332313
method: 'LifecycleBucketProcessor._processBucketEntry',
333314
bucket,
334315
owner,
335316
accountId,
336317
conductorScanId,
337-
messageId,
338318
});
339319
return process.nextTick(() => cb(errors.InternalError));
340320
}
@@ -344,7 +324,6 @@ class LifecycleBucketProcessor {
344324
owner,
345325
accountId,
346326
conductorScanId,
347-
messageId,
348327
});
349328

350329
const s3Client = this.clientManager.getS3Client(accountId);
@@ -402,7 +381,6 @@ class LifecycleBucketProcessor {
402381
details: result.details,
403382
taskName: task.constructor.name,
404383
conductorScanId,
405-
messageId,
406384
});
407385
return this._internalTaskScheduler.push({
408386
task,

extensions/lifecycle/conductor/LifecycleConductor.js

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,6 @@ class LifecycleConductor {
317317
contextInfo: {
318318
reqId: log.getSerializedUids(),
319319
conductorScanId: this._scanId,
320-
messageId: uuid(),
321320
},
322321
target: {
323322
bucket: task.bucketName,
@@ -487,14 +486,14 @@ class LifecycleConductor {
487486
const unknownCanonicalIds = this._accountIdCache.getMisses();
488487

489488
if (err) {
490-
log.error('lifecycle batch failed', {
491-
error: err,
492-
unknownCanonicalIds,
493-
conductorScanId: this._scanId,
494-
fullScanElapsedMs: elapsedMs,
495-
nBucketsListed,
496-
workflowsQueued: messageDeliveryReports,
497-
});
489+
log.error('lifecycle batch failed', {
490+
error: err,
491+
unknownCanonicalIds,
492+
conductorScanId: this._scanId,
493+
fullScanElapsedMs: elapsedMs,
494+
nBucketsListed,
495+
workflowsQueued: messageDeliveryReports,
496+
});
498497
if (cb) {
499498
cb(err);
500499
}

extensions/lifecycle/tasks/LifecycleDeleteObjectTask.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -306,9 +306,8 @@ class LifecycleDeleteObjectTask extends BackbeatTask {
306306
const log = this.logger.newRequestLogger();
307307
const conductorScanId = entry.getContextAttribute(
308308
'conductorScanId');
309-
const messageId = entry.getContextAttribute('messageId');
310309
if (conductorScanId && typeof log.addDefaultFields === 'function') {
311-
log.addDefaultFields({ conductorScanId, messageId });
310+
log.addDefaultFields({ conductorScanId });
312311
}
313312
entry.addLoggedAttributes({
314313
bucketName: 'target.bucket',

extensions/lifecycle/tasks/LifecycleTask.js

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,6 @@ class LifecycleTask extends BackbeatTask {
239239
contextInfo: {
240240
reqId: log.getSerializedUids(),
241241
conductorScanId: bucketData.contextInfo && bucketData.contextInfo.conductorScanId,
242-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
243242
},
244243
details: { marker },
245244
});
@@ -417,7 +416,6 @@ class LifecycleTask extends BackbeatTask {
417416
reqId: log.getSerializedUids(),
418417
conductorScanId: bucketData.contextInfo
419418
&& bucketData.contextInfo.conductorScanId,
420-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
421419
},
422420
details: {
423421
keyMarker: data.NextKeyMarker,
@@ -506,7 +504,6 @@ class LifecycleTask extends BackbeatTask {
506504
reqId: log.getSerializedUids(),
507505
conductorScanId: bucketData.contextInfo
508506
&& bucketData.contextInfo.conductorScanId,
509-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
510507
},
511508
details: {
512509
keyMarker: data.NextKeyMarker,
@@ -1052,7 +1049,6 @@ class LifecycleTask extends BackbeatTask {
10521049
reqId: log.getSerializedUids(),
10531050
conductorScanId: bucketData.contextInfo
10541051
&& bucketData.contextInfo.conductorScanId,
1055-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
10561052
})
10571053
.setAttribute('target.owner', bucketData.target.owner)
10581054
.setAttribute('target.bucket', bucketData.target.bucket)
@@ -1083,7 +1079,6 @@ class LifecycleTask extends BackbeatTask {
10831079
reqId: log.getSerializedUids(),
10841080
conductorScanId: bucketData.contextInfo
10851081
&& bucketData.contextInfo.conductorScanId,
1086-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
10871082
})
10881083
.setAttribute('target.owner', bucketData.target.owner)
10891084
.setAttribute('target.bucket', bucketData.target.bucket)
@@ -1191,7 +1186,6 @@ class LifecycleTask extends BackbeatTask {
11911186
ruleType: 'transition',
11921187
reqId: log.getSerializedUids(),
11931188
conductorScanId: params.conductorScanId,
1194-
messageId: params.messageId,
11951189
});
11961190
entry.setAttribute('source', {
11971191
bucket: params.bucket,
@@ -1394,7 +1388,6 @@ class LifecycleTask extends BackbeatTask {
13941388
{ Days: rules[ncvt][ncd] }, staleDate),
13951389
conductorScanId: bucketData.contextInfo
13961390
&& bucketData.contextInfo.conductorScanId,
1397-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
13981391
}, log, cb);
13991392
return;
14001393
}
@@ -1473,7 +1466,6 @@ class LifecycleTask extends BackbeatTask {
14731466
reqId: log.getSerializedUids(),
14741467
conductorScanId: bucketData.contextInfo
14751468
&& bucketData.contextInfo.conductorScanId,
1476-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
14771469
})
14781470
.setAttribute('target.owner', bucketData.target.owner)
14791471
.setAttribute('target.bucket', bucketData.target.bucket)
@@ -1545,7 +1537,6 @@ class LifecycleTask extends BackbeatTask {
15451537
reqId: log.getSerializedUids(),
15461538
conductorScanId: bucketData.contextInfo
15471539
&& bucketData.contextInfo.conductorScanId,
1548-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
15491540
})
15501541
.setAttribute('target.owner', bucketData.target.owner)
15511542
.setAttribute('target.bucket', bucketData.target.bucket)
@@ -1635,7 +1626,6 @@ class LifecycleTask extends BackbeatTask {
16351626
rules.Transition, object.LastModified),
16361627
conductorScanId: bucketData.contextInfo
16371628
&& bucketData.contextInfo.conductorScanId,
1638-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
16391629
}, log, done);
16401630
}
16411631
return done();
@@ -1792,7 +1782,6 @@ class LifecycleTask extends BackbeatTask {
17921782
reqId: log.getSerializedUids(),
17931783
conductorScanId: bucketData.contextInfo
17941784
&& bucketData.contextInfo.conductorScanId,
1795-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
17961785
})
17971786
.setAttribute('target.owner', bucketData.target.owner)
17981787
.setAttribute('target.bucket', bucketData.target.bucket)

extensions/lifecycle/tasks/LifecycleTaskV2.js

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,6 @@ class LifecycleTaskV2 extends LifecycleTask {
5656
requestId: log.getSerializedUids(),
5757
conductorScanId: bucketData.contextInfo
5858
&& bucketData.contextInfo.conductorScanId,
59-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
6059
},
6160
details: { beforeDate, prefix, listType, storageClass },
6261
});
@@ -124,7 +123,6 @@ class LifecycleTaskV2 extends LifecycleTask {
124123
requestId: log.getSerializedUids(),
125124
conductorScanId: bucketData.contextInfo
126125
&& bucketData.contextInfo.conductorScanId,
127-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
128126
},
129127
details: {
130128
beforeDate: params.BeforeDate,
@@ -213,7 +211,6 @@ class LifecycleTaskV2 extends LifecycleTask {
213211
requestId: log.getSerializedUids(),
214212
conductorScanId: bucketData.contextInfo
215213
&& bucketData.contextInfo.conductorScanId,
216-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
217214
},
218215
details: {
219216
beforeDate: params.BeforeDate,
@@ -367,7 +364,6 @@ class LifecycleTaskV2 extends LifecycleTask {
367364
rules.Transition, obj.LastModified),
368365
conductorScanId: bucketData.contextInfo
369366
&& bucketData.contextInfo.conductorScanId,
370-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
371367
}, log, cb);
372368
}
373369

@@ -446,7 +442,6 @@ class LifecycleTaskV2 extends LifecycleTask {
446442
reqId: log.getSerializedUids(),
447443
conductorScanId: bucketData.contextInfo
448444
&& bucketData.contextInfo.conductorScanId,
449-
messageId: bucketData.contextInfo && bucketData.contextInfo.messageId,
450445
})
451446
.setAttribute('target.owner', bucketData.target.owner)
452447
.setAttribute('target.bucket', bucketData.target.bucket)

extensions/lifecycle/tasks/LifecycleUpdateExpirationTask.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,8 @@ class LifecycleUpdateExpirationTask extends BackbeatTask {
152152
const log = this.logger.newRequestLogger();
153153
const conductorScanId = entry.getContextAttribute(
154154
'conductorScanId');
155-
const messageId = entry.getContextAttribute('messageId');
156155
if (conductorScanId && typeof log.addDefaultFields === 'function') {
157-
log.addDefaultFields({ conductorScanId, messageId });
156+
log.addDefaultFields({ conductorScanId });
158157
}
159158
entry.addLoggedAttributes({
160159
bucketName: 'target.bucket',

extensions/lifecycle/tasks/LifecycleUpdateTransitionTask.js

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,8 @@ class LifecycleUpdateTransitionTask extends BackbeatTask {
262262
const log = this.logger.newRequestLogger();
263263
const conductorScanId = entry.getContextAttribute(
264264
'conductorScanId');
265-
const messageId = entry.getContextAttribute('messageId');
266265
if (conductorScanId && typeof log.addDefaultFields === 'function') {
267-
log.addDefaultFields({ conductorScanId, messageId });
266+
log.addDefaultFields({ conductorScanId });
268267
}
269268
entry.addLoggedAttributes({
270269
bucketName: 'target.bucket',

0 commit comments

Comments
 (0)