Skip to content

Commit e9c1bc9

Browse files
committed
Add lifecycle pipeline tracing logs for overlap debugging
Add info-level logging at key points in the lifecycle pipeline to trace message flow through bucket tasks, object actions, and deletions. Includes Kafka partition/offset, context info, and backlog control check results.
1 parent f6624b5 commit e9c1bc9

File tree

6 files changed

+98
-19
lines changed

6 files changed

+98
-19
lines changed

extensions/lifecycle/bucketProcessor/LifecycleBucketProcessor.js

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -285,11 +285,15 @@ class LifecycleBucketProcessor {
285285
});
286286
return process.nextTick(() => cb(errors.InternalError));
287287
}
288-
this._log.debug('processing bucket entry', {
288+
this._log.info('consuming bucket task entry', {
289289
method: 'LifecycleBucketProcessor._processBucketEntry',
290290
bucket,
291291
owner,
292292
accountId,
293+
kafkaPartition: entry.partition,
294+
kafkaOffset: entry.offset,
295+
contextInfo: result.contextInfo,
296+
details: result.details,
293297
});
294298

295299
const s3 = this.clientManager.getS3Client(accountId);
@@ -343,6 +347,7 @@ class LifecycleBucketProcessor {
343347
bucket,
344348
owner,
345349
details: result.details,
350+
contextInfo: result.contextInfo,
346351
taskName: task.constructor.name,
347352
});
348353
return this._internalTaskScheduler.push({

extensions/lifecycle/conductor/LifecycleConductor.js

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -868,6 +868,8 @@ class LifecycleConductor {
868868
checkResults);
869869
return done(errors.Throttling);
870870
}
871+
this.logger.info('backlog control check passed, ' +
872+
'all consumers caught up', checkResults);
871873
return done();
872874
});
873875
}

extensions/lifecycle/objectProcessor/LifecycleObjectProcessor.js

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,8 +192,12 @@ class LifecycleObjectProcessor extends EventEmitter {
192192
});
193193
return process.nextTick(done);
194194
}
195-
this._log.debug('processing lifecycle object entry',
196-
actionEntry.getLogInfo());
195+
this._log.info('processing lifecycle object entry',
196+
Object.assign({
197+
kafkaPartition: kafkaEntry.partition,
198+
kafkaOffset: kafkaEntry.offset,
199+
actionContext: actionEntry.getContext(),
200+
}, actionEntry.getLogInfo()));
197201
const task = this.getTask(actionEntry);
198202

199203
if (task === null) {

extensions/lifecycle/tasks/LifecycleDeleteObjectTask.js

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,15 +308,25 @@ class LifecycleDeleteObjectTask extends BackbeatTask {
308308
return done();
309309
}
310310
if (err && err.statusCode === 404) {
311-
log.debug('Unable to find object to delete, skipping',
312-
entry.getLogInfo());
311+
log.info('object already deleted, skipping',
312+
Object.assign({
313+
method: 'LifecycleDeleteObjectTask.processActionEntry',
314+
actionContext: entry.getContext(),
315+
}, entry.getLogInfo()));
313316
return done();
314317
}
315318
if (err && err.statusCode === 412) {
316319
log.info('Object was modified after delete entry ' +
317320
'created so object was not deleted',
318321
entry.getLogInfo());
319322
}
323+
if (!err) {
324+
log.info('successfully deleted object', Object.assign({
325+
method: 'LifecycleDeleteObjectTask.processActionEntry',
326+
actionContext: entry.getContext(),
327+
durationMs: Date.now() - startTime,
328+
}, entry.getLogInfo()));
329+
}
320330
return done(err);
321331
});
322332
}

extensions/lifecycle/tasks/LifecycleTask.js

Lines changed: 55 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -115,10 +115,23 @@ class LifecycleTask extends BackbeatTask {
115115
*/
116116
_sendBucketEntry(entry, cb) {
117117
const entries = [{ message: JSON.stringify(entry) }];
118-
this.producer.sendToTopic(this.bucketTasksTopic, entries, err => {
119-
LifecycleMetrics.onKafkaPublish(null, 'BucketTopic', 'bucket', err, 1);
120-
return cb(err);
121-
});
118+
this.producer.sendToTopic(this.bucketTasksTopic, entries,
119+
(err, deliveryReports) => {
120+
LifecycleMetrics.onKafkaPublish(
121+
null, 'BucketTopic', 'bucket', err, 1);
122+
if (!err && deliveryReports && deliveryReports[0]) {
123+
const dr = deliveryReports[0];
124+
this.log.info('produced bucket task entry', {
125+
method: 'LifecycleTask._sendBucketEntry',
126+
bucket: entry.target && entry.target.bucket,
127+
kafkaPartition: dr.partition,
128+
kafkaOffset: dr.offset,
129+
contextInfo: entry.contextInfo,
130+
details: entry.details,
131+
});
132+
}
133+
return cb(err);
134+
});
122135
}
123136

124137
/**
@@ -177,10 +190,25 @@ class LifecycleTask extends BackbeatTask {
177190
Date.now() - entry.getAttribute('transitionTime'));
178191

179192
const entries = [{ message: entry.toKafkaMessage() }];
180-
this.producer.sendToTopic(this.objectTasksTopic, entries, err => {
181-
LifecycleMetrics.onKafkaPublish(null, 'ObjectTopic', 'bucket', err, 1);
182-
return cb(err);
183-
});
193+
this.producer.sendToTopic(this.objectTasksTopic, entries,
194+
(err, deliveryReports) => {
195+
LifecycleMetrics.onKafkaPublish(
196+
null, 'ObjectTopic', 'bucket', err, 1);
197+
if (!err) {
198+
const dr = deliveryReports && deliveryReports[0];
199+
this.log.info('queued object lifecycle action', {
200+
method: 'LifecycleTask._sendObjectAction',
201+
action: entry.getActionType(),
202+
bucket: entry.getAttribute('target.bucket'),
203+
key: entry.getAttribute('target.key'),
204+
version: entry.getAttribute('target.version'),
205+
kafkaPartition: dr && dr.partition,
206+
kafkaOffset: dr && dr.offset,
207+
actionContext: entry.getContext(),
208+
});
209+
}
210+
return cb(err);
211+
});
184212
}
185213

186214
/**
@@ -233,9 +261,13 @@ class LifecycleTask extends BackbeatTask {
233261
});
234262
this._sendBucketEntry(entry, err => {
235263
if (!err) {
236-
log.debug(
237-
'sent kafka entry for bucket consumption', {
264+
log.info(
265+
're-queued truncated bucket listing ' +
266+
'for next page', {
238267
method: 'LifecycleTask._getObjectList',
268+
bucket: bucketData.target.bucket,
269+
marker,
270+
contextInfo: entry.contextInfo,
239271
});
240272
}
241273
});
@@ -412,9 +444,13 @@ class LifecycleTask extends BackbeatTask {
412444
});
413445
this._sendBucketEntry(entry, err => {
414446
if (!err) {
415-
log.debug('sent kafka entry for bucket ' +
416-
'consumption', {
447+
log.info('re-queued truncated bucket listing ' +
448+
'for next page', {
417449
method: 'LifecycleTask._getObjectVersions',
450+
bucket: bucketData.target.bucket,
451+
nextKeyMarker: data.NextKeyMarker,
452+
nextVersionIdMarker: data.NextVersionIdMarker,
453+
contextInfo: entry.contextInfo,
418454
});
419455
}
420456
});
@@ -497,9 +533,14 @@ class LifecycleTask extends BackbeatTask {
497533
});
498534
return this._sendBucketEntry(entry, err => {
499535
if (!err) {
500-
log.debug(
501-
'sent kafka entry for bucket consumption', {
536+
log.info(
537+
're-queued truncated MPU listing ' +
538+
'for next page', {
502539
method: 'LifecycleTask._getMPUs',
540+
bucket: bucketData.target.bucket,
541+
nextKeyMarker: data.NextKeyMarker,
542+
nextUploadIdMarker: data.NextUploadIdMarker,
543+
contextInfo: entry.contextInfo,
503544
});
504545
}
505546
return next(null, data);

lib/KafkaBacklogMetrics.js

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,7 @@ class KafkaBacklogMetrics extends EventEmitter {
492492

493493
_checkConsumerOffsetsGeneric(topic, groupId, maxLag, snapshotName, cb) {
494494
let checkInfo = undefined;
495+
const allPartitionsInfo = [];
495496
const partitionsZkPath = this._getPartitionsOffsetsZkPath(topic);
496497
this._zookeeper.getChildren(partitionsZkPath, (err, partitions) => {
497498
if (err) {
@@ -570,6 +571,7 @@ class KafkaBacklogMetrics extends EventEmitter {
570571
} else {
571572
info.topicOffset = targetOffset;
572573
}
574+
allPartitionsInfo.push(info);
573575
this._log.debug('lag computed for consumer/topic',
574576
info);
575577
if (lag > maxLag && !checkInfo) {
@@ -585,6 +587,12 @@ class KafkaBacklogMetrics extends EventEmitter {
585587
}, err => {
586588
if (err) {
587589
if (err === CheckConditionError) {
590+
this._log.info('consumer lag check failed', {
591+
topic, groupId, maxLag, snapshotName,
592+
result: 'lag_detected',
593+
failedPartition: checkInfo,
594+
partitions: allPartitionsInfo,
595+
});
588596
return cb(null, checkInfo);
589597
}
590598
if (err === NoNodeError) {
@@ -594,10 +602,19 @@ class KafkaBacklogMetrics extends EventEmitter {
594602
// sent to the snapshotted topic. We can
595603
// consider "everything" has been processed
596604
// then and satisfy the check.
605+
this._log.info('consumer lag check passed', {
606+
topic, groupId, maxLag, snapshotName,
607+
result: 'no_snapshot_node',
608+
});
597609
return cb();
598610
}
599611
return cb(err);
600612
}
613+
this._log.info('consumer lag check passed', {
614+
topic, groupId, maxLag, snapshotName,
615+
result: 'all_caught_up',
616+
partitions: allPartitionsInfo,
617+
});
601618
return cb();
602619
});
603620
});

0 commit comments

Comments
 (0)