Skip to content

Commit 4ac7e6a

Browse files
Karolina Drabikmargdoc
authored andcommitted
add new replica-side tracing info - dma_counter and dma_size
1 parent 2c2773d commit 4ac7e6a

File tree

9 files changed

+115
-18
lines changed

9 files changed

+115
-18
lines changed

message/messaging_service.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,14 +1061,14 @@ future<> messaging_service::send_mutation_failed(msg_addr id, unsigned shard, re
10611061
return send_message_oneway(this, messaging_verb::MUTATION_FAILED, std::move(id), shard, std::move(response_id), num_failed, std::move(backlog));
10621062
}
10631063

1064-
void messaging_service::register_read_data(std::function<future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> (const rpc::client_info&, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda)>&& func) {
1064+
void messaging_service::register_read_data(std::function<future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> (const rpc::client_info&, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda)>&& func) {
10651065
register_handler(this, netw::messaging_verb::READ_DATA, std::move(func));
10661066
}
10671067
future<> messaging_service::unregister_read_data() {
10681068
return unregister_handler(netw::messaging_verb::READ_DATA);
10691069
}
1070-
future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t>> messaging_service::send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da) {
1071-
return send_message_timeout<future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t>>>(this, messaging_verb::READ_DATA, std::move(id), timeout, cmd, pr, da);
1070+
future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> messaging_service::send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da) {
1071+
return send_message_timeout<future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>>(this, messaging_verb::READ_DATA, std::move(id), timeout, cmd, pr, da);
10721072
}
10731073

10741074
void messaging_service::register_get_schema_version(std::function<future<frozen_schema>(unsigned, table_schema_version)>&& func) {

message/messaging_service.hh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,9 +482,9 @@ public:
482482

483483
// Wrapper for READ_DATA
484484
// Note: WTH is future<foreign_ptr<lw_shared_ptr<query::result>>
485-
void register_read_data(std::function<future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> (const rpc::client_info&, rpc::opt_time_point timeout, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> digest)>&& func);
485+
void register_read_data(std::function<future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> (const rpc::client_info&, rpc::opt_time_point timeout, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> digest)>&& func);
486486
future<> unregister_read_data();
487-
future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t>> send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da);
487+
future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da);
488488

489489
// Wrapper for GET_SCHEMA_VERSION
490490
void register_get_schema_version(std::function<future<frozen_schema>(unsigned, table_schema_version)>&& func);

service/storage_proxy.cc

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3549,7 +3549,7 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
35493549
});
35503550
}
35513551
}
3552-
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
3552+
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
35533553
++_proxy->get_stats().data_read_attempts.get_ep_stat(ep);
35543554
auto opts = want_digest
35553555
? query::result_options{query::result_request::result_and_digest, digest_algorithm(*_proxy)}
@@ -3559,11 +3559,11 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
35593559
return _proxy->query_result_local(_schema, _cmd, _partition_range, opts, _trace_state, timeout);
35603560
} else {
35613561
tracing::trace(_trace_state, "read_data: sending a message to /{}", ep);
3562-
return _proxy->_messaging.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t> response) {
3563-
auto&& [result, hit_rate, cache_counter] = response;
3562+
return _proxy->_messaging.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t> response) {
3563+
auto&& [result, hit_rate, cache_counter, dma_counter, dma_size] = response;
35643564
tracing::trace(_trace_state, "read_data: got response from /{}", ep);
35653565
tracing::modify_cache_counter(_trace_state, cache_counter);
3566-
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>>(rpc::tuple(make_foreign(::make_lw_shared<query::result>(std::move(result))), hit_rate.value_or(cache_temperature::invalid()), cache_counter));
3566+
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>(rpc::tuple(make_foreign(::make_lw_shared<query::result>(std::move(result))), hit_rate.value_or(cache_temperature::invalid()), cache_counter, dma_counter, dma_size));
35673567
});
35683568
}
35693569
}
@@ -3606,7 +3606,7 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
36063606
auto start = latency_clock::now();
36073607
for (const gms::inet_address& ep : boost::make_iterator_range(begin, end)) {
36083608
// Waited on indirectly, shared_from_this keeps `this` alive
3609-
(void)make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> f) {
3609+
(void)make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> f) {
36103610
try {
36113611
auto v = f.get0();
36123612
_cf->set_hit_rate(ep, std::get<1>(v));
@@ -4007,13 +4007,13 @@ ::shared_ptr<abstract_read_executor> storage_proxy::get_read_executor(lw_shared_
40074007

40084008
future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>>
40094009
storage_proxy::query_result_local_digest(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, query::digest_algorithm da) {
4010-
return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(da), std::move(trace_state), timeout).then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t> result_and_hit_rate) {
4011-
auto&& [result, hit_rate, cache_counter] = result_and_hit_rate;
4010+
return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(da), std::move(trace_state), timeout).then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t> result_and_hit_rate) {
4011+
auto&& [result, hit_rate, cache_counter, dma_counter, dma_size] = result_and_hit_rate;
40124012
return make_ready_future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>>(rpc::tuple(*result->digest(), result->last_modified(), hit_rate));
40134013
});
40144014
}
40154015

4016-
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>>
4016+
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>
40174017
storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_options opts,
40184018
tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout) {
40194019
cmd->slice.options.set_if<query::partition_slice::option::with_digest>(opts.request != query::result_request::only_result);
@@ -4026,7 +4026,7 @@ storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_comman
40264026
return db.query(gs, *cmd, opts, prv, trace_state, timeout).then([trace_state](std::tuple<lw_shared_ptr<query::result>, cache_temperature>&& f_ht) {
40274027
auto&& [f, ht] = f_ht;
40284028
tracing::trace(trace_state, "Querying is done");
4029-
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>>(rpc::tuple(make_foreign(std::move(f)), ht, get_cache_counter(trace_state)));
4029+
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>(rpc::tuple(make_foreign(std::move(f)), ht, get_cache_counter(trace_state), get_dma_counter(trace_state), get_dma_size(trace_state)));
40304030
});
40314031
});
40324032
} else {
@@ -4036,7 +4036,7 @@ storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_comman
40364036
[trace_state = std::move(trace_state)] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>&& r_ht) {
40374037
auto&& [r, ht] = r_ht;
40384038
tracing::trace(trace_state, "Querying is done");
4039-
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>>(rpc::tuple(std::move(r), ht, get_cache_counter(trace_state)));
4039+
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>(rpc::tuple(std::move(r), ht, get_cache_counter(trace_state), get_dma_counter(trace_state), get_dma_size(trace_state)));
40404040
});
40414041
}
40424042
}

service/storage_proxy.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ private:
377377
const inet_address_vector_replica_set& preferred_endpoints,
378378
bool& is_bounced_read,
379379
service_permit permit);
380-
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> query_result_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
380+
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> query_result_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
381381
query::result_options opts,
382382
tracing::trace_state_ptr trace_state,
383383
clock_type::time_point timeout);

sstables/index_reader.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ inline file make_tracked_index_file(sstable& sst, reader_permit permit, tracing:
316316
use_caching caching) {
317317
auto f = caching ? sst.index_file() : sst.uncached_index_file();
318318
f = make_tracked_file(std::move(f), std::move(permit));
319-
if (!trace_state.has_tracing()) {
319+
if (!trace_state.has_tracing() && !trace_state.has_opentelemetry()) {
320320
return f;
321321
}
322322
return tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", sst.filename(component_type::Index)));

sstables/sstables.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2286,7 +2286,7 @@ input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_prior
22862286
options.dynamic_adjustments = std::move(history);
22872287

22882288
file f = make_tracked_file(_data_file, std::move(permit));
2289-
if (trace_state.has_tracing()) {
2289+
if (trace_state.has_tracing() || trace_state.has_opentelemetry()) {
22902290
f = tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", get_filename()));
22912291
}
22922292

tracing/trace_state.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,4 +365,16 @@ void opentelemetry_state_data::serialize_cache_counter(bytes& serialized) const
365365
const auto *counter_ptr = reinterpret_cast<const int8_t*>(&counter);
366366
serialized += bytes{counter_ptr, sizeof(counter)};
367367
}
368+
369+
void opentelemetry_state_data::serialize_dma_counter(bytes& serialized) const {
370+
const auto counter = htonl(_dma_counter);
371+
const auto *counter_ptr = reinterpret_cast<const int8_t*>(&counter);
372+
serialized += bytes{counter_ptr, sizeof(counter)};
373+
}
374+
375+
void opentelemetry_state_data::serialize_dma_size(bytes& serialized) const {
376+
const auto size = htonl(_dma_size);
377+
const auto *size_ptr = reinterpret_cast<const int8_t*>(&size);
378+
serialized += bytes{size_ptr, sizeof(size)};
379+
}
368380
}

tracing/trace_state.hh

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,15 +510,23 @@ private:
510510
class opentelemetry_state_data final {
511511
public:
512512
using cache_counter_t = int32_t;
513+
using dma_counter_t = int32_t;
514+
using dma_size_t = int32_t;
513515

514516
inet_address_vector_replica_set _replicas;
515517
sstring _statement_type;
516518
// Number of read partitions that were found in cache.
517519
cache_counter_t _cache_counter{0};
520+
// Number of DMA reads that were executed.
521+
dma_counter_t _dma_counter{0};
522+
// Number of bytes read in DMA reads.
523+
dma_size_t _dma_size{0};
518524

519525
void serialize_replicas(bytes& serialized) const;
520526
void serialize_statement_type(bytes& serialized) const;
521527
void serialize_cache_counter(bytes& serialized) const;
528+
void serialize_dma_counter(bytes& serialized) const;
529+
void serialize_dma_size(bytes& serialized) const;
522530
};
523531

524532

@@ -538,6 +546,8 @@ private:
538546
to_reduce._replicas.begin(),
539547
to_reduce._replicas.end());
540548
_data._cache_counter += to_reduce._cache_counter;
549+
_data._dma_counter += to_reduce._dma_counter;
550+
_data._dma_size += to_reduce._dma_size;
541551
_data._statement_type += to_reduce._statement_type;
542552
}
543553

@@ -594,6 +604,8 @@ public:
594604

595605
_data->local().serialize_replicas(serialized);
596606
_data->local().serialize_cache_counter(serialized);
607+
_data->local().serialize_dma_counter(serialized);
608+
_data->local().serialize_dma_size(serialized);
597609
_data->local().serialize_statement_type(serialized);
598610

599611
return serialized;
@@ -628,13 +640,49 @@ public:
628640
}
629641
}
630642

643+
/**
644+
* Increment counter of DMA reads.
645+
*
646+
* @param count number of reads
647+
*/
648+
void modify_dma_counter(opentelemetry_state_data::dma_counter_t count) {
649+
if (_data->local_is_initialized()) {
650+
_data->local()._dma_counter += count;
651+
}
652+
}
653+
654+
/**
655+
* Increment number of bytes read in DMA reads.
656+
*
657+
* @param size number of bytes.
658+
*/
659+
void modify_dma_size(opentelemetry_state_data::dma_size_t size) {
660+
if (_data->local_is_initialized()) {
661+
_data->local()._dma_size += size;
662+
}
663+
}
664+
631665
/**
632666
* @return number of partitions that were found in cache
633667
*/
634668
opentelemetry_state_data::cache_counter_t get_cache_counter() const {
635669
return _data->local()._cache_counter;
636670
}
637671

672+
/**
673+
* @return number of DMA reads that were executed
674+
*/
675+
opentelemetry_state_data::dma_counter_t get_dma_counter() const {
676+
return _data->local()._dma_counter;
677+
}
678+
679+
/**
680+
* @return number of bytes read in DMA reads
681+
*/
682+
opentelemetry_state_data::dma_size_t get_dma_size() const {
683+
return _data->local()._dma_size;
684+
}
685+
638686
/**
639687
* @return True if OpenTelemetry trace state is stored.
640688
*/
@@ -696,6 +744,8 @@ public:
696744
{}
697745

698746
using cache_counter_t = opentelemetry_state_data::cache_counter_t;
747+
using dma_counter_t = opentelemetry_state_data::dma_counter_t;
748+
using dma_size_t = opentelemetry_state_data::dma_size_t;
699749

700750
/**
701751
* @return True if classic trace state is stored.
@@ -983,6 +1033,18 @@ inline void modify_cache_counter(const trace_state_ptr& p, trace_state_ptr::cach
9831033
}
9841034
}
9851035

1036+
inline void modify_dma_counter(const trace_state_ptr& p, trace_state_ptr::dma_counter_t count) {
1037+
if (p.has_opentelemetry()) {
1038+
p.get_opentelemetry_ptr()->modify_dma_counter(count);
1039+
}
1040+
}
1041+
1042+
inline void modify_dma_size(const trace_state_ptr& p, trace_state_ptr::dma_size_t size) {
1043+
if (p.has_opentelemetry()) {
1044+
p.get_opentelemetry_ptr()->modify_dma_size(size);
1045+
}
1046+
}
1047+
9861048
inline trace_state_ptr::cache_counter_t get_cache_counter(const trace_state_ptr& p) {
9871049
if (p.has_opentelemetry()) {
9881050
return p.get_opentelemetry_ptr()->get_cache_counter();
@@ -991,6 +1053,22 @@ inline trace_state_ptr::cache_counter_t get_cache_counter(const trace_state_ptr&
9911053
return 0;
9921054
}
9931055

1056+
inline trace_state_ptr::dma_counter_t get_dma_counter(const trace_state_ptr& p) {
1057+
if (p.has_opentelemetry()) {
1058+
return p.get_opentelemetry_ptr()->get_dma_counter();
1059+
}
1060+
1061+
return 0;
1062+
}
1063+
1064+
inline trace_state_ptr::dma_size_t get_dma_size(const trace_state_ptr& p) {
1065+
if (p.has_opentelemetry()) {
1066+
return p.get_opentelemetry_ptr()->get_dma_size();
1067+
}
1068+
1069+
return 0;
1070+
}
1071+
9941072
inline future<> start(const trace_state_ptr& p) {
9951073
if (p.has_opentelemetry()) {
9961074
return p.get_opentelemetry_ptr()->start();

0 commit comments

Comments
 (0)