Skip to content

Move snapshot data write to async mode. #299

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion conanfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

class HomeObjectConan(ConanFile):
name = "homeobject"
version = "2.3.19"
version = "2.3.20"

homepage = "https://github.com/eBay/HomeObject"
description = "Blob Store built on HomeReplication"
Expand Down
130 changes: 75 additions & 55 deletions src/lib/homestore_backend/snapshot_receive_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ int HSHomeObject::SnapshotReceiveHandler::process_pg_snapshot_data(ResyncPGMetaD

// update metrics
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
ctx_->progress.start_time = std::chrono::duration_cast< std::chrono::seconds >(
std::chrono::system_clock::now().time_since_epoch()).count();
ctx_->progress.start_time =
std::chrono::duration_cast< std::chrono::seconds >(std::chrono::system_clock::now().time_since_epoch()).count();
ctx_->progress.total_shards = ctx_->shard_list.size();
ctx_->progress.total_blobs = pg_meta.total_blobs_to_transfer();
ctx_->progress.total_bytes = pg_meta.total_bytes_to_transfer();
Expand Down Expand Up @@ -138,10 +138,22 @@ int HSHomeObject::SnapshotReceiveHandler::process_shard_snapshot_data(ResyncShar
return 0;
}

static auto collect_all_futures(std::vector< folly::Future< std::error_code > >& futs) {
return folly::collectAllUnsafe(futs).thenValue([](auto&& vf) {
for (auto const& err_c : vf) {
if (sisl_unlikely(err_c.value())) {
auto ec = err_c.value();
return folly::makeFuture< std::error_code >(std::move(ec));
}
}
return folly::makeFuture< std::error_code >(std::error_code{});
});
}

int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlobDataBatch const& data_blobs,
const snp_batch_id_t batch_num,
bool is_last_batch) {
//retry mesg, need to handle duplicate batch, reset progress
// retry mesg, need to handle duplicate batch, reset progress
if (ctx_->cur_batch_num == batch_num) {
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
ctx_->progress.complete_blobs -= ctx_->progress.cur_batch_blobs;
Expand All @@ -159,6 +171,10 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob
hints.chunk_id_hint = *p_chunk_id;

uint64_t total_bytes = 0;

std::vector< folly::Future< std::error_code > > futs;
std::vector< std::shared_ptr< sisl::io_blob_safe > > data_bufs;

for (unsigned int i = 0; i < data_blobs.blob_list()->size(); i++) {
const auto blob = data_blobs.blob_list()->Get(i);

Expand Down Expand Up @@ -227,79 +243,83 @@ int HSHomeObject::SnapshotReceiveHandler::process_blobs_snapshot_data(ResyncBlob

// Alloc & persist blob data
auto data_size = blob->data()->size();
sisl::io_blob_safe aligned_buf(sisl::round_up(data_size, io_align), io_align);
std::memcpy(aligned_buf.bytes(), blob_data, data_size);
std::shared_ptr< sisl::io_blob_safe > aligned_buf =
make_shared< sisl::io_blob_safe >(sisl::round_up(data_size, io_align), io_align);
std::memcpy(aligned_buf->bytes(), blob_data, data_size);
data_bufs.emplace_back(aligned_buf);

homestore::MultiBlkId blk_id;
auto status = homestore::data_service().alloc_blks(
sisl::round_up(aligned_buf.size(), homestore::data_service().get_blk_size()), hints, blk_id);
sisl::round_up(aligned_buf->size(), homestore::data_service().get_blk_size()), hints, blk_id);
if (status != homestore::BlkAllocStatus::SUCCESS) {
LOGE("Failed to allocate blocks for shardID=0x{:x}, pg={}, shard=0x{:x} blob {}", ctx_->shard_cursor,
(ctx_->shard_cursor >> homeobject::shard_width), (ctx_->shard_cursor & homeobject::shard_mask),
blob->blob_id());
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
ctx_->progress.error_count++;
return ALLOC_BLK_ERR;
break;
}

auto free_allocated_blks = [blk_id]() {
homestore::data_service().async_free_blk(blk_id).thenValue([blk_id](auto&& err) {
LOGD("Freed blk_id={} due to failure in adding blob info, err {}", blk_id.to_string(),
err ? err.message() : "nil");
});
};

#ifdef _PRERELEASE
if (iomgr_flip::instance()->test_flip("snapshot_receiver_blob_write_data_error")) {
LOGW("Simulating blob snapshot write data error");
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
ctx_->progress.error_count++;
free_allocated_blks();
return WRITE_DATA_ERR;
futs.emplace_back(folly::makeFuture< std::error_code >(std::make_error_code(std::errc::invalid_argument)));
continue;
}
#endif
const auto ret = homestore::data_service()
.async_write(r_cast< char const* >(aligned_buf.cbytes()), aligned_buf.size(), blk_id)
.thenValue([&blk_id](auto&& err) -> BlobManager::AsyncResult< blob_id_t > {
// TODO: do we need to update repl_dev metrics?
if (err) {
LOGE("Failed to write blob info to blk_id={}", blk_id.to_string());
return folly::makeUnexpected(BlobError(BlobErrorCode::REPLICATION_ERROR));
}
LOGD("Blob info written to blk_id={}", blk_id.to_string());
return 0;
})
.get();
if (ret.hasError()) {
LOGE("Failed to write blob info of blob_id {} to blk_id={}", blob->blob_id(), blk_id.to_string());
free_allocated_blks();
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
ctx_->progress.error_count++;
return WRITE_DATA_ERR;
}
if (homestore::data_service().commit_blk(blk_id) != homestore::BlkAllocStatus::SUCCESS) {
LOGE("Failed to commit blk_id={} for blob_id={}", blk_id.to_string(), blob->blob_id());
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
ctx_->progress.error_count++;
free_allocated_blks();
return COMMIT_BLK_ERR;
}
auto blob_id = blob->blob_id();
LOGW("Writing Blob {} to blk_id {}", blob_id, blk_id.to_string());

// ToDo: limit the max concurrent?
futs.emplace_back(
homestore::data_service()
.async_write(r_cast< char const* >(aligned_buf->cbytes()), aligned_buf->size(), blk_id)
.thenValue([this, blk_id, start, blob_id](auto&& err) -> folly::Future< std::error_code > {
// TODO: do we need to update repl_dev metrics?
if (err) {
LOGE("Failed to write blob info to blk_id={}, free the blk.", blk_id.to_string());
homestore::data_service().async_free_blk(*rit).get();
return err;
}
LOGD("Blob {} written to blk_id={}", blob_id, blk_id.to_string());

if (homestore::data_service().commit_blk(blk_id) != homestore::BlkAllocStatus::SUCCESS) {
LOGE("Failed to commit blk_id={} for blob_id={}", blk_id.to_string(), blob_id);
return err;
}
// Add local blob info to index & PG
bool success =
home_obj_.local_add_blob_info(ctx_->pg_id, BlobInfo{ctx_->shard_cursor, blob_id, blk_id});
if (!success) {
LOGE("Failed to add blob info for blob_id={}", blob_id);
return err;
}

auto duration = get_elapsed_time_us(start);
HISTOGRAM_OBSERVE(*metrics_, snp_rcvr_blob_process_time, duration);
LOGD("Persisted blob_id={} in {}us", blob_id, duration);
return std::error_code{};
}));
total_bytes += data_size;
}
auto ec = collect_all_futures(futs).get();
// when there is a allocation failure it breaks the while loop earlier.
auto all_io_submitted = (futs.size() == data_blobs.blob_list()->size());

// Add local blob info to index & PG
bool success =
home_obj_.local_add_blob_info(ctx_->pg_id, BlobInfo{ctx_->shard_cursor, blob->blob_id(), blk_id});
if (!success) {
LOGE("Failed to add blob info for blob_id={}", blob->blob_id());
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
ctx_->progress.error_count++;
free_allocated_blks();
return ADD_BLOB_INDEX_ERR;
if (!all_io_submitted || ec != std::error_code{}) {
if (!all_io_submitted) {
LOGE("Errors in submitting the batch, expect {} blobs, submitted {}.", data_blobs.blob_list()->size(), futs.size());
} else {
LOGE("Errors in writing this batch, code={}, message={}", ec.value(), ec.message());
}
total_bytes += data_size;
auto duration = get_elapsed_time_us(start);
HISTOGRAM_OBSERVE(*metrics_, snp_rcvr_blob_process_time, duration);
LOGD("Persisted blob_id={} in {}us", blob->blob_id(), duration);
std::unique_lock< std::shared_mutex > lock(ctx_->progress_lock);
ctx_->progress.error_count++;
return WRITE_DATA_ERR;
}
futs.clear();
data_bufs.clear();

// update metrics
{
Expand Down
Loading