@@ -260,22 +260,26 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
260
260
auto idx = cur_start_blob_idx_;
261
261
262
262
std::vector< BlobManager::AsyncResult< blob_read_result > > futs;
263
+ auto total_blobs = 0 ;
264
+ auto skipped_blobs = 0 ;
263
265
while (total_bytes < max_batch_size_ && idx < cur_blob_list_.size ()) {
264
266
auto info = cur_blob_list_[idx++];
267
+ total_blobs++;
265
268
// handle deleted object
266
269
if (info.pbas == tombstone_pbas) {
267
270
LOGT (" Blob is deleted: shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, blkid={}" , info.shard_id ,
268
271
(info.shard_id >> homeobject::shard_width), (info.shard_id & homeobject::shard_mask), info.blob_id ,
269
272
info.pbas .to_string ());
270
273
// ignore
274
+ skipped_blobs++;
271
275
continue ;
272
276
}
273
277
274
278
#ifdef _PRERELEASE
275
279
if (iomgr_flip::instance ()->test_flip (" pg_blob_iterator_load_blob_data_error" )) {
276
280
LOGW (" Simulating loading blob data error" );
277
281
futs.emplace_back (folly::makeUnexpected (BlobError (BlobErrorCode::READ_FAILED)));
278
- return false ;
282
+ continue ;
279
283
}
280
284
auto delay = iomgr_flip::instance ()->get_test_flip < long >(" simulate_read_snapshot_load_blob_delay" ,
281
285
static_cast < long >(info.blob_id ));
@@ -286,20 +290,21 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
286
290
}
287
291
#endif
288
292
auto blob_start = Clock::now ();
289
- auto executorKeepAlive = folly::getKeepAliveToken (executor);
290
293
294
+ LOGT (" submitting io for blob {}" , info.blob_id );
291
295
// Fixme: Re-enable retries uint8_t retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry);
292
296
futs.emplace_back (
293
297
std::move (load_blob_data (info))
294
- .via (folly::getKeepAliveToken (InlineExecutor::instance ()))
298
+ .via (folly::getKeepAliveToken (folly:: InlineExecutor::instance ()))
295
299
.thenValue (
296
- [this , info, blob_start](auto && result) mutable -> BlobManager::AsyncResult< blob_read_result > {
300
+ [& , info, blob_start](auto && result) mutable -> BlobManager::AsyncResult< blob_read_result > {
297
301
if (result.hasError () && result.error ().code == BlobErrorCode::READ_FAILED) {
298
302
LOGE (" Failed to retrieve blob for shardID=0x{:x}, pg={}, shard=0x{:x} blob={} pbas={}" ,
299
303
info.shard_id , (info.shard_id >> homeobject::shard_width),
300
304
(info.shard_id & homeobject::shard_mask), info.blob_id , info.pbas .to_string ());
301
305
COUNTER_INCREMENT (*metrics_, snp_dnr_error_count, 1 );
302
306
} else {
307
+ LOGT (" retrieved blob, blob={} pbas={}" , info.blob_id , info.pbas .to_string ());
303
308
HISTOGRAM_OBSERVE (*metrics_, snp_dnr_blob_process_latency, get_elapsed_time_us (blob_start));
304
309
}
305
310
return result;
@@ -309,12 +314,26 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
309
314
total_bytes += expect_blob_size;
310
315
}
311
316
// collect futs and add to flatbuffer.
317
+ bool hit_error = false ;
318
+ if (skipped_blobs + (long )futs.size () != total_blobs) {
319
+ LOGE (" total {} blobs, skipped {}, expect {} but only has {} futs" , total_blobs, skipped_blobs,
320
+ total_blobs - skipped_blobs, futs.size ());
321
+ hit_error = true ;
322
+ }
312
323
for (auto it = futs.begin (); it != futs.end (); it++) {
313
324
auto res = std::move (*it).get ();
314
- // fail whole batch.
315
- if (res.hasError ()) { return false ; }
316
- std::vector< uint8_t > data (res->blob_ .cbytes (), res->blob_ .cbytes () + res->blob_ .size ());
317
- blob_entries.push_back (CreateResyncBlobDataDirect (builder_, res->blob_id_ , (uint8_t )res->state_ , &data));
325
+ // Although we fail the whole batch if single blob failed, however we still need to wait till all in-flight ops
326
+ // finished, because the iterator might be released by nuraft after returning an error, however the "thenValue"
327
+ // still access the metrics ptr.
328
+ if (res.hasError ()) { hit_error = true ; }
329
+ if (!hit_error) {
330
+ std::vector< uint8_t > data (res->blob_ .cbytes (), res->blob_ .cbytes () + res->blob_ .size ());
331
+ blob_entries.push_back (CreateResyncBlobDataDirect (builder_, res->blob_id_ , (uint8_t )res->state_ , &data));
332
+ }
333
+ }
334
+ if (hit_error) {
335
+ builder_.Clear ();
336
+ return false ;
318
337
}
319
338
// should include the deleted blobs
320
339
cur_batch_blob_count_ = idx - cur_start_blob_idx_;
0 commit comments