-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathspan_links.rs
More file actions
106 lines (84 loc) · 3 KB
/
Copy pathspan_links.rs
File metadata and controls
106 lines (84 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
/*!
This example demonstrates how to add links to spans.
Links relate spans outside of the normal parent-child hierarchy. In this example, we
have a dummy messaging system where a message includes the trace context of its producer.
Instead of linking the span for the worker to the producer as a child, we link it
using a span link instead.
The messaging infrastructure here isn't important. What's important is the way the span link
is attached to the span created on the worker thread, which happens in the `worker` function.
*/
use std::{
collections::VecDeque,
sync::{Arc, Mutex, Weak},
thread,
time::Duration,
};
type MessageQueue<T> = Mutex<VecDeque<Message<T>>>;
struct Message<T> {
producer_ctxt: emit::span::SpanCtxt,
data: T,
}
/*
The producer of a span link.
*/
#[emit::span("produce")]
fn produce(queue: Arc<MessageQueue<i32>>, data: i32) {
let msg = Message {
producer_ctxt: emit::span::SpanCtxt::current(emit::ctxt()),
data,
};
queue.lock().unwrap().push_back(msg);
}
/*
The consumer of a span link.
*/
fn worker<T>(queue: Weak<MessageQueue<T>>, mut process: impl FnMut(T)) {
loop {
let process = &mut process;
let Some(queue) = queue.upgrade() else {
// The other side has hung up; return
return;
};
let Some(msg) = queue.lock().unwrap().pop_front() else {
// Nothing to do; wait for a bit
thread::sleep(Duration::from_micros(1));
continue;
};
// 1. Convert the producer's span context into a span link
// This could be done any number of ways depending on the system.
// In this example, we're just converting an `emit::span::SpanCtxt` into
// an `emit::span::SpanLink`.
let mut span_links = emit::span::SpanLinkSet::new();
if let (Some(trace_id), Some(span_id)) =
(msg.producer_ctxt.trace_id(), msg.producer_ctxt.span_id())
{
span_links.insert(emit::span::SpanLink::new(*trace_id, *span_id));
}
// 2. Create a span for the worker that will include the span link
// This is an inline alternative to creating a function with `#[emit::span]`
// on it, which we could also use
let (mut span, frame) = emit::span_guard!(evt_props: emit::props! { span_links }, "worker");
frame.call(move || {
span.start();
// Invoke the worker closure in the context of the span
process(msg.data);
})
}
}
fn main() {
let rt = emit::setup()
.emit_to(emit::emitter::from_fn(|evt| println!("{evt:?}")))
.init();
let queue = Arc::new(MessageQueue::<i32>::default());
let worker = thread::spawn({
let queue = Arc::downgrade(&queue);
move || {
worker(queue, |data: i32| {
emit::info!("processing {data}");
})
}
});
produce(queue, 42);
worker.join().unwrap();
rt.blocking_flush(Duration::from_secs(5));
}