Skip to content

Commit be71140

Browse files
yawzhangBesroy
yawzhang
authored andcommitted
use trace id from upper layer
1 parent 87ed223 commit be71140

21 files changed

+186
-153
lines changed

conanfile.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
class HomeObjectConan(ConanFile):
1111
name = "homeobject"
12-
version = "2.3.14"
12+
version = "2.3.15"
1313

1414
homepage = "https://github.com/eBay/HomeObject"
1515
description = "Blob Store built on HomeReplication"
@@ -49,7 +49,7 @@ def build_requirements(self):
4949

5050
def requirements(self):
5151
self.requires("sisl/[^12.2]@oss/master", transitive_headers=True)
52-
self.requires("homestore/[~6.12.2]@oss/master")
52+
self.requires("homestore/[^6.12]@oss/master")
5353
self.requires("iomgr/[^11.3]@oss/master")
5454
self.requires("lz4/1.9.4", override=True)
5555
self.requires("openssl/3.3.1", override=True)

src/include/homeobject/blob_manager.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ struct Blob {
4141

4242
class BlobManager : public Manager< BlobError > {
4343
public:
44-
virtual AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&) = 0;
44+
virtual AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&, trace_id_t tid = 0) = 0;
4545
virtual AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off = 0,
46-
uint64_t len = 0) const = 0;
47-
virtual NullAsyncResult del(shard_id_t shard, blob_id_t const& blob) = 0;
46+
uint64_t len = 0, trace_id_t tid = 0) const = 0;
47+
virtual NullAsyncResult del(shard_id_t shard, blob_id_t const& blob, trace_id_t tid = 0) = 0;
4848
};
4949

5050
} // namespace homeobject

src/include/homeobject/pg_manager.hpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,9 +85,9 @@ struct PGStats {
8585

8686
class PGManager : public Manager< PGError > {
8787
public:
88-
virtual NullAsyncResult create_pg(PGInfo&& pg_info) = 0;
88+
virtual NullAsyncResult create_pg(PGInfo&& pg_info, trace_id_t tid = 0) = 0;
8989
virtual NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
90-
u_int32_t commit_quorum = 0) = 0;
90+
u_int32_t commit_quorum = 0, trace_id_t tid = 0) = 0;
9191

9292
/**
9393
* Retrieves the statistics for a specific PG (Placement Group) identified by its ID.

src/include/homeobject/shard_manager.hpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ class ShardManager : public Manager< ShardError > {
4242
static uint64_t max_shard_size(); // Static function forces runtime evaluation.
4343
static uint64_t max_shard_num_in_pg();
4444

45-
virtual AsyncResult< ShardInfo > get_shard(shard_id_t id) const = 0;
46-
virtual AsyncResult< InfoList > list_shards(pg_id_t id) const = 0;
47-
virtual AsyncResult< ShardInfo > create_shard(pg_id_t pg_owner, uint64_t size_bytes) = 0;
48-
virtual AsyncResult< ShardInfo > seal_shard(shard_id_t id) = 0;
45+
virtual AsyncResult< ShardInfo > get_shard(shard_id_t id, trace_id_t tid = 0) const = 0;
46+
virtual AsyncResult< InfoList > list_shards(pg_id_t id, trace_id_t tid = 0) const = 0;
47+
virtual AsyncResult< ShardInfo > create_shard(pg_id_t pg_owner, uint64_t size_bytes, trace_id_t tid = 0) = 0;
48+
virtual AsyncResult< ShardInfo > seal_shard(shard_id_t id, trace_id_t tid = 0) = 0;
4949
};
5050

5151
} // namespace homeobject

src/lib/blob_manager.cpp

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,27 +5,28 @@ namespace homeobject {
55
std::shared_ptr< BlobManager > HomeObjectImpl::blob_manager() { return shared_from_this(); }
66

77
BlobManager::AsyncResult< Blob > HomeObjectImpl::get(shard_id_t shard, blob_id_t const& blob_id, uint64_t off,
8-
uint64_t len) const {
9-
return _get_shard(shard).thenValue([this, blob_id, off, len](auto const e) -> BlobManager::AsyncResult< Blob > {
10-
if (!e) return folly::makeUnexpected(BlobError(BlobErrorCode::UNKNOWN_SHARD));
11-
return _get_blob(e.value(), blob_id, off, len);
12-
});
8+
uint64_t len, trace_id_t tid) const {
9+
return _get_shard(shard, tid).thenValue(
10+
[this, blob_id, off, len, tid](auto const e) -> BlobManager::AsyncResult< Blob > {
11+
if (!e) return folly::makeUnexpected(BlobError(BlobErrorCode::UNKNOWN_SHARD));
12+
return _get_blob(e.value(), blob_id, off, len, tid);
13+
});
1314
}
1415

15-
BlobManager::AsyncResult< blob_id_t > HomeObjectImpl::put(shard_id_t shard, Blob&& blob) {
16-
return _get_shard(shard).thenValue(
17-
[this, blob = std::move(blob)](auto const e) mutable -> BlobManager::AsyncResult< blob_id_t > {
16+
BlobManager::AsyncResult< blob_id_t > HomeObjectImpl::put(shard_id_t shard, Blob&& blob, trace_id_t tid) {
17+
return _get_shard(shard, tid).thenValue(
18+
[this, blob = std::move(blob), tid](auto const e) mutable -> BlobManager::AsyncResult< blob_id_t > {
1819
if (!e) return folly::makeUnexpected(BlobError(BlobErrorCode::UNKNOWN_SHARD));
1920
if (ShardInfo::State::SEALED == e.value().state) return folly::makeUnexpected(BlobError(BlobErrorCode::SEALED_SHARD));
2021
if (blob.body.size() == 0) return folly::makeUnexpected(BlobError(BlobErrorCode::INVALID_ARG));
21-
return _put_blob(e.value(), std::move(blob));
22+
return _put_blob(e.value(), std::move(blob), tid);
2223
});
2324
}
2425

25-
BlobManager::NullAsyncResult HomeObjectImpl::del(shard_id_t shard, blob_id_t const& blob) {
26-
return _get_shard(shard).thenValue([this, blob](auto const e) mutable -> BlobManager::NullAsyncResult {
26+
BlobManager::NullAsyncResult HomeObjectImpl::del(shard_id_t shard, blob_id_t const& blob, trace_id_t tid) {
27+
return _get_shard(shard, tid).thenValue([this, blob, tid](auto const e) mutable -> BlobManager::NullAsyncResult {
2728
if (!e) return folly::makeUnexpected(BlobError(BlobErrorCode::UNKNOWN_SHARD));
28-
return _del_blob(e.value(), blob);
29+
return _del_blob(e.value(), blob, tid);
2930
});
3031
}
3132

src/lib/homeobject_impl.hpp

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -87,18 +87,20 @@ class HomeObjectImpl : public HomeObject,
8787
public std::enable_shared_from_this< HomeObjectImpl > {
8888

8989
/// Implementation defines these
90-
virtual ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) = 0;
91-
virtual ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) = 0;
90+
virtual ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes, trace_id_t tid) = 0;
91+
virtual ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&, trace_id_t tid) = 0;
9292

93-
virtual BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) = 0;
94-
virtual BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0,
95-
uint64_t len = 0) const = 0;
96-
virtual BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) = 0;
93+
virtual BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&, trace_id_t tid) = 0;
94+
virtual BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off, uint64_t len,
95+
trace_id_t tid) const = 0;
96+
virtual BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t, trace_id_t tid) = 0;
9797
///
9898

99-
virtual PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) = 0;
99+
virtual PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers,
100+
trace_id_t tid) = 0;
100101
virtual PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member,
101-
PGMember const& new_member, uint32_t commit_quorum) = 0;
102+
PGMember const& new_member, uint32_t commit_quorum,
103+
trace_id_t trace_id) = 0;
102104
virtual bool _get_stats(pg_id_t id, PGStats& stats) const = 0;
103105
virtual void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const = 0;
104106

@@ -122,7 +124,7 @@ class HomeObjectImpl : public HomeObject,
122124
///
123125

124126
auto _defer() const { return folly::makeSemiFuture().via(executor_); }
125-
folly::Future< ShardManager::Result< ShardInfo > > _get_shard(shard_id_t id) const;
127+
folly::Future< ShardManager::Result< ShardInfo > > _get_shard(shard_id_t id, trace_id_t tid) const;
126128

127129
public:
128130
explicit HomeObjectImpl(std::weak_ptr< HomeObjectApplication >&& application);
@@ -143,25 +145,25 @@ class HomeObjectImpl : public HomeObject,
143145
HomeObjectStats get_stats() const final { return _get_stats(); }
144146

145147
/// PgManager
146-
PGManager::NullAsyncResult create_pg(PGInfo&& pg_info) final;
148+
PGManager::NullAsyncResult create_pg(PGInfo&& pg_info, trace_id_t tid) final;
147149
PGManager::NullAsyncResult replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
148-
u_int32_t commit_quorum) final;
150+
u_int32_t commit_quorum, trace_id_t trace_id) final;
149151
// see api comments in base class;
150152
bool get_stats(pg_id_t id, PGStats& stats) const final;
151153
void get_pg_ids(std::vector< pg_id_t >& pg_ids) const final;
152154

153155
/// ShardManager
154-
ShardManager::AsyncResult< ShardInfo > get_shard(shard_id_t id) const final;
155-
ShardManager::AsyncResult< ShardInfo > create_shard(pg_id_t pg_owner, uint64_t size_bytes) final;
156-
ShardManager::AsyncResult< InfoList > list_shards(pg_id_t pg) const final;
157-
ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id_t id) final;
156+
ShardManager::AsyncResult< ShardInfo > get_shard(shard_id_t id, trace_id_t tid) const final;
157+
ShardManager::AsyncResult< ShardInfo > create_shard(pg_id_t pg_owner, uint64_t size_bytes, trace_id_t tid) final;
158+
ShardManager::AsyncResult< InfoList > list_shards(pg_id_t pg, trace_id_t tid) const final;
159+
ShardManager::AsyncResult< ShardInfo > seal_shard(shard_id_t id, trace_id_t tid) final;
158160
uint64_t get_current_timestamp();
159161

160162
/// BlobManager
161-
BlobManager::AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&) final;
163+
BlobManager::AsyncResult< blob_id_t > put(shard_id_t shard, Blob&&, trace_id_t tid) final;
162164
BlobManager::AsyncResult< Blob > get(shard_id_t shard, blob_id_t const& blob, uint64_t off,
163-
uint64_t len) const final;
164-
BlobManager::NullAsyncResult del(shard_id_t shard, blob_id_t const& blob) final;
165+
uint64_t len, trace_id_t tid) const final;
166+
BlobManager::NullAsyncResult del(shard_id_t shard, blob_id_t const& blob, trace_id_t tid) final;
165167
};
166168

167169
} // namespace homeobject

src/lib/homestore_backend/hs_blob_manager.cpp

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -83,14 +83,13 @@ struct put_blob_req_ctx : public repl_result_ctx< BlobManager::Result< HSHomeObj
8383
sisl::io_blob_safe& blob_header_buf() { return data_bufs_[blob_header_idx_]; }
8484
};
8585

86-
BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob) {
86+
BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& shard, Blob&& blob, trace_id_t tid) {
8787

8888
if (is_shutting_down()) {
8989
LOGI("service is being shut down");
9090
return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN);
9191
}
9292
incr_pending_request_num();
93-
auto tid = generateRandomTraceId();
9493
auto& pg_id = shard.placement_group;
9594
shared< homestore::ReplDev > repl_dev;
9695
blob_id_t new_blob_id;
@@ -106,7 +105,7 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
106105
"exhausted all available blob ids");
107106
}
108107
RELEASE_ASSERT(repl_dev != nullptr, "Repl dev instance null");
109-
BLOGD(tid, shard.id, new_blob_id, "Blob Put request:, pg={}, group={}, shard=0x{:x}, length={}", pg_id,
108+
BLOGD(tid, shard.id, new_blob_id, "Blob Put request: pg={}, group={}, shard=0x{:x}, length={}", pg_id,
110109
repl_dev->group_id(), shard.id, blob.body.size());
111110

112111
if (!repl_dev->is_leader()) {
@@ -182,7 +181,7 @@ BlobManager::AsyncResult< blob_id_t > HSHomeObject::_put_blob(ShardInfo const& s
182181
BLOGT(tid, req->blob_header()->shard_id, req->blob_header()->blob_id, "Put blob: header={} sgs={}",
183182
req->blob_header()->to_string(), req->data_sgs_string());
184183

185-
repl_dev->async_alloc_write(req->cheader_buf(), req->ckey_buf(), req->data_sgs(), req, tid);
184+
repl_dev->async_alloc_write(req->cheader_buf(), req->ckey_buf(), req->data_sgs(), req, false/* part_of_batch */, tid);
186185
return req->result().deferValue([this, req, repl_dev, tid](const auto& result) -> BlobManager::AsyncResult< blob_id_t > {
187186
if (result.hasError()) {
188187
auto err = result.error();
@@ -270,13 +269,12 @@ void HSHomeObject::on_blob_put_commit(int64_t lsn, sisl::blob const& header, sis
270269
}
271270

272271
BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard, blob_id_t blob_id, uint64_t req_offset,
273-
uint64_t req_len) const {
272+
uint64_t req_len, trace_id_t tid) const {
274273
if (is_shutting_down()) {
275274
LOGI("service is being shut down");
276275
return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN);
277276
}
278277
incr_pending_request_num();
279-
auto trace_id = generateRandomTraceId();
280278
auto& pg_id = shard.placement_group;
281279
auto hs_pg = get_hs_pg(pg_id);
282280
RELEASE_ASSERT(hs_pg, "PG not found");
@@ -293,16 +291,16 @@ BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob(ShardInfo const& shard,
293291
return folly::makeUnexpected(BlobError(BlobErrorCode::RETRY_REQUEST));
294292
}
295293

296-
BLOGD(trace_id, shard.id, blob_id, "Blob Get request: pd={}, group={}, shard=0x{:x}, blob={}, offset={}, len={}",
294+
BLOGD(tid, shard.id, blob_id, "Blob Get request: pd={}, group={}, shard=0x{:x}, blob={}, offset={}, len={}",
297295
pg_id, repl_dev->group_id(), shard.id, blob_id, req_offset, req_len);
298296
auto r = get_blob_from_index_table(index_table, shard.id, blob_id);
299297
if (!r) {
300-
BLOGE(trace_id, shard.id, blob_id, "Blob not found in index during get blob");
298+
BLOGE(tid, shard.id, blob_id, "Blob not found in index during get blob");
301299
decr_pending_request_num();
302300
return folly::makeUnexpected(r.error());
303301
}
304302

305-
return _get_blob_data(repl_dev, shard.id, blob_id, req_offset, req_len, r.value() /* blkid*/, trace_id);
303+
return _get_blob_data(repl_dev, shard.id, blob_id, req_offset, req_len, r.value() /* blkid*/, tid);
306304
}
307305

308306
BlobManager::AsyncResult< Blob > HSHomeObject::_get_blob_data(const shared< homestore::ReplDev >& repl_dev,
@@ -437,13 +435,12 @@ HSHomeObject::blob_put_get_blk_alloc_hints(sisl::blob const& header, cintrusive<
437435
return hints;
438436
}
439437

440-
BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blob_id_t blob_id) {
438+
BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blob_id_t blob_id, trace_id_t tid) {
441439
if (is_shutting_down()) {
442440
LOGI("service is being shut down");
443441
return folly::makeUnexpected(BlobErrorCode::SHUTTING_DOWN);
444442
}
445443
incr_pending_request_num();
446-
auto tid = generateRandomTraceId();
447444
BLOGT(tid, shard.id, blob_id, "deleting blob");
448445
auto& pg_id = shard.placement_group;
449446
auto hs_pg = get_hs_pg(pg_id);
@@ -479,7 +476,7 @@ BlobManager::NullAsyncResult HSHomeObject::_del_blob(ShardInfo const& shard, blo
479476
// Populate the key
480477
std::memcpy(req->key_buf().bytes(), &blob_id, sizeof(blob_id_t));
481478

482-
repl_dev->async_alloc_write(req->cheader_buf(), req->ckey_buf(), sisl::sg_list{}, req, tid);
479+
repl_dev->async_alloc_write(req->cheader_buf(), req->ckey_buf(), sisl::sg_list{}, req, false/* part_of_batch */, tid);
483480
return req->result().deferValue([this, repl_dev, tid](const auto& result) -> folly::Expected< folly::Unit, BlobError > {
484481
if (result.hasError()) {
485482
auto err = result.error();

src/lib/homestore_backend/hs_homeobject.hpp

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,17 @@ class HSHomeObject : public HomeObjectImpl {
5151
///
5252

5353
/// Overridable Helpers
54-
ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes) override;
55-
ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&) override;
54+
ShardManager::AsyncResult< ShardInfo > _create_shard(pg_id_t, uint64_t size_bytes, trace_id_t tid) override;
55+
ShardManager::AsyncResult< ShardInfo > _seal_shard(ShardInfo const&, trace_id_t tid) override;
5656

57-
BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&) override;
58-
BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off = 0,
59-
uint64_t len = 0) const override;
60-
BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t) override;
57+
BlobManager::AsyncResult< blob_id_t > _put_blob(ShardInfo const&, Blob&&, trace_id_t tid) override;
58+
BlobManager::AsyncResult< Blob > _get_blob(ShardInfo const&, blob_id_t, uint64_t off, uint64_t len,
59+
trace_id_t tid) const override;
60+
BlobManager::NullAsyncResult _del_blob(ShardInfo const&, blob_id_t, trace_id_t tid) override;
6161

62-
PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers) override;
62+
PGManager::NullAsyncResult _create_pg(PGInfo&& pg_info, std::set< peer_id_t > const& peers, trace_id_t tid) override;
6363
PGManager::NullAsyncResult _replace_member(pg_id_t id, peer_id_t const& old_member, PGMember const& new_member,
64-
uint32_t commit_quorum) override;
64+
uint32_t commit_quorum, trace_id_t tid) override;
6565

6666
bool _get_stats(pg_id_t id, PGStats& stats) const override;
6767
void _get_pg_ids(std::vector< pg_id_t >& pg_ids) const override;

0 commit comments

Comments
 (0)