@@ -3549,7 +3549,7 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
35493549 });
35503550 }
35513551 }
3552- future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t >> make_data_request (gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
3552+ future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t , tracing::trace_state_ptr:: dma_counter_t , tracing::trace_state_ptr:: dma_size_t >> make_data_request (gms::inet_address ep, clock_type::time_point timeout, bool want_digest) {
35533553 ++_proxy->get_stats ().data_read_attempts .get_ep_stat (ep);
35543554 auto opts = want_digest
35553555 ? query::result_options{query::result_request::result_and_digest, digest_algorithm (*_proxy)}
@@ -3559,11 +3559,11 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
35593559 return _proxy->query_result_local (_schema, _cmd, _partition_range, opts, _trace_state, timeout);
35603560 } else {
35613561 tracing::trace (_trace_state, " read_data: sending a message to /{}" , ep);
3562- return _proxy->_messaging .send_read_data (netw::messaging_service::msg_addr{ep, 0 }, timeout, *_cmd, _partition_range, opts.digest_algo ).then ([this , ep](rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t > result_hit_rate) {
3563- auto && [result, hit_rate, cache_counter] = result_hit_rate;
3562+ return _proxy->_messaging .send_read_data (netw::messaging_service::msg_addr{ep, 0 }, timeout, *_cmd, _partition_range, opts.digest_algo ).then ([this , ep](rpc::tuple<query::result, rpc::optional<cache_temperature>, tracing::trace_state_ptr::cache_counter_t , tracing::trace_state_ptr:: dma_counter_t , tracing::trace_state_ptr:: dma_size_t > result_hit_rate) {
3563+ auto && [result, hit_rate, cache_counter, dma_counter, dma_size ] = result_hit_rate;
35643564 tracing::trace (_trace_state, " read_data: got response from /{}" , ep);
35653565 tracing::modify_cache_counter (_trace_state, cache_counter);
3566- return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t >>(rpc::tuple (make_foreign (::make_lw_shared<query::result>(std::move (result))), hit_rate.value_or (cache_temperature::invalid ()), cache_counter));
3566+ return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t , tracing::trace_state_ptr:: dma_counter_t , tracing::trace_state_ptr:: dma_size_t >>(rpc::tuple (make_foreign (::make_lw_shared<query::result>(std::move (result))), hit_rate.value_or (cache_temperature::invalid ()), cache_counter, dma_counter, dma_size ));
35673567 });
35683568 }
35693569 }
@@ -3606,7 +3606,7 @@ class abstract_read_executor : public enable_shared_from_this<abstract_read_exec
36063606 auto start = latency_clock::now ();
36073607 for (const gms::inet_address& ep : boost::make_iterator_range (begin, end)) {
36083608 // Waited on indirectly, shared_from_this keeps `this` alive
3609- (void )make_data_request (ep, timeout, want_digest).then_wrapped ([this , resolver, ep, start, exec = shared_from_this ()] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t >> f) {
3609+ (void )make_data_request (ep, timeout, want_digest).then_wrapped ([this , resolver, ep, start, exec = shared_from_this ()] (future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t , tracing::trace_state_ptr:: dma_counter_t , tracing::trace_state_ptr:: dma_size_t >> f) {
36103610 try {
36113611 auto v = f.get0 ();
36123612 _cf->set_hit_rate (ep, std::get<1 >(v));
@@ -4007,13 +4007,13 @@ ::shared_ptr<abstract_read_executor> storage_proxy::get_read_executor(lw_shared_
40074007
40084008future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>>
40094009storage_proxy::query_result_local_digest (schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout, query::digest_algorithm da) {
4010- return query_result_local (std::move (s), std::move (cmd), pr, query::result_options::only_digest (da), std::move (trace_state), timeout).then ([] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t > result_and_hit_rate) {
4011- auto && [result, hit_rate, cache_counter] = result_and_hit_rate;
4010+ return query_result_local (std::move (s), std::move (cmd), pr, query::result_options::only_digest (da), std::move (trace_state), timeout).then ([] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t , tracing::trace_state_ptr:: dma_counter_t , tracing::trace_state_ptr:: dma_size_t > result_and_hit_rate) {
4011+ auto && [result, hit_rate, cache_counter, dma_counter, dma_size ] = result_and_hit_rate;
40124012 return make_ready_future<rpc::tuple<query::result_digest, api::timestamp_type, cache_temperature>>(rpc::tuple (*result->digest (), result->last_modified (), hit_rate));
40134013 });
40144014}
40154015
4016- future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t >>
4016+ future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t , tracing::trace_state_ptr:: dma_counter_t , tracing::trace_state_ptr:: dma_size_t >>
40174017storage_proxy::query_result_local (schema_ptr s, lw_shared_ptr<query::read_command> cmd, const dht::partition_range& pr, query::result_options opts,
40184018 tracing::trace_state_ptr trace_state, storage_proxy::clock_type::time_point timeout) {
40194019 cmd->slice .options .set_if <query::partition_slice::option::with_digest>(opts.request != query::result_request::only_result);
@@ -4026,7 +4026,7 @@ storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_comman
40264026 return db.query (gs, *cmd, opts, prv, trace_state, timeout).then ([trace_state](std::tuple<lw_shared_ptr<query::result>, cache_temperature>&& f_ht) {
40274027 auto && [f, ht] = f_ht;
40284028 tracing::trace (trace_state, " Querying is done" );
4029- return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t >>(rpc::tuple (make_foreign (std::move (f)), ht, get_cache_counter (trace_state)));
4029+ return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t , tracing::trace_state_ptr:: dma_counter_t , tracing::trace_state_ptr:: dma_size_t >>(rpc::tuple (make_foreign (std::move (f)), ht, get_cache_counter (trace_state), get_dma_counter (trace_state), get_dma_size (trace_state)));
40304030 });
40314031 });
40324032 } else {
@@ -4036,7 +4036,7 @@ storage_proxy::query_result_local(schema_ptr s, lw_shared_ptr<query::read_comman
40364036 [trace_state = std::move (trace_state)] (rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature>&& r_ht) {
40374037 auto && [r, ht] = r_ht;
40384038 tracing::trace (trace_state, " Querying is done" );
4039- return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t >>(rpc::tuple (std::move (r), ht, get_cache_counter (trace_state)));
4039+ return make_ready_future<rpc::tuple<foreign_ptr<lw_shared_ptr<query::result>>, cache_temperature, tracing::trace_state_ptr::cache_counter_t , tracing::trace_state_ptr:: dma_counter_t , tracing::trace_state_ptr:: dma_size_t >>(rpc::tuple (std::move (r), ht, get_cache_counter (trace_state), get_dma_counter (trace_state), get_dma_size (trace_state)));
40404040 });
40414041 }
40424042}
0 commit comments