Skip to content

Commit 10e6316

Browse files
author
Karolina Drabik
committed
add new replica-side tracing info - dma_counter and dma_size
1 parent 7c539a4 commit 10e6316

File tree

9 files changed

+111
-18
lines changed

9 files changed

+111
-18
lines changed

message/messaging_service.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,14 +1061,14 @@ future<> messaging_service::send_mutation_failed(msg_addr id, unsigned shard, re
10611061
return send_message_oneway(this, messaging_verb::MUTATION_FAILED, std::move(id), shard, std::move(response_id), num_failed, std::move(backlog));
10621062
}
10631063

1064-
void messaging_service::register_read_data(std::function<future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> (const rpc::client_info&, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda)>&& func) {
1064+
void messaging_service::register_read_data(std::function<future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> (const rpc::client_info&, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda)>&& func) {
10651065
register_handler(this, netw::messaging_verb::READ_DATA, std::move(func));
10661066
}
10671067
future<> messaging_service::unregister_read_data() {
10681068
return unregister_handler(netw::messaging_verb::READ_DATA);
10691069
}
1070-
future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t>> messaging_service::send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da) {
1071-
return send_message_timeout<future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t>>>(this, messaging_verb::READ_DATA, std::move(id), timeout, cmd, pr, da);
1070+
future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> messaging_service::send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da) {
1071+
return send_message_timeout<future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>>(this, messaging_verb::READ_DATA, std::move(id), timeout, cmd, pr, da);
10721072
}
10731073

10741074
void messaging_service::register_get_schema_version(std::function<future<frozen_schema>(unsigned, table_schema_version)>&& func) {

message/messaging_service.hh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,9 +482,9 @@ public:
482482

483483
// Wrapper for READ_DATA
484484
// Note: WTH is future<foreign_ptr<lw_shared_ptr<query::result>>
485-
void register_read_data(std::function<future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> (const rpc::client_info&, rpc::opt_time_point timeout, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> digest)>&& func);
485+
void register_read_data(std::function<future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> (const rpc::client_info&, rpc::opt_time_point timeout, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> digest)>&& func);
486486
future<> unregister_read_data();
487-
future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t>> send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da);
487+
future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da);
488488

489489
// Wrapper for GET_SCHEMA_VERSION
490490
void register_get_schema_version(std::function<future<frozen_schema>(unsigned, table_schema_version)>&& func);

service/storage_proxy.cc

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3549,7 +3549,7 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
35493549
});
35503550
}
35513551
}
3552-
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
3552+
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
35533553
++_proxy->get_stats().data_read_attempts.get_ep_stat(ep);
35543554
auto opts = want_digest
35553555
? query::result_options{query::result_request::result_and_digest, digest_algorithm(*_proxy)}
@@ -3559,11 +3559,11 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
35593559
return _proxy->query_result_local(_schema, _cmd, _partition_range, opts, _trace_state, timeout);
35603560
} else {
35613561
tracing::trace(_trace_state, "read_data: sending a message to /{}", ep);
3562-
return _proxy->_messaging.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t> response) {
3563-
auto&& [result, hit_rate, cache_counter] = response;
3562+
return _proxy->_messaging.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t> response) {
3563+
auto&& [result, hit_rate, cache_counter, dma_counter, dma_size] = response;
35643564
tracing::trace(_trace_state, "read_data: got response from /{}", ep);
35653565
tracing::modify_cache_counter(_trace_state, cache_counter);
3566-
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>>(rpc::tuple(make_foreign(::make_lw_shared<query::result>(std::move(result))), hit_rate.value_or(cache_temperature::invalid()), cache_counter));
3566+
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>(rpc::tuple(make_foreign(::make_lw_shared<query::result>(std::move(result))), hit_rate.value_or(cache_temperature::invalid()), cache_counter, dma_counter, dma_size));
35673567
});
35683568
}
35693569
}
@@ -3606,7 +3606,7 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
36063606
auto start = latency_clock::now();
36073607
for (const gms::inet_address& ep : boost::make_iterator_range(begin, end)) {
36083608
// Waited on indirectly, shared_from_this keeps `this` alive
3609-
(void)make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> f) {
3609+
(void)make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> f) {
36103610
try {
36113611
auto v = f.get0();
36123612
_cf->set_hit_rate(ep, std::get<1>(v));
@@ -4007,13 +4007,13 @@ ::shared_ptr<abstract_read_executor> storage_proxy::get_read_executor(lw_shared_
40074007

40084008
future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>>
40094009
storage_proxy::query_result_local_digest(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, query::digest_algorithm da) {
4010-
return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(da), std::move(trace_state), timeout).then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t> result_and_hit_rate) {
4011-
auto&& [result, hit_rate, cache_counter] = result_and_hit_rate;
4010+
return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(da), std::move(trace_state), timeout).then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t> result_and_hit_rate) {
4011+
auto&& [result, hit_rate, cache_counter, dma_counter, dma_size] = result_and_hit_rate;
40124012
return make_ready_future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>>(rpc::tuple(*result->digest(), result->last_modified(), hit_rate));
40134013
});
40144014
}
40154015

4016-
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>>
4016+
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>
40174017
storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_options opts,
40184018
tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout) {
40194019
cmd->slice.options.set_if<query::partition_slice::option::with_digest>(opts.request != query::result_request::only_result);
@@ -4026,7 +4026,7 @@ storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_comman
40264026
return db.query(gs, *cmd, opts, prv, trace_state, timeout).then([trace_state](std::tuple<lw_shared_ptr<query::result>, cache_temperature>&& f_ht) {
40274027
auto&& [f, ht] = f_ht;
40284028
tracing::trace(trace_state, "Querying is done");
4029-
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>>(rpc::tuple(make_foreign(std::move(f)), ht, get_cache_counter(trace_state)));
4029+
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>(rpc::tuple(make_foreign(std::move(f)), ht, get_cache_counter(trace_state), get_dma_counter(trace_state), get_dma_size(trace_state)));
40304030
});
40314031
});
40324032
} else {
@@ -4036,7 +4036,7 @@ storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_comman
40364036
[trace_state = std::move(trace_state)] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>&& r_ht) {
40374037
auto&& [r, ht] = r_ht;
40384038
tracing::trace(trace_state, "Querying is done");
4039-
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>>(rpc::tuple(std::move(r), ht, get_cache_counter(trace_state)));
4039+
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>(rpc::tuple(std::move(r), ht, get_cache_counter(trace_state), get_dma_counter(trace_state), get_dma_size(trace_state)));
40404040
});
40414041
}
40424042
}

service/storage_proxy.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ private:
377377
const inet_address_vector_replica_set& preferred_endpoints,
378378
bool& is_bounced_read,
379379
service_permit permit);
380-
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> query_result_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
380+
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> query_result_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
381381
query::result_options opts,
382382
tracing::trace_state_ptr trace_state,
383383
clock_type::time_point timeout);

sstables/index_reader.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ inline file make_tracked_index_file(sstable& sst, reader_permit permit, tracing:
316316
use_caching caching) {
317317
auto f = caching ? sst.index_file() : sst.uncached_index_file();
318318
f = make_tracked_file(std::move(f), std::move(permit));
319-
if (!trace_state.has_tracing()) {
319+
if (!trace_state.has_tracing() && !trace_state.has_opentelemetry()) {
320320
return f;
321321
}
322322
return tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", sst.filename(component_type::Index)));

sstables/sstables.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2286,7 +2286,7 @@ input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_prior
22862286
options.dynamic_adjustments = std::move(history);
22872287

22882288
file f = make_tracked_file(_data_file, std::move(permit));
2289-
if (trace_state.has_tracing()) {
2289+
if (trace_state.has_tracing() || trace_state.has_opentelemetry()) {
22902290
f = tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", get_filename()));
22912291
}
22922292

tracing/trace_state.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,4 +365,16 @@ void opentelemetry_state_data::serialize_cache_counter(bytes& serialized) const
365365
const auto *counter_ptr = reinterpret_cast<const int8_t*>(&counter);
366366
serialized += bytes{counter_ptr, sizeof(counter)};
367367
}
368+
369+
void opentelemetry_state_data::serialize_dma_counter(bytes& serialized) const {
370+
const auto counter = htonl(_dma_counter);
371+
const auto *counter_ptr = reinterpret_cast<const int8_t*>(&counter);
372+
serialized += bytes{counter_ptr, sizeof(counter)};
373+
}
374+
375+
void opentelemetry_state_data::serialize_dma_size(bytes& serialized) const {
376+
const auto size = htonl(_dma_size);
377+
const auto *size_ptr = reinterpret_cast<const int8_t*>(&size);
378+
serialized += bytes{size_ptr, sizeof(size)};
379+
}
368380
}

tracing/trace_state.hh

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,15 +510,23 @@ private:
510510
class opentelemetry_state_data final {
511511
public:
512512
using cache_counter_t = int32_t;
513+
using dma_counter_t = int32_t;
514+
using dma_size_t = int32_t;
513515

514516
inet_address_vector_replica_set _replicas;
515517
sstring _statement_type;
516518
// Number of read partitions that were found in cache.
517519
cache_counter_t _cache_counter{0};
520+
// Number of DMA reads that were executed.
521+
dma_counter_t _dma_counter{0};
522+
// Number of bytes read in DMA reads.
523+
dma_size_t _dma_size{0};
518524

519525
void serialize_replicas(bytes& serialized) const;
520526
void serialize_statement_type(bytes& serialized) const;
521527
void serialize_cache_counter(bytes& serialized) const;
528+
void serialize_dma_counter(bytes& serialized) const;
529+
void serialize_dma_size(bytes& serialized) const;
522530
};
523531

524532

@@ -538,6 +546,8 @@ private:
538546
to_reduce._replicas.begin(),
539547
to_reduce._replicas.end());
540548
_data._cache_counter += to_reduce._cache_counter;
549+
_data._dma_counter += to_reduce._dma_counter;
550+
_data._dma_size += to_reduce._dma_size;
541551
_data._statement_type += to_reduce._statement_type;
542552
}
543553

@@ -594,6 +604,8 @@ public:
594604

595605
_data->local().serialize_replicas(serialized);
596606
_data->local().serialize_cache_counter(serialized);
607+
_data->local().serialize_dma_counter(serialized);
608+
_data->local().serialize_dma_size(serialized);
597609
_data->local().serialize_statement_type(serialized);
598610

599611
return serialized;
@@ -626,13 +638,45 @@ public:
626638
_data->local()._cache_counter += count;
627639
}
628640

641+
/**
642+
* Increment counter of DMA reads.
643+
*
644+
* @param count number of reads
645+
*/
646+
void modify_dma_counter(opentelemetry_state_data::dma_counter_t count) {
647+
_data->local()._dma_counter += count;
648+
}
649+
650+
/**
651+
* Increment number of bytes read in DMA reads.
652+
*
653+
* @param size number of bytes.
654+
*/
655+
void modify_dma_size(opentelemetry_state_data::dma_size_t size) {
656+
_data->local()._dma_size += size;
657+
}
658+
629659
/**
630660
* @return number of partitions that were found in cache
631661
*/
632662
opentelemetry_state_data::cache_counter_t get_cache_counter() const {
633663
return _data->local()._cache_counter;
634664
}
635665

666+
/**
667+
* @return number of DMA reads that were executed
668+
*/
669+
opentelemetry_state_data::dma_counter_t get_dma_counter() const {
670+
return _data->local()._dma_counter;
671+
}
672+
673+
/**
674+
* @return number of bytes read in DMA reads
675+
*/
676+
opentelemetry_state_data::dma_size_t get_dma_size() const {
677+
return _data->local()._dma_size;
678+
}
679+
636680
/**
637681
* @return True if OpenTelemetry trace state is stored.
638682
*/
@@ -694,6 +738,8 @@ public:
694738
{}
695739

696740
using cache_counter_t = opentelemetry_state_data::cache_counter_t;
741+
using dma_counter_t = opentelemetry_state_data::dma_counter_t;
742+
using dma_size_t = opentelemetry_state_data::dma_size_t;
697743

698744
/**
699745
* @return True if classic trace state is stored.
@@ -981,6 +1027,18 @@ inline void modify_cache_counter(const trace_state_ptr& p, trace_state_ptr::cach
9811027
}
9821028
}
9831029

1030+
inline void modify_dma_counter(const trace_state_ptr& p, trace_state_ptr::dma_counter_t count) {
1031+
if (p.has_opentelemetry()) {
1032+
p.get_opentelemetry_ptr()->modify_dma_counter(count);
1033+
}
1034+
}
1035+
1036+
inline void modify_dma_size(const trace_state_ptr& p, trace_state_ptr::dma_size_t size) {
1037+
if (p.has_opentelemetry()) {
1038+
p.get_opentelemetry_ptr()->modify_dma_size(size);
1039+
}
1040+
}
1041+
9841042
inline trace_state_ptr::cache_counter_t get_cache_counter(const trace_state_ptr& p) {
9851043
if (p.has_opentelemetry()) {
9861044
return p.get_opentelemetry_ptr()->get_cache_counter();
@@ -989,6 +1047,22 @@ inline trace_state_ptr::cache_counter_t get_cache_counter(const trace_state_ptr&
9891047
return 0;
9901048
}
9911049

1050+
inline trace_state_ptr::dma_counter_t get_dma_counter(const trace_state_ptr& p) {
1051+
if (p.has_opentelemetry()) {
1052+
return p.get_opentelemetry_ptr()->get_dma_counter();
1053+
}
1054+
1055+
return 0;
1056+
}
1057+
1058+
inline trace_state_ptr::dma_size_t get_dma_size(const trace_state_ptr& p) {
1059+
if (p.has_opentelemetry()) {
1060+
return p.get_opentelemetry_ptr()->get_dma_size();
1061+
}
1062+
1063+
return 0;
1064+
}
1065+
9921066
inline future<> start(const trace_state_ptr& p) {
9931067
if (p.has_opentelemetry()) {
9941068
return p.get_opentelemetry_ptr()->start();

0 commit comments

Comments
 (0)