@@ -507,37 +507,90 @@ private:
507507};
508508
509509
510- class opentelemetry_state final {
510+ class opentelemetry_state_data final {
511511public:
512512 using cache_counter_t = int32_t ;
513513
514- private:
515- lw_shared_ptr<trace_state> _state_ptr;
516- bool const _opentelemetry_tracing{false };
517514 inet_address_vector_replica_set _replicas;
518515 // Number of read partitions that were found in cache.
519516 cache_counter_t _cache_counter{0 };
520517
521518 void serialize_replicas (bytes& serialized) const ;
522519 void serialize_cache_counter (bytes& serialized) const ;
520+ };
521+
522+
523+ class opentelemetry_state final {
524+
525+ private:
526+ lw_shared_ptr<trace_state> _state_ptr;
527+ std::shared_ptr<sharded<opentelemetry_state_data>> _data;
528+
529+ class reducer {
530+ private:
531+ opentelemetry_state_data _data;
532+
533+ public:
534+ void operator ()(const opentelemetry_state_data& to_reduce) {
535+ _data._replicas .insert (_data._replicas .end (),
536+ to_reduce._replicas .begin (),
537+ to_reduce._replicas .end ());
538+ _data._cache_counter += to_reduce._cache_counter ;
539+ }
540+
541+ opentelemetry_state_data get () const {
542+ return _data;
543+ }
544+ };
523545
524546public:
525547 opentelemetry_state () = default ;
526- opentelemetry_state (lw_shared_ptr<trace_state> state_ptr, bool opentelemetry_tracing = false )
527- : _state_ptr(std::move(state_ptr)), _opentelemetry_tracing(opentelemetry_tracing)
548+ opentelemetry_state (lw_shared_ptr<trace_state> state_ptr, std::shared_ptr<sharded<opentelemetry_state_data>> data)
549+ : _state_ptr(std::move(state_ptr)), _data(std::move(data))
550+ {}
551+ opentelemetry_state (std::nullptr_t , std::shared_ptr<sharded<opentelemetry_state_data>> data)
552+ : _state_ptr(nullptr ), _data(std::move(data))
528553 {}
529- opentelemetry_state (std::nullptr_t , bool opentelemetry_tracing = false )
530- : _state_ptr(nullptr ), _opentelemetry_tracing(opentelemetry_tracing)
554+ template <typename TraceState>
555+ opentelemetry_state (TraceState state_ptr, bool opentelemetry_tracing = false )
556+ : opentelemetry_state(std::move(state_ptr),
557+ opentelemetry_tracing
558+ ? std::make_shared<sharded<opentelemetry_state_data>>()
559+ : nullptr )
531560 {}
532561
562+ /* *
563+ * Helper function to initialize sharded data in opentelemetry_state.
564+ */
565+ future<> start () {
566+ return _data->start ();
567+ }
568+
569+ /* *
570+ * Helper function to properly stop sharded data in opentelemetry_state.
571+ */
572+ future<> stop () {
573+ return _data->stop ();
574+ }
575+
576+ /* *
577+ * Helper function to collect opentelemetry data from all shards.
578+ * It should be called before serializing opentelemetry_state.
579+ */
580+ future<> collect_data () {
581+ return _data->map_reduce (reducer{}, std::identity{}).then ([&local_data = _data->local ()] (auto data) {
582+ local_data = data;
583+ });
584+ }
585+
533586 /* *
534587 * @return serialized opentelemetry state.
535588 */
536589 bytes serialize () const noexcept {
537590 bytes serialized{};
538591
539- serialize_replicas (serialized);
540- serialize_cache_counter (serialized);
592+ _data-> local (). serialize_replicas (serialized);
593+ _data-> local (). serialize_cache_counter (serialized);
541594
542595 return serialized;
543596 }
@@ -548,30 +601,30 @@ public:
548601 * @param replicas list of contacted replicas
549602 */
550603 void set_replicas (const inet_address_vector_replica_set& replicas) {
551- _replicas = replicas;
604+ _data-> local (). _replicas = replicas;
552605 }
553606
554607 /* *
555608 * Increment counter of partitions read from cache.
556609 *
557610 * @param count number of partitions
558611 */
559- void modify_cache_counter (cache_counter_t count) {
560- _cache_counter += count;
612+ void modify_cache_counter (opentelemetry_state_data:: cache_counter_t count) {
613+ _data-> local (). _cache_counter += count;
561614 }
562615
563616 /* *
564617 * @return number of partitions that were found in cache
565618 */
566- cache_counter_t get_cache_counter () const {
567- return _cache_counter;
619+ opentelemetry_state_data:: cache_counter_t get_cache_counter () const {
620+ return _data-> local (). _cache_counter ;
568621 }
569622
570623 /* *
571624 * @return True if OpenTelemetry trace state is stored.
572625 */
573626 bool has_opentelemetry () const noexcept {
574- return _opentelemetry_tracing ;
627+ return __builtin_expect ( bool (_data), false ) ;
575628 };
576629
577630 /* *
@@ -594,6 +647,13 @@ public:
594647 trace_state& get_tracing () const noexcept {
595648 return *_state_ptr;
596649 }
650+
651+ /* *
652+ * @return A pointer to sharded data.
653+ */
654+ std::shared_ptr<sharded<opentelemetry_state_data>> get_data () const {
655+ return _data;
656+ }
597657};
598658
599659
@@ -620,7 +680,7 @@ public:
620680 : _state_ptr(nullptr )
621681 {}
622682
623- using cache_counter_t = opentelemetry_state ::cache_counter_t ;
683+ using cache_counter_t = opentelemetry_state_data ::cache_counter_t ;
624684
625685 /* *
626686 * @return True if classic trace state is stored.
@@ -872,7 +932,7 @@ inline std::optional<trace_info> make_trace_info(const trace_state_ptr& state) {
872932 if (state.has_opentelemetry ()) {
873933 trace_state_props_set props{};
874934 props.set (trace_state_props::opentelemetry);
875- return trace_info{props};
935+ return trace_info{props, state. get_opentelemetry (). get_data () };
876936 }
877937
878938 return std::nullopt ;
@@ -910,6 +970,30 @@ inline trace_state_ptr::cache_counter_t get_cache_counter(const trace_state_ptr&
910970 return 0 ;
911971}
912972
973+ inline future<> start (const trace_state_ptr& p) {
974+ if (p.has_opentelemetry ()) {
975+ return p.get_opentelemetry_ptr ()->start ();
976+ }
977+
978+ return make_ready_future<>();
979+ }
980+
981+ inline future<> stop (const trace_state_ptr& p) {
982+ if (p.has_opentelemetry ()) {
983+ return p.get_opentelemetry_ptr ()->stop ();
984+ }
985+
986+ return make_ready_future<>();
987+ }
988+
989+ inline future<> collect_data (const trace_state_ptr& p) {
990+ if (p.has_opentelemetry ()) {
991+ return p.get_opentelemetry_ptr ()->collect_data ();
992+ }
993+
994+ return make_ready_future<>();
995+ }
996+
913997// global_trace_state_ptr is a helper class that may be used for creating spans
914998// of an existing tracing session on other shards. When a tracing span on a
915999// different shard is needed global_trace_state_ptr would create a secondary
0 commit comments