@@ -57,8 +57,8 @@ int HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaD
57
57
58
58
// update metrics
59
59
std::unique_lock< std::shared_mutex > lock (ctx_->progress_lock );
60
- ctx_->progress .start_time = std::chrono::duration_cast< std::chrono::seconds >(
61
- std::chrono::system_clock::now ().time_since_epoch ()).count ();
60
+ ctx_->progress .start_time =
61
+ std::chrono::duration_cast< std::chrono::seconds >( std::chrono:: system_clock::now ().time_since_epoch ()).count ();
62
62
ctx_->progress .total_shards = ctx_->shard_list .size ();
63
63
ctx_->progress .total_blobs = pg_meta.total_blobs_to_transfer ();
64
64
ctx_->progress .total_bytes = pg_meta.total_bytes_to_transfer ();
@@ -138,10 +138,22 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar
138
138
return 0 ;
139
139
}
140
140
141
+ static auto collect_all_futures (std::vector< folly::Future< std::error_code > >& futs) {
142
+ return folly::collectAllUnsafe (futs).thenValue ([](auto && vf) {
143
+ for (auto const & err_c : vf) {
144
+ if (sisl_unlikely (err_c.value ())) {
145
+ auto ec = err_c.value ();
146
+ return folly::makeFuture< std::error_code >(std::move (ec));
147
+ }
148
+ }
149
+ return folly::makeFuture< std::error_code >(std::error_code{});
150
+ });
151
+ }
152
+
141
153
int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data (ResyncBlobDataBatch const & data_blobs,
142
154
const snp_batch_id_t batch_num,
143
155
bool is_last_batch) {
144
- // retry mesg, need to handle duplicate batch, reset progress
156
+ // retry mesg, need to handle duplicate batch, reset progress
145
157
if (ctx_->cur_batch_num == batch_num) {
146
158
std::unique_lock< std::shared_mutex > lock (ctx_->progress_lock );
147
159
ctx_->progress .complete_blobs -= ctx_->progress .cur_batch_blobs ;
@@ -159,6 +171,11 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
159
171
hints.chunk_id_hint = *p_chunk_id;
160
172
161
173
uint64_t total_bytes = 0 ;
174
+
175
+ std::vector< homestore::MultiBlkId > allocated_blk_ids;
176
+ std::vector< folly::Future< std::error_code > > futs;
177
+ std::vector< std::shared_ptr< sisl::io_blob_safe > > data_bufs;
178
+
162
179
for (unsigned int i = 0 ; i < data_blobs.blob_list ()->size (); i++) {
163
180
const auto blob = data_blobs.blob_list ()->Get (i);
164
181
@@ -227,12 +244,14 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
227
244
228
245
// Alloc & persist blob data
229
246
auto data_size = blob->data ()->size ();
230
- sisl::io_blob_safe aligned_buf (sisl::round_up (data_size, io_align), io_align);
231
- std::memcpy (aligned_buf.bytes (), blob_data, data_size);
247
+ std::shared_ptr< sisl::io_blob_safe > aligned_buf =
248
+ make_shared< sisl::io_blob_safe >(sisl::round_up (data_size, io_align), io_align);
249
+ std::memcpy (aligned_buf->bytes (), blob_data, data_size);
250
+ data_bufs.emplace_back (aligned_buf);
232
251
233
252
homestore::MultiBlkId blk_id;
234
253
auto status = homestore::data_service ().alloc_blks (
235
- sisl::round_up (aligned_buf. size (), homestore::data_service ().get_blk_size ()), hints, blk_id);
254
+ sisl::round_up (aligned_buf-> size (), homestore::data_service ().get_blk_size ()), hints, blk_id);
236
255
if (status != homestore::BlkAllocStatus::SUCCESS) {
237
256
LOGE (" Failed to allocate blocks for shardID=0x{:x}, pg={}, shard=0x{:x} blob {}" , ctx_->shard_cursor ,
238
257
(ctx_->shard_cursor >> homeobject::shard_width), (ctx_->shard_cursor & homeobject::shard_mask),
@@ -241,65 +260,68 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
241
260
ctx_->progress .error_count ++;
242
261
return ALLOC_BLK_ERR;
243
262
}
244
-
245
- auto free_allocated_blks = [blk_id]() {
246
- homestore::data_service ().async_free_blk (blk_id).thenValue ([blk_id](auto && err) {
247
- LOGD (" Freed blk_id={} due to failure in adding blob info, err {}" , blk_id.to_string (),
248
- err ? err.message () : " nil" );
249
- });
250
- };
263
+ allocated_blk_ids.emplace_back (blk_id);
251
264
252
265
#ifdef _PRERELEASE
253
266
if (iomgr_flip::instance ()->test_flip (" snapshot_receiver_blob_write_data_error" )) {
254
267
LOGW (" Simulating blob snapshot write data error" );
255
268
std::unique_lock< std::shared_mutex > lock (ctx_->progress_lock );
256
269
ctx_->progress .error_count ++;
257
- free_allocated_blks ( );
258
- return WRITE_DATA_ERR ;
270
+ futs. emplace_back (folly::makeFuture< std::error_code >( std::make_error_code (std::errc::invalid_argument)) );
271
+ continue ;
259
272
}
260
273
#endif
261
- const auto ret = homestore::data_service ()
262
- .async_write (r_cast< char const * >(aligned_buf.cbytes ()), aligned_buf.size (), blk_id)
263
- .thenValue ([&blk_id](auto && err) -> BlobManager::AsyncResult< blob_id_t > {
264
- // TODO: do we need to update repl_dev metrics?
265
- if (err) {
266
- LOGE (" Failed to write blob info to blk_id={}" , blk_id.to_string ());
267
- return folly::makeUnexpected (BlobError (BlobErrorCode::REPLICATION_ERROR));
268
- }
269
- LOGD (" Blob info written to blk_id={}" , blk_id.to_string ());
270
- return 0 ;
271
- })
272
- .get ();
273
- if (ret.hasError ()) {
274
- LOGE (" Failed to write blob info of blob_id {} to blk_id={}" , blob->blob_id (), blk_id.to_string ());
275
- free_allocated_blks ();
276
- std::unique_lock< std::shared_mutex > lock (ctx_->progress_lock );
277
- ctx_->progress .error_count ++;
278
- return WRITE_DATA_ERR;
279
- }
280
- if (homestore::data_service ().commit_blk (blk_id) != homestore::BlkAllocStatus::SUCCESS) {
281
- LOGE (" Failed to commit blk_id={} for blob_id={}" , blk_id.to_string (), blob->blob_id ());
282
- std::unique_lock< std::shared_mutex > lock (ctx_->progress_lock );
283
- ctx_->progress .error_count ++;
284
- free_allocated_blks ();
285
- return COMMIT_BLK_ERR;
286
- }
287
-
288
- // Add local blob info to index & PG
289
- bool success =
290
- home_obj_.local_add_blob_info (ctx_->pg_id , BlobInfo{ctx_->shard_cursor , blob->blob_id (), blk_id});
291
- if (!success) {
292
- LOGE (" Failed to add blob info for blob_id={}" , blob->blob_id ());
293
- std::unique_lock< std::shared_mutex > lock (ctx_->progress_lock );
294
- ctx_->progress .error_count ++;
295
- free_allocated_blks ();
296
- return ADD_BLOB_INDEX_ERR;
297
- }
274
+ auto blob_id = blob->blob_id ();
275
+ LOGW (" Writing Blob {} to blk_id {}" , blob_id, blk_id.to_string ());
276
+
277
+ futs.emplace_back (
278
+ homestore::data_service ()
279
+ .async_write (r_cast< char const * >(aligned_buf->cbytes ()), aligned_buf->size (), blk_id)
280
+ .thenValue ([this , blk_id, start, blob_id](auto && err) -> folly::Future< std::error_code > {
281
+ // TODO: do we need to update repl_dev metrics?
282
+ if (err) {
283
+ LOGE (" Failed to write blob info to blk_id={}" , blk_id.to_string ());
284
+ return err;
285
+ }
286
+ LOGD (" Blob {} written to blk_id={}" , blob_id, blk_id.to_string ());
287
+
288
+ if (homestore::data_service ().commit_blk (blk_id) != homestore::BlkAllocStatus::SUCCESS) {
289
+ LOGE (" Failed to commit blk_id={} for blob_id={}" , blk_id.to_string (), blob_id);
290
+ return err;
291
+ }
292
+ // Add local blob info to index & PG
293
+ bool success =
294
+ home_obj_.local_add_blob_info (ctx_->pg_id , BlobInfo{ctx_->shard_cursor , blob_id, blk_id});
295
+ if (!success) {
296
+ LOGE (" Failed to add blob info for blob_id={}" , blob_id);
297
+ return err;
298
+ }
299
+
300
+ auto duration = get_elapsed_time_us (start);
301
+ HISTOGRAM_OBSERVE (*metrics_, snp_rcvr_blob_process_time, duration);
302
+ LOGD (" Persisted blob_id={} in {}us" , blob_id, duration);
303
+ return std::error_code{};
304
+ }));
298
305
total_bytes += data_size;
299
- auto duration = get_elapsed_time_us (start);
300
- HISTOGRAM_OBSERVE (*metrics_, snp_rcvr_blob_process_time, duration);
301
- LOGD (" Persisted blob_id={} in {}us" , blob->blob_id (), duration);
302
306
}
307
+ auto ec = collect_all_futures (futs).get ();
308
+
309
+ if (ec != std::error_code{}) {
310
+ LOGE (" Errors in writing this batch, code={}, message={}" , ec.value (), ec.message ());
311
+ for (auto rit = allocated_blk_ids.rbegin (); rit != allocated_blk_ids.rend (); ++rit) {
312
+ auto err = homestore::data_service ().async_free_blk (*rit).get ();
313
+ if (err) {
314
+ LOGD (" Freed blk_id={} due to failure in adding blob info, err {}" , rit->to_string (),
315
+ err ? err.message () : " nil" );
316
+ }
317
+ }
318
+ std::unique_lock< std::shared_mutex > lock (mutex);
319
+ ctx_->progress .error_count ++;
320
+ return WRITE_DATA_ERR;
321
+ }
322
+ futs.clear ();
323
+ data_bufs.clear ();
324
+ allocated_blk_ids.clear ();
303
325
304
326
// update metrics
305
327
{
0 commit comments