Skip to content

Commit 6b8d20c

Browse files
committed
Move snapshot data write to async mode.
In order to provider higher queue depth for disk, which is optimal for scheduler to merge requests. Also IO and compute(checksum) can parallel. Signed-off-by: Xiaoxi Chen <[email protected]>
1 parent 2fd6b35 commit 6b8d20c

File tree

2 files changed

+70
-56
lines changed

2 files changed

+70
-56
lines changed

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class HomeObjectConan(ConanFile):
1111
name = "homeobject"
12-
version = "2.3.19"
12+
version = "2.3.20"
1313

1414
homepage = "https://github.com/eBay/HomeObject"
1515
description = "Blob Store built on HomeReplication"

src/lib/homestore_backend/snapshot_receive_handler.cpp

Lines changed: 69 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,8 @@ int HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaD
5757

5858
// update metrics
5959
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
60-
ctx_->progress.start_time = std::chrono::duration_cast< std::chrono::seconds >(
61-
std::chrono::system_clock::now().time_since_epoch()).count();
60+
ctx_->progress.start_time =
61+
std::chrono::duration_cast< std::chrono::seconds >(std::chrono::system_clock::now().time_since_epoch()).count();
6262
ctx_->progress.total_shards = ctx_->shard_list.size();
6363
ctx_->progress.total_blobs = pg_meta.total_blobs_to_transfer();
6464
ctx_->progress.total_bytes = pg_meta.total_bytes_to_transfer();
@@ -138,10 +138,22 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar
138138
return 0;
139139
}
140140

141+
static auto collect_all_futures(std::vector< folly::Future< std::error_code > >& futs) {
142+
return folly::collectAllUnsafe(futs).thenValue([](auto&& vf) {
143+
for (auto const& err_c : vf) {
144+
if (sisl_unlikely(err_c.value())) {
145+
auto ec = err_c.value();
146+
return folly::makeFuture< std::error_code >(std::move(ec));
147+
}
148+
}
149+
return folly::makeFuture< std::error_code >(std::error_code{});
150+
});
151+
}
152+
141153
int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs,
142154
const snp_batch_id_t batch_num,
143155
bool is_last_batch) {
144-
//retry mesg, need to handle duplicate batch, reset progress
156+
// retry mesg, need to handle duplicate batch, reset progress
145157
if (ctx_->cur_batch_num == batch_num) {
146158
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
147159
ctx_->progress.complete_blobs -= ctx_->progress.cur_batch_blobs;
@@ -159,6 +171,10 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
159171
hints.chunk_id_hint = *p_chunk_id;
160172

161173
uint64_t total_bytes = 0;
174+
175+
std::vector< folly::Future< std::error_code > > futs;
176+
std::vector< std::shared_ptr< sisl::io_blob_safe > > data_bufs;
177+
162178
for (unsigned int i = 0; i < data_blobs.blob_list()->size(); i++) {
163179
const auto blob = data_blobs.blob_list()->Get(i);
164180

@@ -227,12 +243,14 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
227243

228244
// Alloc & persist blob data
229245
auto data_size = blob->data()->size();
230-
sisl::io_blob_safe aligned_buf(sisl::round_up(data_size, io_align), io_align);
231-
std::memcpy(aligned_buf.bytes(), blob_data, data_size);
246+
std::shared_ptr< sisl::io_blob_safe > aligned_buf =
247+
make_shared< sisl::io_blob_safe >(sisl::round_up(data_size, io_align), io_align);
248+
std::memcpy(aligned_buf->bytes(), blob_data, data_size);
249+
data_bufs.emplace_back(aligned_buf);
232250

233251
homestore::MultiBlkId blk_id;
234252
auto status = homestore::data_service().alloc_blks(
235-
sisl::round_up(aligned_buf.size(), homestore::data_service().get_blk_size()), hints, blk_id);
253+
sisl::round_up(aligned_buf->size(), homestore::data_service().get_blk_size()), hints, blk_id);
236254
if (status != homestore::BlkAllocStatus::SUCCESS) {
237255
LOGE("Failed to allocate blocks for shardID=0x{:x}, pg={}, shard=0x{:x} blob {}", ctx_->shard_cursor,
238256
(ctx_->shard_cursor >> homeobject::shard_width), (ctx_->shard_cursor & homeobject::shard_mask),
@@ -242,64 +260,60 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
242260
return ALLOC_BLK_ERR;
243261
}
244262

245-
auto free_allocated_blks = [blk_id]() {
246-
homestore::data_service().async_free_blk(blk_id).thenValue([blk_id](auto&& err) {
247-
LOGD("Freed blk_id={} due to failure in adding blob info, err {}", blk_id.to_string(),
248-
err ? err.message() : "nil");
249-
});
250-
};
251-
252263
#ifdef _PRERELEASE
253264
if (iomgr_flip::instance()->test_flip("snapshot_receiver_blob_write_data_error")) {
254265
LOGW("Simulating blob snapshot write data error");
255266
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
256267
ctx_->progress.error_count++;
257-
free_allocated_blks();
258-
return WRITE_DATA_ERR;
268+
futs.emplace_back(folly::makeFuture< std::error_code >(std::make_error_code(std::errc::invalid_argument)));
269+
continue;
259270
}
260271
#endif
261-
const auto ret = homestore::data_service()
262-
.async_write(r_cast< char const* >(aligned_buf.cbytes()), aligned_buf.size(), blk_id)
263-
.thenValue([&blk_id](auto&& err) -> BlobManager::AsyncResult< blob_id_t > {
264-
// TODO: do we need to update repl_dev metrics?
265-
if (err) {
266-
LOGE("Failed to write blob info to blk_id={}", blk_id.to_string());
267-
return folly::makeUnexpected(BlobError(BlobErrorCode::REPLICATION_ERROR));
268-
}
269-
LOGD("Blob info written to blk_id={}", blk_id.to_string());
270-
return 0;
271-
})
272-
.get();
273-
if (ret.hasError()) {
274-
LOGE("Failed to write blob info of blob_id {} to blk_id={}", blob->blob_id(), blk_id.to_string());
275-
free_allocated_blks();
276-
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
277-
ctx_->progress.error_count++;
278-
return WRITE_DATA_ERR;
279-
}
280-
if (homestore::data_service().commit_blk(blk_id) != homestore::BlkAllocStatus::SUCCESS) {
281-
LOGE("Failed to commit blk_id={} for blob_id={}", blk_id.to_string(), blob->blob_id());
282-
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
283-
ctx_->progress.error_count++;
284-
free_allocated_blks();
285-
return COMMIT_BLK_ERR;
286-
}
287-
288-
// Add local blob info to index & PG
289-
bool success =
290-
home_obj_.local_add_blob_info(ctx_->pg_id, BlobInfo{ctx_->shard_cursor, blob->blob_id(), blk_id});
291-
if (!success) {
292-
LOGE("Failed to add blob info for blob_id={}", blob->blob_id());
293-
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
294-
ctx_->progress.error_count++;
295-
free_allocated_blks();
296-
return ADD_BLOB_INDEX_ERR;
297-
}
272+
auto blob_id = blob->blob_id();
273+
LOGW("Writing Blob {} to blk_id {}", blob_id, blk_id.to_string());
274+
275+
// ToDo: limit the max concurrent?
276+
futs.emplace_back(
277+
homestore::data_service()
278+
.async_write(r_cast< char const* >(aligned_buf->cbytes()), aligned_buf->size(), blk_id)
279+
.thenValue([this, blk_id, start, blob_id](auto&& err) -> folly::Future< std::error_code > {
280+
// TODO: do we need to update repl_dev metrics?
281+
if (err) {
282+
LOGE("Failed to write blob info to blk_id={}, free the blk.", blk_id.to_string());
283+
homestore::data_service().async_free_blk(*rit).get();
284+
return err;
285+
}
286+
LOGD("Blob {} written to blk_id={}", blob_id, blk_id.to_string());
287+
288+
if (homestore::data_service().commit_blk(blk_id) != homestore::BlkAllocStatus::SUCCESS) {
289+
LOGE("Failed to commit blk_id={} for blob_id={}", blk_id.to_string(), blob_id);
290+
return err;
291+
}
292+
// Add local blob info to index & PG
293+
bool success =
294+
home_obj_.local_add_blob_info(ctx_->pg_id, BlobInfo{ctx_->shard_cursor, blob_id, blk_id});
295+
if (!success) {
296+
LOGE("Failed to add blob info for blob_id={}", blob_id);
297+
return err;
298+
}
299+
300+
auto duration = get_elapsed_time_us(start);
301+
HISTOGRAM_OBSERVE(*metrics_, snp_rcvr_blob_process_time, duration);
302+
LOGD("Persisted blob_id={} in {}us", blob_id, duration);
303+
return std::error_code{};
304+
}));
298305
total_bytes += data_size;
299-
auto duration = get_elapsed_time_us(start);
300-
HISTOGRAM_OBSERVE(*metrics_, snp_rcvr_blob_process_time, duration);
301-
LOGD("Persisted blob_id={} in {}us", blob->blob_id(), duration);
302306
}
307+
auto ec = collect_all_futures(futs).get();
308+
309+
if (ec != std::error_code{}) {
310+
LOGE("Errors in writing this batch, code={}, message={}", ec.value(), ec.message());
311+
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
312+
ctx_->progress.error_count++;
313+
return WRITE_DATA_ERR;
314+
}
315+
futs.clear();
316+
data_bufs.clear();
303317

304318
// update metrics
305319
{

0 commit comments

Comments
 (0)