Skip to content

Commit 0501da0

Browse files
committed
Async for reader side of BR.
Signed-off-by: Xiaoxi Chen <[email protected]>
1 parent 11087d0 commit 0501da0

File tree

2 files changed

+44
-45
lines changed

2 files changed

+44
-45
lines changed

src/lib/homestore_backend/hs_homeobject.hpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,9 +442,11 @@ class HSHomeObject : public HomeObjectImpl {
442442

443443
struct PGBlobIterator {
444444
struct blob_read_result {
445+
blob_id_t blob_id_;
445446
sisl::io_blob_safe blob_;
446447
ResyncBlobState state_;
447-
blob_read_result(sisl::io_blob_safe&& blob, ResyncBlobState state): blob_(std::move(blob)), state_(state) {}
448+
blob_read_result(blob_id_t blob_id, sisl::io_blob_safe&& blob, ResyncBlobState state) :
449+
blob_id_(blob_id), blob_(std::move(blob)), state_(state) {}
448450
};
449451
PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id, uint64_t upto_lsn = 0);
450452
PG* get_pg_metadata();

src/lib/homestore_backend/pg_blob_iterator.cpp

Lines changed: 41 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ bool HSHomeObject::PGBlobIterator::create_shard_snapshot_data(sisl::io_blob_safe
187187
return true;
188188
}
189189

190-
BlobManager::AsyncResult< HSHomeObject::PGBlobIterator::blob_read_result >
191-
HSHomeObject::PGBlobIterator::load_blob_data(const BlobInfo& blob_info) {
190+
typedef HSHomeObject::PGBlobIterator::blob_read_result blob_read_result;
191+
BlobManager::AsyncResult< blob_read_result > HSHomeObject::PGBlobIterator::load_blob_data(const BlobInfo& blob_info) {
192192
auto shard_id = blob_info.shard_id;
193193
auto blob_id = blob_info.blob_id;
194194
auto blkid = blob_info.pbas;
@@ -202,8 +202,8 @@ HSHomeObject::PGBlobIterator::load_blob_data(const BlobInfo& blob_info) {
202202
LOGD("Blob get request: shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, blkid={}", shard_id,
203203
(shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id, blkid.to_string());
204204
return repl_dev_->async_read(blkid, sgs, total_size)
205-
.thenValue([this, blob_id, shard_id, read_buf = std::move(read_buf)](
206-
auto&& result) mutable -> BlobManager::AsyncResult< HSHomeObject::PGBlobIterator::blob_read_result > {
205+
.thenValue([this, blob_id, shard_id, read_buf = std::move(read_buf)](auto&& result) mutable
206+
-> BlobManager::AsyncResult< HSHomeObject::PGBlobIterator::blob_read_result > {
207207
if (result) {
208208
LOGE("Failed to get blob, shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, err={}", shard_id,
209209
(shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id,
@@ -217,15 +217,15 @@ HSHomeObject::PGBlobIterator::load_blob_data(const BlobInfo& blob_info) {
217217
LOGE("Invalid header found, shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, [header={}]", shard_id,
218218
(shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id,
219219
header->to_string());
220-
return HSHomeObject::PGBlobIterator::blob_read_result(std::move(read_buf), ResyncBlobState::CORRUPTED);
220+
return blob_read_result(blob_id, std::move(read_buf), ResyncBlobState::CORRUPTED);
221221
}
222222

223223
if (header->shard_id != shard_id) {
224224
// The metrics for corrupted blob is handled on the follower side.
225225
LOGE("Invalid shard_id in header, shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, [header={}]",
226226
shard_id, (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id,
227227
header->to_string());
228-
return HSHomeObject::PGBlobIterator::blob_read_result(std::move(read_buf), ResyncBlobState::CORRUPTED);
228+
return blob_read_result(blob_id, std::move(read_buf), ResyncBlobState::CORRUPTED);
229229
}
230230

231231
std::string user_key = header->user_key_size
@@ -242,12 +242,12 @@ HSHomeObject::PGBlobIterator::load_blob_data(const BlobInfo& blob_info) {
242242
"[{}] [computed={:np}]",
243243
shard_id, (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id,
244244
header->to_string(), spdlog::to_hex(computed_hash, computed_hash + BlobHeader::blob_max_hash_len));
245-
return HSHomeObject::PGBlobIterator::blob_read_result(std::move(read_buf), ResyncBlobState::CORRUPTED);
245+
return blob_read_result(blob_id, std::move(read_buf), ResyncBlobState::CORRUPTED);
246246
}
247247

248248
LOGD("Blob get success: shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}", shard_id,
249249
(shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id);
250-
return HSHomeObject::PGBlobIterator::blob_read_result(std::move(read_buf), ResyncBlobState::NORMAL);
250+
return blob_read_result(blob_id, std::move(read_buf), ResyncBlobState::NORMAL);
251251
});
252252
}
253253

@@ -259,9 +259,10 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
259259
uint64_t total_bytes = 0;
260260
auto idx = cur_start_blob_idx_;
261261

262+
std::vector< BlobManager::AsyncResult< blob_read_result > > futs;
263+
folly::InlineExecutor executor;
262264
while (total_bytes < max_batch_size_ && idx < cur_blob_list_.size()) {
263265
auto info = cur_blob_list_[idx++];
264-
ResyncBlobState state = ResyncBlobState::NORMAL;
265266
// handle deleted object
266267
if (info.pbas == tombstone_pbas) {
267268
LOGT("Blob is deleted: shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, blkid={}", info.shard_id,
@@ -271,11 +272,10 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
271272
continue;
272273
}
273274

274-
auto blob_start = Clock::now();
275-
276275
#ifdef _PRERELEASE
277276
if (iomgr_flip::instance()->test_flip("pg_blob_iterator_load_blob_data_error")) {
278277
LOGW("Simulating loading blob data error");
278+
futs.emplace_back(folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)));
279279
return false;
280280
}
281281
auto delay = iomgr_flip::instance()->get_test_flip< long >("simulate_read_snapshot_load_blob_delay",
@@ -286,39 +286,36 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
286286
std::this_thread::sleep_for(std::chrono::milliseconds(delay.get()));
287287
}
288288
#endif
289-
sisl::io_blob_safe blob;
290-
291-
struct blob_read_result {
292-
sisl::io_blob_safe blob;
293-
int state;
294-
};
295-
296-
uint8_t retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry);
297-
for (int i = 0; i < retries; i++) {
298-
auto result = load_blob_data(info).get();
299-
if (result.hasError() && result.error().code == BlobErrorCode::READ_FAILED) {
300-
LOGW("Failed to retrieve blob for shardID=0x{:x}, pg={}, shard=0x{:x} blob={} pbas={}, err={}, "
301-
"attempt={}",
302-
info.shard_id, (info.shard_id >> homeobject::shard_width),
303-
(info.shard_id & homeobject::shard_mask), info.blob_id, info.pbas.to_string(), result.error(), i);
304-
} else {
305-
blob = std::move(result.value().blob_);
306-
state = result.value().state_;
307-
break;
308-
}
309-
}
310-
if (blob.size() == 0) {
311-
LOGE("Failed to retrieve blob for shardID=0x{:x}, pg={}, shard=0x{:x} blob={} pbas={}", info.shard_id,
312-
(info.shard_id >> homeobject::shard_width), (info.shard_id & homeobject::shard_mask), info.blob_id,
313-
info.pbas.to_string());
314-
COUNTER_INCREMENT(*metrics_, snp_dnr_error_count, 1);
315-
return false;
316-
}
317-
318-
HISTOGRAM_OBSERVE(*metrics_, snp_dnr_blob_process_latency, get_elapsed_time_us(blob_start));
319-
std::vector< uint8_t > data(blob.cbytes(), blob.cbytes() + blob.size());
320-
blob_entries.push_back(CreateResyncBlobDataDirect(builder_, info.blob_id, (uint8_t)state, &data));
321-
total_bytes += blob.size();
289+
auto blob_start = Clock::now();
290+
auto executorKeepAlive = folly::getKeepAliveToken(executor);
291+
292+
// Fixme: Re-enable retries uint8_t retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry);
293+
futs.emplace_back(
294+
std::move(load_blob_data(info))
295+
.via(std::move(executorKeepAlive))
296+
.thenValue(
297+
[this, info, blob_start](auto&& result) mutable -> BlobManager::AsyncResult< blob_read_result > {
298+
if (result.hasError() && result.error().code == BlobErrorCode::READ_FAILED) {
299+
LOGE("Failed to retrieve blob for shardID=0x{:x}, pg={}, shard=0x{:x} blob={} pbas={}",
300+
info.shard_id, (info.shard_id >> homeobject::shard_width),
301+
(info.shard_id & homeobject::shard_mask), info.blob_id, info.pbas.to_string());
302+
COUNTER_INCREMENT(*metrics_, snp_dnr_error_count, 1);
303+
} else {
304+
HISTOGRAM_OBSERVE(*metrics_, snp_dnr_blob_process_latency, get_elapsed_time_us(blob_start));
305+
}
306+
return result;
307+
}));
308+
309+
auto const expect_blob_size = info.pbas.blk_count() * repl_dev_->get_blk_size();
310+
total_bytes += expect_blob_size;
311+
}
312+
// collect futs and add to flatbuffer.
313+
for (auto it = futs.begin(); it != futs.end(); it++) {
314+
auto res = std::move(*it).get();
315+
// fail whole batch.
316+
if (res.hasError()) { return false; }
317+
std::vector< uint8_t > data(res->blob_.cbytes(), res->blob_.cbytes() + res->blob_.size());
318+
blob_entries.push_back(CreateResyncBlobDataDirect(builder_, res->blob_id_, (uint8_t)res->state_, &data));
322319
}
323320
// should include the deleted blobs
324321
cur_batch_blob_count_ = idx - cur_start_blob_idx_;

0 commit comments

Comments
 (0)