Skip to content

Commit 2c2773d

Browse files
committed
tracing: make opentelemetry state data sharded
Move gathered telemetry data to opentelemetry_state_data and make it sharded.
1 parent d95d219 commit 2c2773d

File tree

6 files changed

+292
-183
lines changed

6 files changed

+292
-183
lines changed

service/storage_proxy.cc

Lines changed: 21 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3559,8 +3559,8 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
35593559
return _proxy->query_result_local(_schema, _cmd, _partition_range, opts, _trace_state, timeout);
35603560
} else {
35613561
tracing::trace(_trace_state, "read_data: sending a message to /{}", ep);
3562-
return _proxy->_messaging.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t> result_hit_rate) {
3563-
auto&& [result, hit_rate, cache_counter] = result_hit_rate;
3562+
return _proxy->_messaging.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t> response) {
3563+
auto&& [result, hit_rate, cache_counter] = response;
35643564
tracing::trace(_trace_state, "read_data: got response from /{}", ep);
35653565
tracing::modify_cache_counter(_trace_state, cache_counter);
35663566
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>>(rpc::tuple(make_foreign(::make_lw_shared<query::result>(std::move(result))), hit_rate.value_or(cache_temperature::invalid()), cache_counter));
@@ -5109,22 +5109,25 @@ void storage_proxy::init_messaging_service(shared_ptr<migration_manager> mm) {
51095109
auto& cfg = sp->local_db().get_config();
51105110
cmd.max_result_size.emplace(cfg.max_memory_for_unlimited_query_soft_limit(), cfg.max_memory_for_unlimited_query_hard_limit());
51115111
}
5112-
return do_with(std::move(pr), std::move(sp), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), da, t, mm] (::compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
5113-
p->get_stats().replica_data_reads++;
5114-
auto src_ip = src_addr.addr;
5115-
return mm->get_schema_for_read(cmd->schema_version, std::move(src_addr), p->_messaging).then([cmd, da, &pr, &p, &trace_state_ptr, t] (schema_ptr s) {
5116-
auto pr2 = ::compat::unwrap(std::move(pr), *s);
5117-
if (pr2.second) {
5118-
// this function assumes singular queries but doesn't validate
5119-
throw std::runtime_error("READ_DATA called with wrapping range");
5120-
}
5121-
query::result_options opts;
5122-
opts.digest_algo = da;
5123-
opts.request = da == query::digest_algorithm::none ? query::result_request::only_result : query::result_request::result_and_digest;
5124-
auto timeout = t ? *t : db::no_timeout;
5125-
return p->query_result_local(std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, timeout);
5126-
}).finally([&trace_state_ptr, src_ip] () mutable {
5127-
tracing::trace(trace_state_ptr, "read_data handling is done, sending a response to /{}", src_ip);
5112+
return tracing::start(trace_state_ptr).then([pr = std::move(pr), sp = std::move(sp), trace_state_ptr, &cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), da, t, mm] () {
5113+
return do_with(std::move(pr), std::move(sp), std::move(trace_state_ptr), [&cinfo, cmd = std::move(cmd), src_addr = std::move(src_addr), da, t, mm] (::compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
5114+
p->get_stats().replica_data_reads++;
5115+
auto src_ip = src_addr.addr;
5116+
return mm->get_schema_for_read(cmd->schema_version, std::move(src_addr), p->_messaging).then([cmd, da, &pr, &p, &trace_state_ptr, t] (schema_ptr s) {
5117+
auto pr2 = ::compat::unwrap(std::move(pr), *s);
5118+
if (pr2.second) {
5119+
// this function assumes singular queries but doesn't validate
5120+
throw std::runtime_error("READ_DATA called with wrapping range");
5121+
}
5122+
query::result_options opts;
5123+
opts.digest_algo = da;
5124+
opts.request = da == query::digest_algorithm::none ? query::result_request::only_result : query::result_request::result_and_digest;
5125+
auto timeout = t ? *t : db::no_timeout;
5126+
return p->query_result_local(std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, timeout);
5127+
}).finally([&trace_state_ptr, src_ip] () mutable {
5128+
tracing::trace(trace_state_ptr, "read_data handling is done, sending a response to /{}", src_ip);
5129+
return tracing::stop(trace_state_ptr);
5130+
});
51285131
});
51295132
});
51305133
});

tracing/trace_state.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ sstring trace_state::raw_value_to_sstring(const cql3::raw_value_view& v, const d
339339
}
340340
}
341341

342-
void opentelemetry_state::serialize_replicas(bytes& serialized) const {
342+
void opentelemetry_state_data::serialize_replicas(bytes& serialized) const {
343343
const auto size = htonl(_replicas.size());
344344
const auto *size_ptr = reinterpret_cast<const int8_t*>(&size);
345345
serialized += bytes{size_ptr, sizeof(size)};
@@ -356,11 +356,11 @@ void opentelemetry_state::serialize_replicas(bytes& serialized) const {
356356
}
357357
}
358358

359-
void opentelemetry_state::serialize_statement_type(bytes& serialized) const {
359+
void opentelemetry_state_data::serialize_statement_type(bytes& serialized) const {
360360
serialized += bytes{reinterpret_cast<const signed char*>(_statement_type.c_str()), _statement_type.length()};
361361
}
362362

363-
void opentelemetry_state::serialize_cache_counter(bytes& serialized) const {
363+
void opentelemetry_state_data::serialize_cache_counter(bytes& serialized) const {
364364
const auto counter = htonl(_cache_counter);
365365
const auto *counter_ptr = reinterpret_cast<const int8_t*>(&counter);
366366
serialized += bytes{counter_ptr, sizeof(counter)};

tracing/trace_state.hh

Lines changed: 107 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -507,13 +507,10 @@ private:
507507
};
508508

509509

510-
class opentelemetry_state final {
510+
class opentelemetry_state_data final {
511511
public:
512512
using cache_counter_t = int32_t;
513513

514-
private:
515-
lw_shared_ptr<trace_state> _state_ptr;
516-
bool const _opentelemetry_tracing{false};
517514
inet_address_vector_replica_set _replicas;
518515
sstring _statement_type;
519516
// Number of read partitions that were found in cache.
@@ -522,25 +519,82 @@ private:
522519
void serialize_replicas(bytes& serialized) const;
523520
void serialize_statement_type(bytes& serialized) const;
524521
void serialize_cache_counter(bytes& serialized) const;
522+
};
523+
524+
525+
class opentelemetry_state final {
526+
527+
private:
528+
lw_shared_ptr<trace_state> _state_ptr;
529+
std::shared_ptr<sharded<opentelemetry_state_data>> _data;
530+
531+
class reducer {
532+
private:
533+
opentelemetry_state_data _data;
534+
535+
public:
536+
void operator()(const opentelemetry_state_data& to_reduce) {
537+
_data._replicas.insert(_data._replicas.end(),
538+
to_reduce._replicas.begin(),
539+
to_reduce._replicas.end());
540+
_data._cache_counter += to_reduce._cache_counter;
541+
_data._statement_type += to_reduce._statement_type;
542+
}
543+
544+
opentelemetry_state_data get() const {
545+
return _data;
546+
}
547+
};
525548

526549
public:
527550
opentelemetry_state() = default;
528-
opentelemetry_state(lw_shared_ptr<trace_state> state_ptr, bool opentelemetry_tracing = false)
529-
: _state_ptr(std::move(state_ptr)), _opentelemetry_tracing(opentelemetry_tracing)
551+
opentelemetry_state(lw_shared_ptr<trace_state> state_ptr, std::shared_ptr<sharded<opentelemetry_state_data>> data)
552+
: _state_ptr(std::move(state_ptr)), _data(std::move(data))
553+
{}
554+
opentelemetry_state(std::nullptr_t , std::shared_ptr<sharded<opentelemetry_state_data>> data)
555+
: _state_ptr(nullptr), _data(std::move(data))
530556
{}
531-
opentelemetry_state(std::nullptr_t, bool opentelemetry_tracing = false)
532-
: _state_ptr(nullptr), _opentelemetry_tracing(opentelemetry_tracing)
557+
template<typename TraceState>
558+
opentelemetry_state(TraceState state_ptr, bool opentelemetry_tracing = false)
559+
: opentelemetry_state(std::move(state_ptr),
560+
opentelemetry_tracing
561+
? std::make_shared<sharded<opentelemetry_state_data>>()
562+
: nullptr)
533563
{}
534564

565+
/**
566+
* Helper function to initialize sharded data in opentelemetry_state.
567+
*/
568+
future<> start() {
569+
return _data->start();
570+
}
571+
572+
/**
573+
* Helper function to properly stop sharded data in opentelemetry_state.
574+
*/
575+
future<> stop() {
576+
return _data->stop();
577+
}
578+
579+
/**
580+
* Helper function to collect opentelemetry data from all shards.
581+
* It should be called before serializing opentelemetry_state.
582+
*/
583+
future<> collect_data() {
584+
return _data->map_reduce(reducer{}, std::identity{}).then([&local_data = _data->local()] (auto data) {
585+
local_data = data;
586+
});
587+
}
588+
535589
/**
536590
* @return serialized opentelemetry state.
537591
*/
538592
bytes serialize() const noexcept {
539593
bytes serialized{};
540594

541-
serialize_replicas(serialized);
542-
serialize_cache_counter(serialized);
543-
serialize_statement_type(serialized);
595+
_data->local().serialize_replicas(serialized);
596+
_data->local().serialize_cache_counter(serialized);
597+
_data->local().serialize_statement_type(serialized);
544598

545599
return serialized;
546600
}
@@ -551,7 +605,7 @@ public:
551605
* @param replicas list of contacted replicas
552606
*/
553607
void set_replicas(const inet_address_vector_replica_set& replicas) {
554-
_replicas = replicas;
608+
_data->local()._replicas = replicas;
555609
}
556610

557611
/**
@@ -560,30 +614,32 @@ public:
560614
* @param statement_type type of prepared statement.
561615
*/
562616
void set_statement_type(const sstring& statement_type) {
563-
_statement_type = statement_type;
617+
_data->local()._statement_type = statement_type;
564618
}
565619

566620
/**
567621
* Increment counter of partitions read from cache.
568622
*
569623
* @param count number of partitions
570624
*/
571-
void modify_cache_counter(cache_counter_t count) {
572-
_cache_counter += count;
625+
void modify_cache_counter(opentelemetry_state_data::cache_counter_t count) {
626+
if (_data->local_is_initialized()) {
627+
_data->local()._cache_counter += count;
628+
}
573629
}
574630

575631
/**
576632
* @return number of partitions that were found in cache
577633
*/
578-
cache_counter_t get_cache_counter() const {
579-
return _cache_counter;
634+
opentelemetry_state_data::cache_counter_t get_cache_counter() const {
635+
return _data->local()._cache_counter;
580636
}
581637

582638
/**
583639
* @return True if OpenTelemetry trace state is stored.
584640
*/
585641
bool has_opentelemetry() const noexcept {
586-
return _opentelemetry_tracing;
642+
return __builtin_expect(bool(_data), false);
587643
};
588644

589645
/**
@@ -606,6 +662,13 @@ public:
606662
trace_state& get_tracing() const noexcept {
607663
return *_state_ptr;
608664
}
665+
666+
/**
667+
* @return A pointer to sharded data.
668+
*/
669+
std::shared_ptr<sharded<opentelemetry_state_data>> get_data() const {
670+
return _data;
671+
}
609672
};
610673

611674

@@ -632,7 +695,7 @@ public:
632695
: _state_ptr(nullptr)
633696
{}
634697

635-
using cache_counter_t = opentelemetry_state::cache_counter_t;
698+
using cache_counter_t = opentelemetry_state_data::cache_counter_t;
636699

637700
/**
638701
* @return True if classic trace state is stored.
@@ -884,7 +947,7 @@ inline std::optional<trace_info> make_trace_info(const trace_state_ptr& state) {
884947
if (state.has_opentelemetry()) {
885948
trace_state_props_set props{};
886949
props.set(trace_state_props::opentelemetry);
887-
return trace_info{props};
950+
return trace_info{props, state.get_opentelemetry().get_data()};
888951
}
889952

890953
return std::nullopt;
@@ -928,6 +991,30 @@ inline trace_state_ptr::cache_counter_t get_cache_counter(const trace_state_ptr&
928991
return 0;
929992
}
930993

994+
inline future<> start(const trace_state_ptr& p) {
995+
if (p.has_opentelemetry()) {
996+
return p.get_opentelemetry_ptr()->start();
997+
}
998+
999+
return make_ready_future<>();
1000+
}
1001+
1002+
inline future<> stop(const trace_state_ptr& p) {
1003+
if (p.has_opentelemetry()) {
1004+
return p.get_opentelemetry_ptr()->stop();
1005+
}
1006+
1007+
return make_ready_future<>();
1008+
}
1009+
1010+
inline future<> collect_data(const trace_state_ptr& p) {
1011+
if (p.has_opentelemetry()) {
1012+
return p.get_opentelemetry_ptr()->collect_data();
1013+
}
1014+
1015+
return make_ready_future<>();
1016+
}
1017+
9311018
// global_trace_state_ptr is a helper class that may be used for creating spans
9321019
// of an existing tracing session on other shards. When a tracing span on a
9331020
// different shard is needed global_trace_state_ptr would create a secondary

tracing/tracing.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,19 +141,19 @@ trace_state_ptr tracing::create_session(const trace_info& secondary_session_info
141141
try {
142142
bool opentelemetry_tracing = secondary_session_info.state_props.contains(trace_state_props::opentelemetry);
143143
if (!started()) {
144-
return opentelemetry_tracing ? make_lw_shared<opentelemetry_state>(nullptr, opentelemetry_tracing) : nullptr;
144+
return opentelemetry_tracing ? make_lw_shared<opentelemetry_state>(nullptr, secondary_session_info.otel_data) : nullptr;
145145
}
146146

147147
// Don't create a session if its records are likely to be dropped
148148
if (!may_create_new_session(secondary_session_info.session_id)) {
149-
return opentelemetry_tracing ? make_lw_shared<opentelemetry_state>(nullptr, opentelemetry_tracing) : nullptr;
149+
return opentelemetry_tracing ? make_lw_shared<opentelemetry_state>(nullptr, secondary_session_info.otel_data) : nullptr;
150150
}
151151

152152
++_active_sessions;
153153
if (secondary_session_info.state_props.contains(trace_state_props::classic)) {
154-
return make_lw_shared<opentelemetry_state>(make_lw_shared<trace_state>(secondary_session_info), opentelemetry_tracing);
154+
return make_lw_shared<opentelemetry_state>(make_lw_shared<trace_state>(secondary_session_info), secondary_session_info.otel_data);
155155
}
156-
return make_lw_shared<opentelemetry_state>(nullptr, opentelemetry_tracing);
156+
return make_lw_shared<opentelemetry_state>(nullptr, secondary_session_info.otel_data);
157157
} catch (...) {
158158
// return an uninitialized state in case of any error (OOM?)
159159
return nullptr;

tracing/tracing.hh

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ using trace_state_props_set = enum_set<super_enum<trace_state_props,
141141
trace_state_props::opentelemetry,
142142
trace_state_props::classic>>;
143143

144+
class opentelemetry_state_data;
145+
144146
class trace_info {
145147
public:
146148
utils::UUID session_id;
@@ -150,6 +152,7 @@ public:
150152
uint32_t slow_query_threshold_us; // in microseconds
151153
uint32_t slow_query_ttl_sec; // in seconds
152154
span_id parent_id;
155+
std::shared_ptr<seastar::sharded<opentelemetry_state_data>> otel_data;
153156

154157
public:
155158
trace_info(utils::UUID sid, trace_type t, bool w_o_c, trace_state_props_set s_p, uint32_t slow_query_threshold, uint32_t slow_query_ttl, span_id p_id)
@@ -164,7 +167,9 @@ public:
164167
state_props.set_if<trace_state_props::write_on_close>(write_on_close);
165168
}
166169

167-
trace_info(trace_state_props_set s_p) : state_props(s_p) {}
170+
trace_info(trace_state_props_set s_p, std::shared_ptr<seastar::sharded<opentelemetry_state_data>> otel_data)
171+
: state_props(s_p), otel_data(otel_data)
172+
{}
168173
};
169174

170175
struct one_session_records;

0 commit comments

Comments
 (0)