-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathpropagator.rs
More file actions
109 lines (95 loc) · 3.6 KB
/
propagator.rs
File metadata and controls
109 lines (95 loc) · 3.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
use opentelemetry::propagation::{Extractor, Injector};
use rdkafka::{
message::{BorrowedHeaders, Headers, OwnedHeaders},
producer::FutureRecord,
};
use tracing::{debug, Span};
use tracing_opentelemetry::OpenTelemetrySpanExt;
struct HeaderInjector<'a>(pub &'a mut OwnedHeaders);
impl Injector for HeaderInjector<'_> {
fn set(&mut self, key: &str, value: String) {
let mut new = OwnedHeaders::new().insert(rdkafka::message::Header {
key,
value: Some(&value),
});
for header in self.0.iter() {
let s = String::from_utf8(header.value.unwrap().to_vec()).unwrap();
new = new.insert(rdkafka::message::Header {
key: header.key,
value: Some(&s),
});
}
self.0.clone_from(&new);
}
}
struct HeaderExtractor<'a>(pub &'a BorrowedHeaders);
impl Extractor for HeaderExtractor<'_> {
fn get(&self, key: &str) -> Option<&str> {
for i in 0..self.0.count() {
if let Ok(val) = self.0.get_as::<str>(i) {
if val.key == key {
return val.value;
}
}
}
None
}
fn keys(&self) -> Vec<&str> {
self.0.iter().map(|kv| kv.key).collect::<Vec<_>>()
}
}
/// May be used when the component produces messages.
/// The `conditional_` prefix indicates a bool should be passed,
/// indicating whether OpenTelemetry is used.
/// If this is false, the methods usually do nothing.
pub trait FutureRecordTracerExt {
fn optional_headers(self, headers: Option<OwnedHeaders>) -> Self;
fn conditional_inject_current_span_into_headers(self, use_otel: bool) -> Self;
fn conditional_inject_span_into_headers(self, use_otel: bool, span: &Span) -> Self;
}
impl FutureRecordTracerExt for FutureRecord<'_, str, [u8]> {
fn optional_headers(self, headers: Option<OwnedHeaders>) -> Self {
if let Some(headers) = headers {
self.headers(headers)
} else {
self
}
}
fn conditional_inject_current_span_into_headers(self, use_otel: bool) -> Self {
self.conditional_inject_span_into_headers(use_otel, &tracing::Span::current())
}
fn conditional_inject_span_into_headers(self, use_otel: bool, span: &Span) -> Self {
if use_otel {
let mut headers = self.headers.clone().unwrap_or_default();
opentelemetry::global::get_text_map_propagator(|propagator| {
propagator.inject_context(&span.context(), &mut HeaderInjector(&mut headers))
});
self.headers(headers)
} else {
self
}
}
}
/// May be used when the component consumne messages.
/// The `conditional_` prefix indicates a bool should be passed,
/// indicating whether OpenTelemetry is used.
/// If this is false, the methods usually do nothing.
pub trait OptionalHeaderTracerExt {
fn conditional_extract_to_current_span(self, use_otel: bool);
fn conditional_extract_to_span(self, use_otel: bool, span: &Span);
}
impl OptionalHeaderTracerExt for Option<&BorrowedHeaders> {
fn conditional_extract_to_current_span(self, use_otel: bool) {
self.conditional_extract_to_span(use_otel, &tracing::Span::current())
}
fn conditional_extract_to_span(self, use_otel: bool, span: &Span) {
if let Some(headers) = self {
if use_otel {
debug!("Kafka Header Found");
span.set_parent(opentelemetry::global::get_text_map_propagator(
|propagator| propagator.extract(&HeaderExtractor(headers)),
));
}
}
}
}