@@ -56,7 +56,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaD
5656 [&pg_meta](auto & de) { de.blob_sequence_num .store (pg_meta.blob_seq_num (), std::memory_order_relaxed); });
5757
5858 // update metrics
59- std::unique_lock<std::shared_mutex> lock (mutex );
59+ std::unique_lock< std::shared_mutex > lock (ctx_-> progress_lock );
6060 ctx_->progress .start_time = std::chrono::duration_cast< std::chrono::seconds >(
6161 std::chrono::system_clock::now ().time_since_epoch ()).count ();
6262 ctx_->progress .total_shards = ctx_->shard_list .size ();
@@ -135,7 +135,6 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar
135135 home_obj_.local_create_shard (shard_sb->info , shard_sb->v_chunk_id , shard_sb->p_chunk_id , blk_id.blk_count ());
136136 ctx_->shard_cursor = shard_meta.shard_id ();
137137 ctx_->cur_batch_num = 0 ;
138- std::unique_lock<std::shared_mutex> lock (mutex);
139138 return 0 ;
140139}
141140
@@ -144,7 +143,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
144143 bool is_last_batch) {
145144 // retry mesg, need to handle duplicate batch, reset progress
146145 if (ctx_->cur_batch_num == batch_num) {
147- std::unique_lock<std::shared_mutex> lock (mutex );
146+ std::unique_lock< std::shared_mutex > lock (ctx_-> progress_lock );
148147 ctx_->progress .complete_blobs -= ctx_->progress .cur_batch_blobs ;
149148 ctx_->progress .complete_bytes -= ctx_->progress .cur_batch_bytes ;
150149 ctx_->progress .cur_batch_blobs = 0 ;
@@ -199,7 +198,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
199198 if (blob->state () != static_cast < uint8_t >(ResyncBlobState::CORRUPTED)) {
200199 auto header = r_cast< BlobHeader const * >(blob_data);
201200 if (!header->valid ()) {
202- std::unique_lock<std::shared_mutex> lock (mutex );
201+ std::unique_lock< std::shared_mutex > lock (ctx_-> progress_lock );
203202 ctx_->progress .error_count ++;
204203 LOGE (" Invalid header found for blob_id={}: [header={}]" , blob->blob_id (), header->to_string ());
205204 return INVALID_BLOB_HEADER;
@@ -215,14 +214,14 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
215214 if (std::memcmp (computed_hash, header->hash , BlobHeader::blob_max_hash_len) != 0 ) {
216215 LOGE (" Hash mismatch for blob_id={}: header [{}] [computed={:np}]" , blob->blob_id (), header->to_string (),
217216 spdlog::to_hex (computed_hash, computed_hash + BlobHeader::blob_max_hash_len));
218- std::unique_lock<std::shared_mutex> lock (mutex );
217+ std::unique_lock< std::shared_mutex > lock (ctx_-> progress_lock );
219218 ctx_->progress .error_count ++;
220219 return BLOB_DATA_CORRUPTED;
221220 }
222221 } else {
223222 LOGW (" find corrupted_blobs={} in shardID=0x{:x}, pg={}, shard=0x{:x}" , blob->blob_id (), ctx_->shard_cursor ,
224223 (ctx_->shard_cursor >> homeobject::shard_width), (ctx_->shard_cursor & homeobject::shard_mask));
225- std::unique_lock<std::shared_mutex> lock (mutex );
224+ std::unique_lock< std::shared_mutex > lock (ctx_-> progress_lock );
226225 ctx_->progress .corrupted_blobs ++;
227226 }
228227
@@ -238,7 +237,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
238237 LOGE (" Failed to allocate blocks for shardID=0x{:x}, pg={}, shard=0x{:x} blob {}" , ctx_->shard_cursor ,
239238 (ctx_->shard_cursor >> homeobject::shard_width), (ctx_->shard_cursor & homeobject::shard_mask),
240239 blob->blob_id ());
241- std::unique_lock<std::shared_mutex> lock (mutex );
240+ std::unique_lock< std::shared_mutex > lock (ctx_-> progress_lock );
242241 ctx_->progress .error_count ++;
243242 return ALLOC_BLK_ERR;
244243 }
@@ -253,7 +252,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
253252#ifdef _PRERELEASE
254253 if (iomgr_flip::instance ()->test_flip (" snapshot_receiver_blob_write_data_error" )) {
255254 LOGW (" Simulating blob snapshot write data error" );
256- std::unique_lock<std::shared_mutex> lock (mutex );
255+ std::unique_lock< std::shared_mutex > lock (ctx_-> progress_lock );
257256 ctx_->progress .error_count ++;
258257 free_allocated_blks ();
259258 return WRITE_DATA_ERR;
@@ -274,13 +273,13 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
274273 if (ret.hasError ()) {
275274 LOGE (" Failed to write blob info of blob_id {} to blk_id={}" , blob->blob_id (), blk_id.to_string ());
276275 free_allocated_blks ();
277- std::unique_lock<std::shared_mutex> lock (mutex );
276+ std::unique_lock< std::shared_mutex > lock (ctx_-> progress_lock );
278277 ctx_->progress .error_count ++;
279278 return WRITE_DATA_ERR;
280279 }
281280 if (homestore::data_service ().commit_blk (blk_id) != homestore::BlkAllocStatus::SUCCESS) {
282281 LOGE (" Failed to commit blk_id={} for blob_id={}" , blk_id.to_string (), blob->blob_id ());
283- std::unique_lock<std::shared_mutex> lock (mutex );
282+ std::unique_lock< std::shared_mutex > lock (ctx_-> progress_lock );
284283 ctx_->progress .error_count ++;
285284 free_allocated_blks ();
286285 return COMMIT_BLK_ERR;
@@ -291,7 +290,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
291290 home_obj_.local_add_blob_info (ctx_->pg_id , BlobInfo{ctx_->shard_cursor , blob->blob_id (), blk_id});
292291 if (!success) {
293292 LOGE (" Failed to add blob info for blob_id={}" , blob->blob_id ());
294- std::unique_lock<std::shared_mutex> lock (mutex );
293+ std::unique_lock< std::shared_mutex > lock (ctx_-> progress_lock );
295294 ctx_->progress .error_count ++;
296295 free_allocated_blks ();
297296 return ADD_BLOB_INDEX_ERR;
@@ -304,7 +303,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
304303
305304 // update metrics
306305 {
307- std::unique_lock<std::shared_mutex> lock (mutex );
306+ std::unique_lock< std::shared_mutex > lock (ctx_-> progress_lock );
308307 ctx_->progress .cur_batch_blobs = data_blobs.blob_list ()->size ();
309308 ctx_->progress .cur_batch_bytes = total_bytes;
310309 ctx_->progress .complete_blobs += ctx_->progress .cur_batch_blobs ;
@@ -323,7 +322,7 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
323322 home_obj_.chunk_selector ()->release_chunk (ctx_->pg_id , v_chunk_id.value ());
324323 }
325324 {
326- std::unique_lock<std::shared_mutex> lock (mutex );
325+ std::unique_lock< std::shared_mutex > lock (ctx_-> progress_lock );
327326 ctx_->progress .complete_shards ++;
328327 }
329328 // We only update the snp info superblk on completion of each shard, since resumption is also shard-level
@@ -425,7 +424,7 @@ void HSHomeObject::SnapshotReceiveHandler::update_snp_info_sb(bool init) {
425424 sb->shard_cursor = get_next_shard ();
426425
427426 {
428- std::unique_lock<std::shared_mutex> lock (mutex );
427+ std::unique_lock< std::shared_mutex > lock (ctx_-> progress_lock );
429428 sb->progress .start_time = ctx_->progress .start_time ;
430429 sb->progress .total_blobs = ctx_->progress .total_blobs ;
431430 sb->progress .total_bytes = ctx_->progress .total_bytes ;
0 commit comments