Skip to content

Commit cc33d9b

Browse files
nits
1 parent 0690894 commit cc33d9b

1 file changed

Lines changed: 7 additions & 6 deletions

File tree

chain/src/indexer/backfiller/consumer.rs

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ impl<E: Spawner + Clock + Storage + Metrics, C: Client> Consumer<E, C> {
167167
Self::wait_for_uploadable_block(&context, &marshal, &uploads, digest, retry)
168168
.await
169169
else {
170-
debug!(?digest, "consumer observed live upload before block upload");
170+
debug!(?digest, "skipping previously uploaded block");
171171
return Completion::Skipped { position, height };
172172
};
173173

@@ -180,7 +180,7 @@ impl<E: Spawner + Clock + Storage + Metrics, C: Client> Consumer<E, C> {
180180
};
181181
match decision {
182182
Decision::Skip => {
183-
debug!(?digest, "consumer observed live upload before block upload");
183+
debug!(?digest, "skipping previously uploaded block");
184184
return Completion::Skipped { position, height };
185185
}
186186
Decision::Wait => {
@@ -193,7 +193,7 @@ impl<E: Spawner + Clock + Storage + Metrics, C: Client> Consumer<E, C> {
193193
match client.block_upload(block.clone()).await {
194194
Ok(()) => {
195195
upload_results.inc(status::Status::Success);
196-
debug!(?digest, "consumer uploaded block");
196+
debug!(?digest, "uploaded block by digest");
197197
return Completion::Uploaded {
198198
position,
199199
height,
@@ -205,7 +205,7 @@ impl<E: Spawner + Clock + Storage + Metrics, C: Client> Consumer<E, C> {
205205
// not ack the entry until success or until the live
206206
// certificate path proves the block was uploaded.
207207
upload_results.inc(status::Status::Failure);
208-
warn!(?e, ?digest, "consumer failed to upload block, retrying");
208+
warn!(?e, ?digest, "retrying block upload by digest");
209209
context.sleep(retry).await;
210210
}
211211
}
@@ -267,9 +267,9 @@ impl<E: Spawner + Clock + Storage + Metrics, C: Client> Consumer<E, C> {
267267
}
268268

269269
async fn complete(&mut self, completion: Completion) {
270+
// Record the success before acking so the in-memory dedupe tracker
271+
// stays aligned with the queue state.
270272
let (position, height) = match completion {
271-
// Record the success before acking so the in-memory dedupe tracker
272-
// stays aligned with the queue state.
273273
Completion::Uploaded {
274274
position,
275275
height,
@@ -281,6 +281,7 @@ impl<E: Spawner + Clock + Storage + Metrics, C: Client> Consumer<E, C> {
281281
Completion::Skipped { position, height } => (position, height),
282282
};
283283

284+
// Acknowledge the queue entry and advance the queue floor if needed.
284285
let floor = self.reader.ack_floor().await;
285286
self.reader.ack(position).await.expect("failed to ack");
286287
let floor_advanced = self.reader.ack_floor().await > floor;

0 commit comments

Comments
 (0)