Skip to content

Commit c1f3da6

Browse files
committed
tracing: make opentelemetry state data sharded
Move gathered telemetry data to opentelemetry_state_data and make it sharded.
1 parent c107273 commit c1f3da6

File tree

6 files changed

+283
-177
lines changed

6 files changed

+283
-177
lines changed

service/storage_proxy.cc

Lines changed: 19 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5109,22 +5109,25 @@ void storage_proxy::init_messaging_service(shared_ptr<migration_manager> mm) {
51095109
auto& cfg = sp->local_db().get_config();
51105110
cmd.max_result_size.emplace(cfg.max_memory_for_unlimited_query_soft_limit(), cfg.max_memory_for_unlimited_query_hard_limit());
51115111
}
5112-
return do_with(std::move(pr), std::move(sp), std::move(trace_state_ptr), [&cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), da, t, mm] (::compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
5113-
p->get_stats().replica_data_reads++;
5114-
auto src_ip = src_addr.addr;
5115-
return mm->get_schema_for_read(cmd->schema_version, std::move(src_addr), p->_messaging).then([cmd, da, &pr, &p, &trace_state_ptr, t] (schema_ptr s) {
5116-
auto pr2 = ::compat::unwrap(std::move(pr), *s);
5117-
if (pr2.second) {
5118-
// this function assumes singular queries but doesn't validate
5119-
throw std::runtime_error("READ_DATA called with wrapping range");
5120-
}
5121-
query::result_options opts;
5122-
opts.digest_algo = da;
5123-
opts.request = da == query::digest_algorithm::none ? query::result_request::only_result : query::result_request::result_and_digest;
5124-
auto timeout = t ? *t : db::no_timeout;
5125-
return p->query_result_local(std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, timeout);
5126-
}).finally([&trace_state_ptr, src_ip] () mutable {
5127-
tracing::trace(trace_state_ptr, "read_data handling is done, sending a response to /{}", src_ip);
5112+
return tracing::start(trace_state_ptr).then([pr = std::move(pr), sp = std::move(sp), trace_state_ptr, &cinfo, cmd = make_lw_shared<query::read_command>(std::move(cmd)), src_addr = std::move(src_addr), da, t, mm] () {
5113+
return do_with(std::move(pr), std::move(sp), std::move(trace_state_ptr), [&cinfo, cmd = std::move(cmd), src_addr = std::move(src_addr), da, t, mm] (::compat::wrapping_partition_range& pr, shared_ptr<storage_proxy>& p, tracing::trace_state_ptr& trace_state_ptr) mutable {
5114+
p->get_stats().replica_data_reads++;
5115+
auto src_ip = src_addr.addr;
5116+
return mm->get_schema_for_read(cmd->schema_version, std::move(src_addr), p->_messaging).then([cmd, da, &pr, &p, &trace_state_ptr, t] (schema_ptr s) {
5117+
auto pr2 = ::compat::unwrap(std::move(pr), *s);
5118+
if (pr2.second) {
5119+
// this function assumes singular queries but doesn't validate
5120+
throw std::runtime_error("READ_DATA called with wrapping range");
5121+
}
5122+
query::result_options opts;
5123+
opts.digest_algo = da;
5124+
opts.request = da == query::digest_algorithm::none ? query::result_request::only_result : query::result_request::result_and_digest;
5125+
auto timeout = t ? *t : db::no_timeout;
5126+
return p->query_result_local(std::move(s), cmd, std::move(pr2.first), opts, trace_state_ptr, timeout);
5127+
}).finally([&trace_state_ptr, src_ip] () mutable {
5128+
tracing::trace(trace_state_ptr, "read_data handling is done, sending a response to /{}", src_ip);
5129+
return tracing::stop(trace_state_ptr);
5130+
});
51285131
});
51295132
});
51305133
});

tracing/trace_state.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ sstring trace_state::raw_value_to_sstring(const cql3::raw_value_view& v, const d
339339
}
340340
}
341341

342-
void opentelemetry_state::serialize_replicas(bytes& serialized) const {
342+
void opentelemetry_state_data::serialize_replicas(bytes& serialized) const {
343343
const auto size = htonl(_replicas.size());
344344
const auto *size_ptr = reinterpret_cast<const int8_t*>(&size);
345345
serialized += bytes{size_ptr, sizeof(size)};
@@ -356,7 +356,7 @@ void opentelemetry_state::serialize_replicas(bytes& serialized) const {
356356
}
357357
}
358358

359-
void opentelemetry_state::serialize_cache_counter(bytes& serialized) const {
359+
void opentelemetry_state_data::serialize_cache_counter(bytes& serialized) const {
360360
const auto counter = htonl(_cache_counter);
361361
const auto *counter_ptr = reinterpret_cast<const int8_t*>(&counter);
362362
serialized += bytes{counter_ptr, sizeof(counter)};

tracing/trace_state.hh

Lines changed: 102 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -507,37 +507,90 @@ private:
507507
};
508508

509509

510-
class opentelemetry_state final {
510+
class opentelemetry_state_data final {
511511
public:
512512
using cache_counter_t = int32_t;
513513

514-
private:
515-
lw_shared_ptr<trace_state> _state_ptr;
516-
bool const _opentelemetry_tracing{false};
517514
inet_address_vector_replica_set _replicas;
518515
// Number of read partitions that were found in cache.
519516
cache_counter_t _cache_counter{0};
520517

521518
void serialize_replicas(bytes& serialized) const;
522519
void serialize_cache_counter(bytes& serialized) const;
520+
};
521+
522+
523+
class opentelemetry_state final {
524+
525+
private:
526+
lw_shared_ptr<trace_state> _state_ptr;
527+
std::shared_ptr<seastar::sharded<opentelemetry_state_data>> _data;
528+
529+
class reducer {
530+
private:
531+
opentelemetry_state_data _data;
532+
533+
public:
534+
void operator()(const opentelemetry_state_data& to_reduce) {
535+
_data._replicas.insert(_data._replicas.end(),
536+
to_reduce._replicas.begin(),
537+
to_reduce._replicas.end());
538+
_data._cache_counter += to_reduce._cache_counter;
539+
}
540+
541+
opentelemetry_state_data get() const {
542+
return _data;
543+
}
544+
};
523545

524546
public:
525547
opentelemetry_state() = default;
526-
opentelemetry_state(lw_shared_ptr<trace_state> state_ptr, bool opentelemetry_tracing = false)
527-
: _state_ptr(std::move(state_ptr)), _opentelemetry_tracing(opentelemetry_tracing)
548+
opentelemetry_state(lw_shared_ptr<trace_state> state_ptr, std::shared_ptr<seastar::sharded<opentelemetry_state_data>> data)
549+
: _state_ptr(std::move(state_ptr)), _data(std::move(data))
550+
{}
551+
opentelemetry_state(std::nullptr_t , std::shared_ptr<seastar::sharded<opentelemetry_state_data>> data)
552+
: _state_ptr(nullptr), _data(std::move(data))
528553
{}
529-
opentelemetry_state(std::nullptr_t, bool opentelemetry_tracing = false)
530-
: _state_ptr(nullptr), _opentelemetry_tracing(opentelemetry_tracing)
554+
template<typename TraceState>
555+
opentelemetry_state(TraceState state_ptr, bool opentelemetry_tracing = false)
556+
: opentelemetry_state(std::move(state_ptr),
557+
opentelemetry_tracing
558+
? std::make_shared<seastar::sharded<opentelemetry_state_data>>()
559+
: nullptr)
531560
{}
532561

562+
/**
563+
* Helper function to initialize sharded data in opentelemetry_state.
564+
*/
565+
future<> start() {
566+
return _data->start();
567+
}
568+
569+
/**
570+
* Helper function to properly stop sharded data in opentelemetry_state.
571+
*/
572+
future<> stop() {
573+
return _data->stop();
574+
}
575+
576+
/**
577+
* Helper function to collect opentelemetry data from all shards.
578+
* It should be called before serializing opentelemetry_state.
579+
*/
580+
future<> collect_data() {
581+
return _data->map_reduce(reducer{}, std::identity{}).then([&local_data = _data->local()] (auto data) {
582+
local_data = data;
583+
});
584+
}
585+
533586
/**
534587
* @return serialized opentelemetry state.
535588
*/
536589
bytes serialize() const noexcept {
537590
bytes serialized{};
538591

539-
serialize_replicas(serialized);
540-
serialize_cache_counter(serialized);
592+
_data->local().serialize_replicas(serialized);
593+
_data->local().serialize_cache_counter(serialized);
541594

542595
return serialized;
543596
}
@@ -548,30 +601,30 @@ public:
548601
* @param replicas list of contacted replicas
549602
*/
550603
void set_replicas(const inet_address_vector_replica_set& replicas) {
551-
_replicas = replicas;
604+
_data->local()._replicas = replicas;
552605
}
553606

554607
/**
555608
* Increment counter of partitions read from cache.
556609
*
557610
* @param count number of partitions
558611
*/
559-
void modify_cache_counter(cache_counter_t count) {
560-
_cache_counter += count;
612+
void modify_cache_counter(opentelemetry_state_data::cache_counter_t count) {
613+
_data->local()._cache_counter += count;
561614
}
562615

563616
/**
564617
* @return number of partitions that were found in cache
565618
*/
566-
cache_counter_t get_cache_counter() const {
567-
return _cache_counter;
619+
opentelemetry_state_data::cache_counter_t get_cache_counter() const {
620+
return _data->local()._cache_counter;
568621
}
569622

570623
/**
571624
* @return True if OpenTelemetry trace state is stored.
572625
*/
573626
bool has_opentelemetry() const noexcept {
574-
return _opentelemetry_tracing;
627+
return __builtin_expect(bool(_data), false);
575628
};
576629

577630
/**
@@ -594,6 +647,13 @@ public:
594647
trace_state& get_tracing() const noexcept {
595648
return *_state_ptr;
596649
}
650+
651+
/**
652+
* @return A pointer to sharded data.
653+
*/
654+
std::shared_ptr<seastar::sharded<opentelemetry_state_data>> get_data() const {
655+
return _data;
656+
}
597657
};
598658

599659

@@ -620,7 +680,7 @@ public:
620680
: _state_ptr(nullptr)
621681
{}
622682

623-
using cache_counter_t = opentelemetry_state::cache_counter_t;
683+
using cache_counter_t = opentelemetry_state_data::cache_counter_t;
624684

625685
/**
626686
* @return True if classic trace state is stored.
@@ -872,7 +932,7 @@ inline std::optional<trace_info> make_trace_info(const trace_state_ptr& state) {
872932
if (state.has_opentelemetry()) {
873933
trace_state_props_set props{};
874934
props.set(trace_state_props::opentelemetry);
875-
return trace_info{props};
935+
return trace_info{props, state.get_opentelemetry().get_data()};
876936
}
877937

878938
return std::nullopt;
@@ -910,6 +970,30 @@ inline trace_state_ptr::cache_counter_t get_cache_counter(const trace_state_ptr&
910970
return 0;
911971
}
912972

973+
inline future<> start(const trace_state_ptr& p) {
974+
if (p.has_opentelemetry()) {
975+
return p.get_opentelemetry_ptr()->start();
976+
}
977+
978+
return make_ready_future<>();
979+
}
980+
981+
inline future<> stop(const trace_state_ptr& p) {
982+
if (p.has_opentelemetry()) {
983+
return p.get_opentelemetry_ptr()->stop();
984+
}
985+
986+
return make_ready_future<>();
987+
}
988+
989+
inline future<> collect_data(const trace_state_ptr& p) {
990+
if (p.has_opentelemetry()) {
991+
return p.get_opentelemetry_ptr()->collect_data();
992+
}
993+
994+
return make_ready_future<>();
995+
}
996+
913997
// global_trace_state_ptr is a helper class that may be used for creating spans
914998
// of an existing tracing session on other shards. When a tracing span on a
915999
// different shard is needed global_trace_state_ptr would create a secondary

tracing/tracing.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -141,19 +141,19 @@ trace_state_ptr tracing::create_session(const trace_info& secondary_session_info
141141
try {
142142
bool opentelemetry_tracing = secondary_session_info.state_props.contains(trace_state_props::opentelemetry);
143143
if (!started()) {
144-
return opentelemetry_tracing ? make_lw_shared<opentelemetry_state>(nullptr, opentelemetry_tracing) : nullptr;
144+
return opentelemetry_tracing ? make_lw_shared<opentelemetry_state>(nullptr, secondary_session_info.otel_data) : nullptr;
145145
}
146146

147147
// Don't create a session if its records are likely to be dropped
148148
if (!may_create_new_session(secondary_session_info.session_id)) {
149-
return opentelemetry_tracing ? make_lw_shared<opentelemetry_state>(nullptr, opentelemetry_tracing) : nullptr;
149+
return opentelemetry_tracing ? make_lw_shared<opentelemetry_state>(nullptr, secondary_session_info.otel_data) : nullptr;
150150
}
151151

152152
++_active_sessions;
153153
if (secondary_session_info.state_props.contains(trace_state_props::classic)) {
154-
return make_lw_shared<opentelemetry_state>(make_lw_shared<trace_state>(secondary_session_info), opentelemetry_tracing);
154+
return make_lw_shared<opentelemetry_state>(make_lw_shared<trace_state>(secondary_session_info), secondary_session_info.otel_data);
155155
}
156-
return make_lw_shared<opentelemetry_state>(nullptr, opentelemetry_tracing);
156+
return make_lw_shared<opentelemetry_state>(nullptr, secondary_session_info.otel_data);
157157
} catch (...) {
158158
// return an uninitialized state in case of any error (OOM?)
159159
return nullptr;

tracing/tracing.hh

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ using trace_state_props_set = enum_set<super_enum<trace_state_props,
141141
trace_state_props::opentelemetry,
142142
trace_state_props::classic>>;
143143

144+
class opentelemetry_state_data;
145+
144146
class trace_info {
145147
public:
146148
utils::UUID session_id;
@@ -150,6 +152,7 @@ public:
150152
uint32_t slow_query_threshold_us; // in microseconds
151153
uint32_t slow_query_ttl_sec; // in seconds
152154
span_id parent_id;
155+
std::shared_ptr<seastar::sharded<opentelemetry_state_data>> otel_data;
153156

154157
public:
155158
trace_info(utils::UUID sid, trace_type t, bool w_o_c, trace_state_props_set s_p, uint32_t slow_query_threshold, uint32_t slow_query_ttl, span_id p_id)
@@ -164,7 +167,9 @@ public:
164167
state_props.set_if<trace_state_props::write_on_close>(write_on_close);
165168
}
166169

167-
trace_info(trace_state_props_set s_p) : state_props(s_p) {}
170+
trace_info(trace_state_props_set s_p, std::shared_ptr<seastar::sharded<opentelemetry_state_data>> otel_data)
171+
: state_props(s_p), otel_data(otel_data)
172+
{}
168173
};
169174

170175
struct one_session_records;

0 commit comments

Comments
 (0)