Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions extensions/lifecycle/LifecycleMetrics.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ const LIFECYCLE_LABEL_OP = 'op';
const LIFECYCLE_LABEL_STATUS = 'status';
const LIFECYCLE_LABEL_LOCATION = 'location';
const LIFECYCLE_LABEL_TYPE = 'type';
const LIFECYCLE_LABEL_BUCKET_SOURCE = 'bucket_source';

const LIFECYCLE_MARKER_METRICS_LOCATION = '-delete-marker-';

Expand Down Expand Up @@ -50,6 +51,32 @@ const lifecycleLegacyTask = ZenkoMetrics.createCounter({
labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_STATUS],
});

const conductorFullScanElapsedSeconds = ZenkoMetrics.createGauge({
name: 's3_lifecycle_conductor_full_scan_elapsed_seconds',
help: 'Elapsed time for the latest full lifecycle conductor scan, in seconds',
labelNames: [LIFECYCLE_LABEL_ORIGIN],
});

const conductorScanCount = ZenkoMetrics.createGauge({
name: 's3_lifecycle_conductor_scan_count',
help: 'Number of buckets and queued workflows for the latest full lifecycle conductor scan',
labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_TYPE, LIFECYCLE_LABEL_BUCKET_SOURCE],
});

const bucketProcessorScanStartTime = ZenkoMetrics.createGauge({
name: 's3_lifecycle_bucket_processor_scan_start_time',
help: 'Timestamp (ms) of the scan currently being processed '
+ 'by this bucket processor (0 when idle)',
labelNames: [LIFECYCLE_LABEL_ORIGIN],
});

const bucketProcessorBucketsCount = ZenkoMetrics.createGauge({
name: 's3_lifecycle_bucket_processor_buckets_count',
help: 'Number of buckets successfully processed by '
+ 'this bucket processor during the current scan',
labelNames: [LIFECYCLE_LABEL_ORIGIN, LIFECYCLE_LABEL_BUCKET_SOURCE],
});

const lifecycleS3Operations = ZenkoMetrics.createCounter({
name: 's3_lifecycle_s3_operations_total',
help: 'Total number of S3 operations by the lifecycle processes',
Expand Down Expand Up @@ -172,6 +199,122 @@ class LifecycleMetrics {
}
}

/**
* Record metrics at the end of a full conductor scan.
* @param {Object} log - logger
* @param {number} elapsedMs - wall-clock time for the full scan
* @param {number} bucketCount - total buckets scanned
* @param {number} workflowCount - messages delivered to bucket-tasks topic
* @param {number} lifecycleBucketCount - buckets that have lifecycle enabled
* @param {string} bucketSource - source of buckets (mongodb, zookeeper, bucketd)
*/
static onConductorFullScan(
log, elapsedMs, bucketCount, workflowCount, lifecycleBucketCount, bucketSource
) {
try {
conductorFullScanElapsedSeconds.set(
{ [LIFECYCLE_LABEL_ORIGIN]: 'conductor' },
elapsedMs / 1000
);
conductorScanCount.set({
[LIFECYCLE_LABEL_ORIGIN]: 'conductor',
[LIFECYCLE_LABEL_TYPE]: 'bucket',
[LIFECYCLE_LABEL_BUCKET_SOURCE]: bucketSource || 'unknown',
}, bucketCount);
// For bucketd, we can't know lifecycle buckets at conductor level,
// so lifecycleBucketCount will be 0. For accurate bucketd counts,
// use bucketProcessorBucketsCount aggregated across all bucket processor
// instances in Grafana dashboards.
conductorScanCount.set({
[LIFECYCLE_LABEL_ORIGIN]: 'conductor',
[LIFECYCLE_LABEL_TYPE]: 'lifecycle_bucket',
[LIFECYCLE_LABEL_BUCKET_SOURCE]: bucketSource || 'unknown',
}, lifecycleBucketCount);
conductorScanCount.set({
[LIFECYCLE_LABEL_ORIGIN]: 'conductor',
[LIFECYCLE_LABEL_TYPE]: 'workflow',
[LIFECYCLE_LABEL_BUCKET_SOURCE]: bucketSource || 'unknown',
}, workflowCount);
} catch (err) {
LifecycleMetrics.handleError(
log, err, 'LifecycleMetrics.onConductorFullScan', {
elapsedMs, bucketCount, workflowCount,
lifecycleBucketCount, bucketSource,
}
);
}
}

/**
* Signal that a bucket processor has started working on a scan.
* Called when the first message of a new conductorScanId arrives.
* @param {Object} log - logger
* @param {number} scanStartTimestamp - timestamp (ms) from the
* conductor scan start
*/
static onBucketProcessorScanStart(log, scanStartTimestamp) {
try {
bucketProcessorScanStartTime.set(
{ [LIFECYCLE_LABEL_ORIGIN]: 'bucket_processor' },
scanStartTimestamp
);
bucketProcessorBucketsCount.reset();
} catch (err) {
LifecycleMetrics.handleError(
log, err,
'LifecycleMetrics.onBucketProcessorScanStart',
{ scanStartTimestamp }
);
}
}

/**
* Increment the count of lifecycle-enabled buckets successfully processed
* during the current scan. This is called after the bucket task completes
* successfully, not when it is dispatched to the scheduler.
*
* Note: For bucketd source, the conductor cannot know which buckets have
* lifecycle at listing time.
* @param {Object} log - logger
* @param {string} bucketSource - source of bucket (mongodb, zookeeper, bucketd)
* @param {string} conductorScanId - scan ID from conductor
*/
static onBucketProcessorBucketDone(log, bucketSource, conductorScanId) {
try {
bucketProcessorBucketsCount.inc({
[LIFECYCLE_LABEL_ORIGIN]: 'bucket_processor',
[LIFECYCLE_LABEL_BUCKET_SOURCE]: bucketSource || 'unknown',
});
} catch (err) {
LifecycleMetrics.handleError(
log, err,
'LifecycleMetrics.onBucketProcessorBucketDone',
{ bucketSource, conductorScanId }
);
}
}

/**
* Signal that a bucket processor has finished all tasks for
* the current scan (queue drained).
* Resets the scan timestamp to 0 so Prometheus can detect
* that this processor is idle.
* @param {Object} log - logger
*/
static onBucketProcessorScanEnd(log) {
try {
bucketProcessorScanStartTime.set(
{ [LIFECYCLE_LABEL_ORIGIN]: 'bucket_processor' },
0
);
} catch (err) {
LifecycleMetrics.handleError(
log, err,
'LifecycleMetrics.onBucketProcessorScanEnd'
);
}
}

static onLifecycleTriggered(log, process, type, location, latencyMs) {
try {
lifecycleTriggerLatency.observe({
Expand Down
76 changes: 75 additions & 1 deletion extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,16 @@ const {
extractBucketProcessorCircuitBreakerConfigs,
} = require('../CircuitBreakerGroup');
const { lifecycleTaskVersions } = require('../../../lib/constants');
const { LifecycleMetrics } = require('../LifecycleMetrics');
const locations = require('../../../conf/locationConfig.json');

const PROCESS_OBJECTS_ACTION = 'processObjects';

// Delay before declaring a scan finished after the internal task
// queue drains. This prevents premature idle signals when the
// queue empties between Kafka message batches within the same scan.
const SCAN_END_DEBOUNCE_DELAY_MS = 30000;

/**
* @class LifecycleBucketProcessor
*
Expand Down Expand Up @@ -97,6 +103,12 @@ class LifecycleBucketProcessor {
this._pausedLocations = new Set();
this._locationStatusStream = null;

// Conductor scan ID for metrics; kept across internal queue drains so
// Kafka batches from the same scan do not retrigger onBucketProcessorScanStart.
this._metricsScanId = null;
this._processorScanMetricsActive = false;
this._scanEndTimeout = null;

this.clientManager = new ClientManager({
id: 'lifecycle',
authConfig: lcConfig.bucketProcessor.auth || lcConfig.auth,
Expand Down Expand Up @@ -145,12 +157,32 @@ class LifecycleBucketProcessor {
}, this._lcConfig.bucketProcessor.concurrency);

// Listen for errors from any task being processed.
// When the internal task queue empties, schedule a delayed
// reset of the scan-in-progress gauge. The delay avoids
// false idle signals when the queue drains between Kafka
// message batches that belong to the same conductor scan.
this._internalTaskScheduler.drain(err => {
if (err) {
this._log.error('error occurred during task processing', {
error: err,
});
}
if (this._processorScanMetricsActive && !this._scanEndTimeout) {
this._log.info('bucket processor task queue drained, scheduling scan end', {
method: 'LifecycleBucketProcessor.drain',
conductorScanId: this._metricsScanId,
debounceMs: SCAN_END_DEBOUNCE_DELAY_MS,
});
this._scanEndTimeout = setTimeout(() => {
this._scanEndTimeout = null;
this._log.info('bucket processor scan end confirmed', {
method: 'LifecycleBucketProcessor.drain',
conductorScanId: this._metricsScanId,
});
LifecycleMetrics.onBucketProcessorScanEnd(this._log);
this._processorScanMetricsActive = false;
}, SCAN_END_DEBOUNCE_DELAY_MS);
}
});

const globalCircuitBreakerConfig = extractBucketProcessorCircuitBreakerConfigs(
Expand Down Expand Up @@ -277,12 +309,41 @@ class LifecycleBucketProcessor {
return process.nextTick(() => cb(errors.InternalError));
}
const { bucket, owner, accountId, taskVersion } = result.target;
const conductorScanId = result.contextInfo && result.contextInfo.conductorScanId;
if (conductorScanId && conductorScanId !== this._metricsScanId) {
// New scan: cancel any pending debounce, full metric reset
if (this._scanEndTimeout) {
clearTimeout(this._scanEndTimeout);
this._scanEndTimeout = null;
}
this._metricsScanId = conductorScanId;
LifecycleMetrics.onBucketProcessorScanStart(
this._log, Date.now()
);
this._processorScanMetricsActive = true;
this._log.info('new conductor scan detected', {
method:
'LifecycleBucketProcessor._processBucketEntry',
conductorScanId,
});
} else if (conductorScanId && this._scanEndTimeout) {
// Same scan, but the debounce was pending: cancel it
// because more messages are still arriving.
clearTimeout(this._scanEndTimeout);
this._scanEndTimeout = null;
this._log.debug('cancelled scan end debounce, more messages arriving', {
method: 'LifecycleBucketProcessor._processBucketEntry',
conductorScanId,
});
}

if (!bucket || !owner || (!accountId && this._authConfig.type === authTypeAssumeRole)) {
this._log.error('kafka bucket entry missing required fields', {
method: 'LifecycleBucketProcessor._processBucketEntry',
bucket,
owner,
accountId,
conductorScanId,
});
return process.nextTick(() => cb(errors.InternalError));
}
Expand All @@ -291,6 +352,7 @@ class LifecycleBucketProcessor {
bucket,
owner,
accountId,
conductorScanId,
});

const s3Client = this.clientManager.getS3Client(accountId);
Expand Down Expand Up @@ -345,14 +407,26 @@ class LifecycleBucketProcessor {
owner,
details: result.details,
taskName: task.constructor.name,
conductorScanId,
});
return this._internalTaskScheduler.push({
task,
rules: config.Rules,
value: result,
s3target: s3Client,
backbeatMetadataProxy,
}, cb);
}, err => {
// Increment metric after task successfully completes
// This ensures we count actual processing completion, not just dispatch
// Only count successful completions, not failures
if (!err) {
const bucketSource = result.contextInfo && result.contextInfo.bucketSource;
LifecycleMetrics.onBucketProcessorBucketDone(
this._log, bucketSource, conductorScanId
);
}
return cb(err);
});
});
}

Expand Down
Loading
Loading