@@ -507,13 +507,10 @@ private:
507507};
508508
509509
510- class opentelemetry_state final {
510+ class opentelemetry_state_data final {
511511public:
512512 using cache_counter_t = int32_t ;
513513
514- private:
515- lw_shared_ptr<trace_state> _state_ptr;
516- bool const _opentelemetry_tracing{false };
517514 inet_address_vector_replica_set _replicas;
518515 sstring _statement_type;
519516 // Number of read partitions that were found in cache.
@@ -522,25 +519,82 @@ private:
522519 void serialize_replicas (bytes& serialized) const ;
523520 void serialize_statement_type (bytes& serialized) const ;
524521 void serialize_cache_counter (bytes& serialized) const ;
522+ };
523+
524+
525+ class opentelemetry_state final {
526+
527+ private:
528+ lw_shared_ptr<trace_state> _state_ptr;
529+ std::shared_ptr<sharded<opentelemetry_state_data>> _data;
530+
531+ class reducer {
532+ private:
533+ opentelemetry_state_data _data;
534+
535+ public:
536+ void operator ()(const opentelemetry_state_data& to_reduce) {
537+ _data._replicas .insert (_data._replicas .end (),
538+ to_reduce._replicas .begin (),
539+ to_reduce._replicas .end ());
540+ _data._cache_counter += to_reduce._cache_counter ;
541+ _data._statement_type += to_reduce._statement_type ;
542+ }
543+
544+ opentelemetry_state_data get () const {
545+ return _data;
546+ }
547+ };
525548
526549public:
527550 opentelemetry_state () = default ;
528- opentelemetry_state (lw_shared_ptr<trace_state> state_ptr, bool opentelemetry_tracing = false )
529- : _state_ptr(std::move(state_ptr)), _opentelemetry_tracing(opentelemetry_tracing)
551+ opentelemetry_state (lw_shared_ptr<trace_state> state_ptr, std::shared_ptr<sharded<opentelemetry_state_data>> data)
552+ : _state_ptr(std::move(state_ptr)), _data(std::move(data))
553+ {}
554+ opentelemetry_state (std::nullptr_t , std::shared_ptr<sharded<opentelemetry_state_data>> data)
555+ : _state_ptr(nullptr ), _data(std::move(data))
530556 {}
531- opentelemetry_state (std::nullptr_t , bool opentelemetry_tracing = false )
532- : _state_ptr(nullptr ), _opentelemetry_tracing(opentelemetry_tracing)
557+ template <typename TraceState>
558+ opentelemetry_state (TraceState state_ptr, bool opentelemetry_tracing = false )
559+ : opentelemetry_state(std::move(state_ptr),
560+ opentelemetry_tracing
561+ ? std::make_shared<sharded<opentelemetry_state_data>>()
562+ : nullptr )
533563 {}
534564
565+ /* *
566+ * Helper function to initialize sharded data in opentelemetry_state.
567+ */
568+ future<> start () {
569+ return _data->start ();
570+ }
571+
572+ /* *
573+ * Helper function to properly stop sharded data in opentelemetry_state.
574+ */
575+ future<> stop () {
576+ return _data->stop ();
577+ }
578+
579+ /* *
580+ * Helper function to collect opentelemetry data from all shards.
581+ * It should be called before serializing opentelemetry_state.
582+ */
583+ future<> collect_data () {
584+ return _data->map_reduce (reducer{}, std::identity{}).then ([&local_data = _data->local ()] (auto data) {
585+ local_data = data;
586+ });
587+ }
588+
535589 /* *
536590 * @return serialized opentelemetry state.
537591 */
538592 bytes serialize () const noexcept {
539593 bytes serialized{};
540594
541- serialize_replicas (serialized);
542- serialize_cache_counter (serialized);
543- serialize_statement_type (serialized);
595+ _data-> local (). serialize_replicas (serialized);
596+ _data-> local (). serialize_cache_counter (serialized);
597+ _data-> local (). serialize_statement_type (serialized);
544598
545599 return serialized;
546600 }
@@ -551,7 +605,7 @@ public:
551605 * @param replicas list of contacted replicas
552606 */
553607 void set_replicas (const inet_address_vector_replica_set& replicas) {
554- _replicas = replicas;
608+ _data-> local (). _replicas = replicas;
555609 }
556610
557611 /* *
@@ -560,30 +614,32 @@ public:
560614 * @param statement_type type of prepared statement.
561615 */
562616 void set_statement_type (const sstring& statement_type) {
563- _statement_type = statement_type;
617+ _data-> local (). _statement_type = statement_type;
564618 }
565619
566620 /* *
567621 * Increment counter of partitions read from cache.
568622 *
569623 * @param count number of partitions
570624 */
571- void modify_cache_counter (cache_counter_t count) {
572- _cache_counter += count;
625+ void modify_cache_counter (opentelemetry_state_data::cache_counter_t count) {
626+ if (_data->local_is_initialized ()) {
627+ _data->local ()._cache_counter += count;
628+ }
573629 }
574630
575631 /* *
576632 * @return number of partitions that were found in cache
577633 */
578- cache_counter_t get_cache_counter () const {
579- return _cache_counter;
634+ opentelemetry_state_data:: cache_counter_t get_cache_counter () const {
635+ return _data-> local (). _cache_counter ;
580636 }
581637
582638 /* *
583639 * @return True if OpenTelemetry trace state is stored.
584640 */
585641 bool has_opentelemetry () const noexcept {
586- return _opentelemetry_tracing ;
642+ return __builtin_expect ( bool (_data), false ) ;
587643 };
588644
589645 /* *
@@ -606,6 +662,13 @@ public:
606662 trace_state& get_tracing () const noexcept {
607663 return *_state_ptr;
608664 }
665+
666+ /* *
667+ * @return A pointer to sharded data.
668+ */
669+ std::shared_ptr<sharded<opentelemetry_state_data>> get_data () const {
670+ return _data;
671+ }
609672};
610673
611674
@@ -632,7 +695,7 @@ public:
632695 : _state_ptr(nullptr )
633696 {}
634697
635- using cache_counter_t = opentelemetry_state ::cache_counter_t ;
698+ using cache_counter_t = opentelemetry_state_data ::cache_counter_t ;
636699
637700 /* *
638701 * @return True if classic trace state is stored.
@@ -884,7 +947,7 @@ inline std::optional<trace_info> make_trace_info(const trace_state_ptr& state) {
884947 if (state.has_opentelemetry ()) {
885948 trace_state_props_set props{};
886949 props.set (trace_state_props::opentelemetry);
887- return trace_info{props};
950+ return trace_info{props, state. get_opentelemetry (). get_data () };
888951 }
889952
890953 return std::nullopt ;
@@ -928,6 +991,30 @@ inline trace_state_ptr::cache_counter_t get_cache_counter(const trace_state_ptr&
928991 return 0 ;
929992}
930993
994+ inline future<> start (const trace_state_ptr& p) {
995+ if (p.has_opentelemetry ()) {
996+ return p.get_opentelemetry_ptr ()->start ();
997+ }
998+
999+ return make_ready_future<>();
1000+ }
1001+
1002+ inline future<> stop (const trace_state_ptr& p) {
1003+ if (p.has_opentelemetry ()) {
1004+ return p.get_opentelemetry_ptr ()->stop ();
1005+ }
1006+
1007+ return make_ready_future<>();
1008+ }
1009+
1010+ inline future<> collect_data (const trace_state_ptr& p) {
1011+ if (p.has_opentelemetry ()) {
1012+ return p.get_opentelemetry_ptr ()->collect_data ();
1013+ }
1014+
1015+ return make_ready_future<>();
1016+ }
1017+
9311018// global_trace_state_ptr is a helper class that may be used for creating spans
9321019// of an existing tracing session on other shards. When a tracing span on a
9331020// different shard is needed global_trace_state_ptr would create a secondary
0 commit comments