diff --git a/extensions/lifecycle/LifecycleMetrics.js b/extensions/lifecycle/LifecycleMetrics.js index eac0d1346..e023054d0 100644 --- a/extensions/lifecycle/LifecycleMetrics.js +++ b/extensions/lifecycle/LifecycleMetrics.js @@ -5,6 +5,7 @@ const LIFECYCLE_LABEL_OP = 'op'; const LIFECYCLE_LABEL_STATUS = 'status'; const LIFECYCLE_LABEL_LOCATION = 'location'; const LIFECYCLE_LABEL_TYPE = 'type'; +const LIFECYCLE_LABEL_BUCKET_SOURCE = 'bucket_source'; const LIFECYCLE_MARKER_METRICS_LOCATION = '-delete-marker-'; @@ -50,6 +51,32 @@ const lifecycleLegacyTask = ZenkoMetrics.createCounter({ labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_STATUS], }); +const conductorFullScanElapsedSeconds = ZenkoMetrics.createGauge({ + name: 's3_lifecycle_conductor_full_scan_elapsed_seconds', + help: 'Elapsed time for the latest full lifecycle conductor scan, in seconds', + labelNames: [LIFECYCLE_LABEL_ORIGIN], +}); + +const conductorScanCount = ZenkoMetrics.createGauge({ + name: 's3_lifecycle_conductor_scan_count', + help: 'Number of buckets and queued workflows for the latest full lifecycle conductor scan', + labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_TYPE, LIFECYCLE_LABEL_BUCKET_SOURCE], +}); + +const bucketProcessorScanStartTime = ZenkoMetrics.createGauge({ + name: 's3_lifecycle_bucket_processor_scan_start_time', + help: 'Timestamp (ms) of the scan currently being processed ' + + 'by this bucket processor (0 when idle)', + labelNames: [LIFECYCLE_LABEL_ORIGIN], +}); + +const bucketProcessorBucketsCount = ZenkoMetrics.createGauge({ + name: 's3_lifecycle_bucket_processor_buckets_count', + help: 'Number of buckets successfully processed by ' + + 'this bucket processor during the current scan', + labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_BUCKET_SOURCE], +}); + const lifecycleS3Operations = ZenkoMetrics.createCounter({ name: 's3_lifecycle_s3_operations_total', help: 'Total number of S3 operations by the lifecycle processes', @@ -172,6 +199,122 @@ class LifecycleMetrics { } } + /** + * Record metrics at the end of a full conductor scan. + * @param {Object} log - logger + * @param {number} elapsedMs - wall-clock time for the full scan + * @param {number} bucketCount - total buckets scanned + * @param {number} workflowCount - messages delivered to bucket-tasks topic + * @param {number} lifecycleBucketCount - buckets that have lifecycle enabled + * @param {string} bucketSource - source of buckets (mongodb, zookeeper, bucketd) + */ + static onConductorFullScan( + log, elapsedMs, bucketCount, workflowCount, lifecycleBucketCount, bucketSource + ) { + try { + conductorFullScanElapsedSeconds.set( + { [LIFECYCLE_LABEL_ORIGIN]: 'conductor' }, + elapsedMs / 1000 + ); + conductorScanCount.set({ + [LIFECYCLE_LABEL_ORIGIN]: 'conductor', + [LIFECYCLE_LABEL_TYPE]: 'bucket', + [LIFECYCLE_LABEL_BUCKET_SOURCE]: bucketSource || 'unknown', + }, bucketCount); + // For bucketd, we can't know lifecycle buckets at conductor level, + // so lifecycleBucketCount will be 0. For accurate bucketd counts, + // use bucketProcessorBucketsCount aggregated across all bucket processor + // instances in Grafana dashboards. + conductorScanCount.set({ + [LIFECYCLE_LABEL_ORIGIN]: 'conductor', + [LIFECYCLE_LABEL_TYPE]: 'lifecycle_bucket', + [LIFECYCLE_LABEL_BUCKET_SOURCE]: bucketSource || 'unknown', + }, lifecycleBucketCount); + conductorScanCount.set({ + [LIFECYCLE_LABEL_ORIGIN]: 'conductor', + [LIFECYCLE_LABEL_TYPE]: 'workflow', + [LIFECYCLE_LABEL_BUCKET_SOURCE]: bucketSource || 'unknown', + }, workflowCount); + } catch (err) { + LifecycleMetrics.handleError( + log, err, 'LifecycleMetrics.onConductorFullScan', { + elapsedMs, bucketCount, workflowCount, + lifecycleBucketCount, bucketSource, + } + ); + } + } + + /** + * Signal that a bucket processor has started working on a scan. + * Called when the first message of a new conductorScanId arrives. + * @param {Object} log - logger + * @param {number} scanStartTimestamp - timestamp (ms) from the + * conductor scan start + */ + static onBucketProcessorScanStart(log, scanStartTimestamp) { + try { + bucketProcessorScanStartTime.set( + { [LIFECYCLE_LABEL_ORIGIN]: 'bucket_processor' }, + scanStartTimestamp + ); + bucketProcessorBucketsCount.reset(); + } catch (err) { + LifecycleMetrics.handleError( + log, err, + 'LifecycleMetrics.onBucketProcessorScanStart', + { scanStartTimestamp } + ); + } + } + + /** + * Increment the count of lifecycle-enabled buckets successfully processed + * during the current scan. This is called after the bucket task completes + * successfully, not when it is dispatched to the scheduler. + * + * Note: For bucketd source, the conductor cannot know which buckets have + * lifecycle at listing time. + * @param {Object} log - logger + * @param {string} bucketSource - source of bucket (mongodb, zookeeper, bucketd) + * @param {string} conductorScanId - scan ID from conductor + */ + static onBucketProcessorBucketDone(log, bucketSource, conductorScanId) { + try { + bucketProcessorBucketsCount.inc({ + [LIFECYCLE_LABEL_ORIGIN]: 'bucket_processor', + [LIFECYCLE_LABEL_BUCKET_SOURCE]: bucketSource || 'unknown', + }); + } catch (err) { + LifecycleMetrics.handleError( + log, err, + 'LifecycleMetrics.onBucketProcessorBucketDone', + { bucketSource, conductorScanId } + ); + } + } + + /** + * Signal that a bucket processor has finished all tasks for + * the current scan (queue drained). + * Resets the scan timestamp to 0 so Prometheus can detect + * that this processor is idle. + * @param {Object} log - logger + */ + static onBucketProcessorScanEnd(log) { + try { + bucketProcessorScanStartTime.set( + { [LIFECYCLE_LABEL_ORIGIN]: 'bucket_processor' }, + 0 + ); + } catch (err) { + LifecycleMetrics.handleError( + log, err, + 'LifecycleMetrics.onBucketProcessorScanEnd' + ); + } + } + static onLifecycleTriggered(log, process, type, location, latencyMs) { try { lifecycleTriggerLatency.observe({ diff --git a/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js b/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js index 6135b4186..dcfe13797 100644 --- a/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js +++ b/extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js @@ -26,10 +26,16 @@ const { extractBucketProcessorCircuitBreakerConfigs, } = require('../CircuitBreakerGroup'); const { lifecycleTaskVersions } = require('../../../lib/constants'); +const { LifecycleMetrics } = require('../LifecycleMetrics'); const locations = require('../../../conf/locationConfig.json'); const PROCESS_OBJECTS_ACTION = 'processObjects'; +// Delay before declaring a scan finished after the internal task +// queue drains. This prevents premature idle signals when the +// queue empties between Kafka message batches within the same scan. +const SCAN_END_DEBOUNCE_DELAY_MS = 30000; + /** * @class LifecycleBucketProcessor * @@ -97,6 +103,12 @@ class LifecycleBucketProcessor { this._pausedLocations = new Set(); this._locationStatusStream = null; + // Conductor scan ID for metrics; kept across internal queue drains so + // Kafka batches from the same scan do not retrigger onBucketProcessorScanStart. + this._metricsScanId = null; + this._processorScanMetricsActive = false; + this._scanEndTimeout = null; + this.clientManager = new ClientManager({ id: 'lifecycle', authConfig: lcConfig.bucketProcessor.auth || lcConfig.auth, @@ -145,12 +157,32 @@ class LifecycleBucketProcessor { }, this._lcConfig.bucketProcessor.concurrency); // Listen for errors from any task being processed. + // When the internal task queue empties, schedule a delayed + // reset of the scan-in-progress gauge. The delay avoids + // false idle signals when the queue drains between Kafka + // message batches that belong to the same conductor scan. this._internalTaskScheduler.drain(err => { if (err) { this._log.error('error occurred during task processing', { error: err, }); } + if (this._processorScanMetricsActive && !this._scanEndTimeout) { + this._log.info('bucket processor task queue drained, scheduling scan end', { + method: 'LifecycleBucketProcessor.drain', + conductorScanId: this._metricsScanId, + debounceMs: SCAN_END_DEBOUNCE_DELAY_MS, + }); + this._scanEndTimeout = setTimeout(() => { + this._scanEndTimeout = null; + this._log.info('bucket processor scan end confirmed', { + method: 'LifecycleBucketProcessor.drain', + conductorScanId: this._metricsScanId, + }); + LifecycleMetrics.onBucketProcessorScanEnd(this._log); + this._processorScanMetricsActive = false; + }, SCAN_END_DEBOUNCE_DELAY_MS); + } }); const globalCircuitBreakerConfig = extractBucketProcessorCircuitBreakerConfigs( @@ -277,12 +309,43 @@ class LifecycleBucketProcessor { return process.nextTick(() => cb(errors.InternalError)); } const { bucket, owner, accountId, taskVersion } = result.target; + const conductorScanId = result.contextInfo && result.contextInfo.conductorScanId; + const scanStartTimestamp = (result.contextInfo + && result.contextInfo.scanStartTimestamp) || Date.now(); + if (conductorScanId && conductorScanId !== this._metricsScanId) { + // New scan: cancel any pending debounce, full metric reset + if (this._scanEndTimeout) { + clearTimeout(this._scanEndTimeout); + this._scanEndTimeout = null; + } + this._metricsScanId = conductorScanId; + LifecycleMetrics.onBucketProcessorScanStart( + this._log, scanStartTimestamp + ); + this._processorScanMetricsActive = true; + this._log.info('new conductor scan detected', { + method: + 'LifecycleBucketProcessor._processBucketEntry', + conductorScanId, + }); + } else if (conductorScanId && this._scanEndTimeout) { + // Same scan, but the debounce was pending: cancel it + // because more messages are still arriving. + clearTimeout(this._scanEndTimeout); + this._scanEndTimeout = null; + this._log.debug('cancelled scan end debounce, more messages arriving', { + method: 'LifecycleBucketProcessor._processBucketEntry', + conductorScanId, + }); + } + if (!bucket || !owner || (!accountId && this._authConfig.type === authTypeAssumeRole)) { this._log.error('kafka bucket entry missing required fields', { method: 'LifecycleBucketProcessor._processBucketEntry', bucket, owner, accountId, + conductorScanId, }); return process.nextTick(() => cb(errors.InternalError)); } @@ -291,6 +354,7 @@ class LifecycleBucketProcessor { bucket, owner, accountId, + conductorScanId, }); const s3Client = this.clientManager.getS3Client(accountId); @@ -345,6 +409,7 @@ class LifecycleBucketProcessor { owner, details: result.details, taskName: task.constructor.name, + conductorScanId, }); return this._internalTaskScheduler.push({ task, @@ -352,7 +417,18 @@ class LifecycleBucketProcessor { value: result, s3target: s3Client, backbeatMetadataProxy, - }, cb); + }, err => { + // Increment metric after task successfully completes + // This ensures we count actual processing completion, not just dispatch + // Only count successful completions, not failures + if (!err) { + const bucketSource = result.contextInfo && result.contextInfo.bucketSource; + LifecycleMetrics.onBucketProcessorBucketDone( + this._log, bucketSource, conductorScanId + ); + } + return cb(err); + }); }); } @@ -510,6 +586,10 @@ class LifecycleBucketProcessor { * @return {undefined} */ close(cb) { + if (this._scanEndTimeout) { + clearTimeout(this._scanEndTimeout); + this._scanEndTimeout = null; + } if (this._deleteInactiveCredentialsInterval) { clearInterval(this._deleteInactiveCredentialsInterval); } diff --git a/extensions/lifecycle/conductor/LifecycleConductor.js b/extensions/lifecycle/conductor/LifecycleConductor.js index fbbee05ba..eee724bdf 100644 --- a/extensions/lifecycle/conductor/LifecycleConductor.js +++ b/extensions/lifecycle/conductor/LifecycleConductor.js @@ -3,6 +3,7 @@ const async = require('async'); const schedule = require('node-schedule'); const zookeeper = require('node-zookeeper-client'); +const { v4: uuid } = require('uuid'); const { constants, errors } = require('arsenal'); const Logger = require('werelogs').Logger; @@ -308,12 +309,15 @@ class LifecycleConductor { }); } - _taskToMessage(task, taskVersion, log) { + _taskToMessage(task, taskVersion, scanId, scanStartTimestamp, log) { return { message: JSON.stringify({ action: 'processObjects', contextInfo: { reqId: log.getSerializedUids(), + conductorScanId: scanId, + scanStartTimestamp, + bucketSource: this._bucketSource, }, target: { bucket: task.bucketName, @@ -354,9 +358,10 @@ class LifecycleConductor { cb); } - _createBucketTaskMessages(tasks, log, cb) { + _createBucketTaskMessages(tasks, scanId, scanStartTimestamp, log, cb) { if (this.lcConfig.forceLegacyListing) { - return process.nextTick(cb, null, tasks.map(t => this._taskToMessage(t, lifecycleTaskVersions.v1, log))); + return process.nextTick(cb, null, tasks.map(t => + this._taskToMessage(t, lifecycleTaskVersions.v1, scanId, scanStartTimestamp, log))); } return async.mapLimit(tasks, 10, (t, taskDone) => @@ -364,16 +369,25 @@ class LifecycleConductor { if (err) { // should not happen as indexes methods would // ignore the errors and fallback to v1 listing - return taskDone(null, this._taskToMessage(t, lifecycleTaskVersions.v1, log)); + return taskDone(null, this._taskToMessage(t, lifecycleTaskVersions.v1, scanId, + scanStartTimestamp, log)); } - return taskDone(null, this._taskToMessage(t, taskVersion, log)); + return taskDone(null, this._taskToMessage(t, taskVersion, scanId, + scanStartTimestamp, log)); }), cb); } processBuckets(cb) { const log = this.logger.newRequestLogger(); - const start = new Date(); + const start = Date.now(); + const scanId = uuid(); + if (typeof log.addDefaultFields === 'function') { + log.addDefaultFields({ conductorScanId: scanId }); + } + LifecycleMetrics.onProcessBuckets(log); let nBucketsQueued = 0; + let nBucketsListed = 0; + let nLifecycledBuckets = 0; let messageDeliveryReports = 0; const messageSendQueue = async.cargo((tasks, done) => { @@ -400,16 +414,21 @@ class LifecycleConductor { } return true; }))), - (tasksWithAccountId, next) => this._createBucketTaskMessages(tasksWithAccountId, log, next), + (tasksWithAccountId, next) => this._createBucketTaskMessages( + tasksWithAccountId, scanId, start, log, next), ], (err, messages) => { nBucketsQueued += tasks.length; + nLifecycledBuckets += tasks.filter( + t => t.isLifecycled === true + ).length; log.info('bucket push progress', { + conductorScanId: scanId, nBucketsQueued, bucketsInCargo: tasks.length, kafkaBucketMessagesDeliveryReports: messageDeliveryReports, - kafkaEnqueueRateHz: Math.round(nBucketsQueued * 1000 / (new Date() - start)), + kafkaEnqueueRateHz: Math.round(nBucketsQueued * 1000 / (Date.now() - start)), }); this._accountIdCache.expireOldest(); @@ -429,21 +448,30 @@ class LifecycleConductor { next => this._indexesGetInProgressJobs(log, () => next(null)), next => { this._batchInProgress = true; - log.info('starting new lifecycle batch', { bucketSource: this._bucketSource }); - this.listBuckets(messageSendQueue, log, (err, nBucketsListed) => { + log.info('starting new lifecycle batch', { + bucketSource: this._bucketSource, + conductorScanId: scanId, + }); + this.listBuckets(messageSendQueue, scanId, log, (err, listedBucketsCount) => { LifecycleMetrics.onBucketListing(log, err); - return next(err, nBucketsListed); + nBucketsListed = listedBucketsCount; + return next(err, listedBucketsCount); }); }, - (nBucketsListed, next) => { + (listedBucketsCount, next) => { async.until( - () => nBucketsQueued === nBucketsListed, + () => nBucketsQueued === listedBucketsCount, unext => setTimeout(unext, 1000), next); }, ], err => { + const elapsedMs = Date.now() - start; if (err && err.Throttling) { - log.info('not starting new lifecycle batch', { reason: err }); + log.info('not starting new lifecycle batch', { + reason: err, + conductorScanId: scanId, + fullScanElapsedMs: elapsedMs, + }); if (cb) { cb(err); } @@ -456,28 +484,51 @@ class LifecycleConductor { const unknownCanonicalIds = this._accountIdCache.getMisses(); if (err) { - log.error('lifecycle batch failed', { error: err, unknownCanonicalIds }); + log.error('lifecycle batch failed', { + error: err, + unknownCanonicalIds, + conductorScanId: scanId, + fullScanElapsedMs: elapsedMs, + nBucketsListed, + workflowsQueued: messageDeliveryReports, + }); if (cb) { cb(err); } return; } - log.info('finished pushing lifecycle batch', { nBucketsQueued, unknownCanonicalIds }); - LifecycleMetrics.onProcessBuckets(log); + LifecycleMetrics.onConductorFullScan( + log, + elapsedMs, + nBucketsListed, + messageDeliveryReports, + nLifecycledBuckets, + this._bucketSource + ); + + log.info('finished pushing lifecycle batch', { + nBucketsQueued, + nLifecycledBuckets, + unknownCanonicalIds, + conductorScanId: scanId, + fullScanElapsedMs: elapsedMs, + nBucketsListed, + workflowsQueued: messageDeliveryReports, + }); if (cb) { cb(null, nBucketsQueued); } }); } - listBuckets(queue, log, cb) { + listBuckets(queue, scanId, log, cb) { if (this._bucketSource === 'zookeeper') { - return this.listZookeeperBuckets(queue, log, cb); + return this.listZookeeperBuckets(queue, scanId, log, cb); } if (this._bucketSource === 'mongodb') { - return this.listMongodbBuckets(queue, log, cb); + return this.listMongodbBuckets(queue, scanId, log, cb); } return this.restoreBucketCheckpoint((err, marker) => { @@ -485,11 +536,11 @@ class LifecycleConductor { return cb(err); } - return this.listBucketdBuckets(queue, marker || null, log, cb); + return this.listBucketdBuckets(queue, marker || null, scanId, log, cb); }); } - listZookeeperBuckets(queue, log, cb) { + listZookeeperBuckets(queue, scanId, log, cb) { const zkBucketsPath = this.getBucketsZkPath(); this._zkClient.getChildren( zkBucketsPath, @@ -498,7 +549,7 @@ class LifecycleConductor { if (err) { log.error( 'error getting list of buckets from zookeeper', - { zkPath: zkBucketsPath, error: err.message }); + { zkPath: zkBucketsPath, error: err.message, conductorScanId: scanId }); return cb(err); } @@ -509,7 +560,7 @@ class LifecycleConductor { if (!canonicalId || !bucketUID || !bucketName) { log.error( 'malformed zookeeper bucket entry, skipping', - { zkPath: zkBucketsPath, bucket }); + { zkPath: zkBucketsPath, bucket, conductorScanId: scanId }); return false; } @@ -524,7 +575,7 @@ class LifecycleConductor { const canonicalId = split[0]; const bucketName = split[2]; - return { canonicalId, bucketName }; + return { canonicalId, bucketName, isLifecycled: true }; }); queue.push(batch); @@ -572,7 +623,7 @@ class LifecycleConductor { }); } - listBucketdBuckets(queue, initMarker, log, cb) { + listBucketdBuckets(queue, initMarker, scanId, log, cb) { let isTruncated = true; let marker = initMarker; let nEnqueued = 0; @@ -591,6 +642,7 @@ class LifecycleConductor { maxInFlight: this._maxInFlightBatchSize, bucketListingPushRateHz: Math.round(nEnqueued * 1000 / (new Date() - start)), breakerState, + conductorScanId: scanId, }; if (queue.length() > this._maxInFlightBatchSize || @@ -672,7 +724,7 @@ class LifecycleConductor { ); } - listMongodbBuckets(queue, log, cb) { + listMongodbBuckets(queue, scanId, log, cb) { let nEnqueued = 0; const start = new Date(); @@ -746,6 +798,7 @@ class LifecycleConductor { maxInFlight: this._maxInFlightBatchSize, enqueueRateHz: Math.round(nEnqueued * 1000 / (new Date() - start)), breakerState, + conductorScanId: scanId, }; if (queue.length() > this._maxInFlightBatchSize || diff --git a/extensions/lifecycle/tasks/LifecycleDeleteObjectTask.js b/extensions/lifecycle/tasks/LifecycleDeleteObjectTask.js index 7139d8724..e05e8f0f1 100644 --- a/extensions/lifecycle/tasks/LifecycleDeleteObjectTask.js +++ b/extensions/lifecycle/tasks/LifecycleDeleteObjectTask.js @@ -304,6 +304,11 @@ class LifecycleDeleteObjectTask extends BackbeatTask { processActionEntry(entry, done) { const startTime = Date.now(); const log = this.logger.newRequestLogger(); + const conductorScanId = entry.getContextAttribute( + 'conductorScanId'); + if (conductorScanId && typeof log.addDefaultFields === 'function') { + log.addDefaultFields({ conductorScanId }); + } entry.addLoggedAttributes({ bucketName: 'target.bucket', objectKey: 'target.key', diff --git a/extensions/lifecycle/tasks/LifecycleTask.js b/extensions/lifecycle/tasks/LifecycleTask.js index 591ce63e8..73e149c89 100644 --- a/extensions/lifecycle/tasks/LifecycleTask.js +++ b/extensions/lifecycle/tasks/LifecycleTask.js @@ -236,7 +236,10 @@ class LifecycleTask extends BackbeatTask { } const entry = Object.assign({}, bucketData, { - contextInfo: { reqId: log.getSerializedUids() }, + contextInfo: { + reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo && bucketData.contextInfo.conductorScanId, + }, details: { marker }, }); this._sendBucketEntry(entry, err => { @@ -411,6 +414,8 @@ class LifecycleTask extends BackbeatTask { const entry = Object.assign({}, bucketData, { contextInfo: { reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }, details: { keyMarker: data.NextKeyMarker, @@ -497,6 +502,8 @@ class LifecycleTask extends BackbeatTask { const entry = Object.assign({}, bucketData, { contextInfo: { reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }, details: { keyMarker: data.NextKeyMarker, @@ -1040,6 +1047,8 @@ class LifecycleTask extends BackbeatTask { origin: 'lifecycle', ruleType: 'expiration', reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }) .setAttribute('target.owner', bucketData.target.owner) .setAttribute('target.bucket', bucketData.target.bucket) @@ -1068,6 +1077,8 @@ class LifecycleTask extends BackbeatTask { origin: 'lifecycle', ruleType: 'expiration', reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }) .setAttribute('target.owner', bucketData.target.owner) .setAttribute('target.bucket', bucketData.target.bucket) @@ -1174,6 +1185,7 @@ class LifecycleTask extends BackbeatTask { origin: 'lifecycle', ruleType: 'transition', reqId: log.getSerializedUids(), + conductorScanId: params.conductorScanId, }); entry.setAttribute('source', { bucket: params.bucket, @@ -1374,6 +1386,8 @@ class LifecycleTask extends BackbeatTask { site: rules[ncvt].StorageClass, transitionTime: this._lifecycleDateTime.getTransitionTimestamp( { Days: rules[ncvt][ncd] }, staleDate), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }, log, cb); return; } @@ -1450,6 +1464,8 @@ class LifecycleTask extends BackbeatTask { origin: 'lifecycle', ruleType: 'expiration', reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }) .setAttribute('target.owner', bucketData.target.owner) .setAttribute('target.bucket', bucketData.target.bucket) @@ -1519,6 +1535,8 @@ class LifecycleTask extends BackbeatTask { origin: 'lifecycle', ruleType: 'expiration', reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }) .setAttribute('target.owner', bucketData.target.owner) .setAttribute('target.bucket', bucketData.target.bucket) @@ -1606,6 +1624,8 @@ class LifecycleTask extends BackbeatTask { site: rules.Transition.StorageClass, transitionTime: this._lifecycleDateTime.getTransitionTimestamp( rules.Transition, object.LastModified), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }, log, done); } return done(); @@ -1709,6 +1729,8 @@ class LifecycleTask extends BackbeatTask { encodedVersionId: undefined, transitionTime: this._lifecycleDateTime.getTransitionTimestamp( rules.Transition, version.LastModified), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }, log, done); } @@ -1758,6 +1780,8 @@ class LifecycleTask extends BackbeatTask { origin: 'lifecycle', ruleType: 'expiration', reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }) .setAttribute('target.owner', bucketData.target.owner) .setAttribute('target.bucket', bucketData.target.bucket) @@ -1827,6 +1851,11 @@ class LifecycleTask extends BackbeatTask { processBucketEntry(bucketLCRules, bucketData, s3target, backbeatMetadataProxy, nbRetries, done) { const log = this.log.newRequestLogger(); + const conductorScanId = bucketData.contextInfo + && bucketData.contextInfo.conductorScanId; + if (conductorScanId && typeof log.addDefaultFields === 'function') { + log.addDefaultFields({ conductorScanId }); + } this.s3target = s3target; this.backbeatMetadataProxy = backbeatMetadataProxy; if (!this.backbeatMetadataProxy) { diff --git a/extensions/lifecycle/tasks/LifecycleTaskV2.js b/extensions/lifecycle/tasks/LifecycleTaskV2.js index f7ca26652..4a4d8b0ac 100644 --- a/extensions/lifecycle/tasks/LifecycleTaskV2.js +++ b/extensions/lifecycle/tasks/LifecycleTaskV2.js @@ -52,7 +52,11 @@ class LifecycleTaskV2 extends LifecycleTask { } = l; const entry = Object.assign({}, bucketData, { - contextInfo: { requestId: log.getSerializedUids() }, + contextInfo: { + requestId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, + }, details: { beforeDate, prefix, listType, storageClass }, }); @@ -115,7 +119,11 @@ class LifecycleTaskV2 extends LifecycleTask { // re-queue truncated listing only once. if (isTruncated && nbRetries === 0) { const entry = Object.assign({}, bucketData, { - contextInfo: { requestId: log.getSerializedUids() }, + contextInfo: { + requestId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, + }, details: { beforeDate: params.BeforeDate, prefix: params.Prefix, @@ -199,7 +207,11 @@ class LifecycleTaskV2 extends LifecycleTask { // re-queue truncated listing only once. if (isTruncated && nbRetries === 0) { const entry = Object.assign({}, bucketData, { - contextInfo: { requestId: log.getSerializedUids() }, + contextInfo: { + requestId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, + }, details: { beforeDate: params.BeforeDate, prefix: params.Prefix, @@ -350,6 +362,8 @@ class LifecycleTaskV2 extends LifecycleTask { site: rules.Transition.StorageClass, transitionTime: this._lifecycleDateTime.getTransitionTimestamp( rules.Transition, obj.LastModified), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }, log, cb); } @@ -426,6 +440,8 @@ class LifecycleTaskV2 extends LifecycleTask { origin: 'lifecycle', ruleType: 'expiration', reqId: log.getSerializedUids(), + conductorScanId: bucketData.contextInfo + && bucketData.contextInfo.conductorScanId, }) .setAttribute('target.owner', bucketData.target.owner) .setAttribute('target.bucket', bucketData.target.bucket) diff --git a/extensions/lifecycle/tasks/LifecycleUpdateExpirationTask.js b/extensions/lifecycle/tasks/LifecycleUpdateExpirationTask.js index a94928ab9..e90f60e43 100644 --- a/extensions/lifecycle/tasks/LifecycleUpdateExpirationTask.js +++ b/extensions/lifecycle/tasks/LifecycleUpdateExpirationTask.js @@ -150,6 +150,11 @@ class LifecycleUpdateExpirationTask extends BackbeatTask { processActionEntry(entry, done) { const startTime = Date.now(); const log = this.logger.newRequestLogger(); + const conductorScanId = entry.getContextAttribute( + 'conductorScanId'); + if (conductorScanId && typeof log.addDefaultFields === 'function') { + log.addDefaultFields({ conductorScanId }); + } entry.addLoggedAttributes({ bucketName: 'target.bucket', objectKey: 'target.key', diff --git a/extensions/lifecycle/tasks/LifecycleUpdateTransitionTask.js b/extensions/lifecycle/tasks/LifecycleUpdateTransitionTask.js index 55820bad8..9deb1d989 100644 --- a/extensions/lifecycle/tasks/LifecycleUpdateTransitionTask.js +++ b/extensions/lifecycle/tasks/LifecycleUpdateTransitionTask.js @@ -260,6 +260,11 @@ class LifecycleUpdateTransitionTask extends BackbeatTask { */ processActionEntry(entry, done) { const log = this.logger.newRequestLogger(); + const conductorScanId = entry.getContextAttribute( + 'conductorScanId'); + if (conductorScanId && typeof log.addDefaultFields === 'function') { + log.addDefaultFields({ conductorScanId }); + } entry.addLoggedAttributes({ bucketName: 'target.bucket', objectKey: 'target.key', diff --git a/monitoring/lifecycle/dashboard.json b/monitoring/lifecycle/dashboard.json index a714de95e..21086433a 100644 --- a/monitoring/lifecycle/dashboard.json +++ b/monitoring/lifecycle/dashboard.json @@ -4345,6 +4345,412 @@ "transparent": false, "type": "timeseries" }, + { + "datasource": "${DS_PROMETHEUS}", + "editable": true, + "error": false, + "fieldConfig": { + "defaults": { + "custom": {}, + "decimals": null, + "mappings": [], + "noValue": "-", + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "index": 0, + "line": true, + "op": "gt", + "value": "null", + "yaxis": "left" + }, + { + "color": "orange", + "index": 1, + "line": true, + "op": "gt", + "value": 60.0, + "yaxis": "left" + }, + { + "color": "red", + "index": 2, + "line": true, + "op": "gt", + "value": 300.0, + "yaxis": "left" + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 8, + "x": 0, + "y": 111 + }, + "hideTimeOverride": false, + "id": 50, + "links": [], + "maxDataPoints": 100, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "last" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": null, + "expr": "s3_lifecycle_conductor_full_scan_elapsed_seconds{job=\"${job_lifecycle_producer}\", namespace=\"${namespace}\"}", + "format": "time_series", + "hide": false, + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "", + "metric": "", + "refId": "", + "step": 10, + "target": "" + } + ], + "title": "Full Scan Elapsed Time", + "transformations": [], + "transparent": false, + "type": "stat" + }, + { + "datasource": "${DS_PROMETHEUS}", + "description": "Number of buckets with lifecycle configuration found during the latest conductor scan. For mongodb/zookeeper sources, uses conductor metric. For bucketd source, uses aggregated bucket processor count since conductor cannot know which buckets have lifecycle at listing time.", + "editable": true, + "error": false, + "fieldConfig": { + "defaults": { + "custom": {}, + "decimals": 0, + "mappings": [], + "noValue": "0", + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "semi-dark-blue", + "index": 0, + "line": true, + "op": "gt", + "value": "null", + "yaxis": "left" + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 8, + "x": 8, + "y": 111 + }, + "hideTimeOverride": false, + "id": 51, + "links": [], + "maxDataPoints": 100, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "last" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": null, + "expr": "(s3_lifecycle_conductor_scan_count{job=\"${job_lifecycle_producer}\", namespace=\"${namespace}\", type=\"lifecycle_bucket\", bucket_source=\"mongodb\"} or vector(0)) + (s3_lifecycle_conductor_scan_count{job=\"${job_lifecycle_producer}\", namespace=\"${namespace}\", type=\"lifecycle_bucket\", bucket_source=\"zookeeper\"} or vector(0)) + (sum(s3_lifecycle_bucket_processor_buckets_count{job=\"${job_lifecycle_bucket_processor}\", namespace=\"${namespace}\", bucket_source=\"bucketd\"}) or vector(0))", + "format": "time_series", + "hide": false, + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "", + "metric": "", + "refId": "", + "step": 10, + "target": "" + } + ], + "title": "Lifecycle Buckets", + "transformations": [], + "transparent": false, + "type": "stat" + }, + { + "datasource": "${DS_PROMETHEUS}", + "description": "Number of bucket task messages sent to Kafka during the latest conductor scan.", + "editable": true, + "error": false, + "fieldConfig": { + "defaults": { + "custom": {}, + "decimals": 0, + "mappings": [], + "noValue": "0", + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "semi-dark-blue", + "index": 0, + "line": true, + "op": "gt", + "value": "null", + "yaxis": "left" + } + ] + }, + "unit": "none" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 8, + "x": 16, + "y": 111 + }, + "hideTimeOverride": false, + "id": 52, + "links": [], + "maxDataPoints": 100, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "reduceOptions": { + "calcs": [ + "last" + ], + "fields": "", + "values": false + }, + "textMode": "auto" + }, + "targets": [ + { + "datasource": null, + "expr": "s3_lifecycle_conductor_scan_count{job=\"${job_lifecycle_producer}\", namespace=\"${namespace}\", type=\"workflow\"}", + "format": "time_series", + "hide": false, + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "", + "metric": "", + "refId": "", + "step": 10, + "target": "" + } + ], + "title": "Workflows", + "transformations": [], + "transparent": false, + "type": "stat" + }, + { + "datasource": "${DS_PROMETHEUS}", + "description": "Shows how long each bucket processor has been working on the current scan. Drops to 0 / disappears when idle.", + "editable": true, + "error": false, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "smooth", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "log": 2, + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": {}, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 115 + }, + "hideTimeOverride": false, + "id": 53, + "links": [], + "maxDataPoints": 100, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": null, + "expr": "(time() * 1000 - s3_lifecycle_bucket_processor_scan_start_time{job=\"${job_lifecycle_bucket_processor}\", namespace=\"${namespace}\"}) / 1000 and s3_lifecycle_bucket_processor_scan_start_time{job=\"${job_lifecycle_bucket_processor}\", namespace=\"${namespace}\"} > 0", + "format": "time_series", + "hide": false, + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{pod}}", + "metric": "", + "refId": "", + "step": 10, + "target": "" + } + ], + "title": "Bucket Processor Scan Progress", + "transformations": [], + "transparent": false, + "type": "timeseries" + }, + { + "datasource": "${DS_PROMETHEUS}", + "description": "Running count of lifecycle-enabled bucket tasks processed by each bucket processor pod since it last detected a new scan. Resets when the pod receives the first message of a new conductor scan. As Kafka distributes messages across pods, not every pod sees every scan boundary, so values may accumulate across multiple scans.", + "editable": true, + "error": false, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "smooth", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "log": 2, + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": {}, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [] + }, + "unit": "" + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 115 + }, + "hideTimeOverride": false, + "id": 54, + "links": [], + "maxDataPoints": 100, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom" + }, + "tooltip": { + "mode": "single" + } + }, + "targets": [ + { + "datasource": null, + "expr": "s3_lifecycle_bucket_processor_buckets_count{job=\"${job_lifecycle_bucket_processor}\", namespace=\"${namespace}\"}", + "format": "time_series", + "hide": false, + "instant": false, + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{pod}}", + "metric": "", + "refId": "", + "step": 10, + "target": "" + } + ], + "title": "Bucket Processor Lifecycle Buckets", + "transformations": [], + "transparent": false, + "type": "timeseries" + }, { "datasource": "${DS_PROMETHEUS}", "editable": true, @@ -4442,10 +4848,10 @@ "h": 7, "w": 8, "x": 0, - "y": 111 + "y": 122 }, "hideTimeOverride": false, - "id": 50, + "id": 55, "links": [], "maxDataPoints": 100, "options": { @@ -4555,10 +4961,10 @@ "h": 7, "w": 8, "x": 8, - "y": 111 + "y": 122 }, "hideTimeOverride": false, - "id": 51, + "id": 56, "links": [], "maxDataPoints": 100, "options": { @@ -4640,10 +5046,10 @@ "h": 7, "w": 8, "x": 16, - "y": 111 + "y": 122 }, "hideTimeOverride": false, - "id": 52, + "id": 57, "links": [], "maxDataPoints": 100, "options": { diff --git a/monitoring/lifecycle/dashboard.py b/monitoring/lifecycle/dashboard.py index bf11220ac..cd7c31811 100644 --- a/monitoring/lifecycle/dashboard.py +++ b/monitoring/lifecycle/dashboard.py @@ -73,6 +73,26 @@ class Metrics: 'status', job='${job_lifecycle_producer}', namespace='${namespace}', ) + FULL_SCAN_ELAPSED = metrics.Metric( + 's3_lifecycle_conductor_full_scan_elapsed_seconds', + job='${job_lifecycle_producer}', namespace='${namespace}', + ) + + SCAN_COUNT = metrics.Metric( + 's3_lifecycle_conductor_scan_count', + 'type', 'bucket_source', job='${job_lifecycle_producer}', namespace='${namespace}', + ) + + BUCKET_PROCESSOR_SCAN_START_TIME = metrics.Metric( + 's3_lifecycle_bucket_processor_scan_start_time', + job='${job_lifecycle_bucket_processor}', namespace='${namespace}', + ) + + BUCKET_PROCESSOR_BUCKETS_COUNT = metrics.Metric( + 's3_lifecycle_bucket_processor_buckets_count', + 'bucket_source', job='${job_lifecycle_bucket_processor}', namespace='${namespace}', + ) + S3_OPS = metrics.CounterMetric( 's3_lifecycle_s3_operations_total', 'origin', 'op', 'status', job=['$jobs'], namespace='${namespace}', @@ -730,6 +750,108 @@ def color_override(name, color): ] ) +lifecycle_full_scan_elapsed = Stat( + title="Full Scan Elapsed Time", + dataSource="${DS_PROMETHEUS}", + reduceCalc="last", + format=UNITS.SECONDS, + noValue='-', + targets=[ + Target( + expr=Metrics.FULL_SCAN_ELAPSED(), + ), + ], + thresholds=[ + Threshold('green', 0, 0.0), + Threshold('orange', 1, 60.0), + Threshold('red', 2, 300.0), + ], +) + +lifecycle_scan_count_buckets = Stat( + title="Lifecycle Buckets", + description="Number of buckets with lifecycle configuration " + "found during the latest conductor scan. For mongodb/zookeeper " + "sources, uses conductor metric. For bucketd source, uses aggregated " + "bucket processor count since conductor cannot know which buckets " + "have lifecycle at listing time.", + dataSource="${DS_PROMETHEUS}", + reduceCalc="last", + decimals=0, + noValue='0', + targets=[ + Target( + # mongodb/zookeeper: conductor metric + # bucketd: bucket processor count (conductor metric is always 0) + expr=( + f"({Metrics.SCAN_COUNT(type='lifecycle_bucket', bucket_source='mongodb')} or vector(0)) + " + f"({Metrics.SCAN_COUNT(type='lifecycle_bucket', bucket_source='zookeeper')} or vector(0)) + " + f"(sum({Metrics.BUCKET_PROCESSOR_BUCKETS_COUNT(bucket_source='bucketd')}) or vector(0))" + ), + ), + ], + thresholds=[ + Threshold('semi-dark-blue', 0, 0.0), + ], +) + +lifecycle_scan_count_workflows = Stat( + title="Workflows", + description="Number of bucket task messages sent to Kafka " + "during the latest conductor scan.", + dataSource="${DS_PROMETHEUS}", + reduceCalc="last", + decimals=0, + noValue='0', + targets=[ + Target( + expr=Metrics.SCAN_COUNT(type='workflow'), + ), + ], + thresholds=[ + Threshold('semi-dark-blue', 0, 0.0), + ], +) + +bucket_processor_scan_progress = TimeSeries( + title="Bucket Processor Scan Progress", + description="Shows how long each bucket processor has been working " + "on the current scan. Drops to 0 / disappears when idle.", + dataSource="${DS_PROMETHEUS}", + lineInterpolation="smooth", + unit=UNITS.SECONDS, + targets=[ + Target( + expr='(time() * 1000 - ' + + Metrics.BUCKET_PROCESSOR_SCAN_START_TIME() + + ') / 1000 and ' + + Metrics.BUCKET_PROCESSOR_SCAN_START_TIME() + + ' > 0', + legendFormat='{{pod}}', + ), + ], +) + +bucket_processor_buckets = TimeSeries( + title="Bucket Processor Lifecycle Buckets", + description="Running count of lifecycle-enabled bucket tasks " + "processed by each bucket processor pod since it last " + "detected a new scan. Resets when the pod receives the " + "first message of a new conductor scan. As Kafka " + "distributes messages across pods, not every pod sees " + "every scan boundary, so values may accumulate across " + "multiple scans.", + dataSource="${DS_PROMETHEUS}", + lineInterpolation="smooth", + decimals=0, + targets=[ + Target( + expr=Metrics.BUCKET_PROCESSOR_BUCKETS_COUNT(), + legendFormat='{{pod}}', + ), + ], +) + active_indexing_jobs = TimeSeries( title="Active Indexing jobs", dataSource="${DS_PROMETHEUS}", @@ -897,6 +1019,8 @@ def color_override(name, color): layout.row([s3_delete_object_ops, s3_delete_mpu_ops], height=8), RowPanel(title="Lifecycle Conductor"), layout.row([lifecycle_scans, trigger_latency], height=7), + layout.row([lifecycle_full_scan_elapsed, lifecycle_scan_count_buckets, lifecycle_scan_count_workflows], height=4), + layout.row([bucket_processor_scan_progress, bucket_processor_buckets], height=7), layout.row([lifecycle_scan_rate, active_indexing_jobs, legacy_tasks], height=7), ]), ) diff --git a/tests/functional/lifecycle/LifecycleConductor.spec.js b/tests/functional/lifecycle/LifecycleConductor.spec.js index 9943ab4c2..68b525254 100644 --- a/tests/functional/lifecycle/LifecycleConductor.spec.js +++ b/tests/functional/lifecycle/LifecycleConductor.spec.js @@ -41,7 +41,7 @@ const expected2Messages = (version='v2') => ([ { value: { action: 'processObjects', - contextInfo: { reqId: 'test-request-id' }, + contextInfo: { reqId: 'test-request-id', conductorScanId: 'test-scan-id' }, target: { bucket: 'bucket1', owner: 'owner1', taskVersion: version }, details: {}, }, @@ -49,7 +49,7 @@ const expected2Messages = (version='v2') => ([ { value: { action: 'processObjects', - contextInfo: { reqId: 'test-request-id' }, + contextInfo: { reqId: 'test-request-id', conductorScanId: 'test-scan-id' }, target: { bucket: 'bucket1-2', owner: 'owner1', taskVersion: version }, details: {}, }, @@ -60,7 +60,7 @@ const expected4Messages = (version='v2') => ([ { value: { action: 'processObjects', - contextInfo: { reqId: 'test-request-id' }, + contextInfo: { reqId: 'test-request-id', conductorScanId: 'test-scan-id' }, target: { bucket: 'bucket1', owner: 'owner1', taskVersion: version }, details: {}, }, @@ -68,7 +68,7 @@ const expected4Messages = (version='v2') => ([ { value: { action: 'processObjects', - contextInfo: { reqId: 'test-request-id' }, + contextInfo: { reqId: 'test-request-id', conductorScanId: 'test-scan-id' }, target: { bucket: 'bucket1-2', owner: 'owner1', taskVersion: version }, details: {}, }, @@ -76,7 +76,7 @@ const expected4Messages = (version='v2') => ([ { value: { action: 'processObjects', - contextInfo: { reqId: 'test-request-id' }, + contextInfo: { reqId: 'test-request-id', conductorScanId: 'test-scan-id' }, target: { bucket: 'bucket3', owner: 'owner3', taskVersion: version }, details: {}, }, @@ -84,7 +84,7 @@ const expected4Messages = (version='v2') => ([ { value: { action: 'processObjects', - contextInfo: { reqId: 'test-request-id' }, + contextInfo: { reqId: 'test-request-id', conductorScanId: 'test-scan-id' }, target: { bucket: 'bucket4', owner: 'owner4', taskVersion: version }, details: {}, }, diff --git a/tests/unit/lifecycle/LifecycleConductor.spec.js b/tests/unit/lifecycle/LifecycleConductor.spec.js index 598d0b4df..1bbde0b31 100644 --- a/tests/unit/lifecycle/LifecycleConductor.spec.js +++ b/tests/unit/lifecycle/LifecycleConductor.spec.js @@ -7,6 +7,7 @@ const { splitter } = require('arsenal').constants; const LifecycleConductor = require( '../../../extensions/lifecycle/conductor/LifecycleConductor'); +const { LifecycleMetrics } = require('../../../extensions/lifecycle/LifecycleMetrics'); const { lifecycleTaskVersions, indexesForFeature @@ -190,6 +191,86 @@ describe('Lifecycle Conductor', () => { conductor.processBuckets(done); }); + it('should publish full scan metrics at end of scan', done => { + conductor._mongodbClient = { + getIndexingJobs: (_, cb) => cb(null, ['job1']), + getCollection: () => ({ + find: () => ({ + project: () => ({ + hasNext: () => Promise.resolve(false), + }), + }), + }), + }; + conductor._zkClient = { + getData: (_, cb) => cb(null, null, null), + setData: (path, data, version, cb) => cb(null, { version: 1 }), + }; + + sinon.stub(conductor, '_controlBacklog').callsFake(cb => cb(null)); + const metricStub = sinon.stub(LifecycleMetrics, 'onConductorFullScan'); + + conductor.processBuckets(err => { + assert.ifError(err); + assert(metricStub.calledOnce); + const [, , bucketCount, workflowCount, lifecycleBucketCount] = + metricStub.firstCall.args; + assert.strictEqual(bucketCount, 0); + assert.strictEqual(workflowCount, 0); + assert.strictEqual(lifecycleBucketCount, 0); + done(); + }); + }); + + it('should generate a conductorScanId', done => { + conductor._mongodbClient = { + getIndexingJobs: (_, cb) => cb(null, []), + getCollection: () => ({ + find: () => ({ + project: () => ({ + hasNext: () => Promise.resolve(false), + }), + }), + }), + }; + conductor._zkClient = { + getData: (_, cb) => cb(null, null, null), + setData: (path, data, version, cb) => cb(null, { version: 1 }), + }; + conductor._producer = { send: (msg, cb) => cb(null, {}) }; + const addDefaultFieldsStub = sinon.stub(); + const testLog = conductor.logger.newRequestLogger(); + testLog.addDefaultFields = addDefaultFieldsStub; + sinon.stub(conductor.logger, 'newRequestLogger').returns(testLog); + + sinon.stub(conductor, '_controlBacklog').callsFake(cb => cb(null)); + let capturedScanId = null; + sinon.stub(conductor, 'listBuckets') + .callsFake((queue, scanId, log, cb) => { + // Verify scanId is a valid UUID + const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; + assert(uuidRegex.test(scanId), + `conductorScanId should be a valid UUID v4, got: ${scanId}`); + capturedScanId = scanId; + assert(addDefaultFieldsStub.calledOnce); + assert.strictEqual(addDefaultFieldsStub.firstCall.args[0].conductorScanId, scanId); + cb(null, 0); + }); + const metricStub = sinon.stub(LifecycleMetrics, 'onConductorFullScan'); + + conductor.processBuckets(err => { + assert.ifError(err); + assert(metricStub.calledOnce); + const logPassedToMetric = metricStub.firstCall.args[0]; + assert.strictEqual(logPassedToMetric, testLog); + assert(capturedScanId); + const uuidRegex = /^[0-9a-f]{8}-[0-9a-f]{4}-4[0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$/i; + assert(uuidRegex.test(capturedScanId), + `captured conductorScanId should be a valid UUID v4, got: ${capturedScanId}`); + done(); + }); + }); + // tests that `activeIndexingJobRetrieved` is not reset until the e it('should not reset `activeIndexingJobsRetrieved` while async operations are in progress', done => { const order = []; @@ -202,7 +283,7 @@ describe('Lifecycle Conductor', () => { sinon.stub(conductor._mongodbClient, 'getIndexingJobs') .callsFake((_, cb) => cb(null, ['job1', 'job2'])); sinon.stub(conductor, 'listBuckets') - .callsFake((mQueue, log, cb) => { + .callsFake((mQueue, scanId, log, cb) => { mQueue.push({ bucketName: 'testbucket', canonicalId: 'testId', @@ -219,7 +300,7 @@ describe('Lifecycle Conductor', () => { setTimeout(() => cb(null, []), 1000); }); sinon.stub(conductor, '_createBucketTaskMessages') - .callsFake((_a, _b, cb) => { + .callsFake((_a, _b, _c, _d, cb) => { order.push(2); assert(conductor.activeIndexingJobsRetrieved); setTimeout(() => cb(null, []), 1000); @@ -244,6 +325,15 @@ describe('Lifecycle Conductor', () => { }); describe('_indexesGetOrCreate', () => { + it('should include conductor scan id in task context', () => { + const taskMessage = conductor._taskToMessage( + getTask(true), lifecycleTaskVersions.v2, + 'scan-id-test', 1706000000000, log); + const parsed = JSON.parse(taskMessage.message); + assert.strictEqual(parsed.contextInfo.conductorScanId, 'scan-id-test'); + assert.strictEqual(parsed.contextInfo.scanStartTimestamp, 1706000000000); + }); + it('should return v2 for bucketd bucket sources', done => { conductor._bucketSource = 'bucketd'; conductor._indexesGetOrCreate(getTask(undefined), log, (err, taskVersion) => { @@ -435,17 +525,19 @@ describe('Lifecycle Conductor', () => { const lcConductor = makeLifecycleConductorWithFilters({ bucketSource: 'zookeeper', }); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 2); assert.strictEqual(queue.length(), 2); const expectedQueue = [ { canonicalId: account1, bucketName: bucket1, + isLifecycled: true, }, { canonicalId: account2, bucketName: bucket2, + isLifecycled: true, }, ]; assert.deepStrictEqual(queue.list(), expectedQueue); @@ -458,7 +550,7 @@ describe('Lifecycle Conductor', () => { const lcConductor = makeLifecycleConductorWithFilters({ bucketSource: 'bucketd', }, markers); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 2); assert.strictEqual(queue.length(), 2); @@ -489,17 +581,19 @@ describe('Lifecycle Conductor', () => { 'invalid:bucketuid789', 'invalid', ]); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 2); assert.strictEqual(queue.length(), 2); const expectedQueue = [ { canonicalId: account1, bucketName: bucket1, + isLifecycled: true, }, { canonicalId: account2, bucketName: bucket2, + isLifecycled: true, }, ]; assert.deepStrictEqual(queue.list(), expectedQueue); @@ -512,13 +606,14 @@ describe('Lifecycle Conductor', () => { bucketsDenied: [bucket1], bucketSource: 'zookeeper', }); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 1); assert.strictEqual(queue.length(), 1); const expectedQueue = [ { canonicalId: account2, bucketName: bucket2, + isLifecycled: true, }, ]; assert.deepStrictEqual(queue.list(), expectedQueue); @@ -532,7 +627,7 @@ describe('Lifecycle Conductor', () => { bucketsDenied: [bucket1], bucketSource: 'bucketd', }, markers); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 1); assert.strictEqual(queue.length(), 1); const expectedQueue = [ @@ -553,13 +648,14 @@ describe('Lifecycle Conductor', () => { accountsDenied: [`${accountName1}:${account1}`], bucketSource: 'zookeeper', }); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 1); assert.strictEqual(queue.length(), 1); const expectedQueue = [ { canonicalId: account2, bucketName: bucket2, + isLifecycled: true, }, ]; assert.deepStrictEqual(queue.list(), expectedQueue); @@ -573,7 +669,7 @@ describe('Lifecycle Conductor', () => { accountsDenied: [`${accountName1}:${account1}`], bucketSource: 'bucketd', }, markers); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 1); assert.strictEqual(queue.length(), 1); const expectedQueue = [ @@ -595,7 +691,7 @@ describe('Lifecycle Conductor', () => { bucketsDenied: [bucket2], bucketSource: 'zookeeper', }); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 0); assert.strictEqual(queue.length(), 0); const expectedQueue = []; @@ -611,7 +707,7 @@ describe('Lifecycle Conductor', () => { bucketsDenied: [bucket2], bucketSource: 'bucketd', }, markers); - lcConductor.listBuckets(queue, fakeLogger, (err, length) => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, (err, length) => { assert.strictEqual(length, 0); assert.strictEqual(queue.length(), 0); const expectedQueue = []; @@ -636,7 +732,7 @@ describe('Lifecycle Conductor', () => { getCollection: () => ({ find: findStub }) }; sinon.stub(lcConductor._zkClient, 'getData').yields(null, null, null); - lcConductor.listBuckets(queue, fakeLogger, err => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, err => { assert.ifError(err); assert.deepEqual(findStub.getCall(0).args[0], {}); done(); @@ -658,7 +754,7 @@ describe('Lifecycle Conductor', () => { getCollection: () => ({ find: findStub }) }; sinon.stub(lcConductor._zkClient, 'getData').yields(null, null, null); - lcConductor.listBuckets(queue, fakeLogger, err => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, err => { assert.ifError(err); assert.deepEqual(findStub.getCall(0).args[0], { 'value.owner': { @@ -686,7 +782,7 @@ describe('Lifecycle Conductor', () => { getCollection: () => ({ find: findStub }) }; sinon.stub(lcConductor._zkClient, 'getData').yields(null, null, null); - lcConductor.listBuckets(queue, fakeLogger, err => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, err => { assert.ifError(err); assert.deepEqual(findStub.getCall(0).args[0], { _id: { @@ -711,7 +807,7 @@ describe('Lifecycle Conductor', () => { getCollection: () => ({ find: findStub }) }; sinon.stub(lcConductor._zkClient, 'getData').yields(null, Buffer.from('bucket1'), null); - lcConductor.listBuckets(queue, fakeLogger, err => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, err => { assert.ifError(err); assert.deepEqual(findStub.getCall(0).args[0], { _id: { @@ -738,7 +834,7 @@ describe('Lifecycle Conductor', () => { getCollection: () => ({ find: findStub }) }; sinon.stub(lcConductor._zkClient, 'getData').yields(null, Buffer.from('bucket1'), null); - lcConductor.listBuckets(queue, fakeLogger, err => { + lcConductor.listBuckets(queue, 'test-scan-id', fakeLogger, err => { assert.ifError(err); assert.deepEqual(findStub.getCall(0).args[0], { '_id': { diff --git a/tests/unit/lifecycle/LifecycleMetrics.spec.js b/tests/unit/lifecycle/LifecycleMetrics.spec.js index 2acdba249..f1a85d39c 100644 --- a/tests/unit/lifecycle/LifecycleMetrics.spec.js +++ b/tests/unit/lifecycle/LifecycleMetrics.spec.js @@ -79,6 +79,70 @@ describe('LifecycleMetrics', () => { })); }); + it('should catch errors in onConductorFullScan', () => { + const metric = ZenkoMetrics.getMetric('s3_lifecycle_conductor_full_scan_elapsed_seconds'); + sinon.stub(metric, 'set').throws(new Error('Metric error')); + + LifecycleMetrics.onConductorFullScan(log, 5000, 10, 8, 5); + + assert(log.error.calledOnce); + assert(log.error.calledWithMatch('failed to update prometheus metrics', { + method: 'LifecycleMetrics.onConductorFullScan', + elapsedMs: 5000, + bucketCount: 10, + workflowCount: 8, + lifecycleBucketCount: 5, + })); + }); + + it('should set and reset bucket processor scan gauges', () => { + const scanStartMetric = ZenkoMetrics.getMetric( + 's3_lifecycle_bucket_processor_scan_start_time'); + const bucketsMetric = ZenkoMetrics.getMetric( + 's3_lifecycle_bucket_processor_buckets_count'); + const startSet = sinon.stub(scanStartMetric, 'set'); + const bucketsReset = sinon.stub(bucketsMetric, 'reset'); + const bucketsInc = sinon.stub(bucketsMetric, 'inc'); + + // start a scan + LifecycleMetrics.onBucketProcessorScanStart( + log, 1706000000000); + assert(startSet.calledOnce); + assert(startSet.calledWithMatch( + { origin: 'bucket_processor' }, 1706000000000)); + assert(bucketsReset.calledOnce); + + // process a bucket + LifecycleMetrics.onBucketProcessorBucketDone(log); + assert(bucketsInc.calledOnce); + + // end the scan + startSet.resetHistory(); + LifecycleMetrics.onBucketProcessorScanEnd(log); + assert(startSet.calledOnce); + assert(startSet.calledWithMatch( + { origin: 'bucket_processor' }, 0)); + + assert(log.error.notCalled); + }); + + it('should catch errors in onBucketProcessorScanStart', () => { + const scanStartMetric = ZenkoMetrics.getMetric( + 's3_lifecycle_bucket_processor_scan_start_time'); + sinon.stub(scanStartMetric, 'set') + .throws(new Error('Metric error')); + + LifecycleMetrics.onBucketProcessorScanStart( + log, 1706000000000); + + assert(log.error.calledOnce); + assert(log.error.calledWithMatch( + 'failed to update prometheus metrics', { + method: + 'LifecycleMetrics.onBucketProcessorScanStart', + })); + }); + it('should catch errors in onLifecycleTriggered', () => { LifecycleMetrics.onLifecycleTriggered(log, 'conductor', 'expiration', 'us-east-1', NaN); @@ -169,5 +233,28 @@ describe('LifecycleMetrics', () => { process: 'conductor', })); }); + + it('should set full scan metrics including lifecycle bucket count', () => { + const elapsedMetric = ZenkoMetrics.getMetric( + 's3_lifecycle_conductor_full_scan_elapsed_seconds'); + const countMetric = ZenkoMetrics.getMetric( + 's3_lifecycle_conductor_scan_count'); + const elapsedSet = sinon.stub(elapsedMetric, 'set'); + const countSet = sinon.stub(countMetric, 'set'); + + LifecycleMetrics.onConductorFullScan(log, 3000, 7, 6, 4); + + assert(elapsedSet.calledOnce); + assert(elapsedSet.calledWithMatch( + { origin: 'conductor' }, 3)); + assert.strictEqual(countSet.callCount, 3); + assert(countSet.getCall(0).calledWithMatch( + { origin: 'conductor', type: 'bucket' }, 7)); + assert(countSet.getCall(1).calledWithMatch( + { origin: 'conductor', type: 'lifecycle_bucket' }, 4)); + assert(countSet.getCall(2).calledWithMatch( + { origin: 'conductor', type: 'workflow' }, 6)); + assert(log.error.notCalled); + }); }); }); diff --git a/tests/utils/BackbeatTestConsumer.js b/tests/utils/BackbeatTestConsumer.js index 00ed404dd..2e690180f 100644 --- a/tests/utils/BackbeatTestConsumer.js +++ b/tests/utils/BackbeatTestConsumer.js @@ -32,6 +32,25 @@ class BackbeatTestConsumer extends BackbeatConsumer { // present assert(parsedMsg.contextInfo?.reqId, 'expected contextInfo.reqId field'); parsedMsg.contextInfo.reqId = expectedMsg.value.contextInfo?.reqId; + // conductorScanId is also generated per scan: check it exists, + // then normalize for comparison + if (expectedMsg.value.contextInfo?.conductorScanId === 'test-scan-id') { + assert(parsedMsg.contextInfo?.conductorScanId, + 'expected contextInfo.conductorScanId field'); + parsedMsg.contextInfo.conductorScanId = + expectedMsg.value.contextInfo.conductorScanId; + } + // bucketSource is added by conductor: normalize for comparison + if (parsedMsg.contextInfo?.bucketSource && + !expectedMsg.value.contextInfo?.bucketSource) { + delete parsedMsg.contextInfo.bucketSource; + } + // scanStartTimestamp is a dynamic conductor timestamp: + // normalize for comparison + if (parsedMsg.contextInfo?.scanStartTimestamp && + !expectedMsg.value.contextInfo?.scanStartTimestamp) { + delete parsedMsg.contextInfo.scanStartTimestamp; + } } assert.deepStrictEqual( parsedMsg, expectedMsg.value,