Skip to content

Commit f912bab

Browse files
author
Karolina Drabik
committed
add new replica-side tracing info - dma_counter and dma_size
1 parent 5518f2b commit f912bab

File tree

9 files changed

+111
-18
lines changed

9 files changed

+111
-18
lines changed

message/messaging_service.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1061,14 +1061,14 @@ future<> messaging_service::send_mutation_failed(msg_addr id, unsigned shard, re
10611061
return send_message_oneway(this, messaging_verb::MUTATION_FAILED, std::move(id), shard, std::move(response_id), num_failed, std::move(backlog));
10621062
}
10631063

1064-
void messaging_service::register_read_data(std::function<future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> (const rpc::client_info&, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda)>&& func) {
1064+
void messaging_service::register_read_data(std::function<future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> (const rpc::client_info&, rpc::opt_time_point t, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> oda)>&& func) {
10651065
register_handler(this, netw::messaging_verb::READ_DATA, std::move(func));
10661066
}
10671067
future<> messaging_service::unregister_read_data() {
10681068
return unregister_handler(netw::messaging_verb::READ_DATA);
10691069
}
1070-
future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t>> messaging_service::send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da) {
1071-
return send_message_timeout<future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t>>>(this, messaging_verb::READ_DATA, std::move(id), timeout, cmd, pr, da);
1070+
future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> messaging_service::send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da) {
1071+
return send_message_timeout<future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>>(this, messaging_verb::READ_DATA, std::move(id), timeout, cmd, pr, da);
10721072
}
10731073

10741074
void messaging_service::register_get_schema_version(std::function<future<frozen_schema>(unsigned, table_schema_version)>&& func) {

message/messaging_service.hh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -482,9 +482,9 @@ public:
482482

483483
// Wrapper for READ_DATA
484484
// Note: WTH is future<foreign_ptr<lw_shared_ptr<query::result>>
485-
void register_read_data(std::function<future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> (const rpc::client_info&, rpc::opt_time_point timeout, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> digest)>&& func);
485+
void register_read_data(std::function<future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> (const rpc::client_info&, rpc::opt_time_point timeout, query::read_command cmd, ::compat::wrapping_partition_range pr, rpc::optional<query::digest_algorithm> digest)>&& func);
486486
future<> unregister_read_data();
487-
future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t>> send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da);
487+
future<rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> send_read_data(msg_addr id, clock_type::time_point timeout, const query::read_command& cmd, const dht::partition_range& pr, query::digest_algorithm da);
488488

489489
// Wrapper for GET_SCHEMA_VERSION
490490
void register_get_schema_version(std::function<future<frozen_schema>(unsigned, table_schema_version)>&& func);

service/storage_proxy.cc

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3549,7 +3549,7 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
35493549
});
35503550
}
35513551
}
3552-
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
3552+
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> make_data_request(gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
35533553
++_proxy->get_stats().data_read_attempts.get_ep_stat(ep);
35543554
auto opts = want_digest
35553555
? query::result_options{query::result_request::result_and_digest, digest_algorithm(*_proxy)}
@@ -3559,11 +3559,11 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
35593559
return _proxy->query_result_local(_schema, _cmd, _partition_range, opts, _trace_state, timeout);
35603560
} else {
35613561
tracing::trace(_trace_state, "read_data: sending a message to /{}", ep);
3562-
return _proxy->_messaging.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t> response) {
3563-
auto&& [result, hit_rate, cache_counter] = response;
3562+
return _proxy->_messaging.send_read_data(netw::messaging_service::msg_addr{ep, 0}, timeout, *_cmd, _partition_range, opts.digest_algo).then([this, ep](rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t> response) {
3563+
auto&& [result, hit_rate, cache_counter, dma_counter, dma_size] = response;
35643564
tracing::trace(_trace_state, "read_data: got response from /{}", ep);
35653565
tracing::modify_cache_counter(_trace_state, cache_counter);
3566-
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>>(rpc::tuple(make_foreign(::make_lw_shared<query::result>(std::move(result))), hit_rate.value_or(cache_temperature::invalid()), cache_counter));
3566+
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>(rpc::tuple(make_foreign(::make_lw_shared<query::result>(std::move(result))), hit_rate.value_or(cache_temperature::invalid()), cache_counter, dma_counter, dma_size));
35673567
});
35683568
}
35693569
}
@@ -3606,7 +3606,7 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
36063606
auto start = latency_clock::now();
36073607
for (const gms::inet_address& ep : boost::make_iterator_range(begin, end)) {
36083608
// Waited on indirectly, shared_from_this keeps `this` alive
3609-
(void)make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> f) {
3609+
(void)make_data_request(ep, timeout, want_digest).then_wrapped([this, resolver, ep, start, exec = shared_from_this()] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> f) {
36103610
try {
36113611
auto v = f.get0();
36123612
_cf->set_hit_rate(ep, std::get<1>(v));
@@ -4007,13 +4007,13 @@ ::shared_ptr<abstract_read_executor> storage_proxy::get_read_executor(lw_shared_
40074007

40084008
future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>>
40094009
storage_proxy::query_result_local_digest(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, query::digest_algorithm da) {
4010-
return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(da), std::move(trace_state), timeout).then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t> result_and_hit_rate) {
4011-
auto&& [result, hit_rate, cache_counter] = result_and_hit_rate;
4010+
return query_result_local(std::move(s), std::move(cmd), pr, query::result_options::only_digest(da), std::move(trace_state), timeout).then([] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t> result_and_hit_rate) {
4011+
auto&& [result, hit_rate, cache_counter, dma_counter, dma_size] = result_and_hit_rate;
40124012
return make_ready_future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>>(rpc::tuple(*result->digest(), result->last_modified(), hit_rate));
40134013
});
40144014
}
40154015

4016-
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>>
4016+
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>
40174017
storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_options opts,
40184018
tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout) {
40194019
cmd->slice.options.set_if<query::partition_slice::option::with_digest>(opts.request != query::result_request::only_result);
@@ -4026,7 +4026,7 @@ storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_comman
40264026
return db.query(gs, *cmd, opts, prv, trace_state, timeout).then([trace_state](std::tuple<lw_shared_ptr<query::result>, cache_temperature>&& f_ht) {
40274027
auto&& [f, ht] = f_ht;
40284028
tracing::trace(trace_state, "Querying is done");
4029-
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>>(rpc::tuple(make_foreign(std::move(f)), ht, get_cache_counter(trace_state)));
4029+
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>(rpc::tuple(make_foreign(std::move(f)), ht, get_cache_counter(trace_state), get_dma_counter(trace_state), get_dma_size(trace_state)));
40304030
});
40314031
});
40324032
} else {
@@ -4036,7 +4036,7 @@ storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_comman
40364036
[trace_state = std::move(trace_state)] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>&& r_ht) {
40374037
auto&& [r, ht] = r_ht;
40384038
tracing::trace(trace_state, "Querying is done");
4039-
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>>(rpc::tuple(std::move(r), ht, get_cache_counter(trace_state)));
4039+
return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>>(rpc::tuple(std::move(r), ht, get_cache_counter(trace_state), get_dma_counter(trace_state), get_dma_size(trace_state)));
40404040
});
40414041
}
40424042
}

service/storage_proxy.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ private:
377377
const inet_address_vector_replica_set& preferred_endpoints,
378378
bool& is_bounced_read,
379379
service_permit permit);
380-
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t>> query_result_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
380+
future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t, tracing::trace_state_ptr::dma_counter_t, tracing::trace_state_ptr::dma_size_t>> query_result_local(schema_ptr, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr,
381381
query::result_options opts,
382382
tracing::trace_state_ptr trace_state,
383383
clock_type::time_point timeout);

sstables/index_reader.hh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -316,7 +316,7 @@ inline file make_tracked_index_file(sstable& sst, reader_permit permit, tracing:
316316
use_caching caching) {
317317
auto f = caching ? sst.index_file() : sst.uncached_index_file();
318318
f = make_tracked_file(std::move(f), std::move(permit));
319-
if (!trace_state.has_tracing()) {
319+
if (!trace_state.has_tracing() && !trace_state.has_opentelemetry()) {
320320
return f;
321321
}
322322
return tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", sst.filename(component_type::Index)));

sstables/sstables.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2286,7 +2286,7 @@ input_stream<char> sstable::data_stream(uint64_t pos, size_t len, const io_prior
22862286
options.dynamic_adjustments = std::move(history);
22872287

22882288
file f = make_tracked_file(_data_file, std::move(permit));
2289-
if (trace_state.has_tracing()) {
2289+
if (trace_state.has_tracing() || trace_state.has_opentelemetry()) {
22902290
f = tracing::make_traced_file(std::move(f), std::move(trace_state), format("{}:", get_filename()));
22912291
}
22922292

tracing/trace_state.cc

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,4 +361,16 @@ void opentelemetry_state_data::serialize_cache_counter(bytes& serialized) const
361361
const auto *counter_ptr = reinterpret_cast<const int8_t*>(&counter);
362362
serialized += bytes{counter_ptr, sizeof(counter)};
363363
}
364+
365+
void opentelemetry_state_data::serialize_dma_counter(bytes& serialized) const {
366+
const auto counter = htonl(_dma_counter);
367+
const auto *counter_ptr = reinterpret_cast<const int8_t*>(&counter);
368+
serialized += bytes{counter_ptr, sizeof(counter)};
369+
}
370+
371+
void opentelemetry_state_data::serialize_dma_size(bytes& serialized) const {
372+
const auto size = htonl(_dma_size);
373+
const auto *size_ptr = reinterpret_cast<const int8_t*>(&size);
374+
serialized += bytes{size_ptr, sizeof(size)};
375+
}
364376
}

tracing/trace_state.hh

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,13 +510,21 @@ private:
510510
class opentelemetry_state_data final {
511511
public:
512512
using cache_counter_t = int32_t;
513+
using dma_counter_t = int32_t;
514+
using dma_size_t = int32_t;
513515

514516
inet_address_vector_replica_set _replicas;
515517
// Number of read partitions that were found in cache.
516518
cache_counter_t _cache_counter{0};
519+
// Number of DMA reads that were executed.
520+
dma_counter_t _dma_counter{0};
521+
// Number of bytes read in DMA reads.
522+
dma_size_t _dma_size{0};
517523

518524
void serialize_replicas(bytes& serialized) const;
519525
void serialize_cache_counter(bytes& serialized) const;
526+
void serialize_dma_counter(bytes& serialized) const;
527+
void serialize_dma_size(bytes& serialized) const;
520528
};
521529

522530

@@ -536,6 +544,8 @@ private:
536544
to_reduce._replicas.begin(),
537545
to_reduce._replicas.end());
538546
_data._cache_counter += to_reduce._cache_counter;
547+
_data._dma_counter += to_reduce._dma_counter;
548+
_data._dma_size += to_reduce._dma_size;
539549
}
540550

541551
opentelemetry_state_data get() const {
@@ -591,6 +601,8 @@ public:
591601

592602
_data->local().serialize_replicas(serialized);
593603
_data->local().serialize_cache_counter(serialized);
604+
_data->local().serialize_dma_counter(serialized);
605+
_data->local().serialize_dma_size(serialized);
594606

595607
return serialized;
596608
}
@@ -613,13 +625,45 @@ public:
613625
_data->local()._cache_counter += count;
614626
}
615627

628+
/**
629+
* Increment counter of DMA reads.
630+
*
631+
* @param count number of reads
632+
*/
633+
void modify_dma_counter(opentelemetry_state_data::dma_counter_t count) {
634+
_data->local()._dma_counter += count;
635+
}
636+
637+
/**
638+
* Increment number of bytes read in DMA reads.
639+
*
640+
* @param size number of bytes.
641+
*/
642+
void modify_dma_size(opentelemetry_state_data::dma_size_t size) {
643+
_data->local()._dma_size += size;
644+
}
645+
616646
/**
617647
* @return number of partitions that were found in cache
618648
*/
619649
opentelemetry_state_data::cache_counter_t get_cache_counter() const {
620650
return _data->local()._cache_counter;
621651
}
622652

653+
/**
654+
* @return number of DMA reads that were executed
655+
*/
656+
opentelemetry_state_data::dma_counter_t get_dma_counter() const {
657+
return _data->local()._dma_counter;
658+
}
659+
660+
/**
661+
* @return number of bytes read in DMA reads
662+
*/
663+
opentelemetry_state_data::dma_size_t get_dma_size() const {
664+
return _data->local()._dma_size;
665+
}
666+
623667
/**
624668
* @return True if OpenTelemetry trace state is stored.
625669
*/
@@ -681,6 +725,8 @@ public:
681725
{}
682726

683727
using cache_counter_t = opentelemetry_state_data::cache_counter_t;
728+
using dma_counter_t = opentelemetry_state_data::dma_counter_t;
729+
using dma_size_t = opentelemetry_state_data::dma_size_t;
684730

685731
/**
686732
* @return True if classic trace state is stored.
@@ -962,6 +1008,18 @@ inline void modify_cache_counter(const trace_state_ptr& p, trace_state_ptr::cach
9621008
}
9631009
}
9641010

1011+
inline void modify_dma_counter(const trace_state_ptr& p, trace_state_ptr::dma_counter_t count) {
1012+
if (p.has_opentelemetry()) {
1013+
p.get_opentelemetry_ptr()->modify_dma_counter(count);
1014+
}
1015+
}
1016+
1017+
inline void modify_dma_size(const trace_state_ptr& p, trace_state_ptr::dma_size_t size) {
1018+
if (p.has_opentelemetry()) {
1019+
p.get_opentelemetry_ptr()->modify_dma_size(size);
1020+
}
1021+
}
1022+
9651023
inline trace_state_ptr::cache_counter_t get_cache_counter(const trace_state_ptr& p) {
9661024
if (p.has_opentelemetry()) {
9671025
return p.get_opentelemetry_ptr()->get_cache_counter();
@@ -970,6 +1028,22 @@ inline trace_state_ptr::cache_counter_t get_cache_counter(const trace_state_ptr&
9701028
return 0;
9711029
}
9721030

1031+
inline trace_state_ptr::dma_counter_t get_dma_counter(const trace_state_ptr& p) {
1032+
if (p.has_opentelemetry()) {
1033+
return p.get_opentelemetry_ptr()->get_dma_counter();
1034+
}
1035+
1036+
return 0;
1037+
}
1038+
1039+
inline trace_state_ptr::dma_size_t get_dma_size(const trace_state_ptr& p) {
1040+
if (p.has_opentelemetry()) {
1041+
return p.get_opentelemetry_ptr()->get_dma_size();
1042+
}
1043+
1044+
return 0;
1045+
}
1046+
9731047
inline future<> start(const trace_state_ptr& p) {
9741048
if (p.has_opentelemetry()) {
9751049
return p.get_opentelemetry_ptr()->start();

0 commit comments

Comments
 (0)