Skip to content

Commit 7d78757

Browse files
committed
fixup! Refactor: pass scanId as parameter instead of instance field and fix metric timing
fix(tests): update tests to reflect messageId removal and scanId parameter changes
1 parent d2d32b3 commit 7d78757

File tree

3 files changed

+48
-13
lines changed

3 files changed

+48
-13
lines changed

extensions/lifecycle/LifecycleMetrics.js

Lines changed: 39 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const LIFECYCLE_LABEL_OP = 'op';
55
const LIFECYCLE_LABEL_STATUS = 'status';
66
const LIFECYCLE_LABEL_LOCATION = 'location';
77
const LIFECYCLE_LABEL_TYPE = 'type';
8+
const LIFECYCLE_LABEL_BUCKET_SOURCE = 'bucket_source';
89

910
const LIFECYCLE_MARKER_METRICS_LOCATION = '-delete-marker-';
1011

@@ -59,7 +60,7 @@ const conductorFullScanElapsedSeconds = ZenkoMetrics.createGauge({
5960
const conductorScanCount = ZenkoMetrics.createGauge({
6061
name: 's3_lifecycle_conductor_scan_count',
6162
help: 'Number of buckets and queued workflows for the latest full lifecycle conductor scan',
62-
labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_TYPE],
63+
labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_TYPE, LIFECYCLE_LABEL_BUCKET_SOURCE],
6364
});
6465

6566
const bucketProcessorScanStartTime = ZenkoMetrics.createGauge({
@@ -207,7 +208,7 @@ class LifecycleMetrics {
207208
* @param {number} lifecycleBucketCount - buckets that have lifecycle enabled
208209
*/
209210
static onConductorFullScan(
210-
log, elapsedMs, bucketCount, workflowCount, lifecycleBucketCount
211+
log, elapsedMs, bucketCount, workflowCount, lifecycleBucketCount, bucketSource
211212
) {
212213
try {
213214
conductorFullScanElapsedSeconds.set(
@@ -217,20 +218,35 @@ class LifecycleMetrics {
217218
conductorScanCount.set({
218219
[LIFECYCLE_LABEL_ORIGIN]: 'conductor',
219220
[LIFECYCLE_LABEL_TYPE]: 'bucket',
221+
[LIFECYCLE_LABEL_BUCKET_SOURCE]: bucketSource || 'unknown',
220222
}, bucketCount);
221-
conductorScanCount.set({
222-
[LIFECYCLE_LABEL_ORIGIN]: 'conductor',
223-
[LIFECYCLE_LABEL_TYPE]: 'lifecycle_bucket',
224-
}, lifecycleBucketCount);
223+
// For bucketd, we can't know lifecycle buckets at conductor level,
224+
// so we don't set the metric here. The bucket processor will
225+
// increment it when it discovers buckets with lifecycle.
226+
if (bucketSource !== 'bucketd') {
227+
conductorScanCount.set({
228+
[LIFECYCLE_LABEL_ORIGIN]: 'conductor',
229+
[LIFECYCLE_LABEL_TYPE]: 'lifecycle_bucket',
230+
[LIFECYCLE_LABEL_BUCKET_SOURCE]: bucketSource || 'unknown',
231+
}, lifecycleBucketCount);
232+
} else {
233+
// Reset to 0 for bucketd at start of scan, bucket processor will increment
234+
conductorScanCount.set({
235+
[LIFECYCLE_LABEL_ORIGIN]: 'conductor',
236+
[LIFECYCLE_LABEL_TYPE]: 'lifecycle_bucket',
237+
[LIFECYCLE_LABEL_BUCKET_SOURCE]: 'bucketd',
238+
}, 0);
239+
}
225240
conductorScanCount.set({
226241
[LIFECYCLE_LABEL_ORIGIN]: 'conductor',
227242
[LIFECYCLE_LABEL_TYPE]: 'workflow',
243+
[LIFECYCLE_LABEL_BUCKET_SOURCE]: bucketSource || 'unknown',
228244
}, workflowCount);
229245
} catch (err) {
230246
LifecycleMetrics.handleError(
231247
log, err, 'LifecycleMetrics.onConductorFullScan', {
232248
elapsedMs, bucketCount, workflowCount,
233-
lifecycleBucketCount,
249+
lifecycleBucketCount, bucketSource,
234250
}
235251
);
236252
}
@@ -266,17 +282,31 @@ class LifecycleMetrics {
266282
* Increment the count of lifecycle-enabled buckets successfully processed
267283
* during the current scan. This is called after the bucket task completes
268284
* successfully, not when it is dispatched to the scheduler.
285+
* Also updates the conductor metric for bucketd source since we can't know
286+
* at conductor level which buckets have lifecycle.
269287
* @param {Object} log - logger
288+
* @param {string} bucketSource - source of bucket (mongodb, zookeeper, bucketd)
289+
* @param {string} conductorScanId - scan ID from conductor
270290
*/
271-
static onBucketProcessorBucketDone(log) {
291+
static onBucketProcessorBucketDone(log, bucketSource, conductorScanId) {
272292
try {
273293
bucketProcessorBucketsCount.inc(
274294
{ [LIFECYCLE_LABEL_ORIGIN]: 'bucket_processor' }
275295
);
296+
// For bucketd source, update conductor metric since we can't know
297+
// at conductor level which buckets have lifecycle
298+
if (bucketSource === 'bucketd' && conductorScanId) {
299+
conductorScanCount.inc({
300+
[LIFECYCLE_LABEL_ORIGIN]: 'conductor',
301+
[LIFECYCLE_LABEL_TYPE]: 'lifecycle_bucket',
302+
[LIFECYCLE_LABEL_BUCKET_SOURCE]: 'bucketd',
303+
});
304+
}
276305
} catch (err) {
277306
LifecycleMetrics.handleError(
278307
log, err,
279-
'LifecycleMetrics.onBucketProcessorBucketDone'
308+
'LifecycleMetrics.onBucketProcessorBucketDone',
309+
{ bucketSource, conductorScanId }
280310
);
281311
}
282312
}

extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -388,7 +388,10 @@ class LifecycleBucketProcessor {
388388
// This ensures we count actual processing completion, not just dispatch
389389
// Only count successful completions, not failures
390390
if (!err) {
391-
LifecycleMetrics.onBucketProcessorBucketDone(this._log);
391+
const bucketSource = result.contextInfo && result.contextInfo.bucketSource;
392+
LifecycleMetrics.onBucketProcessorBucketDone(
393+
this._log, bucketSource, conductorScanId
394+
);
392395
}
393396
return cb(err);
394397
});

extensions/lifecycle/conductor/LifecycleConductor.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -316,6 +316,7 @@ class LifecycleConductor {
316316
contextInfo: {
317317
reqId: log.getSerializedUids(),
318318
conductorScanId: scanId,
319+
bucketSource: this._bucketSource,
319320
},
320321
target: {
321322
bucket: task.bucketName,
@@ -415,7 +416,7 @@ class LifecycleConductor {
415416
(err, messages) => {
416417
nBucketsQueued += tasks.length;
417418
nLifecycledBuckets += tasks.filter(
418-
t => t.isLifecycled !== false
419+
t => t.isLifecycled === true
419420
).length;
420421

421422
log.info('bucket push progress', {
@@ -498,7 +499,8 @@ class LifecycleConductor {
498499
elapsedMs,
499500
nBucketsListed,
500501
messageDeliveryReports,
501-
nLifecycledBuckets
502+
nLifecycledBuckets,
503+
this._bucketSource
502504
);
503505

504506
log.info('finished pushing lifecycle batch', {
@@ -673,7 +675,7 @@ class LifecycleConductor {
673675
const [canonicalId, bucketName] = marker.split(constants.splitter);
674676
if (!this._isBlacklisted(canonicalId, bucketName)) {
675677
nEnqueued += 1;
676-
queue.push({ canonicalId, bucketName, isLifecycled: true });
678+
queue.push({ canonicalId, bucketName });
677679
this.lastSentId = o.key;
678680
// Optimization:
679681
// If we only blacklist by accounts, and the last bucket is blacklisted

0 commit comments

Comments
 (0)