Skip to content

Commit f1f2fe8

Browse files
committed
Async for reader side of BR.
Signed-off-by: Xiaoxi Chen <[email protected]>
1 parent a81b1ee commit f1f2fe8

File tree

2 files changed

+51
-51
lines changed

2 files changed

+51
-51
lines changed

src/lib/homestore_backend/hs_homeobject.hpp

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ class HSHomeObject : public HomeObjectImpl {
5959
trace_id_t tid) const override;
6060
BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t, trace_id_t tid) override;
6161

62-
PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers, trace_id_t tid) override;
62+
PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers,
63+
trace_id_t tid) override;
6364
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
6465
uint32_t commit_quorum, trace_id_t tid) override;
6566

@@ -435,9 +436,11 @@ class HSHomeObject : public HomeObjectImpl {
435436

436437
struct PGBlobIterator {
437438
struct blob_read_result {
439+
blob_id_t blob_id_;
438440
sisl::io_blob_safe blob_;
439441
ResyncBlobState state_;
440-
blob_read_result(sisl::io_blob_safe&& blob, ResyncBlobState state): blob_(std::move(blob)), state_(state) {}
442+
blob_read_result(blob_id_t blob_id, sisl::io_blob_safe&& blob, ResyncBlobState state) :
443+
blob_id_(blob_id), blob_(std::move(blob)), state_(state) {}
441444
};
442445
PGBlobIterator(HSHomeObject& home_obj, homestore::group_id_t group_id, uint64_t upto_lsn = 0);
443446
PG* get_pg_metadata();
@@ -639,7 +642,7 @@ class HSHomeObject : public HomeObjectImpl {
639642
static ShardInfo deserialize_shard_info(const char* shard_info_str, size_t size);
640643
static std::string serialize_shard_info(const ShardInfo& info);
641644
void local_create_shard(ShardInfo shard_info, homestore::chunk_num_t v_chunk_id, homestore::chunk_num_t p_chunk_id,
642-
homestore::blk_count_t blk_count, trace_id_t tid=0);
645+
homestore::blk_count_t blk_count, trace_id_t tid = 0);
643646
void add_new_shard_to_map(ShardPtr&& shard);
644647
void update_shard_in_map(const ShardInfo& shard_info);
645648

@@ -812,7 +815,7 @@ class HSHomeObject : public HomeObjectImpl {
812815
const homestore::MultiBlkId& pbas, cintrusive< homestore::repl_req_ctx >& hs_ctx);
813816
void on_blob_del_commit(int64_t lsn, sisl::blob const& header, sisl::blob const& key,
814817
cintrusive< homestore::repl_req_ctx >& hs_ctx);
815-
bool local_add_blob_info(pg_id_t pg_id, BlobInfo const& blob_info, trace_id_t tid=0);
818+
bool local_add_blob_info(pg_id_t pg_id, BlobInfo const& blob_info, trace_id_t tid = 0);
816819
homestore::ReplResult< homestore::blk_alloc_hints >
817820
blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive< homestore::repl_req_ctx >& ctx);
818821
void compute_blob_payload_hash(BlobHeader::HashAlgorithm algorithm, const uint8_t* blob_bytes, size_t blob_size,
@@ -902,13 +905,13 @@ class HSHomeObject : public HomeObjectImpl {
902905

903906
// only leader will call incr and decr pending request num
904907
void incr_pending_request_num() const {
905-
uint64_t now=pending_request_num.fetch_add(1);
908+
uint64_t now = pending_request_num.fetch_add(1);
906909
LOGT("inc pending req, was {}", now);
907910
}
908-
void decr_pending_request_num() const {
911+
void decr_pending_request_num() const {
909912
uint64_t now = pending_request_num.fetch_sub(1);
910913
LOGT("desc pending req, was {}", now);
911-
DEBUG_ASSERT(now>0, "pending == 0 ");
914+
DEBUG_ASSERT(now > 0, "pending == 0 ");
912915
}
913916
};
914917

src/lib/homestore_backend/pg_blob_iterator.cpp

Lines changed: 41 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ bool HSHomeObject::PGBlobIterator::create_shard_snapshot_data(sisl::io_blob_safe
187187
return true;
188188
}
189189

190-
BlobManager::AsyncResult< HSHomeObject::PGBlobIterator::blob_read_result >
191-
HSHomeObject::PGBlobIterator::load_blob_data(const BlobInfo& blob_info) {
190+
typedef HSHomeObject::PGBlobIterator::blob_read_result blob_read_result;
191+
BlobManager::AsyncResult< blob_read_result > HSHomeObject::PGBlobIterator::load_blob_data(const BlobInfo& blob_info) {
192192
auto shard_id = blob_info.shard_id;
193193
auto blob_id = blob_info.blob_id;
194194
auto blkid = blob_info.pbas;
@@ -202,8 +202,8 @@ HSHomeObject::PGBlobIterator::load_blob_data(const BlobInfo& blob_info) {
202202
LOGD("Blob get request: shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, blkid={}", shard_id,
203203
(shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id, blkid.to_string());
204204
return repl_dev_->async_read(blkid, sgs, total_size)
205-
.thenValue([this, blob_id, shard_id, read_buf = std::move(read_buf)](
206-
auto&& result) mutable -> BlobManager::AsyncResult< HSHomeObject::PGBlobIterator::blob_read_result > {
205+
.thenValue([this, blob_id, shard_id, read_buf = std::move(read_buf)](auto&& result) mutable
206+
-> BlobManager::AsyncResult< HSHomeObject::PGBlobIterator::blob_read_result > {
207207
if (result) {
208208
LOGE("Failed to get blob, shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, err={}", shard_id,
209209
(shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id,
@@ -217,15 +217,15 @@ HSHomeObject::PGBlobIterator::load_blob_data(const BlobInfo& blob_info) {
217217
LOGE("Invalid header found, shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, [header={}]", shard_id,
218218
(shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id,
219219
header->to_string());
220-
return HSHomeObject::PGBlobIterator::blob_read_result(std::move(read_buf), ResyncBlobState::CORRUPTED);
220+
return blob_read_result(blob_id, std::move(read_buf), ResyncBlobState::CORRUPTED);
221221
}
222222

223223
if (header->shard_id != shard_id) {
224224
// The metrics for corrupted blob is handled on the follower side.
225225
LOGE("Invalid shard_id in header, shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, [header={}]",
226226
shard_id, (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id,
227227
header->to_string());
228-
return HSHomeObject::PGBlobIterator::blob_read_result(std::move(read_buf), ResyncBlobState::CORRUPTED);
228+
return blob_read_result(blob_id, std::move(read_buf), ResyncBlobState::CORRUPTED);
229229
}
230230

231231
std::string user_key = header->user_key_size
@@ -242,12 +242,12 @@ HSHomeObject::PGBlobIterator::load_blob_data(const BlobInfo& blob_info) {
242242
"[{}] [computed={:np}]",
243243
shard_id, (shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id,
244244
header->to_string(), spdlog::to_hex(computed_hash, computed_hash + BlobHeader::blob_max_hash_len));
245-
return HSHomeObject::PGBlobIterator::blob_read_result(std::move(read_buf), ResyncBlobState::CORRUPTED);
245+
return blob_read_result(blob_id, std::move(read_buf), ResyncBlobState::CORRUPTED);
246246
}
247247

248248
LOGD("Blob get success: shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}", shard_id,
249249
(shard_id >> homeobject::shard_width), (shard_id & homeobject::shard_mask), blob_id);
250-
return HSHomeObject::PGBlobIterator::blob_read_result(std::move(read_buf), ResyncBlobState::NORMAL);
250+
return blob_read_result(blob_id, std::move(read_buf), ResyncBlobState::NORMAL);
251251
});
252252
}
253253

@@ -259,9 +259,10 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
259259
uint64_t total_bytes = 0;
260260
auto idx = cur_start_blob_idx_;
261261

262+
std::vector< BlobManager::AsyncResult< blob_read_result > > futs;
263+
folly::InlineExecutor executor;
262264
while (total_bytes < max_batch_size_ && idx < cur_blob_list_.size()) {
263265
auto info = cur_blob_list_[idx++];
264-
ResyncBlobState state = ResyncBlobState::NORMAL;
265266
// handle deleted object
266267
if (info.pbas == tombstone_pbas) {
267268
LOGT("Blob is deleted: shardID=0x{:x}, pg={}, shard=0x{:x}, blob_id={}, blkid={}", info.shard_id,
@@ -271,11 +272,10 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
271272
continue;
272273
}
273274

274-
auto blob_start = Clock::now();
275-
276275
#ifdef _PRERELEASE
277276
if (iomgr_flip::instance()->test_flip("pg_blob_iterator_load_blob_data_error")) {
278277
LOGW("Simulating loading blob data error");
278+
futs.emplace_back(folly::makeUnexpected(BlobError(BlobErrorCode::READ_FAILED)));
279279
return false;
280280
}
281281
auto delay = iomgr_flip::instance()->get_test_flip< long >("simulate_read_snapshot_load_blob_delay",
@@ -286,39 +286,36 @@ bool HSHomeObject::PGBlobIterator::create_blobs_snapshot_data(sisl::io_blob_safe
286286
std::this_thread::sleep_for(std::chrono::milliseconds(delay.get()));
287287
}
288288
#endif
289-
sisl::io_blob_safe blob;
290-
291-
struct blob_read_result {
292-
sisl::io_blob_safe blob;
293-
int state;
294-
};
295-
296-
uint8_t retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry);
297-
for (int i = 0; i < retries; i++) {
298-
auto result = load_blob_data(info).get();
299-
if (result.hasError() && result.error().code == BlobErrorCode::READ_FAILED) {
300-
LOGW("Failed to retrieve blob for shardID=0x{:x}, pg={}, shard=0x{:x} blob={} pbas={}, err={}, "
301-
"attempt={}",
302-
info.shard_id, (info.shard_id >> homeobject::shard_width),
303-
(info.shard_id & homeobject::shard_mask), info.blob_id, info.pbas.to_string(), result.error(), i);
304-
} else {
305-
blob = std::move(result.value().blob_);
306-
state = result.value().state_;
307-
break;
308-
}
309-
}
310-
if (blob.size() == 0) {
311-
LOGE("Failed to retrieve blob for shardID=0x{:x}, pg={}, shard=0x{:x} blob={} pbas={}", info.shard_id,
312-
(info.shard_id >> homeobject::shard_width), (info.shard_id & homeobject::shard_mask), info.blob_id,
313-
info.pbas.to_string());
314-
COUNTER_INCREMENT(*metrics_, snp_dnr_error_count, 1);
315-
return false;
316-
}
317-
318-
HISTOGRAM_OBSERVE(*metrics_, snp_dnr_blob_process_latency, get_elapsed_time_us(blob_start));
319-
std::vector< uint8_t > data(blob.cbytes(), blob.cbytes() + blob.size());
320-
blob_entries.push_back(CreateResyncBlobDataDirect(builder_, info.blob_id, (uint8_t)state, &data));
321-
total_bytes += blob.size();
289+
auto blob_start = Clock::now();
290+
auto executorKeepAlive = folly::getKeepAliveToken(executor);
291+
292+
// Fixme: Re-enable retries uint8_t retries = HS_BACKEND_DYNAMIC_CONFIG(snapshot_blob_load_retry);
293+
futs.emplace_back(
294+
std::move(load_blob_data(info))
295+
.via(std::move(executorKeepAlive))
296+
.thenValue(
297+
[this, info, blob_start](auto&& result) mutable -> BlobManager::AsyncResult< blob_read_result > {
298+
if (result.hasError() && result.error().code == BlobErrorCode::READ_FAILED) {
299+
LOGE("Failed to retrieve blob for shardID=0x{:x}, pg={}, shard=0x{:x} blob={} pbas={}",
300+
info.shard_id, (info.shard_id >> homeobject::shard_width),
301+
(info.shard_id & homeobject::shard_mask), info.blob_id, info.pbas.to_string());
302+
COUNTER_INCREMENT(*metrics_, snp_dnr_error_count, 1);
303+
} else {
304+
HISTOGRAM_OBSERVE(*metrics_, snp_dnr_blob_process_latency, get_elapsed_time_us(blob_start));
305+
}
306+
return result;
307+
}));
308+
309+
auto const expect_blob_size = info.pbas.blk_count() * repl_dev_->get_blk_size();
310+
total_bytes += expect_blob_size;
311+
}
312+
// collect futs and add to flatbuffer.
313+
for (auto it = futs.begin(); it != futs.end(); it++) {
314+
auto res = std::move(*it).get();
315+
// fail whole batch.
316+
if (res.hasError()) { return false; }
317+
std::vector< uint8_t > data(res->blob_.cbytes(), res->blob_.cbytes() + res->blob_.size());
318+
blob_entries.push_back(CreateResyncBlobDataDirect(builder_, res->blob_id_, (uint8_t)res->state_, &data));
322319
}
323320
// should include the deleted blobs
324321
cur_batch_blob_count_ = idx - cur_start_blob_idx_;

0 commit comments

Comments
 (0)